diff --git a/src/asynccb.c b/src/asynccb.c index 30b707db0..57d20ceff 100644 --- a/src/asynccb.c +++ b/src/asynccb.c @@ -14,6 +14,7 @@ #include "natsp.h" #include "mem.h" #include "conn.h" +#include "sub.h" #if defined(NATS_HAS_STREAMING) #include "stan/conn.h" #endif @@ -46,6 +47,8 @@ _createAndPostCb(natsAsyncCbType type, natsConnection *nc, natsSubscription *sub cb->type = type; cb->nc = nc; cb->sub = sub; + if (sub != NULL) + natsSub_retain(sub); cb->err = err; cb->errTxt = errTxt; #if defined(NATS_HAS_STREAMING) @@ -85,20 +88,23 @@ natsAsyncCb_PostStanConnLostHandler(stanConnection *sc) void natsAsyncCb_Destroy(natsAsyncCbInfo *info) { - natsConnection *nc = NULL; + natsConnection *nc = NULL; + natsSubscription *sub = NULL; #if defined(NATS_HAS_STREAMING) - stanConnection *sc = NULL; + stanConnection *sc = NULL; #endif if (info == NULL) return; - nc = info->nc; + nc = info->nc; + sub = info->sub; #if defined(NATS_HAS_STREAMING) - sc = info->sc; + sc = info->sc; #endif _freeAsyncCbInfo(info); + natsSub_release(sub); natsConn_release(nc); #if defined(NATS_HAS_STREAMING) stanConn_release(sc); diff --git a/src/conn.c b/src/conn.c index 8e7d66deb..accdb0577 100644 --- a/src/conn.c +++ b/src/conn.c @@ -2792,7 +2792,11 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen) } } - natsSubAndLdw_UnlockAndRelease(sub); + // If we are going to post to the error handler, do not release yet. + if (sc || sm) + natsSubAndLdw_Unlock(sub); + else + natsSubAndLdw_UnlockAndRelease(sub); if ((s == NATS_OK) && fcReply) s = natsConnection_Publish(nc, fcReply, NULL, 0); @@ -2807,6 +2811,10 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen) nc->err = (sc ? NATS_SLOW_CONSUMER : NATS_MISMATCH); natsAsyncCb_PostErrHandler(nc, sub, nc->err, NULL); + // Now release the subscription (it has been retained in + // natsAsyncCb_PostErrHandler function). + natsSub_release(sub); + natsConn_Unlock(nc); } diff --git a/src/crypto.c b/src/crypto.c index ee8aa38fa..23c205c35 100644 --- a/src/crypto.c +++ b/src/crypto.c @@ -17,7 +17,7 @@ #include natsStatus -natsCrypto_Init() +natsCrypto_Init(void) { return ((sodium_init() == -1) ? NATS_ERR : NATS_OK); } @@ -485,7 +485,7 @@ secure_memzero(void * const pnt, const size_t len) } natsStatus -natsCrypto_Init() +natsCrypto_Init(void) { return NATS_OK; } diff --git a/src/nats.c b/src/nats.c index fcaaf0e14..d1b2c7852 100644 --- a/src/nats.c +++ b/src/nats.c @@ -1969,13 +1969,13 @@ natsLib_msgDeliveryAssignWorker(natsSubscription *sub) } bool -natsLib_isLibHandlingMsgDeliveryByDefault() +natsLib_isLibHandlingMsgDeliveryByDefault(void) { return gLib.libHandlingMsgDeliveryByDefault; } int64_t -natsLib_defaultWriteDeadline() +natsLib_defaultWriteDeadline(void) { return gLib.libDefaultWriteDeadline; } diff --git a/test/list.txt b/test/list.txt index 0429b024c..3fda01ac8 100644 --- a/test/list.txt +++ b/test/list.txt @@ -147,6 +147,7 @@ AsyncSubscriptionPendingDrain SyncSubscriptionPending SyncSubscriptionPendingDrain AsyncErrHandler +AsyncErrHandlerSubDestroyed AsyncSubscriberStarvation AsyncSubscriberOnClose NextMsgCallOnAsyncSub diff --git a/test/test.c b/test/test.c index a7cf40ed7..89d54bc52 100644 --- a/test/test.c +++ b/test/test.c @@ -14373,6 +14373,130 @@ test_AsyncErrHandler(void) _stopServer(serverPid); } +static void +_asyncErrBlockingCb(natsConnection *nc, natsSubscription *sub, natsStatus err, void* closure) +{ + struct threadArg *arg = (struct threadArg*) closure; + + natsMutex_Lock(arg->m); + + arg->sum++; + + while ((arg->sum == 1) && !arg->closed) + natsCondition_Wait(arg->c, arg->m); + + if (sub != arg->sub) + arg->status = NATS_ERR; + + if ((arg->status == NATS_OK) && (err != NATS_SLOW_CONSUMER)) + arg->status = NATS_ERR; + + if (arg->status == NATS_OK) + { + // Call some subscription API to make sure that the pointer has not been freed. + arg->current = natsSubscription_IsValid(sub); + } + + arg->done = true; + natsCondition_Signal(arg->c); + + natsMutex_Unlock(arg->m); +} + +static void +test_AsyncErrHandlerSubDestroyed(void) +{ + natsStatus s; + natsConnection *nc = NULL; + natsOptions *opts = NULL; + natsSubscription *sub = NULL; + natsPid serverPid = NATS_INVALID_PID; + natsMsg *msg = NULL; + struct threadArg arg; + + s = _createDefaultThreadArgsForCbTests(&arg); + if (s != NATS_OK) + FAIL("Unable to setup test!"); + + s = natsOptions_Create(&opts); + IFOK(s, natsOptions_SetURL(opts, NATS_DEFAULT_URL)); + IFOK(s, natsOptions_SetMaxPendingMsgs(opts, 1)); + IFOK(s, natsOptions_SetErrorHandler(opts, _asyncErrBlockingCb, (void*) &arg)); + + if (s != NATS_OK) + FAIL("Unable to create options for test AsyncErrHandler"); + + serverPid = _startServer("nats://127.0.0.1:4222", NULL, true); + CHECK_SERVER_STARTED(serverPid); + + test("Connect: "); + s = natsConnection_Connect(&nc, opts); + testCond(s == NATS_OK); + + test("Create sync sub: "); + s = natsConnection_SubscribeSync(&sub, nc, "foo"); + testCond(s == NATS_OK); + + natsMutex_Lock(arg.m); + arg.sub = sub; + arg.current = true; + natsMutex_Unlock(arg.m); + + test("Cause error: "); + s = natsConnection_PublishString(nc, "foo", "msg1"); + IFOK(s, natsConnection_PublishString(nc, "foo", "msg2")); + testCond(s == NATS_OK); + + // Wait a bit to make sure that we have a slow consumer. + nats_Sleep(250); + + test("Next should be error: "); + s = natsSubscription_NextMsg(&msg, sub, 1000); + testCond((s == NATS_SLOW_CONSUMER) && (msg == NULL)); + nats_clearLastError(); + s = NATS_OK; + + test("Consume 1: "); + s = natsSubscription_NextMsg(&msg, sub, 1000); + testCond(s == NATS_OK); + natsMsg_Destroy(msg); + msg = NULL; + + test("Cause error again: "); + s = natsConnection_PublishString(nc, "foo", "msg3"); + IFOK(s, natsConnection_PublishString(nc, "foo", "msg4")); + testCond(s == NATS_OK); + + test("Wait a bit for async error to be posted: "); + nats_Sleep(200); + testCond(true); + + test("Destroy subscription: "); + natsSubscription_Destroy(sub); + + test("Wait for async error callback to return: "); + natsMutex_Lock(arg.m); + // First unblock the first instance + arg.closed = true; + natsCondition_Signal(arg.c); + // Now wait for the callback to have processed both and be done. + while ((s != NATS_TIMEOUT) && !arg.done) + s = natsCondition_TimedWait(arg.c, arg.m, 2000); + // If ok, arg.current should be false since the subscription should + // not be valid (closed) at the second callback iteration. + if ((s == NATS_OK) && arg.current) + s = NATS_ERR; + natsMutex_Unlock(arg.m); + testCond(s == NATS_OK); + + natsOptions_Destroy(opts); + natsConnection_Destroy(nc); + + _destroyDefaultThreadArgs(&arg); + + _stopServer(serverPid); +} + static void _responseCb(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure) { @@ -34440,6 +34564,7 @@ static testInfo allTests[] = {"SyncSubscriptionPending", test_SyncSubscriptionPending}, {"SyncSubscriptionPendingDrain", test_SyncSubscriptionPendingDrain}, {"AsyncErrHandler", test_AsyncErrHandler}, + {"AsyncErrHandlerSubDestroyed", test_AsyncErrHandlerSubDestroyed}, {"AsyncSubscriberStarvation", test_AsyncSubscriberStarvation}, {"AsyncSubscriberOnClose", test_AsyncSubscriberOnClose}, {"NextMsgCallOnAsyncSub", test_NextMsgCallOnAsyncSub},