diff --git a/src/conn.c b/src/conn.c index 5d03f96eb..1644e83f4 100644 --- a/src/conn.c +++ b/src/conn.c @@ -2663,7 +2663,9 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen) return NATS_OK; } - if ((jsi = sub->jsi) != NULL) + jsi = sub->jsi; + // For JS subscriptions (but not pull ones), handle hearbeat and flow control here. + if (jsi && !jsi->pull) { ctrlMsg = natsMsg_isJSCtrl(msg, &jct); if (ctrlMsg && jct == jsCtrlHeartbeat) diff --git a/src/js.c b/src/js.c index ab8bf16c0..a88bd2b4f 100644 --- a/src/js.c +++ b/src/js.c @@ -49,6 +49,10 @@ const int64_t jsOrderedHBInterval = NATS_SECONDS_TO_NANOS(5); #define jsLastConsumerSeqHdr "Nats-Last-Consumer" +// Forward declarations +static void _hbTimerFired(natsTimer *timer, void* closure); +static void _hbTimerStopped(natsTimer *timer, void* closure); + typedef struct __jsOrderedConsInfo { int64_t osid; @@ -1232,6 +1236,11 @@ jsSub_free(jsSub *jsi) js = jsi->js; natsTimer_Destroy(jsi->hbTimer); + if (jsi->mhMsg != NULL) + { + natsMsg_clearNoDestroy(jsi->mhMsg); + natsMsg_Destroy(jsi->mhMsg); + } NATS_FREE(jsi->stream); NATS_FREE(jsi->consumer); NATS_FREE(jsi->nxtMsgSubj); @@ -1509,12 +1518,12 @@ jsSub_trackSequences(jsSub *jsi, const char *reply) { natsStatus s = NATS_OK; - if ((reply == NULL) || (strstr(reply, jsAckPrefix) != reply)) - return NATS_OK; - // Data is equivalent to HB, so consider active. jsi->active = true; + if ((reply == NULL) || (strstr(reply, jsAckPrefix) != reply)) + return NATS_OK; + // Keep track of inbound message "sequence" for flow control purposes. jsi->fciseq++; @@ -1682,12 +1691,19 @@ jsSub_scheduleFlowControlResponse(jsSub *jsi, const char *reply) } static natsStatus -_checkMsg(natsMsg *msg, bool checkSts, bool *usrMsg) +_checkMsg(natsMsg *msg, bool checkSts, bool *usrMsg, natsMsg *mhMsg) { natsStatus s = NATS_OK; const char *val = NULL; const char *desc= NULL; + // Check for missed heartbeat special message + if (msg == mhMsg) + { + *usrMsg = false; + return NATS_MISSED_HEARTBEAT; + } + *usrMsg = true; if ((msg->dataLen > 0) || (msg->hdrLen <= 0)) @@ -1708,6 +1724,10 @@ _checkMsg(natsMsg *msg, bool checkSts, bool *usrMsg) if (!checkSts) return NATS_OK; + // 100 Idle hearbeat, return OK + if (strncmp(val, CTRL_STATUS, HDR_STATUS_LEN) == 0) + return NATS_OK; + // 404 indicating that there are no messages. if (strncmp(val, NOT_FOUND_STATUS, HDR_STATUS_LEN) == 0) return NATS_NOT_FOUND; @@ -1725,24 +1745,25 @@ _checkMsg(natsMsg *msg, bool checkSts, bool *usrMsg) static natsStatus _sendPullRequest(natsConnection *nc, const char *subj, const char *rply, - natsBuffer *buf, int batchSize, int64_t timeout, bool noWait) + natsBuffer *buf, jsFetchRequest *req) { natsStatus s; int64_t expires; - // Make our request expiration a bit shorter than the - // current timeout. - expires = (timeout >= 20 ? timeout - 10 : timeout); - - // Since "expires" is a Go time.Duration and our timeout - // is in milliseconds, convert it to nanos. - expires *= 1000000; + // Make our request expiration a bit shorter than user provided expiration. + expires = (req->Expires >= (int64_t) 20E6 ? req->Expires - (int64_t) 10E6 : req->Expires); natsBuf_Reset(buf); s = natsBuf_AppendByte(buf, '{'); - IFOK(s, nats_marshalLong(buf, false, "batch", (int64_t) batchSize)); - IFOK(s, nats_marshalLong(buf, true, "expires", expires)); - if ((s == NATS_OK) && noWait) + // Currently, Batch is required, so will always be > 0 + IFOK(s, nats_marshalLong(buf, false, "batch", (int64_t) req->Batch)); + if ((s == NATS_OK) && (req->MaxBytes > 0)) + s = nats_marshalLong(buf, true, "max_bytes", req->MaxBytes); + if ((s == NATS_OK) && (expires > 0)) + s = nats_marshalLong(buf, true, "expires", expires); + if ((s == NATS_OK) && (req->Heartbeat > 0)) + s = nats_marshalLong(buf, true, "idle_heartbeat", req->Heartbeat); + if ((s == NATS_OK) && req->NoWait) s = natsBuf_Append(buf, ",\"no_wait\":true", -1); IFOK(s, natsBuf_AppendByte(buf, '}')); @@ -1754,8 +1775,7 @@ _sendPullRequest(natsConnection *nc, const char *subj, const char *rply, } natsStatus -natsSubscription_Fetch(natsMsgList *list, natsSubscription *sub, int batch, int64_t timeout, - jsErrCode *errCode) +_fetch(natsMsgList *list, natsSubscription *sub, jsFetchRequest *req, bool simpleFetch) { natsStatus s = NATS_OK; natsMsg **msgs = NULL; @@ -1766,29 +1786,39 @@ natsSubscription_Fetch(natsMsgList *list, natsSubscription *sub, int batch, int6 int pmc = 0; char buffer[64]; natsBuffer buf; - int64_t start; - - if (errCode != NULL) - *errCode = 0; + int64_t start = 0; + int64_t deadline = 0; + int64_t timeout = 0; + int size = 0; + bool sendReq = true; + jsSub *jsi = NULL; + natsMsg *mhMsg = NULL; + bool noWait; if (list == NULL) return nats_setDefaultError(NATS_INVALID_ARG); memset(list, 0, sizeof(natsMsgList)); - if ((sub == NULL) || (batch <= 0)) + if ((sub == NULL) || (req == NULL) || (req->Batch <= 0) || (req->MaxBytes < 0)) return nats_setDefaultError(NATS_INVALID_ARG); - if (timeout <= 0) + if (!req->NoWait && req->Expires <= 0) return nats_setDefaultError(NATS_INVALID_TIMEOUT); natsSub_Lock(sub); - if ((sub->jsi == NULL) || !sub->jsi->pull) + jsi = sub->jsi; + if ((jsi == NULL) || !jsi->pull) { natsSub_Unlock(sub); return nats_setError(NATS_INVALID_SUBSCRIPTION, "%s", jsErrNotAPullSubscription); } - msgs = (natsMsg**) NATS_CALLOC(batch, sizeof(natsMsg*)); + if (jsi->inFetch) + { + natsSub_Unlock(sub); + return nats_setError(NATS_ERR, "%s", jsErrConcurrentFetchNotAllowed); + } + msgs = (natsMsg**) NATS_CALLOC(req->Batch, sizeof(natsMsg*)); if (msgs == NULL) { natsSub_Unlock(sub); @@ -1797,15 +1827,41 @@ natsSubscription_Fetch(natsMsgList *list, natsSubscription *sub, int batch, int6 natsBuf_InitWithBackend(&buf, buffer, 0, sizeof(buffer)); nc = sub->conn; rply = (const char*) sub->subject; - subj = sub->jsi->nxtMsgSubj; + subj = jsi->nxtMsgSubj; pmc = (sub->msgList.msgs > 0); + jsi->inFetch = true; + if (req->Heartbeat) + { + int64_t hbi = req->Heartbeat / 1000000; + sub->refs++; + if (jsi->hbTimer == NULL) + { + s = natsMsg_create(&jsi->mhMsg, NULL, 0, NULL, 0, NULL, 0, -1); + if (s == NATS_OK) + { + natsMsg_setNoDestroy(jsi->mhMsg); + s = natsTimer_Create(&jsi->hbTimer, _hbTimerFired, _hbTimerStopped, hbi*2, (void*) sub); + } + if (s != NATS_OK) + sub->refs--; + } + else + natsTimer_Reset(jsi->hbTimer, hbi); + + mhMsg = jsi->mhMsg; + } natsSub_Unlock(sub); - start = nats_Now(); + if (req->Expires > 0) + { + start = nats_Now(); + timeout = req->Expires / (int64_t) 1E6; + deadline = start + timeout; + } // First, if there are already pending messages in the internal sub, // then get as much messages as we can (but not more than the batch). - while (pmc && (s == NATS_OK) && (count < batch)) + while (pmc && (s == NATS_OK) && (count < req->Batch) && ((req->MaxBytes == 0) || (size < req->MaxBytes))) { natsMsg *msg = NULL; bool usrMsg= false; @@ -1814,66 +1870,76 @@ natsSubscription_Fetch(natsMsgList *list, natsSubscription *sub, int batch, int6 // but will not wait (and return NATS_TIMEOUT without updating // the error stack) if there are no messages. s = natsSub_nextMsg(&msg, sub, 0, true); + if (s == NATS_TIMEOUT) + { + s = NATS_OK; + break; + } if (s == NATS_OK) { // Here we care only about user messages. - s = _checkMsg(msg, false, &usrMsg); + s = _checkMsg(msg, false, &usrMsg, mhMsg); if ((s == NATS_OK) && usrMsg) + { msgs[count++] = msg; + size += msg->wsz; + } else natsMsg_Destroy(msg); } } + if (s == NATS_OK) + { + // If we come from natsSubscription_Fetch() (simpleFetch is true), then + // we decide on the NoWait value. + if (simpleFetch) + noWait = (req->Batch - count > 1 ? true : false); + else + noWait = req->NoWait; + } - // If we have OK or TIMEOUT and not all messages, we will send a fetch + // If we have OK and not all messages, we will send a fetch // request to the server. - if (((s == NATS_OK) || (s == NATS_TIMEOUT)) && (count != batch)) + while ((s == NATS_OK) && (count != req->Batch) && ((req->MaxBytes == 0) || (size < req->MaxBytes))) { - bool sendReq = true; - bool doNoWait = (batch-count > 1 ? true : false); + natsMsg *msg = NULL; + bool usrMsg = false; - // Reset status in case we entered with timeout - s = NATS_OK; - - // Now wait for messages or a 404 saying that there are no more. - while ((s == NATS_OK) && (count < batch)) + if (req->Expires > 0) { - natsMsg *msg = NULL; - bool usrMsg = false; - - timeout -= (nats_Now()-start); + timeout = deadline - nats_Now(); if (timeout <= 0) s = NATS_TIMEOUT; + } - if ((s == NATS_OK) && sendReq) + if ((s == NATS_OK) && sendReq) + { + sendReq = false; + req->Batch = req->Batch - (int64_t) count; + req->Expires = NATS_MILLIS_TO_NANOS(timeout); + req->NoWait = noWait; + s = _sendPullRequest(nc, subj, rply, &buf, req); + } + IFOK(s, natsSub_nextMsg(&msg, sub, timeout, true)); + if (s == NATS_OK) + { + s = _checkMsg(msg, true, &usrMsg, mhMsg); + if ((s == NATS_OK) && usrMsg) { - sendReq = false; - s = _sendPullRequest(nc, subj, rply, &buf, batch-count, timeout, doNoWait); + msgs[count++] = msg; + size += msg->wsz; } - IFOK(s, natsSub_nextMsg(&msg, sub, timeout, true)); - if (s == NATS_OK) + else { - s = _checkMsg(msg, true, &usrMsg); - if ((s == NATS_OK) && usrMsg) - msgs[count++] = msg; - else + natsMsg_Destroy(msg); + // If we come from "simpleFetch" and we have a 404 for + // the noWait request and have not collected any message, + // then resend the request and ask to wait this time. + if (simpleFetch && noWait && (s == NATS_NOT_FOUND) && (count == 0)) { - natsMsg_Destroy(msg); - // If we have a 404 for our "no_wait" request and have - // not collected any message, then resend request to - // wait this time. - if (doNoWait && (s == NATS_NOT_FOUND) && (count == 0)) - { - s = NATS_OK; - doNoWait = false; - sendReq = true; - } - else if ((s == NATS_TIMEOUT) && (count == 0)) - { - // If we get a 408, we will bail if we already collected some - // messages, otherwise ignore and go back calling nextMsg. - s = NATS_OK; - } + s = NATS_OK; + noWait = false; + sendReq = true; } } } @@ -1888,17 +1954,61 @@ natsSubscription_Fetch(natsMsgList *list, natsSubscription *sub, int batch, int6 // If there was an error, we need to clear the error stack, // since we return NATS_OK. if (s != NATS_OK) + { nats_clearLastError(); + s = NATS_OK; + } // Update the list with what we have collected. list->Msgs = msgs; list->Count = count; - - return NATS_OK; } - NATS_FREE(msgs); + if (s != NATS_OK) + NATS_FREE(msgs); + + natsSub_Lock(sub); + jsi->inFetch = false; + if (req->Heartbeat && (jsi->hbTimer != NULL)) + natsTimer_Stop(jsi->hbTimer); + natsSub_Unlock(sub); + + return NATS_UPDATE_ERR_STACK(s); +} + +natsStatus +jsFetchRequest_Init(jsFetchRequest *request) +{ + if (request == NULL) + return nats_setDefaultError(NATS_INVALID_ARG); + + memset(request, 0, sizeof(jsFetchRequest)); + return NATS_OK; +} + +natsStatus +natsSubscription_Fetch(natsMsgList *list, natsSubscription *sub, int batch, int64_t timeout, + jsErrCode *errCode) +{ + natsStatus s; + jsFetchRequest req; + + if (errCode != NULL) + *errCode = 0; + + jsFetchRequest_Init(&req); + req.Batch = batch; + req.Expires = NATS_MILLIS_TO_NANOS(timeout); + s = _fetch(list, sub, &req, true); + return NATS_UPDATE_ERR_STACK(s); +} + +natsStatus +natsSubscription_FetchRequest(natsMsgList *list, natsSubscription *sub, jsFetchRequest *req) +{ + natsStatus s; + s = _fetch(list, sub, req, false); return NATS_UPDATE_ERR_STACK(s); } @@ -1910,11 +2020,28 @@ _hbTimerFired(natsTimer *timer, void* closure) bool alert= false; natsConnection *nc = NULL; - natsSubAndLdw_Lock(sub); + natsSub_Lock(sub); alert = !jsi->active; jsi->active = false; + if (alert && jsi->pull) + { + // If there are messages pending then we can't really consider + // that we missed hearbeats. Wait for those to be processed and + // we will check missed HBs again. + if (sub->msgList.msgs == 0) + { + sub->msgList.msgs++; + sub->msgList.head = jsi->mhMsg; + sub->msgList.tail = jsi->mhMsg; + sub->msgList.bytes = natsMsg_dataAndHdrLen(jsi->mhMsg); + natsCondition_Signal(sub->cond); + natsTimer_Stop(timer); + } + natsSub_Unlock(sub); + return; + } nc = sub->conn; - natsSubAndLdw_Unlock(sub); + natsSub_Unlock(sub); if (!alert) return; @@ -2394,7 +2521,7 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha // cb/cbClosure will be NULL for sync or pull subscriptions. IFOK(s, natsConn_subscribeImpl(&sub, nc, true, deliver, opts->Queue, 0, cb, cbClosure, false, jsi)); - if ((s == NATS_OK) && (hbi > 0)) + if ((s == NATS_OK) && (hbi > 0) && !isPullMode) { bool ct = false; // create timer or not. diff --git a/src/js.h b/src/js.h index 6cbdd1113..92f7c0a62 100644 --- a/src/js.h +++ b/src/js.h @@ -65,6 +65,7 @@ extern const int64_t jsDefaultRequestWait; #define jsErrConsumerConfigRequired "consumer configuration required" #define jsErrInvalidDurableName "invalid durable name" #define jsErrInvalidConsumerName "invalid consumer name" +#define jsErrConcurrentFetchNotAllowed "concurrent fetch request not allowed" #define jsCtrlHeartbeat (1) #define jsCtrlFlowControl (2) diff --git a/src/jsm.c b/src/jsm.c index 5ecae934d..2e72a3e31 100644 --- a/src/jsm.c +++ b/src/jsm.c @@ -2042,6 +2042,8 @@ _marshalConsumerCreateReq(natsBuffer **new_buf, const char *stream, jsConsumerCo s = nats_marshalLong(buf, true, "max_batch", cfg->MaxRequestBatch); if ((s == NATS_OK) && (cfg->MaxRequestExpires > 0)) s = nats_marshalLong(buf, true, "max_expires", cfg->MaxRequestExpires); + if ((s == NATS_OK) && (cfg->MaxRequestMaxBytes > 0)) + s = nats_marshalLong(buf, true, "max_bytes", cfg->MaxRequestMaxBytes); if ((s == NATS_OK) && (cfg->InactiveThreshold > 0)) s = nats_marshalLong(buf, true, "inactive_threshold", cfg->InactiveThreshold); if ((s == NATS_OK) && (cfg->BackOff != NULL) && (cfg->BackOffLen > 0)) @@ -2200,6 +2202,7 @@ _unmarshalConsumerConfig(nats_JSON *json, const char *fieldName, jsConsumerConfi IFOK(s, nats_JSONGetBool(cjson, "headers_only", &(cc->HeadersOnly))); IFOK(s, nats_JSONGetLong(cjson, "max_batch", &(cc->MaxRequestBatch))); IFOK(s, nats_JSONGetLong(cjson, "max_expires", &(cc->MaxRequestExpires))); + IFOK(s, nats_JSONGetLong(cjson, "max_bytes", &(cc->MaxRequestMaxBytes))); IFOK(s, nats_JSONGetLong(cjson, "inactive_threshold", &(cc->InactiveThreshold))); IFOK(s, nats_JSONGetArrayLong(cjson, "backoff", &(cc->BackOff), &(cc->BackOffLen))); IFOK(s, nats_JSONGetLong(cjson, "num_replicas", &(cc->Replicas))); diff --git a/src/msg.c b/src/msg.c index a344852a5..1eaba6f6b 100644 --- a/src/msg.c +++ b/src/msg.c @@ -829,6 +829,10 @@ natsMsg_create(natsMsg **newMsg, memcpy(ptr, buf, dataLen); ptr += dataLen; *(ptr) = '\0'; + // This is essentially to match server's view of a message size + // when sending messages to pull consumers and keeping track + // of size in regards to a max_bytes setting. + msg->wsz = subjLen + replyLen + bufLen; // Setting the callback will trigger garbage collection when // natsMsg_Destroy() is invoked. diff --git a/src/msg.h b/src/msg.h index 663f9a789..e01da05fa 100644 --- a/src/msg.h +++ b/src/msg.h @@ -62,6 +62,7 @@ struct __natsMsg const char *data; int dataLen; int hdrLen; + int wsz; int flags; uint64_t seq; int64_t time; diff --git a/src/nats.h b/src/nats.h index f9bd96a5e..0d2005900 100644 --- a/src/nats.h +++ b/src/nats.h @@ -708,6 +708,7 @@ typedef struct jsConsumerConfig // Pull based options. int64_t MaxRequestBatch; ///< Maximum Pull Consumer request batch size. int64_t MaxRequestExpires; ///< Maximum Pull Consumer request expiration, expressed in number of nanoseconds. + int64_t MaxRequestMaxBytes; ///< Maximum Pull Consumer request maximum bytes. // Push based options. const char *DeliverSubject; @@ -973,6 +974,21 @@ typedef struct jsDirectGetMsgOptions } jsDirectGetMsgOptions; +/** + * Options for the natsSubscription_FetchRequest() call, which is + * similar to natsSubscription_Fetch() but gives more control in + * the configuration of the fetch. + */ +typedef struct jsFetchRequest +{ + int64_t Expires; ///< Expiration of the request, expressed in nanoseconds + int Batch; ///< Maximum number of messages to be received (see MaxBytes) + int64_t MaxBytes; ///< Maximum bytes for the request (request complete based on whichever Batch or MaxBytes comes first) + bool NoWait; ///< Will not wait if the request cannot be completed + int64_t Heartbeat; ///< Have server sends heartbeats to help detect communication failures + +} jsFetchRequest; + /** * JetStream context options. * @@ -5997,6 +6013,30 @@ NATS_EXTERN natsStatus natsSubscription_Fetch(natsMsgList *list, natsSubscription *sub, int batch, int64_t timeout, jsErrCode *errCode); +/** \brief Initializes a fetch request options structure. + * + * Use this before setting specific fetch options and passing it to #natsSubscription_FetchRequest. + * + * @param request the pointer to the #jsFetchRequest object. + */ +NATS_EXTERN natsStatus +jsFetchRequest_Init(jsFetchRequest *request); + +/** \brief Fetches messages for a pull subscription with a complete request configuration + * + * Similar to #natsSubscription_Fetch but a full #jsFetchRequest configuration is provided + * for maximum control. + * + * Initialize the #jsFetchRequest structure using #jsFetchRequest_Init and then set + * the parameters desired, then invoke this function. + * + * @param list the location to a #natsMsgList that will be filled by the result of this call. + * @param sub the pointer to the #natsSubscription object. + * @param request the pointer to a #jsFetchRequest configuration. + */ +NATS_EXTERN natsStatus +natsSubscription_FetchRequest(natsMsgList *list, natsSubscription *sub, jsFetchRequest *request); + /** \brief Returns the jsConsumerInfo associated with this subscription. * * Returns the #jsConsumerInfo associated with this subscription. diff --git a/src/natsp.h b/src/natsp.h index 34e195ad4..d5fdcdf8a 100644 --- a/src/natsp.h +++ b/src/natsp.h @@ -365,6 +365,7 @@ typedef struct __jsSub char *consumer; char *nxtMsgSubj; bool pull; + bool inFetch; bool ordered; bool dc; // delete JS consumer in Unsub()/Drain() bool ackNone; @@ -378,8 +379,9 @@ typedef struct __jsSub uint64_t pending; int64_t hbi; - int64_t active; + bool active; natsTimer *hbTimer; + natsMsg *mhMsg; char *cmeta; uint64_t sseq; diff --git a/test/test.c b/test/test.c index b448492e7..b1211f6cc 100644 --- a/test/test.c +++ b/test/test.c @@ -21844,10 +21844,9 @@ test_JetStreamUnmarshalConsumerInfo(void) "{\"config\":{\"headers_only\":true}}", "{\"config\":{\"max_batch\":1}}", "{\"config\":{\"max_expires\":123456789}}", + "{\"config\":{\"max_bytes\":1024}}", "{\"config\":{\"inactive_threshold\":123456789}}", "{\"config\":{\"backoff\":[50000000,250000000]}}", - "{\"config\":{\"max_batch\":100}}", - "{\"config\":{\"max_expires\":1000000000}}", "{\"config\":{\"num_replicas\":1}}", "{\"config\":{\"mem_storage\":true}}", }; @@ -21877,10 +21876,12 @@ test_JetStreamUnmarshalConsumerInfo(void) "{\"config\":{\"headers_only\":123}}", "{\"config\":{\"max_batch\":\"1\"}}", "{\"config\":{\"max_expires\":\"123456789\"}}", + "{\"config\":{\"max_bytes\":\"123456789\"}}", "{\"config\":{\"inactive_threshold\":\"123456789\"}}", "{\"config\":{\"backoff\":true}}", "{\"config\":{\"max_batch\":\"abc\"}}", "{\"config\":{\"max_expires\":false}}", + "{\"config\":{\"max_bytes\":false}}", "{\"config\":{\"mem_storage\":\"abc\"}}", "{\"delivered\":123}", "{\"delivered\":{\"consumer_seq\":\"abc\"}}", @@ -26616,6 +26617,7 @@ _sendToPullSub(void *closure) natsMsg *msg = NULL; natsStatus s; + nats_Sleep(250); natsMutex_Lock(args->m); s = natsMsg_create(&msg, args->sub->subject, (int) strlen(args->sub->subject), NULL, 0, args->string, (int) strlen(args->string), (int) strlen(args->string)); @@ -26624,6 +26626,50 @@ _sendToPullSub(void *closure) natsMsg_Destroy(msg); } +static void +_fetchRequest(void *closure) +{ + struct threadArg *args = (struct threadArg*) closure; + natsSubscription *sub = NULL; + jsFetchRequest fr; + natsStatus s; + natsMsgList list; + int64_t start; + + natsMutex_Lock(args->m); + sub = args->sub; + natsMutex_Unlock(args->m); + + jsFetchRequest_Init(&fr); + // With current messages, for a MaxBytes of 150, we should get 2 messages, + // for a total size of 142. + fr.Batch = 10; + fr.MaxBytes = 150; + fr.Expires = NATS_SECONDS_TO_NANOS(2); + start = nats_Now(); + s = natsSubscription_FetchRequest(&list, sub, &fr); + if (s == NATS_OK) + { + int i; + int total = 0; + + for (i=0; iwsz; + natsMsg_AckSync(list.Msgs[i], NULL, NULL); + } + + if ((total > 150) || (list.Count != 2) || ((nats_Now() - start) >= 1900)) + s = NATS_ERR; + + natsMsgList_Destroy(&list); + } + + natsMutex_Lock(args->m); + args->status = s; + natsMutex_Unlock(args->m); +} + static void test_JetStreamSubscribePull(void) { @@ -26641,6 +26687,9 @@ test_JetStreamSubscribePull(void) const char *badAckStr[] = {jsAckNoneStr, jsAckAllStr}; jsAckPolicy badAck[] = {js_AckNone, js_AckAll}; jsConsumerConfig cc; + jsFetchRequest fr; + natsSubscription *sub2 = NULL; + natsSubscription *sub3 = NULL; JS_SETUP(2, 7, 0); @@ -26746,7 +26795,7 @@ test_JetStreamSubscribePull(void) test("Create pull sub: "); jsSubOptions_Init(&so); so.Config.MaxAckPending = 10; - so.Config.AckWait = 300*1000000; + so.Config.AckWait = NATS_MILLIS_TO_NANOS(300); s = js_PullSubscribe(&sub, js, "foo", "dur", NULL, &so, &jerr); testCond((s == NATS_OK) && (sub != NULL) && (jerr == 0)); @@ -26762,6 +26811,7 @@ test_JetStreamSubscribePull(void) dur = nats_Now() - start; testCond((s == NATS_TIMEOUT) && (list.Msgs == NULL) && (list.Count == 0) && (jerr == 0) && (dur >= 450) && (dur <= 600)); + nats_clearLastError(); test("Send a message: "); s = js_Publish(NULL, js, "foo", "hello", 5, NULL, &jerr); @@ -26829,6 +26879,10 @@ test_JetStreamSubscribePull(void) natsMsgList_Destroy(&list); testCond(s == NATS_OK); + natsThread_Join(t); + natsThread_Destroy(t); + t = NULL; + test("Receive msg with header no data: "); s = natsMsg_create(&msg, sub->subject, (int) strlen(sub->subject), NULL, 0, "NATS/1.0\r\nk:v\r\n\r\n", 17, 17); @@ -26855,25 +26909,24 @@ test_JetStreamSubscribePull(void) msg = NULL; natsMsgList_Destroy(&list); - natsThread_Join(t); - natsThread_Destroy(t); - t = NULL; + // Since we faked the 404, the server is going to send a 408 when the request + // expires, so wait for it to be sent. + nats_Sleep(200); - test("Fetch ignores 408: "); + test("Fetch returns on 408: "); natsMutex_Lock(args.m); args.nc = nc; args.sub = sub; args.string = "NATS/1.0 408 Request Timeout\r\n\r\n"; natsMutex_Unlock(args.m); - s = natsThread_Create(&t, _sendToPullSub, (void*) &args); start = nats_Now(); - IFOK(s, natsSubscription_Fetch(&list, sub, 1, 500, &jerr)); + s = natsThread_Create(&t, _sendToPullSub, (void*) &args); + IFOK(s, natsSubscription_Fetch(&list, sub, 1, 1000, &jerr)); dur = nats_Now() - start; + // Since we wait 250ms to publish, it will take aound 250ms testCond((s == NATS_TIMEOUT) && (list.Msgs == NULL) && (list.Count == 0) && (jerr == 0) - && (dur > 400)); + && (dur < 500)); nats_clearLastError(); - natsMsg_Destroy(msg); - msg = NULL; natsMsgList_Destroy(&list); natsThread_Join(t); @@ -26900,8 +26953,8 @@ test_JetStreamSubscribePull(void) test("Create stream: "); jsStreamConfig_Init(&sc); sc.Name = "TEST2"; - sc.Subjects = (const char*[2]){"bar", "baz"}; - sc.SubjectsLen = 2; + sc.Subjects = (const char*[4]){"bar", "baz", "bat", "box"}; + sc.SubjectsLen = 4; s = js_AddStream(NULL, js, &sc, NULL, &jerr); testCond((s == NATS_OK) && (jerr == 0)); @@ -26911,30 +26964,21 @@ test_JetStreamSubscribePull(void) s = js_PullSubscribe(&sub, js, "bar", "pullmaxwaiting", NULL, &so, &jerr); testCond((s == NATS_OK) && (sub != NULL) && (jerr == 0)); - test("Max requests: "); - for (i=0; (s == NATS_OK) && (i<2); i++) - { - s = natsSubscription_Fetch(&list, sub, 1, 50, &jerr); - if (s == NATS_TIMEOUT) - s = NATS_OK; - } + test("Fill requests: "); + // Send requests manually to use the max requests + s = natsConnection_SubscribeSync(&sub2, nc, "my.pull.cons.inbox1"); + IFOK(s, natsConnection_SubscribeSync(&sub3, nc, "my.pull.cons.inbox2")); + IFOK(s, natsConnection_PublishRequestString(nc, "$JS.API.CONSUMER.MSG.NEXT.TEST2.pullmaxwaiting", "my.pull.cons.inbox1", "{\"batch\":1,\"expires\":1000000000}")); + IFOK(s, natsConnection_PublishRequestString(nc, "$JS.API.CONSUMER.MSG.NEXT.TEST2.pullmaxwaiting", "my.pull.cons.inbox1", "{\"batch\":1,\"expires\":1000000000}")); testCond(s == NATS_OK); - nats_clearLastError(); - - // Wait for more than expiration of above pull requests - nats_Sleep(100); - test("Next does not return early: "); - for (i=0; i<2; i++) - { - int batchSize = (i == 0 ? 1 : 10); - start = nats_Now(); - s = natsSubscription_Fetch(&list, sub, batchSize, 250, &jerr); - dur = nats_Now() - start; - s = (((s == NATS_TIMEOUT) && (list.Count == 0) && (dur >= 200)) ? NATS_OK : NATS_ERR); - } - testCond(s == NATS_OK); + test("Max waiting error: "); + s = natsSubscription_Fetch(&list, sub, 1, 1000, &jerr); + testCond((s == NATS_ERR) && (strstr(nats_GetLastError(NULL), "Exceeded") != NULL)); + nats_clearLastError(); + natsSubscription_Destroy(sub2); + natsSubscription_Destroy(sub3); natsSubscription_Destroy(sub); sub = NULL; @@ -26986,6 +27030,155 @@ test_JetStreamSubscribePull(void) testCond((s == NATS_OK) && (list.Msgs != NULL) && (list.Count == 1) && (jerr == 0)); natsMsgList_Destroy(&list); + natsSubscription_Destroy(sub); + sub = NULL; + + test("jsFetchRequest init bad args: "); + s = jsFetchRequest_Init(NULL); + testCond(s == NATS_INVALID_ARG); + nats_clearLastError(); + + test("Create pull consumer with MaxRequestBytes: "); + jsSubOptions_Init(&so); + so.Config.MaxRequestMaxBytes = 1024; + s = js_PullSubscribe(&sub, js, "bat", "max-request-bytes", NULL, &so, &jerr); + testCond((s == NATS_OK) && (jerr == 0)); + + jsFetchRequest_Init(&fr); + + test("FetchRequest bad args: "); + s = natsSubscription_FetchRequest(NULL, sub, &fr); + if (s == NATS_INVALID_ARG) + s = natsSubscription_FetchRequest(&list, NULL, &fr); + if (s == NATS_INVALID_ARG) + s = natsSubscription_FetchRequest(&list, sub, NULL); + testCond(s == NATS_INVALID_ARG); + nats_clearLastError(); + + test("FetchRequest no expiration err: "); + jsFetchRequest_Init(&fr); + // If NoWait is false, then Expires must be set. + fr.Batch = 1; + s = natsSubscription_FetchRequest(&list, sub, &fr); + testCond((s == NATS_INVALID_TIMEOUT && (list.Count == 0) && (list.Msgs == NULL))); + nats_clearLastError(); + + test("Batch must be set: "); + jsFetchRequest_Init(&fr); + fr.MaxBytes = 100; + fr.Expires = NATS_SECONDS_TO_NANOS(1); + s = natsSubscription_FetchRequest(&list, sub, &fr); + testCond((s == NATS_INVALID_ARG) && (list.Count == 0) && (list.Msgs == NULL)); + nats_clearLastError(); + + test("MaxBytes must be > 0: "); + jsFetchRequest_Init(&fr); + fr.Batch = 1; + fr.MaxBytes = -100; + fr.Expires = NATS_SECONDS_TO_NANOS(1); + s = natsSubscription_FetchRequest(&list, sub, &fr); + testCond((s == NATS_INVALID_ARG) && (list.Count == 0) && (list.Msgs == NULL)); + nats_clearLastError(); + + test("Requesting more than allowed max bytes: "); + jsFetchRequest_Init(&fr); + fr.Batch = 1; + fr.MaxBytes = 2048; + fr.Expires = NATS_SECONDS_TO_NANOS(1); + s = natsSubscription_FetchRequest(&list, sub, &fr); + testCond((s == NATS_ERR) && (list.Count == 0) && (list.Msgs == NULL) + && (strstr(nats_GetLastError(NULL), "Exceeded MaxRequestMaxBytes") != NULL)); + nats_clearLastError(); + + test("No concurrent call: "); + natsMutex_Lock(args.m); + args.sub = sub; + args.status = NATS_OK; + natsMutex_Unlock(args.m); + s = natsThread_Create(&t, _fetchRequest, (void*) &args); + if (s == NATS_OK) + { + nats_Sleep(250); + jsFetchRequest_Init(&fr); + fr.Batch = 1; + fr.Expires = NATS_SECONDS_TO_NANOS(1); + s = natsSubscription_FetchRequest(&list, sub, &fr); + } + testCond((s == NATS_ERR) && (strstr(nats_GetLastError(NULL), jsErrConcurrentFetchNotAllowed) != NULL)); + nats_clearLastError(); + s = NATS_OK; + + test("Populate: "); + for (i=0; (s == NATS_OK) && (i<10); i++) + s = js_PublishAsync(js, "bat", (const void*) "abcdefghij", 10, NULL); + testCond(s == NATS_OK); + + test("Received ok: "); + natsThread_Join(t); + natsMutex_Lock(args.m); + s = args.status; + natsMutex_Unlock(args.m); + testCond(s == NATS_OK); + + natsThread_Destroy(t); + natsSubscription_Destroy(sub); + sub = NULL; + + test("Create pull consumer: "); + s = js_PullSubscribe(&sub, js, "box", "feth-request", NULL, NULL, &jerr); + testCond((s == NATS_OK) && (jerr == 0)); + + test("Populate: "); + s = js_PublishAsync(js, "box", (const void*) "abcdefghij", 10, NULL); + testCond(s == NATS_OK); + + test("Check expiration: "); + // Unlike with the simple fetch, asking for more than is avail will + // wait until expiration to return. + jsFetchRequest_Init(&fr); + fr.Batch = 10; + fr.Expires = NATS_MILLIS_TO_NANOS(500); + fr.Heartbeat = NATS_MILLIS_TO_NANOS(50); + start = nats_Now(); + s = natsSubscription_FetchRequest(&list, sub, &fr); + dur = nats_Now() - start; + testCond((s == NATS_OK) && (list.Count == 1) && (list.Msgs != NULL) + && (dur > 400) && (dur < 600)); + natsMsgList_Destroy(&list); + +#if _WIN32 + nats_Sleep(1000); +#endif + + test("Check invalid hb: "); + jsFetchRequest_Init(&fr); + fr.Batch = 10; + fr.Expires = NATS_SECONDS_TO_NANOS(1); + fr.Heartbeat = NATS_SECONDS_TO_NANOS(10); + s = natsSubscription_FetchRequest(&list, sub, &fr); + testCond((s == NATS_ERR) && (strstr(nats_GetLastError(NULL), "too large") != NULL)); + nats_clearLastError(); + + test("Check idle hearbeat: "); + jsFetchRequest_Init(&fr); + fr.Batch = 10; + // Let's make it wait for 2 seconds + fr.Expires = NATS_SECONDS_TO_NANOS(2); + // And have HBs every 50ms + fr.Heartbeat = NATS_MILLIS_TO_NANOS(50); + // Schedule the server to be stopped in a different thread + s = natsThread_Create(&t, _stopServerInThread, (void*) &pid); + start = nats_Now(); + // We should be kicked out of the fetch request with an error indicating + // that we missed hearbeats. + IFOK(s, natsSubscription_FetchRequest(&list, sub, &fr)); + dur = nats_Now() - start; + testCond((s == NATS_MISSED_HEARTBEAT) && (dur < 500)); + + natsThread_Join(t); + natsThread_Destroy(t); + pid = NATS_INVALID_PID; + natsSubscription_Destroy(sub); JS_TEARDOWN;