Skip to content

Commit

Permalink
Merge pull request #659 from nats-io/fix_async_err_cbs
Browse files Browse the repository at this point in the history
[FIXED] Possible crash if error handler if subscription was destroyed
  • Loading branch information
levb committed Jun 2, 2023
2 parents 8c02dcc + 56d3d5e commit 585abc6
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 9 deletions.
14 changes: 10 additions & 4 deletions src/asynccb.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "natsp.h"
#include "mem.h"
#include "conn.h"
#include "sub.h"
#if defined(NATS_HAS_STREAMING)
#include "stan/conn.h"
#endif
Expand Down Expand Up @@ -46,6 +47,8 @@ _createAndPostCb(natsAsyncCbType type, natsConnection *nc, natsSubscription *sub
cb->type = type;
cb->nc = nc;
cb->sub = sub;
if (sub != NULL)
natsSub_retain(sub);
cb->err = err;
cb->errTxt = errTxt;
#if defined(NATS_HAS_STREAMING)
Expand Down Expand Up @@ -85,20 +88,23 @@ natsAsyncCb_PostStanConnLostHandler(stanConnection *sc)
void
natsAsyncCb_Destroy(natsAsyncCbInfo *info)
{
natsConnection *nc = NULL;
natsConnection *nc = NULL;
natsSubscription *sub = NULL;
#if defined(NATS_HAS_STREAMING)
stanConnection *sc = NULL;
stanConnection *sc = NULL;
#endif

if (info == NULL)
return;

nc = info->nc;
nc = info->nc;
sub = info->sub;
#if defined(NATS_HAS_STREAMING)
sc = info->sc;
sc = info->sc;
#endif

_freeAsyncCbInfo(info);
natsSub_release(sub);
natsConn_release(nc);
#if defined(NATS_HAS_STREAMING)
stanConn_release(sc);
Expand Down
10 changes: 9 additions & 1 deletion src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -2792,7 +2792,11 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)
}
}

natsSubAndLdw_UnlockAndRelease(sub);
// If we are going to post to the error handler, do not release yet.
if (sc || sm)
natsSubAndLdw_Unlock(sub);
else
natsSubAndLdw_UnlockAndRelease(sub);

if ((s == NATS_OK) && fcReply)
s = natsConnection_Publish(nc, fcReply, NULL, 0);
Expand All @@ -2807,6 +2811,10 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)
nc->err = (sc ? NATS_SLOW_CONSUMER : NATS_MISMATCH);
natsAsyncCb_PostErrHandler(nc, sub, nc->err, NULL);

// Now release the subscription (it has been retained in
// natsAsyncCb_PostErrHandler function).
natsSub_release(sub);

natsConn_Unlock(nc);
}

Expand Down
4 changes: 2 additions & 2 deletions src/crypto.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include <sodium.h>

natsStatus
natsCrypto_Init()
natsCrypto_Init(void)
{
return ((sodium_init() == -1) ? NATS_ERR : NATS_OK);
}
Expand Down Expand Up @@ -485,7 +485,7 @@ secure_memzero(void * const pnt, const size_t len)
}

natsStatus
natsCrypto_Init()
natsCrypto_Init(void)
{
return NATS_OK;
}
Expand Down
4 changes: 2 additions & 2 deletions src/nats.c
Original file line number Diff line number Diff line change
Expand Up @@ -1969,13 +1969,13 @@ natsLib_msgDeliveryAssignWorker(natsSubscription *sub)
}

bool
natsLib_isLibHandlingMsgDeliveryByDefault()
natsLib_isLibHandlingMsgDeliveryByDefault(void)
{
return gLib.libHandlingMsgDeliveryByDefault;
}

int64_t
natsLib_defaultWriteDeadline()
natsLib_defaultWriteDeadline(void)
{
return gLib.libDefaultWriteDeadline;
}
Expand Down
1 change: 1 addition & 0 deletions test/list.txt
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ AsyncSubscriptionPendingDrain
SyncSubscriptionPending
SyncSubscriptionPendingDrain
AsyncErrHandler
AsyncErrHandlerSubDestroyed
AsyncSubscriberStarvation
AsyncSubscriberOnClose
NextMsgCallOnAsyncSub
Expand Down
125 changes: 125 additions & 0 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -14373,6 +14373,130 @@ test_AsyncErrHandler(void)
_stopServer(serverPid);
}

static void
_asyncErrBlockingCb(natsConnection *nc, natsSubscription *sub, natsStatus err, void* closure)
{
struct threadArg *arg = (struct threadArg*) closure;

natsMutex_Lock(arg->m);

arg->sum++;

while ((arg->sum == 1) && !arg->closed)
natsCondition_Wait(arg->c, arg->m);

if (sub != arg->sub)
arg->status = NATS_ERR;

if ((arg->status == NATS_OK) && (err != NATS_SLOW_CONSUMER))
arg->status = NATS_ERR;

if (arg->status == NATS_OK)
{
// Call some subscription API to make sure that the pointer has not been freed.
arg->current = natsSubscription_IsValid(sub);
}

arg->done = true;
natsCondition_Signal(arg->c);

natsMutex_Unlock(arg->m);
}

static void
test_AsyncErrHandlerSubDestroyed(void)
{
natsStatus s;
natsConnection *nc = NULL;
natsOptions *opts = NULL;
natsSubscription *sub = NULL;
natsPid serverPid = NATS_INVALID_PID;
natsMsg *msg = NULL;
struct threadArg arg;

s = _createDefaultThreadArgsForCbTests(&arg);
if (s != NATS_OK)
FAIL("Unable to setup test!");

s = natsOptions_Create(&opts);
IFOK(s, natsOptions_SetURL(opts, NATS_DEFAULT_URL));
IFOK(s, natsOptions_SetMaxPendingMsgs(opts, 1));
IFOK(s, natsOptions_SetErrorHandler(opts, _asyncErrBlockingCb, (void*) &arg));

if (s != NATS_OK)
FAIL("Unable to create options for test AsyncErrHandler");

serverPid = _startServer("nats://127.0.0.1:4222", NULL, true);
CHECK_SERVER_STARTED(serverPid);

test("Connect: ");
s = natsConnection_Connect(&nc, opts);
testCond(s == NATS_OK);

test("Create sync sub: ");
s = natsConnection_SubscribeSync(&sub, nc, "foo");
testCond(s == NATS_OK);

natsMutex_Lock(arg.m);
arg.sub = sub;
arg.current = true;
natsMutex_Unlock(arg.m);

test("Cause error: ");
s = natsConnection_PublishString(nc, "foo", "msg1");
IFOK(s, natsConnection_PublishString(nc, "foo", "msg2"));
testCond(s == NATS_OK);

// Wait a bit to make sure that we have a slow consumer.
nats_Sleep(250);

test("Next should be error: ");
s = natsSubscription_NextMsg(&msg, sub, 1000);
testCond((s == NATS_SLOW_CONSUMER) && (msg == NULL));
nats_clearLastError();
s = NATS_OK;

test("Consume 1: ");
s = natsSubscription_NextMsg(&msg, sub, 1000);
testCond(s == NATS_OK);
natsMsg_Destroy(msg);
msg = NULL;

test("Cause error again: ");
s = natsConnection_PublishString(nc, "foo", "msg3");
IFOK(s, natsConnection_PublishString(nc, "foo", "msg4"));
testCond(s == NATS_OK);

test("Wait a bit for async error to be posted: ");
nats_Sleep(200);
testCond(true);

test("Destroy subscription: ");
natsSubscription_Destroy(sub);

test("Wait for async error callback to return: ");
natsMutex_Lock(arg.m);
// First unblock the first instance
arg.closed = true;
natsCondition_Signal(arg.c);
// Now wait for the callback to have processed both and be done.
while ((s != NATS_TIMEOUT) && !arg.done)
s = natsCondition_TimedWait(arg.c, arg.m, 2000);
// If ok, arg.current should be false since the subscription should
// not be valid (closed) at the second callback iteration.
if ((s == NATS_OK) && arg.current)
s = NATS_ERR;
natsMutex_Unlock(arg.m);
testCond(s == NATS_OK);

natsOptions_Destroy(opts);
natsConnection_Destroy(nc);

_destroyDefaultThreadArgs(&arg);

_stopServer(serverPid);
}

static void
_responseCb(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
{
Expand Down Expand Up @@ -34440,6 +34564,7 @@ static testInfo allTests[] =
{"SyncSubscriptionPending", test_SyncSubscriptionPending},
{"SyncSubscriptionPendingDrain", test_SyncSubscriptionPendingDrain},
{"AsyncErrHandler", test_AsyncErrHandler},
{"AsyncErrHandlerSubDestroyed", test_AsyncErrHandlerSubDestroyed},
{"AsyncSubscriberStarvation", test_AsyncSubscriberStarvation},
{"AsyncSubscriberOnClose", test_AsyncSubscriberOnClose},
{"NextMsgCallOnAsyncSub", test_NextMsgCallOnAsyncSub},
Expand Down

0 comments on commit 585abc6

Please sign in to comment.