Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abort handler signal on error/return #786

Merged
merged 1 commit into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions packages/connect/src/protocol-connect/handler-factory.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,14 @@ describe("createHandlerFactory()", function () {
it("should propogate errors back to the handler", async function () {
let resolve: (e: unknown) => void;
const catchError = new Promise<unknown>((r) => (resolve = r));
let abortResolve: () => void;
const abortCalled = new Promise<void>((r) => (abortResolve = r));
const { handler } = setupTestHandler(
testService.methods.serverStreaming,
{},
// eslint-disable-next-line @typescript-eslint/require-await
async function* (req) {
async function* (req, { signal }) {
signal.addEventListener("abort", abortResolve);
try {
yield { value: `${req.value}` };
fail("expected error");
Expand All @@ -192,7 +195,8 @@ describe("createHandlerFactory()", function () {
await it.next();
const writeError = new Error("write error");
await it.throw?.(writeError).catch(() => {});
expect(await catchError).toEqual(writeError);
await expectAsync(catchError).toBeResolvedTo(writeError);
await expectAsync(abortCalled).toBeResolved();
});
});

Expand Down
24 changes: 23 additions & 1 deletion packages/connect/src/protocol-connect/handler-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,30 @@ function createStreamHandler<I extends Message<I>, O extends Message<O>>(
// raises an error, but we want to be lenient
),
);
const it = transformInvokeImplementation<I, O>(
spec,
context,
)(inputIt)[Symbol.asyncIterator]();
const outputIt = pipe(
transformInvokeImplementation<I, O>(spec, context)(inputIt),
// We wrap the iterator in an async iterator to ensure that the
// abort signal is aborted when the iterator is done.
{
[Symbol.asyncIterator]() {
return {
next: () => it.next(),
throw: (e: unknown) => {
context.abort(e);
return it.throw?.(e) ?? Promise.reject({ done: true });
},
return: (v: unknown) => {
context.abort();
return (
it.return?.(v) ?? Promise.resolve({ done: true, value: v })
);
},
};
},
},
transformSerializeEnvelope(serialization.getO(type.binary)),
transformCatchFinally<EnvelopedMessage>((e) => {
context.abort();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,14 @@ describe("createHandlerFactory()", function () {
it("should propogate errors back to the handler", async function () {
let resolve: (e: unknown) => void;
const catchError = new Promise<unknown>((r) => (resolve = r));
let abortResolve: () => void;
const abortCalled = new Promise<void>((r) => (abortResolve = r));
const { handler } = setupTestHandler(
testService.methods.serverStreaming,
{},
// eslint-disable-next-line @typescript-eslint/require-await
async function* (req) {
async function* (req, { signal }) {
signal.addEventListener("abort", abortResolve);
try {
yield { value: `${req.value}` };
fail("expected error");
Expand All @@ -155,7 +158,8 @@ describe("createHandlerFactory()", function () {
await it.next();
const writeError = new Error("write error");
await it.throw?.(writeError).catch(() => {});
expect(await catchError).toEqual(writeError);
await expectAsync(catchError).toBeResolvedTo(writeError);
await expectAsync(abortCalled).toBeResolved();
});
});

Expand Down
24 changes: 23 additions & 1 deletion packages/connect/src/protocol-grpc-web/handler-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,30 @@ function createHandler<I extends Message<I>, O extends Message<O>>(
// raises an error, but we want to be lenient
),
);
const it = transformInvokeImplementation<I, O>(
spec,
context,
)(inputIt)[Symbol.asyncIterator]();
const outputIt = pipe(
transformInvokeImplementation<I, O>(spec, context)(inputIt),
// We wrap the iterator in an async iterator to ensure that the
// abort signal is aborted when the iterator is done.
{
[Symbol.asyncIterator]() {
return {
next: () => it.next(),
throw: (e: unknown) => {
context.abort(e);
return it.throw?.(e) ?? Promise.reject({ done: true });
},
return: (v: unknown) => {
context.abort();
return (
it.return?.(v) ?? Promise.resolve({ done: true, value: v })
);
},
};
},
},
transformSerializeEnvelope(serialization.getO(type.binary)),
transformCatchFinally<EnvelopedMessage>((e) => {
context.abort();
Expand Down
8 changes: 6 additions & 2 deletions packages/connect/src/protocol-grpc/handler-factory.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,14 @@ describe("createHandlerFactory()", function () {
it("should propogate errors back to the handler", async function () {
let resolve: (e: unknown) => void;
const catchError = new Promise<unknown>((r) => (resolve = r));
let abortResolve: () => void;
const abortCalled = new Promise<void>((r) => (abortResolve = r));
const { handler } = setupTestHandler(
testService.methods.serverStreaming,
{},
// eslint-disable-next-line @typescript-eslint/require-await
async function* (req) {
async function* (req, { signal }) {
signal.addEventListener("abort", abortResolve);
try {
yield { value: `${req.value}` };
fail("expected error");
Expand All @@ -155,7 +158,8 @@ describe("createHandlerFactory()", function () {
await it.next();
const writeError = new Error("write error");
await it.throw?.(writeError).catch(() => {});
expect(await catchError).toEqual(writeError);
await expectAsync(catchError).toBeResolvedTo(writeError);
await expectAsync(abortCalled).toBeResolved();
});
});

Expand Down
24 changes: 23 additions & 1 deletion packages/connect/src/protocol-grpc/handler-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,30 @@ function createHandler<I extends Message<I>, O extends Message<O>>(
transformDecompressEnvelope(compression.request, opt.readMaxBytes),
transformParseEnvelope(serialization.getI(type.binary)),
);
const it = transformInvokeImplementation<I, O>(
spec,
context,
)(inputIt)[Symbol.asyncIterator]();
const outputIt = pipe(
transformInvokeImplementation<I, O>(spec, context)(inputIt),
// We wrap the iterator in an async iterator to ensure that the
// abort signal is aborted when the iterator is done.
{
[Symbol.asyncIterator]() {
return {
next: () => it.next(),
throw: (e: unknown) => {
context.abort(e);
return it.throw?.(e) ?? Promise.reject({ done: true });
},
return: (v: unknown) => {
context.abort();
return (
it.return?.(v) ?? Promise.resolve({ done: true, value: v })
);
},
};
},
},
transformSerializeEnvelope(serialization.getO(type.binary)),
transformCompressEnvelope(compression.response, opt.compressMinBytes),
transformJoinEnvelopes(),
Expand Down