Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Possible crash if error handler if subscription was destroyed #659

Merged
merged 1 commit into from
Jun 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions src/asynccb.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "natsp.h"
#include "mem.h"
#include "conn.h"
#include "sub.h"
#if defined(NATS_HAS_STREAMING)
#include "stan/conn.h"
#endif
Expand Down Expand Up @@ -46,6 +47,8 @@ _createAndPostCb(natsAsyncCbType type, natsConnection *nc, natsSubscription *sub
cb->type = type;
cb->nc = nc;
cb->sub = sub;
if (sub != NULL)
natsSub_retain(sub);
cb->err = err;
cb->errTxt = errTxt;
#if defined(NATS_HAS_STREAMING)
Expand Down Expand Up @@ -85,20 +88,23 @@ natsAsyncCb_PostStanConnLostHandler(stanConnection *sc)
void
natsAsyncCb_Destroy(natsAsyncCbInfo *info)
{
natsConnection *nc = NULL;
natsConnection *nc = NULL;
natsSubscription *sub = NULL;
#if defined(NATS_HAS_STREAMING)
stanConnection *sc = NULL;
stanConnection *sc = NULL;
#endif

if (info == NULL)
return;

nc = info->nc;
nc = info->nc;
sub = info->sub;
#if defined(NATS_HAS_STREAMING)
sc = info->sc;
sc = info->sc;
#endif

_freeAsyncCbInfo(info);
natsSub_release(sub);
natsConn_release(nc);
#if defined(NATS_HAS_STREAMING)
stanConn_release(sc);
Expand Down
10 changes: 9 additions & 1 deletion src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -2792,7 +2792,11 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)
}
}

natsSubAndLdw_UnlockAndRelease(sub);
// If we are going to post to the error handler, do not release yet.
if (sc || sm)
natsSubAndLdw_Unlock(sub);
else
natsSubAndLdw_UnlockAndRelease(sub);

if ((s == NATS_OK) && fcReply)
s = natsConnection_Publish(nc, fcReply, NULL, 0);
Expand All @@ -2807,6 +2811,10 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)
nc->err = (sc ? NATS_SLOW_CONSUMER : NATS_MISMATCH);
natsAsyncCb_PostErrHandler(nc, sub, nc->err, NULL);

// Now release the subscription (it has been retained in
// natsAsyncCb_PostErrHandler function).
natsSub_release(sub);

natsConn_Unlock(nc);
}

Expand Down
4 changes: 2 additions & 2 deletions src/crypto.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include <sodium.h>

natsStatus
natsCrypto_Init()
natsCrypto_Init(void)
{
return ((sodium_init() == -1) ? NATS_ERR : NATS_OK);
}
Expand Down Expand Up @@ -485,7 +485,7 @@ secure_memzero(void * const pnt, const size_t len)
}

natsStatus
natsCrypto_Init()
natsCrypto_Init(void)
{
return NATS_OK;
}
Expand Down
4 changes: 2 additions & 2 deletions src/nats.c
Original file line number Diff line number Diff line change
Expand Up @@ -1969,13 +1969,13 @@ natsLib_msgDeliveryAssignWorker(natsSubscription *sub)
}

bool
natsLib_isLibHandlingMsgDeliveryByDefault()
natsLib_isLibHandlingMsgDeliveryByDefault(void)
{
return gLib.libHandlingMsgDeliveryByDefault;
}

int64_t
natsLib_defaultWriteDeadline()
natsLib_defaultWriteDeadline(void)
{
return gLib.libDefaultWriteDeadline;
}
Expand Down
1 change: 1 addition & 0 deletions test/list.txt
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ AsyncSubscriptionPendingDrain
SyncSubscriptionPending
SyncSubscriptionPendingDrain
AsyncErrHandler
AsyncErrHandlerSubDestroyed
AsyncSubscriberStarvation
AsyncSubscriberOnClose
NextMsgCallOnAsyncSub
Expand Down
125 changes: 125 additions & 0 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -14373,6 +14373,130 @@ test_AsyncErrHandler(void)
_stopServer(serverPid);
}

static void
_asyncErrBlockingCb(natsConnection *nc, natsSubscription *sub, natsStatus err, void* closure)
{
struct threadArg *arg = (struct threadArg*) closure;

natsMutex_Lock(arg->m);

arg->sum++;

while ((arg->sum == 1) && !arg->closed)
natsCondition_Wait(arg->c, arg->m);

if (sub != arg->sub)
arg->status = NATS_ERR;

if ((arg->status == NATS_OK) && (err != NATS_SLOW_CONSUMER))
arg->status = NATS_ERR;

if (arg->status == NATS_OK)
{
// Call some subscription API to make sure that the pointer has not been freed.
arg->current = natsSubscription_IsValid(sub);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

arg->done = true;
natsCondition_Signal(arg->c);

natsMutex_Unlock(arg->m);
}

static void
test_AsyncErrHandlerSubDestroyed(void)
{
natsStatus s;
natsConnection *nc = NULL;
natsOptions *opts = NULL;
natsSubscription *sub = NULL;
natsPid serverPid = NATS_INVALID_PID;
natsMsg *msg = NULL;
struct threadArg arg;

s = _createDefaultThreadArgsForCbTests(&arg);
if (s != NATS_OK)
FAIL("Unable to setup test!");

s = natsOptions_Create(&opts);
IFOK(s, natsOptions_SetURL(opts, NATS_DEFAULT_URL));
IFOK(s, natsOptions_SetMaxPendingMsgs(opts, 1));
IFOK(s, natsOptions_SetErrorHandler(opts, _asyncErrBlockingCb, (void*) &arg));

if (s != NATS_OK)
FAIL("Unable to create options for test AsyncErrHandler");

serverPid = _startServer("nats://127.0.0.1:4222", NULL, true);
CHECK_SERVER_STARTED(serverPid);

test("Connect: ");
s = natsConnection_Connect(&nc, opts);
testCond(s == NATS_OK);

test("Create sync sub: ");
s = natsConnection_SubscribeSync(&sub, nc, "foo");
testCond(s == NATS_OK);

natsMutex_Lock(arg.m);
arg.sub = sub;
arg.current = true;
natsMutex_Unlock(arg.m);

test("Cause error: ");
s = natsConnection_PublishString(nc, "foo", "msg1");
IFOK(s, natsConnection_PublishString(nc, "foo", "msg2"));
testCond(s == NATS_OK);

// Wait a bit to make sure that we have a slow consumer.
nats_Sleep(250);

test("Next should be error: ");
s = natsSubscription_NextMsg(&msg, sub, 1000);
testCond((s == NATS_SLOW_CONSUMER) && (msg == NULL));
nats_clearLastError();
s = NATS_OK;

test("Consume 1: ");
s = natsSubscription_NextMsg(&msg, sub, 1000);
testCond(s == NATS_OK);
natsMsg_Destroy(msg);
msg = NULL;

test("Cause error again: ");
s = natsConnection_PublishString(nc, "foo", "msg3");
IFOK(s, natsConnection_PublishString(nc, "foo", "msg4"));
testCond(s == NATS_OK);

test("Wait a bit for async error to be posted: ");
nats_Sleep(200);
testCond(true);

test("Destroy subscription: ");
natsSubscription_Destroy(sub);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


test("Wait for async error callback to return: ");
natsMutex_Lock(arg.m);
// First unblock the first instance
arg.closed = true;
natsCondition_Signal(arg.c);
// Now wait for the callback to have processed both and be done.
while ((s != NATS_TIMEOUT) && !arg.done)
s = natsCondition_TimedWait(arg.c, arg.m, 2000);
// If ok, arg.current should be false since the subscription should
// not be valid (closed) at the second callback iteration.
if ((s == NATS_OK) && arg.current)
s = NATS_ERR;
natsMutex_Unlock(arg.m);
testCond(s == NATS_OK);

natsOptions_Destroy(opts);
natsConnection_Destroy(nc);

_destroyDefaultThreadArgs(&arg);

_stopServer(serverPid);
}

static void
_responseCb(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
{
Expand Down Expand Up @@ -34440,6 +34564,7 @@ static testInfo allTests[] =
{"SyncSubscriptionPending", test_SyncSubscriptionPending},
{"SyncSubscriptionPendingDrain", test_SyncSubscriptionPendingDrain},
{"AsyncErrHandler", test_AsyncErrHandler},
{"AsyncErrHandlerSubDestroyed", test_AsyncErrHandlerSubDestroyed},
{"AsyncSubscriberStarvation", test_AsyncSubscriberStarvation},
{"AsyncSubscriberOnClose", test_AsyncSubscriberOnClose},
{"NextMsgCallOnAsyncSub", test_NextMsgCallOnAsyncSub},
Expand Down