Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix live query UUID #236

Merged
merged 2 commits into from
Apr 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions src/surreal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { Engine, HttpEngine, WebsocketEngine } from "./library/engine.ts";
import { RecordId } from "./library/cbor/recordid.ts";
import { Emitter } from "./library/emitter.ts";
import { processAuthVars } from "./library/processAuthVars.ts";
import type { UUID } from "./library/cbor/uuid.ts";
import {
Action,
type ActionResult,
Expand Down Expand Up @@ -302,7 +303,7 @@ export class Surreal {
>,
>(table: string, callback?: LiveHandler<Result>, diff?: boolean) {
await this.ready;
const res = await this.rpc<string>("live", [table, diff]);
const res = await this.rpc<UUID>("live", [table, diff]);

if (res.error) throw new ResponseError(res.error.message);
if (callback) this.subscribeLive<Result>(res.result, callback);
Expand All @@ -320,7 +321,7 @@ export class Surreal {
string,
unknown
>,
>(queryUuid: string, callback: LiveHandler<Result>) {
>(queryUuid: UUID, callback: LiveHandler<Result>) {
await this.ready;
if (!this.connection) throw new NoActiveSocket();
this.connection.emitter.subscribe(
Expand All @@ -343,7 +344,7 @@ export class Surreal {
string,
unknown
>,
>(queryUuid: string, callback: LiveHandler<Result>) {
>(queryUuid: UUID, callback: LiveHandler<Result>) {
await this.ready;
if (!this.connection) throw new NoActiveSocket();
this.connection.emitter.unSubscribe(
Expand All @@ -357,9 +358,9 @@ export class Surreal {

/**
* Kill a live query
* @param uuid - The query that you want to kill.
* @param queryUuid - The query that you want to kill.
*/
async kill(queryUuid: string | string[]) {
async kill(queryUuid: UUID | readonly UUID[]) {
await this.ready;
if (!this.connection) throw new NoActiveSocket();
if (Array.isArray(queryUuid)) {
Expand Down
7 changes: 6 additions & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { z } from "npm:zod";
import { RecordId } from "./library/cbor/recordid.ts";
import { Surreal } from "./surreal.ts";
import { UUID } from "./library/cbor/uuid.ts";

export const UseOptions = z.object({
namespace: z.coerce.string(),
Expand Down Expand Up @@ -260,7 +261,11 @@ export const Action = z.union([
export type Action = z.infer<typeof Action>;

export const LiveResult = z.object({
id: z.string().uuid(),
id: z.instanceof(UUID as never) as z.ZodType<
typeof UUID,
z.ZodTypeDef,
typeof UUID
>,
action: Action,
result: z.record(z.unknown()),
});
Expand Down
1 change: 1 addition & 0 deletions tests/integration/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ new Deno.Command("surreal", {
await new Promise((r) => setTimeout(r, 1000));
import "./tests/auth.ts";
import "./tests/querying.ts";
import "./tests/live.ts";
175 changes: 175 additions & 0 deletions tests/integration/tests/live.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import { createSurreal } from "../surreal.ts";

import {
assert,
assertEquals,
assertGreater,
assertNotEquals,
assertRejects,
} from "https://deno.land/std@0.223.0/assert/mod.ts";
import { type Action, RecordId, UUID } from "../../../mod.ts";

async function withTimeout(p: Promise<void>, ms: number): Promise<void> {
const { promise, resolve, reject } = Promise.withResolvers<void>();
const timeout = setTimeout(() => reject(new Error("Timeout")), ms);

await Promise.race([
promise,
p.then(resolve).finally(() => clearTimeout(timeout)),
]);
}

Deno.test("live", async () => {
const surreal = await createSurreal();

if (surreal.connection?.connection.url?.protocol !== "ws:") {
await assertRejects(async () => {
await surreal.live("person");
});
} else {
const events: { action: Action; result: Record<string, unknown> }[] =
[];
const { promise, resolve } = Promise.withResolvers<void>();

const queryUuid = await surreal.live("person", (action, result) => {
events.push({ action, result });
if (action === "DELETE") resolve();
});

assert(queryUuid instanceof UUID);

await surreal.create(new RecordId("person", 1), {
firstname: "John",
lastname: "Doe",
});
await surreal.update(new RecordId("person", 1), {
firstname: "Jane",
lastname: "Doe",
});
await surreal.delete(new RecordId("person", 1));

await withTimeout(promise, 5e3); // Wait for the DELETE event

assertEquals(events, [
{
action: "CREATE",
result: {
id: new RecordId("person", 1),
firstname: "John",
lastname: "Doe",
},
},
{
action: "UPDATE",
result: {
id: new RecordId("person", 1),
firstname: "Jane",
lastname: "Doe",
},
},
{
action: "DELETE",
result: {
id: new RecordId("person", 1),
firstname: "Jane",
lastname: "Doe",
},
},
]);
}

await surreal.close();
});

Deno.test("unsubscribe live", async () => {
const surreal = await createSurreal();

if (surreal.connection?.connection.url?.protocol !== "ws:") {
// Not supported
} else {
const { promise, resolve } = Promise.withResolvers<void>();

let primaryLiveHandlerCallCount = 0;
let secondaryLiveHandlerCallCount = 0;

const primaryLiveHandler = () => {
primaryLiveHandlerCallCount += 1;
};
const secondaryLiveHandler = () => {
secondaryLiveHandlerCallCount += 1;
};

const queryUuid = await surreal.live("person", (action: Action) => {
if (action === "DELETE") resolve();
});
await surreal.subscribeLive(queryUuid, primaryLiveHandler);
await surreal.subscribeLive(queryUuid, secondaryLiveHandler);

await surreal.create(new RecordId("person", 1), { firstname: "John" });

await surreal.unSubscribeLive(queryUuid, secondaryLiveHandler);

await surreal.update(new RecordId("person", 1), { firstname: "Jane" });
await surreal.delete(new RecordId("person", 1));

await withTimeout(promise, 5e3); // Wait for the DELETE event

assertGreater(
primaryLiveHandlerCallCount,
secondaryLiveHandlerCallCount,
);
}

await surreal.close();
});

Deno.test("kill", async () => {
const surreal = await createSurreal();

if (surreal.connection?.connection.url?.protocol !== "ws:") {
// Not supported
} else {
const { promise, resolve } = Promise.withResolvers<void>();

let primaryLiveHandlerCallCount = 0;
let secondaryLiveHandlerCallCount = 0;

const primaryLiveHandler = (action: Action) => {
primaryLiveHandlerCallCount += 1;
if (action === "DELETE") resolve();
};
const secondaryLiveHandler = () => {
secondaryLiveHandlerCallCount += 1;
};

const primaryQueryUuid = await surreal.live(
"person",
primaryLiveHandler,
);
const secondaryQueryUuid = await surreal.live(
"person",
secondaryLiveHandler,
);

assertNotEquals(
primaryQueryUuid.toString(),
secondaryQueryUuid.toString(),
);

await surreal.create(new RecordId("person", 1), { firstname: "John" });

await surreal.kill(secondaryQueryUuid);

await surreal.update(new RecordId("person", 1), { firstname: "Jane" });
await surreal.delete(new RecordId("person", 1));

await withTimeout(promise, 5e3); // Wait for the DELETE event

assertGreater(
primaryLiveHandlerCallCount,
secondaryLiveHandlerCallCount,
);
}

await surreal.close();
});