Skip to content

Commit

Permalink
Add getConsumerFromInfo method to get a by Consumer with a ConsumerIn…
Browse files Browse the repository at this point in the history
…fo (bypassing additional lookup)
  • Loading branch information
aricart committed Jan 7, 2025
1 parent 052a20e commit f9c2ab9
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 1 deletion.
13 changes: 13 additions & 0 deletions jetstream/jsmstream_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import {
} from "../nats-base-client/core.ts";
import {
ApiPagedRequest,
ConsumerInfo,
ExternalStream,
MsgDeleteRequest,
MsgRequest,
Expand Down Expand Up @@ -110,6 +111,13 @@ export class ConsumersImpl implements Consumers {
return Promise.resolve();
}

getConsumerFromInfo(ci: ConsumerInfo): Consumer {
if (ci.config.deliver_subject !== undefined) {
throw new Error("push consumer not supported");
}
return new PullConsumerImpl(this.api, ci);
}

async get(
stream: string,
name: string | Partial<OrderedConsumerOptions> = {},
Expand Down Expand Up @@ -196,6 +204,11 @@ export class StreamImpl implements Stream {
});
}

getConsumerFromInfo(ci: ConsumerInfo): Consumer {
return new ConsumersImpl(new ConsumerAPIImpl(this.api.nc, this.api.opts))
.getConsumerFromInfo(ci);
}

getConsumer(
name?: string | Partial<OrderedConsumerOptions>,
): Promise<Consumer> {
Expand Down
96 changes: 95 additions & 1 deletion jetstream/tests/consumers_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {
assertRejects,
assertStringIncludes,
} from "https://deno.land/std@0.221.0/assert/mod.ts";
import { deferred, nanos } from "../../nats-base-client/mod.ts";
import { deferred, delay, nanos } from "../../nats-base-client/mod.ts";
import {
AckPolicy,
Consumer,
Expand Down Expand Up @@ -551,3 +551,97 @@ Deno.test("consumers - inboxPrefix is respected", async () => {
await done;
await cleanup(ns, nc);
});

Deno.test("consumers - getFromConsumerInfo", async () => {
const { ns, nc } = await setup(jetstreamServerConf(), { inboxPrefix: "x" });
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "messages", subjects: ["hello"] });

const js = nc.jetstream();
await js.publish("hello");

const ci = await jsm.consumers.add("messages", {
durable_name: "c",
deliver_policy: DeliverPolicy.All,
ack_policy: AckPolicy.Explicit,
ack_wait: nanos(3000),
max_waiting: 500,
});

let c = 0;
nc.subscribe("$JS.API.CONSUMER.INFO.>", {
callback: (_) => {
c++;
},
});

const consumer = js.consumers.getConsumerFromInfo(ci);
const iter = await consumer.consume({ bind: true });

for await (const _m of iter) {
break;
}

await nc.flush();
assertEquals(c, 0);

await cleanup(ns, nc);
});

Deno.test("consumers - getFromConsumerInfo non existing misses heartbeats", async () => {
const { ns, nc } = await setup(jetstreamServerConf(), { inboxPrefix: "x" });
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "messages", subjects: ["hello"] });

const js = nc.jetstream();
await js.publish("hello");

const ci = await jsm.consumers.add("messages", {
durable_name: "c",
deliver_policy: DeliverPolicy.All,
ack_policy: AckPolicy.Explicit,
ack_wait: nanos(3000),
max_waiting: 500,
inactive_threshold: nanos(2000),
});

const consumer = js.consumers.getConsumerFromInfo(ci);
await consumer.delete();

await delay(1000);
await assertRejects(
() => {
return jsm.consumers.info("messages", "c");
},
Error,
"consumer not found",
);

let c = 0;
nc.subscribe("$JS.API.CONSUMER.INFO.>", {
callback: (_) => {
c++;
},
});

const iter = await consumer.consume({ bind: true, idle_heartbeat: 1000 });
(async () => {
const status = await iter.status();
for await (const s of status) {
if (s.type === ConsumerEvents.HeartbeatsMissed) {
if (s.data === 2) {
iter.stop();
}
}
}
})().catch();

for await (const _m of iter) {
break;
}

await nc.flush();
assertEquals(c, 0);

await cleanup(ns, nc);
});
11 changes: 11 additions & 0 deletions jetstream/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,15 @@ export interface Consumers {
stream: string,
name?: string | Partial<OrderedConsumerOptions>,
): Promise<Consumer>;

/**
* Returns a Consumer API based on the ConsumerInfo specified.
* Note this method can throw, and it doesn't validate that the
* underlying consumer exists. When using a consumer obtained
* by this method it is important to check for ConsumerEvents#HeartbeatsMissed
* @param info
*/
getConsumerFromInfo(info: ConsumerInfo): Consumer;
}

export interface ConsumerOpts {
Expand Down Expand Up @@ -885,6 +894,8 @@ export interface Stream {
name?: string | Partial<OrderedConsumerOptions>,
): Promise<Consumer>;

getConsumerFromInfo(ci: ConsumerInfo): Consumer;

getMessage(query: MsgRequest): Promise<StoredMsg>;

deleteMessage(seq: number, erase?: boolean): Promise<boolean>;
Expand Down

0 comments on commit f9c2ab9

Please sign in to comment.