Skip to content

Commit

Permalink
fix(jetstream): ordered pull consumer would never fail to reset on in…
Browse files Browse the repository at this point in the history
…itial creation if there was an error

Fix #725
  • Loading branch information
aricart committed Aug 20, 2024
1 parent 9c09cd6 commit 9f46065
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 2 deletions.
7 changes: 5 additions & 2 deletions jetstream/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,7 @@ export class OrderedPullConsumerImpl implements Consumer {
iter: OrderedConsumerMessages | null;
type: PullConsumerType;
startSeq: number;
maxInitialReset: number;

constructor(
api: ConsumerAPI,
Expand All @@ -998,6 +999,7 @@ export class OrderedPullConsumerImpl implements Consumer {
this.iter = null;
this.type = PullConsumerType.Unset;
this.consumerOpts = opts;
this.maxInitialReset = 30;

// to support a random start sequence we need to update the cursor
this.startSeq = this.consumerOpts.opt_start_seq || 0;
Expand Down Expand Up @@ -1067,6 +1069,7 @@ export class OrderedPullConsumerImpl implements Consumer {
}

async resetConsumer(seq = 0): Promise<ConsumerInfo> {
const isNew = this.serial === 0;
// try to delete the consumer
this.consumer?.delete().catch(() => {});
seq = seq === 0 ? 1 : seq;
Expand Down Expand Up @@ -1096,7 +1099,7 @@ export class OrderedPullConsumerImpl implements Consumer {
}
}

if (seq === 0 && i >= 30) {
if (isNew && i >= this.maxInitialReset) {
// consumer was never created, so we can fail this
throw err;
} else {
Expand Down Expand Up @@ -1294,7 +1297,7 @@ export class OrderedPullConsumerImpl implements Consumer {

async info(cached?: boolean): Promise<ConsumerInfo> {
if (this.currentConsumer == null) {
this.currentConsumer = await this.resetConsumer(this.serial);
this.currentConsumer = await this.resetConsumer(this.startSeq);
return Promise.resolve(this.currentConsumer);
}
if (cached && this.currentConsumer) {
Expand Down
21 changes: 21 additions & 0 deletions jetstream/tests/consumers_ordered_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1052,3 +1052,24 @@ Deno.test("ordered consumers - next reset", async () => {

await cleanup(ns, nc);
});

Deno.test("ordered consumers - initial creation fails, consumer fails", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await nc.jetstreamManager();

await jsm.streams.add({ name: "A", subjects: ["a"] });
const js = nc.jetstream();

const c = await js.consumers.get("A") as OrderedPullConsumerImpl;
await jsm.streams.delete("A");
c.maxInitialReset = 3;
await assertRejects(
() => {
return c.consume();
},
Error,
"stream not found",
);

await cleanup(ns, nc);
});

0 comments on commit 9f46065

Please sign in to comment.