Skip to content

Commit

Permalink
Changed RePublish from jsSubjectMapping to jsRePublish + HeadersOnly
Browse files Browse the repository at this point in the history
Since this feature was not released, made the change to the name
of the structure and added the HeadersOnly boolean to be on-par
with the server.

Related to #546

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Jul 22, 2022
1 parent 418dd00 commit 7f14956
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 50 deletions.
50 changes: 26 additions & 24 deletions src/jsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,14 @@ _destroyStreamSource(jsStreamSource *source)
}

static void
_destroySubjectMapping(jsSubjectMapping *sm)
_destroyRePublish(jsRePublish *rp)
{
if (sm == NULL)
if (rp == NULL)
return;

NATS_FREE((char*) sm->Source);
NATS_FREE((char*) sm->Destination);
NATS_FREE(sm);
NATS_FREE((char*) rp->Source);
NATS_FREE((char*) rp->Destination);
NATS_FREE(rp);
}

void
Expand All @@ -127,7 +127,7 @@ js_destroyStreamConfig(jsStreamConfig *cfg)
for (i=0; i<cfg->SourcesLen; i++)
_destroyStreamSource(cfg->Sources[i]);
NATS_FREE(cfg->Sources);
_destroySubjectMapping(cfg->RePublish);
_destroyRePublish(cfg->RePublish);
NATS_FREE(cfg);
}

Expand Down Expand Up @@ -510,27 +510,28 @@ _marshalStorageType(jsStorageType storage, natsBuffer *buf)
}

static natsStatus
_unmarshalRePublish(nats_JSON *json, const char *fieldName, jsSubjectMapping **new_mapping)
_unmarshalRePublish(nats_JSON *json, const char *fieldName, jsRePublish **new_republish)
{
jsSubjectMapping *sm = NULL;
jsRePublish *rp = NULL;
nats_JSON *jsm = NULL;
natsStatus s;

s = nats_JSONGetObject(json, fieldName, &jsm);
if (jsm == NULL)
return NATS_UPDATE_ERR_STACK(s);

sm = (jsSubjectMapping*) NATS_CALLOC(1, sizeof(jsSubjectMapping));
if (sm == NULL)
rp = (jsRePublish*) NATS_CALLOC(1, sizeof(jsRePublish));
if (rp == NULL)
return nats_setDefaultError(NATS_NO_MEMORY);

s = nats_JSONGetStr(jsm, "src", (char**) &(sm->Source));
IFOK(s, nats_JSONGetStr(jsm, "dest", (char**) &(sm->Destination)));
s = nats_JSONGetStr(jsm, "src", (char**) &(rp->Source));
IFOK(s, nats_JSONGetStr(jsm, "dest", (char**) &(rp->Destination)));
IFOK(s, nats_JSONGetBool(jsm, "headers_only", &(rp->HeadersOnly)));

if (s == NATS_OK)
*new_mapping = sm;
*new_republish = rp;
else
_destroySubjectMapping(sm);
_destroyRePublish(rp);

return NATS_UPDATE_ERR_STACK(s);

Expand Down Expand Up @@ -702,20 +703,22 @@ js_marshalStreamConfig(natsBuffer **new_buf, jsStreamConfig *cfg)
IFOK(s, natsBuf_Append(buf, ",\"deny_purge\":true", -1));
if ((s == NATS_OK) && cfg->AllowRollup)
IFOK(s, natsBuf_Append(buf, ",\"allow_rollup_hdrs\":true", -1));
if ((s == NATS_OK) && (cfg->RePublish != NULL))
if ((s == NATS_OK) && (cfg->RePublish != NULL) && !nats_IsStringEmpty(cfg->RePublish->Destination))
{
// "dest" is not omitempty, in that the field will always be present.
IFOK(s, natsBuf_Append(buf, ",\"republish\":{\"dest\":\"", -1));
// Still check that our value is not NULL
if (!nats_IsStringEmpty(cfg->RePublish->Destination))
IFOK(s, natsBuf_Append(buf, cfg->RePublish->Destination, -1));
IFOK(s, natsBuf_Append(buf, cfg->RePublish->Destination, -1));
IFOK(s, natsBuf_AppendByte(buf, '"'));
// Now the source...
if (!nats_IsStringEmpty(cfg->RePublish->Source))
{
IFOK(s, natsBuf_Append(buf, "\",\"src\":\"", -1))
IFOK(s, natsBuf_Append(buf, ",\"src\":\"", -1))
IFOK(s, natsBuf_Append(buf, cfg->RePublish->Source, -1));
IFOK(s, natsBuf_AppendByte(buf, '"'));
}
IFOK(s, natsBuf_Append(buf, "\"}", -1));
if (cfg->RePublish->HeadersOnly)
IFOK(s, natsBuf_Append(buf, ",\"headers_only\":true", -1));
IFOK(s, natsBuf_AppendByte(buf, '}'));
}
if ((s == NATS_OK) && cfg->AllowDirect)
IFOK(s, natsBuf_Append(buf, ",\"allow_direct\":true", -1));
Expand Down Expand Up @@ -1881,13 +1884,12 @@ jsExternalStream_Init(jsExternalStream *external)
}

natsStatus
jsSubjectMapping_Init(jsSubjectMapping *sm, const char *src, const char *dst)
jsRePublish_Init(jsRePublish *rp)
{
if (sm == NULL)
if (rp == NULL)
return nats_setDefaultError(NATS_INVALID_ARG);

sm->Source = src;
sm->Destination = dst;
memset(rp, 0, sizeof(jsRePublish));
return NATS_OK;
}

Expand Down
25 changes: 13 additions & 12 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -381,12 +381,13 @@ typedef struct jsStreamSource
/**
* Allows a source subject to be mapped to a destination subject for republishing.
*/
typedef struct jsSubjectMapping
typedef struct jsRePublish
{
const char *Source;
const char *Destination;
bool HeadersOnly;

} jsSubjectMapping;
} jsRePublish;

/**
* Configuration of a JetStream stream.
Expand Down Expand Up @@ -414,7 +415,7 @@ typedef struct jsSubjectMapping
* const char *subjects[] = {"foo", "bar"};
* const char *tags[] = {"tag1", "tag2"};
* jsStreamSource *sources[] = {&s1, &s2};
* jsSubjectMapping sm;
* jsRePublish rp;
*
* jsStreamConfig_Init(&sc);
*
Expand Down Expand Up @@ -456,8 +457,10 @@ typedef struct jsSubjectMapping
* sc.SourcesLen = 2;
*
* // For RePublish subject:
* jsSubjectMapping_Init(&sm, ">", "RP.>")
* sc.RePublish = &sm;
* jsRePublish_Init(&rp);
* rp.Source = ">";
* rp.Destination = "RP.>";
* sc.RePublish = &rp;
*
* s = js_AddStream(&si, js, &sc, NULL, &jerr);
* \endcode
Expand Down Expand Up @@ -494,7 +497,7 @@ typedef struct jsStreamConfig {
bool AllowRollup;

// Allow republish of the message after being sequenced and stored.
jsSubjectMapping *RePublish;
jsRePublish *RePublish;

// Allow higher performance, direct access to get individual messages. E.g. KeyValue
bool AllowDirect;
Expand Down Expand Up @@ -5294,16 +5297,14 @@ jsStreamSource_Init(jsStreamSource *source);
NATS_EXTERN natsStatus
jsExternalStream_Init(jsExternalStream *external);

/** \brief Initializes a subject mapping structure.
/** \brief Initializes a republish structure.
*
* Use this to set the source and destination for a subject mapping.
* Use this to set the source, destination and/or headers only for a stream re-publish.
*
* @param sm the pointer to the #jsSubjectMapping to initialize.
* @param src the string for the source.
* @param dst the string for the destination.
* @param rp the pointer to the #jsRePublish to initialize.
*/
NATS_EXTERN natsStatus
jsSubjectMapping_Init(jsSubjectMapping *sm, const char *src, const char *dst);
jsRePublish_Init(jsRePublish *rp);

/** \brief Creates a stream.
*
Expand Down
25 changes: 11 additions & 14 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -21603,7 +21603,7 @@ test_JetStreamMarshalStreamConfig(void)
nats_JSON *json = NULL;
jsStreamConfig *rsc = NULL;
int64_t optStartTime = 1624583232123456000;
jsSubjectMapping sm;
jsRePublish rp;

test("init bad args: ");
s = jsStreamConfig_Init(NULL);
Expand Down Expand Up @@ -21681,22 +21681,18 @@ test_JetStreamMarshalStreamConfig(void)
sc.AllowDirect = true;
sc.MirrorDirect = true;

test("Subject mapping init err: ");
s = jsSubjectMapping_Init(NULL, "a", "b");
test("RePublish init err: ");
s = jsRePublish_Init(NULL);
testCond(s == NATS_INVALID_ARG);
nats_clearLastError();
s = NATS_OK;

test("Subject mapping init, NULL or empty are ok: ");
s = jsSubjectMapping_Init(&sm, "A", NULL);
IFOK(s, jsSubjectMapping_Init(&sm, "A", ""));
IFOK(s, jsSubjectMapping_Init(&sm, NULL, "B"));
IFOK(s, jsSubjectMapping_Init(&sm, "", "B"));
testCond(s == NATS_OK);

// Republish
jsSubjectMapping_Init(&sm, ">", "RP.>");
sc.RePublish = &sm;
jsRePublish_Init(&rp);
rp.Source = ">";
rp.Destination = "RP.>";
rp.HeadersOnly = true;
sc.RePublish = &rp;

test("Marshal stream config: ");
s = js_marshalStreamConfig(&buf, &sc);
Expand Down Expand Up @@ -21764,6 +21760,7 @@ test_JetStreamMarshalStreamConfig(void)
&& (strcmp(rsc->RePublish->Source, ">") == 0)
&& (rsc->RePublish->Destination != NULL)
&& (strcmp(rsc->RePublish->Destination, "RP.>") == 0)
&& rsc->RePublish->HeadersOnly
&& rsc->AllowDirect
&& rsc->MirrorDirect);
js_destroyStreamConfig(rsc);
Expand Down Expand Up @@ -26562,9 +26559,9 @@ test_JetStreamSubscribePull(void)

test("Ack: ");
for (i=0; (s == NATS_OK) && (i<list.Count); i++)
s = natsMsg_Ack(list.Msgs[i], NULL);
s = natsMsg_AckSync(list.Msgs[i], NULL, &jerr);
natsMsgList_Destroy(&list);
testCond(s == NATS_OK);
testCond((s == NATS_OK) && (jerr == 0));

test("Send a message: ");
s = js_Publish(NULL, js, "foo", "hello", 5, NULL, &jerr);
Expand Down

0 comments on commit 7f14956

Please sign in to comment.