Skip to content

Commit

Permalink
exposed and enabled queue option for consumer opts (#162)
Browse files Browse the repository at this point in the history
FIX #nats-io/nats.js/issues/432
  • Loading branch information
aricart authored Jun 24, 2021
1 parent 340f418 commit 46c7f7b
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 0 deletions.
1 change: 1 addition & 0 deletions nats-base-client/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ export class JetStreamClientImpl extends BaseApiClient
so.dispatchedFn = autoAckJsMsg;
}
so.max = jsi.max || 0;
so.queue = jsi.queue;
return so;
}

Expand Down
6 changes: 6 additions & 0 deletions nats-base-client/jsconsumeropts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ export class ConsumerOptsBuilderImpl implements ConsumerOptsBuilder {
stream: string;
callbackFn?: JsMsgCallback;
max?: number;
qname?: string;

constructor(opts?: Partial<ConsumerConfig>) {
this.stream = "";
Expand All @@ -58,6 +59,7 @@ export class ConsumerOptsBuilderImpl implements ConsumerOptsBuilder {
o.stream = this.stream;
o.callbackFn = this.callbackFn;
o.max = this.max;
o.queue = this.qname;
return o;
}

Expand Down Expand Up @@ -129,6 +131,10 @@ export class ConsumerOptsBuilderImpl implements ConsumerOptsBuilder {
callback(fn: JsMsgCallback) {
this.callbackFn = fn;
}

queue(n: string) {
this.qname = n;
}
}

export function isConsumerOptsBuilder(
Expand Down
3 changes: 3 additions & 0 deletions nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ export interface ConsumerOpts {

// standard
max?: number;
queue?: string;
debug?: boolean;
}

Expand Down Expand Up @@ -355,6 +356,8 @@ export interface ConsumerOptsBuilder {
maxWaiting(max: number): void;
// standard nats subscribe option for the maximum number of messages to receive on the subscription
maxMessages(max: number): void;
// standard nats queue group option
queue(n: string): void;
// callback to process messages (or iterator is returned)
callback(fn: JsMsgCallback): void;
}
Expand Down
32 changes: 32 additions & 0 deletions tests/jetstream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1556,3 +1556,35 @@ Deno.test("jetstream - JSON", async () => {
}
await cleanup(ns, nc);
});

Deno.test("jetstream - qsub", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
const { subj } = await initStream(nc);
const js = nc.jetstream();

const opts = consumerOpts();
opts.queue("q");
opts.durable("n");
opts.deliverTo("here");
opts.callback((err, m) => {
if (m) {
m.ack();
}
});

const sub = await js.subscribe(subj, opts);
const sub2 = await js.subscribe(subj, opts);

for (let i = 0; i < 100; i++) {
await js.publish(subj, Empty);
}
await nc.flush();
await sub.drain();
await sub2.drain();

assert(sub.getProcessed() > 0);
assert(sub2.getProcessed() > 0);
assertEquals(sub.getProcessed() + sub2.getProcessed(), 100);

await cleanup(ns, nc);
});

0 comments on commit 46c7f7b

Please sign in to comment.