From 56d3d5e34536cbd1b55e06be8216930d1dd9d13c Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 1 Jun 2023 15:34:27 -0600 Subject: [PATCH] [FIXED] Possible crash if error handler if subscription was destroyed Since error handler is invoked asynchronously, if the user had destroyed the subscription, it was possible that the error handler would be presented with a subscription that was already freed. Signed-off-by: Ivan Kozlovic --- src/asynccb.c | 14 ++++-- src/conn.c | 10 +++- src/crypto.c | 4 +- src/nats.c | 4 +- test/list.txt | 1 + test/test.c | 125 ++++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 149 insertions(+), 9 deletions(-) diff --git a/src/asynccb.c b/src/asynccb.c index 30b707db0..57d20ceff 100644 --- a/src/asynccb.c +++ b/src/asynccb.c @@ -14,6 +14,7 @@ #include "natsp.h" #include "mem.h" #include "conn.h" +#include "sub.h" #if defined(NATS_HAS_STREAMING) #include "stan/conn.h" #endif @@ -46,6 +47,8 @@ _createAndPostCb(natsAsyncCbType type, natsConnection *nc, natsSubscription *sub cb->type = type; cb->nc = nc; cb->sub = sub; + if (sub != NULL) + natsSub_retain(sub); cb->err = err; cb->errTxt = errTxt; #if defined(NATS_HAS_STREAMING) @@ -85,20 +88,23 @@ natsAsyncCb_PostStanConnLostHandler(stanConnection *sc) void natsAsyncCb_Destroy(natsAsyncCbInfo *info) { - natsConnection *nc = NULL; + natsConnection *nc = NULL; + natsSubscription *sub = NULL; #if defined(NATS_HAS_STREAMING) - stanConnection *sc = NULL; + stanConnection *sc = NULL; #endif if (info == NULL) return; - nc = info->nc; + nc = info->nc; + sub = info->sub; #if defined(NATS_HAS_STREAMING) - sc = info->sc; + sc = info->sc; #endif _freeAsyncCbInfo(info); + natsSub_release(sub); natsConn_release(nc); #if defined(NATS_HAS_STREAMING) stanConn_release(sc); diff --git a/src/conn.c b/src/conn.c index 8e7d66deb..accdb0577 100644 --- a/src/conn.c +++ b/src/conn.c @@ -2792,7 +2792,11 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen) } } - natsSubAndLdw_UnlockAndRelease(sub); + // If we are going to post to the error handler, do not release yet. + if (sc || sm) + natsSubAndLdw_Unlock(sub); + else + natsSubAndLdw_UnlockAndRelease(sub); if ((s == NATS_OK) && fcReply) s = natsConnection_Publish(nc, fcReply, NULL, 0); @@ -2807,6 +2811,10 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen) nc->err = (sc ? NATS_SLOW_CONSUMER : NATS_MISMATCH); natsAsyncCb_PostErrHandler(nc, sub, nc->err, NULL); + // Now release the subscription (it has been retained in + // natsAsyncCb_PostErrHandler function). + natsSub_release(sub); + natsConn_Unlock(nc); } diff --git a/src/crypto.c b/src/crypto.c index ee8aa38fa..23c205c35 100644 --- a/src/crypto.c +++ b/src/crypto.c @@ -17,7 +17,7 @@ #include natsStatus -natsCrypto_Init() +natsCrypto_Init(void) { return ((sodium_init() == -1) ? NATS_ERR : NATS_OK); } @@ -485,7 +485,7 @@ secure_memzero(void * const pnt, const size_t len) } natsStatus -natsCrypto_Init() +natsCrypto_Init(void) { return NATS_OK; } diff --git a/src/nats.c b/src/nats.c index fcaaf0e14..d1b2c7852 100644 --- a/src/nats.c +++ b/src/nats.c @@ -1969,13 +1969,13 @@ natsLib_msgDeliveryAssignWorker(natsSubscription *sub) } bool -natsLib_isLibHandlingMsgDeliveryByDefault() +natsLib_isLibHandlingMsgDeliveryByDefault(void) { return gLib.libHandlingMsgDeliveryByDefault; } int64_t -natsLib_defaultWriteDeadline() +natsLib_defaultWriteDeadline(void) { return gLib.libDefaultWriteDeadline; } diff --git a/test/list.txt b/test/list.txt index 0429b024c..3fda01ac8 100644 --- a/test/list.txt +++ b/test/list.txt @@ -147,6 +147,7 @@ AsyncSubscriptionPendingDrain SyncSubscriptionPending SyncSubscriptionPendingDrain AsyncErrHandler +AsyncErrHandlerSubDestroyed AsyncSubscriberStarvation AsyncSubscriberOnClose NextMsgCallOnAsyncSub diff --git a/test/test.c b/test/test.c index a7cf40ed7..89d54bc52 100644 --- a/test/test.c +++ b/test/test.c @@ -14373,6 +14373,130 @@ test_AsyncErrHandler(void) _stopServer(serverPid); } +static void +_asyncErrBlockingCb(natsConnection *nc, natsSubscription *sub, natsStatus err, void* closure) +{ + struct threadArg *arg = (struct threadArg*) closure; + + natsMutex_Lock(arg->m); + + arg->sum++; + + while ((arg->sum == 1) && !arg->closed) + natsCondition_Wait(arg->c, arg->m); + + if (sub != arg->sub) + arg->status = NATS_ERR; + + if ((arg->status == NATS_OK) && (err != NATS_SLOW_CONSUMER)) + arg->status = NATS_ERR; + + if (arg->status == NATS_OK) + { + // Call some subscription API to make sure that the pointer has not been freed. + arg->current = natsSubscription_IsValid(sub); + } + + arg->done = true; + natsCondition_Signal(arg->c); + + natsMutex_Unlock(arg->m); +} + +static void +test_AsyncErrHandlerSubDestroyed(void) +{ + natsStatus s; + natsConnection *nc = NULL; + natsOptions *opts = NULL; + natsSubscription *sub = NULL; + natsPid serverPid = NATS_INVALID_PID; + natsMsg *msg = NULL; + struct threadArg arg; + + s = _createDefaultThreadArgsForCbTests(&arg); + if (s != NATS_OK) + FAIL("Unable to setup test!"); + + s = natsOptions_Create(&opts); + IFOK(s, natsOptions_SetURL(opts, NATS_DEFAULT_URL)); + IFOK(s, natsOptions_SetMaxPendingMsgs(opts, 1)); + IFOK(s, natsOptions_SetErrorHandler(opts, _asyncErrBlockingCb, (void*) &arg)); + + if (s != NATS_OK) + FAIL("Unable to create options for test AsyncErrHandler"); + + serverPid = _startServer("nats://127.0.0.1:4222", NULL, true); + CHECK_SERVER_STARTED(serverPid); + + test("Connect: "); + s = natsConnection_Connect(&nc, opts); + testCond(s == NATS_OK); + + test("Create sync sub: "); + s = natsConnection_SubscribeSync(&sub, nc, "foo"); + testCond(s == NATS_OK); + + natsMutex_Lock(arg.m); + arg.sub = sub; + arg.current = true; + natsMutex_Unlock(arg.m); + + test("Cause error: "); + s = natsConnection_PublishString(nc, "foo", "msg1"); + IFOK(s, natsConnection_PublishString(nc, "foo", "msg2")); + testCond(s == NATS_OK); + + // Wait a bit to make sure that we have a slow consumer. + nats_Sleep(250); + + test("Next should be error: "); + s = natsSubscription_NextMsg(&msg, sub, 1000); + testCond((s == NATS_SLOW_CONSUMER) && (msg == NULL)); + nats_clearLastError(); + s = NATS_OK; + + test("Consume 1: "); + s = natsSubscription_NextMsg(&msg, sub, 1000); + testCond(s == NATS_OK); + natsMsg_Destroy(msg); + msg = NULL; + + test("Cause error again: "); + s = natsConnection_PublishString(nc, "foo", "msg3"); + IFOK(s, natsConnection_PublishString(nc, "foo", "msg4")); + testCond(s == NATS_OK); + + test("Wait a bit for async error to be posted: "); + nats_Sleep(200); + testCond(true); + + test("Destroy subscription: "); + natsSubscription_Destroy(sub); + + test("Wait for async error callback to return: "); + natsMutex_Lock(arg.m); + // First unblock the first instance + arg.closed = true; + natsCondition_Signal(arg.c); + // Now wait for the callback to have processed both and be done. + while ((s != NATS_TIMEOUT) && !arg.done) + s = natsCondition_TimedWait(arg.c, arg.m, 2000); + // If ok, arg.current should be false since the subscription should + // not be valid (closed) at the second callback iteration. + if ((s == NATS_OK) && arg.current) + s = NATS_ERR; + natsMutex_Unlock(arg.m); + testCond(s == NATS_OK); + + natsOptions_Destroy(opts); + natsConnection_Destroy(nc); + + _destroyDefaultThreadArgs(&arg); + + _stopServer(serverPid); +} + static void _responseCb(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure) { @@ -34440,6 +34564,7 @@ static testInfo allTests[] = {"SyncSubscriptionPending", test_SyncSubscriptionPending}, {"SyncSubscriptionPendingDrain", test_SyncSubscriptionPendingDrain}, {"AsyncErrHandler", test_AsyncErrHandler}, + {"AsyncErrHandlerSubDestroyed", test_AsyncErrHandlerSubDestroyed}, {"AsyncSubscriberStarvation", test_AsyncSubscriberStarvation}, {"AsyncSubscriberOnClose", test_AsyncSubscriberOnClose}, {"NextMsgCallOnAsyncSub", test_NextMsgCallOnAsyncSub},