Skip to content

Commit

Permalink
Merge pull request #588 from nats-io/js_stream_info_subjects
Browse files Browse the repository at this point in the history
[CHANGED] JetStream: js_GetStreamInfo() now returns all subjects
  • Loading branch information
kozlovic authored Sep 20, 2022
2 parents 3498a57 + d645790 commit eeafb9f
Show file tree
Hide file tree
Showing 2 changed files with 294 additions and 74 deletions.
252 changes: 184 additions & 68 deletions src/jsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ typedef enum

} jsStreamAction;

typedef struct apiPaged
{
int64_t total;
int64_t offset;
int64_t limit;

} apiPaged;

static natsStatus
_marshalTimeUTC(natsBuffer *buf, const char *fieldName, int64_t timeUTC)
{
Expand Down Expand Up @@ -941,8 +949,8 @@ _unmarshalStreamSourceInfo(nats_JSON *pjson, const char *fieldName, jsStreamSour
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
js_unmarshalStreamInfo(nats_JSON *json, jsStreamInfo **new_si)
static natsStatus
_unmarshalStreamInfoPaged(nats_JSON *json, jsStreamInfo **new_si, apiPaged *page)
{
jsStreamInfo *si = NULL;
nats_JSON **sources = NULL;
Expand Down Expand Up @@ -977,6 +985,12 @@ js_unmarshalStreamInfo(nats_JSON *json, jsStreamInfo **new_si)
// Free the array of JSON objects that was allocated by nats_JSONGetArrayObject.
NATS_FREE(sources);
}
if ((s == NATS_OK) && (page != NULL))
{
IFOK(s, nats_JSONGetLong(json, "total", &page->total));
IFOK(s, nats_JSONGetLong(json, "offset", &page->offset));
IFOK(s, nats_JSONGetLong(json, "limit", &page->limit));
}

if (s == NATS_OK)
*new_si = si;
Expand All @@ -986,8 +1000,15 @@ js_unmarshalStreamInfo(nats_JSON *json, jsStreamInfo **new_si)
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
js_unmarshalStreamInfo(nats_JSON *json, jsStreamInfo **new_si)
{
natsStatus s = _unmarshalStreamInfoPaged(json, new_si, NULL);
return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_unmarshalStreamCreateResp(jsStreamInfo **new_si, natsMsg *resp, jsErrCode *errCode)
_unmarshalStreamCreateResp(jsStreamInfo **new_si, apiPaged *page, natsMsg *resp, jsErrCode *errCode)
{
nats_JSON *json = NULL;
jsApiResponse ar;
Expand All @@ -1011,7 +1032,7 @@ _unmarshalStreamCreateResp(jsStreamInfo **new_si, natsMsg *resp, jsErrCode *errC
else if (new_si != NULL)
{
// At this point we need to unmarshal the stream info itself.
s = js_unmarshalStreamInfo(json, new_si);
s = _unmarshalStreamInfoPaged(json, new_si, page);
}

js_freeApiRespContent(&ar);
Expand All @@ -1038,45 +1059,11 @@ jsStreamConfig_Init(jsStreamConfig *cfg)
}

static natsStatus
_marshalStreamInfoReq(natsBuffer **new_buf, struct jsOptionsStreamInfo *o)
{
natsBuffer *buf = NULL;
natsStatus s;

*new_buf = NULL;
if (!o->DeletedDetails && nats_IsStringEmpty(o->SubjectsFilter))
return NATS_OK;

s = natsBuf_Create(&buf, 30);
IFOK(s, natsBuf_AppendByte(buf, '{'));
if ((s == NATS_OK) && o->DeletedDetails)
s = natsBuf_Append(buf, "\"deleted_details\":true", -1);
if ((s == NATS_OK) && !nats_IsStringEmpty(o->SubjectsFilter))
{
if (o->DeletedDetails)
s = natsBuf_AppendByte(buf, ',');
IFOK(s, natsBuf_Append(buf, "\"subjects_filter\":\"", -1));
IFOK(s, natsBuf_Append(buf, o->SubjectsFilter, -1));
IFOK(s, natsBuf_AppendByte(buf, '\"'));
}
IFOK(s, natsBuf_AppendByte(buf, '}'));

if (s == NATS_OK)
*new_buf = buf;
else
natsBuf_Destroy(buf);

return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_addUpdateOrGet(jsStreamInfo **new_si, jsStreamAction action, jsCtx *js, jsStreamConfig *cfg, jsOptions *opts, jsErrCode *errCode)
_addOrUpdate(jsStreamInfo **new_si, jsStreamAction action, jsCtx *js, jsStreamConfig *cfg, jsOptions *opts, jsErrCode *errCode)
{
natsStatus s = NATS_OK;
natsBuffer *buf = NULL;
char *subj = NULL;
char *req = NULL;
int reqLen = 0;
natsMsg *resp = NULL;
natsConnection *nc = NULL;
const char *apiT = NULL;
Expand All @@ -1100,7 +1087,6 @@ _addUpdateOrGet(jsStreamInfo **new_si, jsStreamAction action, jsCtx *js, jsStrea
{
case jsStreamActionCreate: apiT = jsApiStreamCreateT; break;
case jsStreamActionUpdate: apiT = jsApiStreamUpdateT; break;
case jsStreamActionGet: apiT = jsApiStreamInfoT; break;
default: abort();
}
s = js_setOpts(&nc, &freePfx, js, opts, &o);
Expand All @@ -1113,27 +1099,14 @@ _addUpdateOrGet(jsStreamInfo **new_si, jsStreamAction action, jsCtx *js, jsStrea
NATS_FREE((char*) o.Prefix);
}

if (action != jsStreamActionGet)
{
// Marshal the stream create/update request
IFOK(s, js_marshalStreamConfig(&buf, cfg));
}
else
{
// For GetStreamInfo, if there are options, we need to marshal the request.
IFOK(s, _marshalStreamInfoReq(&buf, &(o.Stream.Info)));
}
if ((s == NATS_OK) && (buf != NULL))
{
req = natsBuf_Data(buf);
reqLen = natsBuf_Len(buf);
}
// Marshal the stream create/update request
IFOK(s, js_marshalStreamConfig(&buf, cfg));

// Send the request
IFOK_JSR(s, natsConnection_Request(&resp, nc, subj, req, reqLen, o.Wait));
IFOK_JSR(s, natsConnection_Request(&resp, nc, subj, natsBuf_Data(buf), natsBuf_Len(buf), o.Wait));

// If we got a response, check for error or return the stream info result.
IFOK(s, _unmarshalStreamCreateResp(new_si, resp, errCode));
IFOK(s, _unmarshalStreamCreateResp(new_si, NULL, resp, errCode));

natsBuf_Destroy(buf);
natsMsg_Destroy(resp);
Expand All @@ -1145,38 +1118,181 @@ _addUpdateOrGet(jsStreamInfo **new_si, jsStreamAction action, jsCtx *js, jsStrea
natsStatus
js_AddStream(jsStreamInfo **new_si, jsCtx *js, jsStreamConfig *cfg, jsOptions *opts, jsErrCode *errCode)
{
natsStatus s = _addUpdateOrGet(new_si, jsStreamActionCreate, js, cfg, opts, errCode);
natsStatus s = _addOrUpdate(new_si, jsStreamActionCreate, js, cfg, opts, errCode);
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
js_UpdateStream(jsStreamInfo **new_si, jsCtx *js, jsStreamConfig *cfg, jsOptions *opts, jsErrCode *errCode)
{
natsStatus s = _addUpdateOrGet(new_si, jsStreamActionUpdate, js, cfg, opts, errCode);
natsStatus s = _addOrUpdate(new_si, jsStreamActionUpdate, js, cfg, opts, errCode);
return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_marshalStreamInfoReq(natsBuffer **new_buf, struct jsOptionsStreamInfo *o, int64_t offset)
{
natsBuffer *buf = *new_buf;
natsStatus s = NATS_OK;

if (!o->DeletedDetails && nats_IsStringEmpty(o->SubjectsFilter))
return NATS_OK;

if (buf == NULL)
s = natsBuf_Create(&buf, 30);

IFOK(s, natsBuf_AppendByte(buf, '{'));
if ((s == NATS_OK) && o->DeletedDetails)
s = natsBuf_Append(buf, "\"deleted_details\":true", -1);
if ((s == NATS_OK) && !nats_IsStringEmpty(o->SubjectsFilter))
{
if (o->DeletedDetails)
s = natsBuf_AppendByte(buf, ',');
IFOK(s, natsBuf_Append(buf, "\"subjects_filter\":\"", -1));
IFOK(s, natsBuf_Append(buf, o->SubjectsFilter, -1));
IFOK(s, natsBuf_AppendByte(buf, '\"'));
if ((s == NATS_OK) && (offset > 0))
IFOK(s, nats_marshalLong(buf, true, "offset", offset));
}
IFOK(s, natsBuf_AppendByte(buf, '}'));

if (*new_buf == NULL)
*new_buf = buf;

return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
js_GetStreamInfo(jsStreamInfo **new_si, jsCtx *js, const char *stream, jsOptions *opts, jsErrCode *errCode)
{
natsStatus s;
jsStreamConfig cfg;
natsBuffer *buf = NULL;
char *subj = NULL;
char *req = NULL;
int reqLen = 0;
natsMsg *resp = NULL;
natsConnection *nc = NULL;
bool freePfx = false;
jsOptions o;
// For subjects pagination
int64_t offset = 0;
bool doPage = false;
bool done = false;
jsStreamInfo *si = NULL;
jsStreamStateSubjects *subjects = NULL;
apiPaged page;

if (errCode != NULL)
*errCode = 0;

// Check for new_si here, the js and stream name will be checked in _addUpdateOrGet.
if (new_si == NULL)
if ((js == NULL) || (new_si == NULL))
return nats_setDefaultError(NATS_INVALID_ARG);

jsStreamConfig_Init(&cfg);
cfg.Name = (char*) stream;
s = _checkStreamName(stream);
if (s != NATS_OK)
return NATS_UPDATE_ERR_STACK(s);

s = _addUpdateOrGet(new_si, jsStreamActionGet, js, &cfg, opts, errCode);
if (s == NATS_NOT_FOUND)
s = js_setOpts(&nc, &freePfx, js, opts, &o);
if (s == NATS_OK)
{
nats_clearLastError();
return s;
if (nats_asprintf(&subj, jsApiStreamInfoT, js_lenWithoutTrailingDot(o.Prefix), o.Prefix, stream) < 0)
s = nats_setDefaultError(NATS_NO_MEMORY);

if (freePfx)
NATS_FREE((char*) o.Prefix);
}

if (!nats_IsStringEmpty(o.Stream.Info.SubjectsFilter))
{
doPage = true;
memset(&page, 0, sizeof(apiPaged));
}

do
{
// This will return a buffer if the request was marshal'ed
// (due to presence of options)
IFOK(s, _marshalStreamInfoReq(&buf, &(o.Stream.Info), offset));
if ((s == NATS_OK) && (buf != NULL) && (natsBuf_Len(buf) > 0))
{
req = natsBuf_Data(buf);
reqLen = natsBuf_Len(buf);
}

// Send the request
IFOK_JSR(s, natsConnection_Request(&resp, nc, subj, req, reqLen, o.Wait));

// If we got a response, check for error or return the stream info result.
IFOK(s, _unmarshalStreamCreateResp(&si, &page, resp, errCode));

// If there was paging, we need to collect subjects until we get them all.
if ((s == NATS_OK) && doPage)
{
if (si->State.Subjects != NULL)
{
int sc = si->State.Subjects->Count;
offset += (int64_t) sc;
if (subjects == NULL)
subjects = si->State.Subjects;
else
{
jsStreamStateSubject *list = subjects->List;
int prev = subjects->Count;

list = (jsStreamStateSubject*) NATS_REALLOC(list, (prev + sc) * sizeof(jsStreamStateSubject));
if (list == NULL)
s = nats_setDefaultError(NATS_NO_MEMORY);
else
{
int i;
for (i=0; i<sc; i++)
{
list[prev+i].Subject = si->State.Subjects->List[i].Subject;
list[prev+i].Msgs = si->State.Subjects->List[i].Msgs;
}
NATS_FREE(si->State.Subjects->List);
NATS_FREE(si->State.Subjects);
subjects->List = list;
subjects->Count += sc;
}
}
if (s == NATS_OK)
si->State.Subjects = NULL;
}
done = offset >= page.total;
if (!done)
{
jsStreamInfo_Destroy(si);
si = NULL;
// Reset the request buffer, we may be able to reuse.
natsBuf_Reset(buf);
}
}
natsMsg_Destroy(resp);
resp = NULL;
}
while ((s == NATS_OK) && doPage && !done);

natsBuf_Destroy(buf);
NATS_FREE(subj);

if (s == NATS_OK)
{
if (doPage && (subjects != NULL))
si->State.Subjects = subjects;

*new_si = si;
}
else
{
if (subjects != NULL)
_destroyStreamStateSubjects(subjects);

if (s == NATS_NOT_FOUND)
{
nats_clearLastError();
return s;
}
}
return NATS_UPDATE_ERR_STACK(s);
}
Expand Down
Loading

0 comments on commit eeafb9f

Please sign in to comment.