Skip to content

Commit

Permalink
[FEAT] start a watcher at a specific revision
Browse files Browse the repository at this point in the history
FIX #650
  • Loading branch information
aricart committed Jan 4, 2024
1 parent 5c8f1f8 commit 27083aa
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 14 deletions.
3 changes: 3 additions & 0 deletions jetstream/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,9 @@ export class Bucket implements KV, KvRemove {
const subj = cc.filter_subject!;
const copts = consumerOpts(cc);
copts.bindStream(this.stream);
if (opts.resumeFromRevision && opts.resumeFromRevision > 0) {
copts.startSequence(opts.resumeFromRevision);
}
copts.orderedConsumer();
copts.callback((err, jm) => {
if (err) {
Expand Down
21 changes: 21 additions & 0 deletions jetstream/tests/kv_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1945,3 +1945,24 @@ Deno.test("kv - compression", async () => {
assertEquals(status.compression, false);
await cleanup(ns, nc);
});

Deno.test("kv - watch start at", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const js = nc.jetstream();
const kv = await js.views.kv("a");
await kv.put("a", "1");
await kv.put("b", "2");
await kv.put("c", "3");

const iter = await kv.watch({ resumeFromRevision: 2 });
await (async () => {
for await (const o of iter) {
// expect first key to be "b"
assertEquals(o.key, "b");
assertEquals(o.revision, 2);
break;
}
})();

await cleanup(ns, nc);
});
6 changes: 6 additions & 0 deletions jetstream/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1135,6 +1135,12 @@ export type KvWatchOptions = {
* Specify what to include in the watcher, by default all last values.
*/
include?: KvWatchInclude;
/**
* Starts watching at the specified revision. This is intended for watchers
* that have restarted watching and have maintained some state of where they are
* in the watch.
*/
resumeFromRevision?: number;
};

export interface RoKV {
Expand Down
14 changes: 0 additions & 14 deletions nats-base-client/error.ts

This file was deleted.

0 comments on commit 27083aa

Please sign in to comment.