diff --git a/src/js.h b/src/js.h index 92f7c0a62..ed2465efa 100644 --- a/src/js.h +++ b/src/js.h @@ -157,6 +157,12 @@ extern const int64_t jsDefaultRequestWait; // jsApiDirectMsgGetLastBySubjectT is the endpoint to perform a direct get of a message by subject. #define jsApiDirectMsgGetLastBySubjectT "%.*s.DIRECT.GET.%s.%s" +// jsApiStreamListT is the endpoint to get the list of stream infos. +#define jsApiStreamListT "%.*s.STREAM.LIST" + +// jsApiStreamNamesT is the endpoint to get the list of stream names. +#define jsApiStreamNamesT "%.*s.STREAM.NAMES" + // Creates a subject based on the option's prefix, the subject format and its values. #define js_apiSubj(s, o, f, ...) (nats_asprintf((s), (f), (o)->Prefix, __VA_ARGS__) < 0 ? NATS_NO_MEMORY : NATS_OK) diff --git a/src/jsm.c b/src/jsm.c index 9b0695355..0b100d1f9 100644 --- a/src/jsm.c +++ b/src/jsm.c @@ -1830,6 +1830,404 @@ js_EraseMsg(jsCtx *js, const char *stream, uint64_t seq, jsOptions *opts, jsErrC return NATS_UPDATE_ERR_STACK(s); } +static natsStatus +_unmarshalStreamInfoListResp(natsStrHash *m, apiPaged *page, natsMsg *resp, jsErrCode *errCode) +{ + nats_JSON *json = NULL; + jsApiResponse ar; + natsStatus s; + + s = js_unmarshalResponse(&ar, &json, resp); + if (s != NATS_OK) + return NATS_UPDATE_ERR_STACK(s); + + if (js_apiResponseIsErr(&ar)) + { + if (errCode != NULL) + *errCode = (int) ar.Error.ErrCode; + + // If the error code is JSStreamNotFoundErr then pick NATS_NOT_FOUND. + if (ar.Error.ErrCode == JSStreamNotFoundErr) + s = NATS_NOT_FOUND; + else + s = nats_setError(NATS_ERR, "%s", ar.Error.Description); + } + else + { + nats_JSON **streams = NULL; + int streamsLen = 0; + + IFOK(s, nats_JSONGetLong(json, "total", &page->total)); + IFOK(s, nats_JSONGetLong(json, "offset", &page->offset)); + IFOK(s, nats_JSONGetLong(json, "limit", &page->limit)); + IFOK(s, nats_JSONGetArrayObject(json, "streams", &streams, &streamsLen)); + if ((s == NATS_OK) && (streams != NULL)) + { + int i; + + for (i=0; (s == NATS_OK) && (iConfig == NULL) || nats_IsStringEmpty(si->Config->Name))) + s = nats_setError(NATS_ERR, "%s", "stream name missing from configuration"); + IFOK(s, natsStrHash_SetEx(m, (char*) si->Config->Name, true, true, si, (void**) &osi)); + if (osi != NULL) + jsStreamInfo_Destroy(osi); + } + // Free the array of JSON objects that was allocated by nats_JSONGetArrayObject. + NATS_FREE(streams); + } + } + js_freeApiRespContent(&ar); + nats_JSONDestroy(json); + return NATS_UPDATE_ERR_STACK(s); +} + +natsStatus +js_Streams(jsStreamInfoList **new_list, jsCtx *js, jsOptions *opts, jsErrCode *errCode) +{ + natsStatus s = NATS_OK; + natsBuffer *buf = NULL; + char *subj = NULL; + natsMsg *resp = NULL; + natsConnection *nc = NULL; + bool freePfx = false; + bool done = false; + int64_t offset = 0; + int64_t start = 0; + int64_t timeout = 0; + natsStrHash *streams= NULL; + jsStreamInfoList *list = NULL; + jsOptions o; + apiPaged page; + + if (errCode != NULL) + *errCode = 0; + + if ((new_list == NULL) || (js == NULL)) + return nats_setDefaultError(NATS_INVALID_ARG); + + s = js_setOpts(&nc, &freePfx, js, opts, &o); + if (s == NATS_OK) + { + if (nats_asprintf(&subj, jsApiStreamListT, js_lenWithoutTrailingDot(o.Prefix), o.Prefix) < 0) + s = nats_setDefaultError(NATS_NO_MEMORY); + + if (freePfx) + NATS_FREE((char*) o.Prefix); + } + IFOK(s, natsBuf_Create(&buf, 64)); + IFOK(s, natsStrHash_Create(&streams, 16)); + + if (s == NATS_OK) + { + memset(&page, 0, sizeof(apiPaged)); + start = nats_Now(); + } + + do + { + IFOK(s, natsBuf_AppendByte(buf, '{')); + IFOK(s, nats_marshalLong(buf, false, "offset", offset)); + if (!nats_IsStringEmpty(o.Stream.Info.SubjectsFilter)) + { + IFOK(s, natsBuf_Append(buf, ",\"subject\":\"", -1)); + IFOK(s, natsBuf_Append(buf, o.Stream.Info.SubjectsFilter, -1)); + IFOK(s, natsBuf_AppendByte(buf, '\"')); + } + IFOK(s, natsBuf_AppendByte(buf, '}')); + + timeout = o.Wait - (nats_Now() - start); + if (timeout <= 0) + s = NATS_TIMEOUT; + + // Send the request + IFOK_JSR(s, natsConnection_Request(&resp, nc, subj, natsBuf_Data(buf), natsBuf_Len(buf), timeout)); + + IFOK(s, _unmarshalStreamInfoListResp(streams, &page, resp, errCode)); + if (s == NATS_OK) + { + offset += page.limit; + done = offset >= page.total; + if (!done) + { + // Reset the request buffer, we may be able to reuse. + natsBuf_Reset(buf); + } + } + natsMsg_Destroy(resp); + resp = NULL; + } + while ((s == NATS_OK) && !done); + + natsBuf_Destroy(buf); + NATS_FREE(subj); + + if (s == NATS_OK) + { + if (natsStrHash_Count(streams) == 0) + { + natsStrHash_Destroy(streams); + return NATS_NOT_FOUND; + } + list = (jsStreamInfoList*) NATS_CALLOC(1, sizeof(jsStreamInfoList)); + if (list == NULL) + s = nats_setDefaultError(NATS_NO_MEMORY); + else + { + list->List = (jsStreamInfo**) NATS_CALLOC(natsStrHash_Count(streams), sizeof(jsStreamInfo*)); + if (list->List == NULL) + { + NATS_FREE(list); + list = NULL; + s = nats_setDefaultError(NATS_NO_MEMORY); + } + else + { + natsStrHashIter iter; + void *val = NULL; + + natsStrHashIter_Init(&iter, streams); + while (natsStrHashIter_Next(&iter, NULL, (void**) &val)) + { + jsStreamInfo *si = (jsStreamInfo*) val; + + list->List[list->Count++] = si; + natsStrHashIter_RemoveCurrent(&iter); + } + natsStrHashIter_Done(&iter); + + *new_list = list; + } + } + } + if (s != NATS_OK) + { + natsStrHashIter iter; + void *val = NULL; + + natsStrHashIter_Init(&iter, streams); + while (natsStrHashIter_Next(&iter, NULL, (void**) &val)) + { + jsStreamInfo *si = (jsStreamInfo*) val; + + natsStrHashIter_RemoveCurrent(&iter); + jsStreamInfo_Destroy(si); + } + natsStrHashIter_Done(&iter); + } + natsStrHash_Destroy(streams); + + return NATS_UPDATE_ERR_STACK(s); +} + +void +jsStreamInfoList_Destroy(jsStreamInfoList *list) +{ + int i; + + if (list == NULL) + return; + + for (i=0; iCount; i++) + jsStreamInfo_Destroy(list->List[i]); + + NATS_FREE(list->List); + NATS_FREE(list); +} + +static natsStatus +_unmarshalStreamNamesListResp(natsStrHash *m, apiPaged *page, natsMsg *resp, jsErrCode *errCode) +{ + nats_JSON *json = NULL; + jsApiResponse ar; + natsStatus s; + + s = js_unmarshalResponse(&ar, &json, resp); + if (s != NATS_OK) + return NATS_UPDATE_ERR_STACK(s); + + if (js_apiResponseIsErr(&ar)) + { + if (errCode != NULL) + *errCode = (int) ar.Error.ErrCode; + + // If the error code is JSStreamNotFoundErr then pick NATS_NOT_FOUND. + if (ar.Error.ErrCode == JSStreamNotFoundErr) + s = NATS_NOT_FOUND; + else + s = nats_setError(NATS_ERR, "%s", ar.Error.Description); + } + else + { + char **streams = NULL; + int streamsLen = 0; + + IFOK(s, nats_JSONGetLong(json, "total", &page->total)); + IFOK(s, nats_JSONGetLong(json, "offset", &page->offset)); + IFOK(s, nats_JSONGetLong(json, "limit", &page->limit)); + IFOK(s, nats_JSONGetArrayStr(json, "streams", &streams, &streamsLen)); + if ((s == NATS_OK) && (streams != NULL)) + { + int i; + + for (i=0; (s == NATS_OK) && (i= page.total; + if (!done) + { + // Reset the request buffer, we may be able to reuse. + natsBuf_Reset(buf); + } + } + natsMsg_Destroy(resp); + resp = NULL; + } + while ((s == NATS_OK) && !done); + + natsBuf_Destroy(buf); + NATS_FREE(subj); + + if (s == NATS_OK) + { + if (natsStrHash_Count(names) == 0) + { + natsStrHash_Destroy(names); + return NATS_NOT_FOUND; + } + list = (jsStreamNamesList*) NATS_CALLOC(1, sizeof(jsStreamNamesList)); + if (list == NULL) + s = nats_setDefaultError(NATS_NO_MEMORY); + else + { + list->List = (char**) NATS_CALLOC(natsStrHash_Count(names), sizeof(char*)); + if (list->List == NULL) + s = nats_setDefaultError(NATS_NO_MEMORY); + else + { + natsStrHashIter iter; + char *sname = NULL; + + natsStrHashIter_Init(&iter, names); + while ((s == NATS_OK) && natsStrHashIter_Next(&iter, &sname, NULL)) + { + char *copyName = NULL; + + DUP_STRING(s, copyName, sname); + if (s == NATS_OK) + { + list->List[list->Count++] = copyName; + natsStrHashIter_RemoveCurrent(&iter); + } + } + natsStrHashIter_Done(&iter); + } + if (s == NATS_OK) + *new_list = list; + else + jsStreamNamesList_Destroy(list); + } + } + natsStrHash_Destroy(names); + + return NATS_UPDATE_ERR_STACK(s); +} + +void +jsStreamNamesList_Destroy(jsStreamNamesList *list) +{ + int i; + + if (list == NULL) + return; + + for (i=0; iCount; i++) + NATS_FREE(list->List[i]); + + NATS_FREE(list->List); + NATS_FREE(list); +} + // // Account related functions // diff --git a/src/nats.h b/src/nats.h index 41cdac7d2..959416f76 100644 --- a/src/nats.h +++ b/src/nats.h @@ -646,6 +646,34 @@ typedef struct jsStreamInfo } jsStreamInfo; +/** + * List of stream information objects returned by #js_Streams + * + * \note Once done, the list should be destroyed calling #jsStreamInfoList_Destroy + * + * @see jsStreamInfoList_Destroy + */ +typedef struct jsStreamInfoList +{ + jsStreamInfo **List; + int Count; + +} jsStreamInfoList; + +/** + * List of stream names returned by #js_StreamNames + * + * \note Once done, the list should be destroyed calling #jsStreamNamesList_Destroy + * + * @see jsStreamNamesList_Destroy + */ +typedef struct jsStreamNamesList +{ + char **List; + int Count; + +} jsStreamNamesList; + /** * Configuration of a JetStream consumer. * @@ -5596,6 +5624,64 @@ js_GetStreamInfo(jsStreamInfo **si, jsCtx *js, const char *stream, jsOptions *op NATS_EXTERN void jsStreamInfo_Destroy(jsStreamInfo *si); +/** \brief Retrieves the list of all available streams. + * + * Retrieves the list of all #jsStreamInfo. It is possible to filter + * which streams are to be retrieved based on a subject filter. + * + * \warning The list should be destroyed when no longer used by + * calling #jsStreamInfoList_Destroy. + * + * @param list the location where to store the pointer to the new #jsStreamInfoList object. + * @param js the pointer to the #jsCtx context. + * @param opts the pointer to the #jsOptions object, possibly `NULL`. + * @param errCode the location where to store the JetStream specific error code, or `NULL` + * if not needed. + */ +NATS_EXTERN natsStatus +js_Streams(jsStreamInfoList **list, jsCtx *js, jsOptions *opts, jsErrCode *errCode); + +/** \brief Destroys the stream information list object. + * + * Releases memory allocated for this stream information list. + * + * \warning All #jsStreamInfo pointers contained in the list will + * be destroyed by this call. + * + * @param list the pointer to the #jsStreamInfoList object. + */ +NATS_EXTERN void +jsStreamInfoList_Destroy(jsStreamInfoList *list); + +/** \brief Retrieves the list of all available stream names. + * + * Retrieves the list of all stream names. It is possible to filter + * which streams are to be retrieved based on a subject filter. + * + * \warning The list should be destroyed when no longer used by + * calling #jsStreamNamesList_Destroy. + * + * @param list the location where to store the pointer to the new #jsStreamNamesList object. + * @param js the pointer to the #jsCtx context. + * @param opts the pointer to the #jsOptions object, possibly `NULL`. + * @param errCode the location where to store the JetStream specific error code, or `NULL` + * if not needed. + */ +NATS_EXTERN natsStatus +js_StreamNames(jsStreamNamesList **list, jsCtx *js, jsOptions *opts, jsErrCode *errCode); + +/** \brief Destroys the stream names list object. + * + * Releases memory allocated for this list of stream names. + * + * \warning All string pointers contained in the list will + * be destroyed by this call. + * + * @param list the pointer to the #jsStreamNamesList object. + */ +NATS_EXTERN void +jsStreamNamesList_Destroy(jsStreamNamesList *list); + /** \brief Initializes a consumer configuration structure. * * Use this before adding a consumer. diff --git a/test/test.c b/test/test.c index fa6aa41df..0e1cb21ff 100644 --- a/test/test.c +++ b/test/test.c @@ -22482,6 +22482,94 @@ test_JetStreamContextDomain(void) remove(confFile); } +static void +_streamsInfoListReq(natsConnection *nc, natsMsg **msg, void *closure) +{ + int *count = (int*) closure; + const char *payload = NULL; + natsMsg *newMsg = NULL; + + if (strstr(natsMsg_GetData(*msg), "stream_list_response") == NULL) + return; + + (*count)++; + if (*count == 1) + { + // Pretend limit is 2 and send 2 simplified stream infos + payload = "{\"type\":\"io.nats.jetstream.api.v1.stream_list_response\",\"total\":5,\"offset\":0,\"limit\":2,"\ + "\"streams\":[{\"config\":{\"name\":\"S1\"}},{\"config\":{\"name\":\"S2\"}}]}"; + } + else if (*count == 2) + { + // Pretend that there is a repeat of a stream name to check + // that we are properly replacing and not leaking memory. + payload = "{\"type\":\"io.nats.jetstream.api.v1.stream_list_response\",\"total\":5,\"offset\":2,\"limit\":2,"\ + "\"streams\":[{\"config\":{\"name\":\"S2\"}},{\"config\":{\"name\":\"S3\"}}]}"; + } + else if (*count == 3) + { + // Pretend that our next page was over the limit (say streams were removed) + // and therefore the server returned no streams (but set offset to total) + payload = "{\"type\":\"io.nats.jetstream.api.v1.stream_list_response\",\"total\":3,\"offset\":3,\"limit\":2,"\ + "\"streams\":[]}"; + } + else + { + // Use original message + return; + } + if (natsMsg_create(&newMsg, (*msg)->subject, (int) strlen((*msg)->subject), NULL, 0, + payload, (int) strlen(payload), 0) == NATS_OK) + { + natsMsg_Destroy(*msg); + *msg = newMsg; + } +} + +static void +_streamsNamesListReq(natsConnection *nc, natsMsg **msg, void *closure) +{ + int *count = (int*) closure; + const char *payload = NULL; + natsMsg *newMsg = NULL; + + if (strstr(natsMsg_GetData(*msg), "stream_names_response") == NULL) + return; + + (*count)++; + if (*count == 1) + { + // Pretend limit is 2 and send 2 stream names + payload = "{\"type\":\"io.nats.jetstream.api.v1.stream_names_response\",\"total\":5,\"offset\":0,\"limit\":2,"\ + "\"streams\":[\"S1\",\"S2\"]}"; + } + else if (*count == 2) + { + // Pretend that there is a repeat of a stream name to check + // that we are properly replacing and not leaking memory. + payload = "{\"type\":\"io.nats.jetstream.api.v1.stream_names_response\",\"total\":5,\"offset\":2,\"limit\":2,"\ + "\"streams\":[\"S2\",\"S3\"]}"; + } + else if (*count == 3) + { + // Pretend that our next page was over the limit (say streams were removed) + // and therefore the server returned no streams (but set offset to total) + payload = "{\"type\":\"io.nats.jetstream.api.v1.stream_names_response\",\"total\":3,\"offset\":3,\"limit\":2,"\ + "\"streams\":null}"; + } + else + { + // Use original message + return; + } + if (natsMsg_create(&newMsg, (*msg)->subject, (int) strlen((*msg)->subject), NULL, 0, + payload, (int) strlen(payload), 0) == NATS_OK) + { + natsMsg_Destroy(*msg); + *msg = newMsg; + } +} + static void test_JetStreamMgtStreams(void) { @@ -22494,6 +22582,9 @@ test_JetStreamMgtStreams(void) natsMsg *msg = NULL; natsSubscription *sub = NULL; char *desc = NULL; + jsStreamInfoList *siList = NULL; + jsStreamNamesList *snList = NULL; + int count = 0; jsOptions o; int i; @@ -22948,6 +23039,7 @@ test_JetStreamMgtStreams(void) free(desc); jsCtx_Destroy(js2); natsSubscription_Destroy(sub); + sub = NULL; test("Create stream with wilcards: "); jsStreamConfig_Init(&cfg); @@ -22960,6 +23052,142 @@ test_JetStreamMgtStreams(void) && (strcmp(si->Config->Subjects[0], "foo.>") == 0) && (strcmp(si->Config->Subjects[1], "bar.*") == 0)); jsStreamInfo_Destroy(si); + + test("List stream infos (bad args): "); + s = js_Streams(NULL, js, NULL, NULL); + if (s == NATS_INVALID_ARG) + s = js_Streams(&siList, NULL, NULL, NULL); + testCond(s == NATS_INVALID_ARG); + nats_clearLastError(); + + test("Create sub for pagination check: "); + s = natsConnection_SubscribeSync(&sub, nc, "$JS.API.STREAM.LIST"); + testCond(s == NATS_OK); + + natsConn_setFilterWithClosure(nc, _streamsInfoListReq, (void*) &count); + + test("List stream infos: "); + s = js_Streams(&siList, js, NULL, &jerr); + testCond((s == NATS_OK) && (siList != NULL) && (siList->List != NULL) && (siList->Count == 3)); + + natsConn_setFilter(nc, NULL); + + test("Check 1st request: "); + s = natsSubscription_NextMsg(&msg, sub, 1000); + testCond((s == NATS_OK) + && (strstr(natsMsg_GetData(msg), "offset\":0") != NULL)); + natsMsg_Destroy(msg); + msg = NULL; + + test("Check 2nd request: "); + s = natsSubscription_NextMsg(&msg, sub, 1000); + testCond((s == NATS_OK) + && (strstr(natsMsg_GetData(msg), "offset\":2") != NULL)); + natsMsg_Destroy(msg); + msg = NULL; + + test("Check 3rd request: "); + s = natsSubscription_NextMsg(&msg, sub, 1000); + testCond((s == NATS_OK) + && (strstr(natsMsg_GetData(msg), "offset\":4") != NULL)); + natsMsg_Destroy(msg); + msg = NULL; + + natsSubscription_Destroy(sub); + sub = NULL; + + test("Destroy list: "); + // Will see with valgrind if this is doing the right thing + jsStreamInfoList_Destroy(siList); + siList = NULL; + // Check this does not crash + jsStreamInfoList_Destroy(siList); + testCond(true); + + test("List stream infos with filter: "); + jsOptions_Init(&o); + o.Stream.Info.SubjectsFilter = "TEST"; + s = js_Streams(&siList, js, &o, &jerr); + testCond((s == NATS_OK) && (siList != NULL) && (siList->List != NULL) && (siList->Count == 1) + && (strcmp(siList->List[0]->Config->Name, "TEST") == 0)); + jsStreamInfoList_Destroy(siList); + siList = NULL; + + test("List stream infos with filter no match: "); + jsOptions_Init(&o); + o.Stream.Info.SubjectsFilter = "no.match"; + s = js_Streams(&siList, js, &o, &jerr); + testCond((s == NATS_NOT_FOUND) && (siList == NULL)); + + // Do names now + + test("List stream names (bad args): "); + s = js_StreamNames(NULL, js, NULL, NULL); + if (s == NATS_INVALID_ARG) + s = js_StreamNames(&snList, NULL, NULL, NULL); + testCond(s == NATS_INVALID_ARG); + nats_clearLastError(); + + test("Create sub for pagination check: "); + s = natsConnection_SubscribeSync(&sub, nc, "$JS.API.STREAM.NAMES"); + testCond(s == NATS_OK); + + count = 0; + natsConn_setFilterWithClosure(nc, _streamsNamesListReq, (void*) &count); + + test("List stream names: "); + s = js_StreamNames(&snList, js, NULL, &jerr); + testCond((s == NATS_OK) && (snList != NULL) && (snList->List != NULL) && (snList->Count == 3)); + + natsConn_setFilter(nc, NULL); + + test("Check 1st request: "); + s = natsSubscription_NextMsg(&msg, sub, 1000); + testCond((s == NATS_OK) + && (strstr(natsMsg_GetData(msg), "offset\":0") != NULL)); + natsMsg_Destroy(msg); + msg = NULL; + + test("Check 2nd request: "); + s = natsSubscription_NextMsg(&msg, sub, 1000); + testCond((s == NATS_OK) + && (strstr(natsMsg_GetData(msg), "offset\":2") != NULL)); + natsMsg_Destroy(msg); + msg = NULL; + + test("Check 3rd request: "); + s = natsSubscription_NextMsg(&msg, sub, 1000); + testCond((s == NATS_OK) + && (strstr(natsMsg_GetData(msg), "offset\":4") != NULL)); + natsMsg_Destroy(msg); + msg = NULL; + + natsSubscription_Destroy(sub); + sub = NULL; + + test("Destroy list: "); + // Will see with valgrind if this is doing the right thing + jsStreamNamesList_Destroy(snList); + snList = NULL; + // Check this does not crash + jsStreamNamesList_Destroy(snList); + testCond(true); + + test("List stream names with filter: "); + jsOptions_Init(&o); + o.Stream.Info.SubjectsFilter = "TEST"; + s = js_StreamNames(&snList, js, &o, &jerr); + testCond((s == NATS_OK) && (snList != NULL) && (snList->List != NULL) && (snList->Count == 1) + && (strcmp(snList->List[0], "TEST") == 0)); + jsStreamNamesList_Destroy(snList); + snList = NULL; + + test("List stream names with filter no match: "); + jsOptions_Init(&o); + o.Stream.Info.SubjectsFilter = "no.match"; + s = js_StreamNames(&snList, js, &o, &jerr); + testCond((s == NATS_NOT_FOUND) && (snList == NULL)); + JS_TEARDOWN; }