Skip to content

Commit

Permalink
feat: Allow cancelling in-flight RPC operations with per-link*() fu…
Browse files Browse the repository at this point in the history
…nction context in TypeScript registry implementation

Signed-off-by: Felicitas Pojtinger <felicitas@pojtinger.com>
  • Loading branch information
pojntfx committed Aug 9, 2024
1 parent 079231a commit 8521e10
Showing 1 changed file with 21 additions and 4 deletions.
25 changes: 21 additions & 4 deletions ts/src/rpc/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
import { ILocalContext, IRemoteContext } from "./context";
import { ClosureManager, registerClosure } from "./manager";

export const ErrorCallCancelled = "call timed out";
export const ErrorCallAborted = "call aborted";
export const ErrorCannotCallNonFunction = "can not call non function";

const constructorFunctionName = "constructor";
Expand Down Expand Up @@ -50,6 +50,10 @@ export const remoteClosure = (

const makeRPC =
<T>(
// This is separate from the AbortSignal that is the first argument to each RPC because we also
// want to be able to cancel all in-flight RPCs if the signal passed to a `link*()` function is cancelled
linkSignal: AbortSignal | undefined,

name: string,
responseResolver: EventTarget,

Expand All @@ -61,8 +65,8 @@ const makeRPC =
) =>
async (ctx: IRemoteContext, ...rest: any[]) =>
new Promise((res, rej) => {
if (ctx?.signal?.aborted) {
rej(new Error(ErrorCallCancelled));
if (ctx?.signal?.aborted || linkSignal?.aborted) {
rej(new Error(ErrorCallAborted));

return;
}
Expand Down Expand Up @@ -91,14 +95,15 @@ const makeRPC =
closureFreers.map((free) => free());

const callResponse: ICallResponse = {
err: ErrorCallCancelled,
err: ErrorCallAborted,
};

responseResolver.dispatchEvent(
new CustomEvent(`rpc:${callID}`, { detail: callResponse })
);
};
ctx?.signal?.addEventListener("abort", abortListener);
linkSignal?.addEventListener("abort", abortListener);

const returnListener = (event: Event) => {
const { value, err } = (event as CustomEvent<ICallResponse>).detail;
Expand Down Expand Up @@ -147,6 +152,7 @@ export class Registry<L extends Object, R extends Object> {

/**
* Expose local RPCs and implement remote RPCs via a message-based transport
* @param signal AbortSignal for in-flight RPC operations
* @param requestWriter Stream to write requests to
* @param responseWriter Stream to write responses to
* @param requestReader Stream to read requests from
Expand All @@ -156,6 +162,8 @@ export class Registry<L extends Object, R extends Object> {
* @param hooks Link hooks
*/
linkMessage = <T>(
signal: AbortSignal | undefined,

requestWriter: WritableStreamDefaultWriter<T>,
responseWriter: WritableStreamDefaultWriter<T>,

Expand All @@ -180,6 +188,8 @@ export class Registry<L extends Object, R extends Object> {
}

(r as any)[functionName] = makeRPC(
signal,

functionName,
responseResolver,

Expand Down Expand Up @@ -255,6 +265,8 @@ export class Registry<L extends Object, R extends Object> {
remoteClosureParameterIndexes?.includes(index + 1)
? (closureCtx: IRemoteContext, ...closureArgs: any[]) => {
const rpc = makeRPC<T>(
signal,

"CallClosure",
responseResolver,

Expand Down Expand Up @@ -338,13 +350,16 @@ export class Registry<L extends Object, R extends Object> {

/**
* Expose local RPCs and implement remote RPCs via a stream-based transport
* @param signal AbortSignal for in-flight RPC operations
* @param encoder Stream to write messages to
* @param decoder Stream to read messages from
* @param marshal Function to marshal nested values with
* @param unmarshal Function to unmarshal nested values with
* @param hooks Link hooks
*/
linkStream = <T>(
signal: AbortSignal | undefined,

encoder: WritableStream,
decoder: ReadableStream,

Expand Down Expand Up @@ -454,6 +469,8 @@ export class Registry<L extends Object, R extends Object> {
});

this.linkMessage(
signal,

requestWriter.getWriter(),
responseWriter.getWriter(),

Expand Down

0 comments on commit 8521e10

Please sign in to comment.