Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support timeouts in handlers #591

Merged
merged 2 commits into from
Apr 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,87 +42,41 @@ describe("timeout_on_sleeping_server", function () {
const options: CallOptions = {
timeoutMs: 5,
};
// TODO(TCN-761) support deadlines in connect-es handlers
servers.describeTransportsExcluding(
[
"@bufbuild/connect-node (gRPC, binary, http2) against @bufbuild/connect-node (h2)",
"@bufbuild/connect-node (gRPC, binary, http2) against @bufbuild/connect-node (h2c)",
"@bufbuild/connect-node (gRPC, JSON, http2) against @bufbuild/connect-node (h2c)",
"@bufbuild/connect-node (gRPC, binary, http2, gzip) against @bufbuild/connect-node (h2c)",
"@bufbuild/connect-node (gRPC, JSON, http2, gzip) against @bufbuild/connect-node (h2c)",
"@bufbuild/connect-node (gRPC, binary, http) against @bufbuild/connect-node (h1)",
"@bufbuild/connect-node (gRPC, JSON, http) against @bufbuild/connect-node (h1)",
"@bufbuild/connect-node (gRPC, JSON, https) against @bufbuild/connect-node (h1 + tls)",
"@bufbuild/connect-node (gRPC, binary, https) against @bufbuild/connect-node (h1 + tls)",
"@bufbuild/connect-node (gRPC, binary, http, gzip) against @bufbuild/connect-node (h1)",
"@bufbuild/connect-node (gRPC, JSON, http, gzip) against @bufbuild/connect-node (h1)",
"@bufbuild/connect-node (gRPC, binary, http, gzip) against @bufbuild/connect-fastify (h2c)",
"@bufbuild/connect-node (gRPC, JSON, http, gzip) against @bufbuild/connect-fastify (h2c)",
"@bufbuild/connect-node (gRPC, binary, http, gzip) against @bufbuild/connect-express (h1)",
"@bufbuild/connect-node (gRPC, JSON, http, gzip) against @bufbuild/connect-express (h1)",
"@bufbuild/connect-node (Connect, binary, http2, gzip) against @bufbuild/connect-node (h2c)",
"@bufbuild/connect-node (Connect, JSON, http2, gzip) against @bufbuild/connect-node (h2c)",
"@bufbuild/connect-node (Connect, JSON, http) against @bufbuild/connect-node (h1)",
"@bufbuild/connect-node (Connect, binary, http) against @bufbuild/connect-node (h1)",
"@bufbuild/connect-node (Connect, binary, https) against @bufbuild/connect-node (h1 + tls)",
"@bufbuild/connect-node (Connect, JSON, https) against @bufbuild/connect-node (h1 + tls)",
"@bufbuild/connect-node (Connect, JSON, http, gzip) against @bufbuild/connect-node (h1)",
"@bufbuild/connect-node (Connect, binary, http, gzip) against @bufbuild/connect-node (h1)",
"@bufbuild/connect-node (Connect, JSON, http, gzip) against @bufbuild/connect-fastify (h2c)",
"@bufbuild/connect-node (Connect, binary, http, gzip) against @bufbuild/connect-fastify (h2c)",
"@bufbuild/connect-node (Connect, JSON, http, gzip) against @bufbuild/connect-express (h1)",
"@bufbuild/connect-node (Connect, binary, http, gzip) against @bufbuild/connect-express (h1)",
"@bufbuild/connect-node (gRPC-web, binary, http2) against @bufbuild/connect-node (h2c)",
"@bufbuild/connect-node (gRPC-web, JSON, http2) against @bufbuild/connect-node (h2c)",
"@bufbuild/connect-node (gRPC-web, binary, http2, gzip) against @bufbuild/connect-node (h2c)",
"@bufbuild/connect-node (gRPC-web, JSON, http2, gzip) against @bufbuild/connect-node (h2c)",
"@bufbuild/connect-node (gRPC-web, binary, http) against @bufbuild/connect-node (h1)",
"@bufbuild/connect-node (gRPC-web, JSON, http) against @bufbuild/connect-node (h1)",
"@bufbuild/connect-node (gRPC-web, JSON, https) against @bufbuild/connect-node (h1 + tls)",
"@bufbuild/connect-node (gRPC-web, binary, https) against @bufbuild/connect-node (h1 + tls)",
"@bufbuild/connect-node (gRPC-web, binary, http, gzip) against @bufbuild/connect-node (h1)",
"@bufbuild/connect-node (gRPC-web, JSON, http, gzip) against @bufbuild/connect-node (h1)",
"@bufbuild/connect-node (gRPC-web, binary, http, gzip against @bufbuild/connect-fastify (h2c)",
"@bufbuild/connect-node (gRPC-web, JSON, http, gzip) against @bufbuild/connect-fastify (h2c)",
"@bufbuild/connect-node (gRPC-web, JSON, http, gzip) against @bufbuild/connect-express (h1)",
"@bufbuild/connect-node (gRPC-web, binary, http, gzip) against @bufbuild/connect-express (h1)",
],
(transport) => {
it("with promise client", async function () {
const client = createPromiseClient(TestService, transport());
try {
for await (const response of client.streamingOutputCall(
request,
options
)) {
fail(
`expecting no response from sleeping server, got: ${response.toJsonString()}`
);
}
fail("expected to catch an error");
} catch (e) {
expect(e).toBeInstanceOf(ConnectError);
expect(connectErrorFromReason(e).code).toBe(Code.DeadlineExceeded);
}
});
it("with callback client", function (done) {
const client = createCallbackClient(TestService, transport());
client.streamingOutputCall(
servers.describeTransports((transport) => {
it("with promise client", async function () {
const client = createPromiseClient(TestService, transport());
try {
for await (const response of client.streamingOutputCall(
request,
(response) => {
fail(
`expecting no response from sleeping server, got: ${response.toJsonString()}`
);
},
(err: ConnectError | undefined) => {
expect(err?.code).toBe(Code.DeadlineExceeded);
done();
},
options
);
});
}
);
)) {
fail(
`expecting no response from sleeping server, got: ${response.toJsonString()}`
);
}
fail("expected to catch an error");
} catch (e) {
expect(e).toBeInstanceOf(ConnectError);
expect(connectErrorFromReason(e).code).toBe(Code.DeadlineExceeded);
}
});
it("with callback client", function (done) {
const client = createCallbackClient(TestService, transport());
client.streamingOutputCall(
request,
(response) => {
fail(
`expecting no response from sleeping server, got: ${response.toJsonString()}`
);
},
(err: ConnectError | undefined) => {
expect(err?.code).toBe(Code.DeadlineExceeded);
done();
},
options
);
});
});

afterAll(async () => await servers.stop());
});
4 changes: 4 additions & 0 deletions packages/connect-node-test/src/helpers/test-routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ const testService: ServiceImpl<typeof TestService> = {
);
for (const param of request.responseParameters) {
await maybeDelayResponse(param);
context.deadline?.throwIfAborted();
yield {
payload: interop.makeServerPayload(request.responseType, param.size),
};
Expand All @@ -84,6 +85,7 @@ const testService: ServiceImpl<typeof TestService> = {
);
for (const param of request.responseParameters) {
await maybeDelayResponse(param);
context.deadline?.throwIfAborted();
yield {
payload: interop.makeServerPayload(request.responseType, param.size),
};
Expand Down Expand Up @@ -117,6 +119,7 @@ const testService: ServiceImpl<typeof TestService> = {
for await (const req of requests) {
for (const param of req.responseParameters) {
await maybeDelayResponse(param);
context.deadline?.throwIfAborted();
yield {
payload: interop.makeServerPayload(req.responseType, param.size),
};
Expand All @@ -138,6 +141,7 @@ const testService: ServiceImpl<typeof TestService> = {
for await (const req of buffer) {
for (const param of req.responseParameters) {
await maybeDelayResponse(param);
context.deadline?.throwIfAborted();
yield {
payload: interop.makeServerPayload(req.responseType, param.size),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {
connectErrorFromReason,
} from "@bufbuild/connect";
import { TestService } from "../gen/grpc/testing/test_connect.js";
import { describeTransportsExcluding } from "../helpers/crosstestserver.js";
import { describeTransports } from "../helpers/crosstestserver.js";
import { StreamingOutputCallRequest } from "../gen/grpc/testing/messages_pb.js";

describe("timeout_on_sleeping_server", function () {
Expand All @@ -39,48 +39,39 @@ describe("timeout_on_sleeping_server", function () {
const options: CallOptions = {
timeoutMs: 5,
};
// TODO(TCN-761) support deadlines in connect-es handlers
describeTransportsExcluding(
[
"@bufbuild/connect-web (Connect, JSON) against @bufbuild/connect-node (h1)",
"@bufbuild/connect-web (Connect, binary) against @bufbuild/connect-node (h1)",
"@bufbuild/connect-web (gRPC-web, binary) gRPC-web against @bufbuild/connect-node (h1)",
"@bufbuild/connect-web (gRPC-web, JSON) gRPC-web against @bufbuild/connect-node (h1)",
],
(transport) => {
it("with promise client", async function () {
const client = createPromiseClient(TestService, transport());
try {
for await (const response of client.streamingOutputCall(
request,
options
)) {
fail(
`expecting no response from sleeping server, got: ${response.toJsonString()}`
);
}
fail("expected to catch an error");
} catch (e) {
expect(e).toBeInstanceOf(ConnectError);
expect(connectErrorFromReason(e).code).toBe(Code.DeadlineExceeded);
}
});
it("with callback client", function (done) {
const client = createCallbackClient(TestService, transport());
client.streamingOutputCall(
describeTransports((transport) => {
it("with promise client", async function () {
const client = createPromiseClient(TestService, transport());
try {
for await (const response of client.streamingOutputCall(
request,
(response) => {
fail(
`expecting no response from sleeping server, got: ${response.toJsonString()}`
);
},
(err: ConnectError | undefined) => {
expect(err?.code).toBe(Code.DeadlineExceeded);
done();
},
options
);
});
}
);
)) {
fail(
`expecting no response from sleeping server, got: ${response.toJsonString()}`
);
}
fail("expected to catch an error");
} catch (e) {
expect(e).toBeInstanceOf(ConnectError);
expect(connectErrorFromReason(e).code).toBe(Code.DeadlineExceeded);
}
});
it("with callback client", function (done) {
const client = createCallbackClient(TestService, transport());
client.streamingOutputCall(
request,
(response) => {
fail(
`expecting no response from sleeping server, got: ${response.toJsonString()}`
);
},
(err: ConnectError | undefined) => {
expect(err?.code).toBe(Code.DeadlineExceeded);
done();
},
options
);
});
});
});
7 changes: 6 additions & 1 deletion packages/connect/src/implementation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ export interface HandlerContext {
*/
readonly service: ServiceType;

// TODO
readonly deadline?: AbortSignal;

/**
* Incoming request headers.
*/
Expand All @@ -95,13 +98,15 @@ export interface HandlerContext {
*/
export function createHandlerContext(
spec: { service: ServiceType; method: MethodInfo },
deadline: AbortSignal | undefined, // TODO
requestHeader: HeadersInit,
responseHeader: HeadersInit,
responseTrailer?: HeadersInit
responseTrailer: HeadersInit
): HandlerContext {
return {
method: spec.method,
service: spec.service,
deadline,
requestHeader: new Headers(requestHeader),
responseHeader: new Headers(responseHeader),
responseTrailer: new Headers(responseTrailer),
Expand Down
102 changes: 101 additions & 1 deletion packages/connect/src/protocol-connect/handler-factory.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@ import { Int32Value, MethodKind, StringValue } from "@bufbuild/protobuf";
import { createHandlerFactory } from "./handler-factory.js";
import type { MethodImpl } from "../implementation.js";
import { createMethodImplSpec } from "../implementation.js";
import type { UniversalHandlerOptions } from "../protocol/index.js";
import type {
UniversalHandlerOptions,
UniversalServerResponse,
} from "../protocol/index.js";
import {
createAsyncIterable,
createUniversalHandlerClient,
encodeEnvelope,
pipeTo,
sinkAll,
transformSplitEnvelope,
} from "../protocol/index.js";
import { ConnectError } from "../connect-error.js";
import {
Expand All @@ -33,6 +38,7 @@ import { Code } from "../code.js";
import { errorFromJsonBytes } from "./error-json.js";
import { endStreamFromJson } from "./end-stream.js";
import { createTransport } from "./transport.js";
import { requestHeader } from "./request-header.js";

describe("createHandlerFactory()", function () {
const testService = {
Expand Down Expand Up @@ -234,4 +240,98 @@ describe("createHandlerFactory()", function () {
});
});
});

describe("deadlines", function () {
describe("unary", function () {
it("should raise an error with code DEADLINE_EXCEEDED if exceeded", async function () {
const timeoutMs = 1;
const { handler, service, method } = setupTestHandler(
testService.methods.foo,
{},
async (req, ctx) => {
await new Promise((r) => setTimeout(r, timeoutMs + 50));
ctx.deadline?.throwIfAborted();
return { value: req.value.toString(10) };
}
);
const res = await handler({
httpVersion: "2.0",
method: "POST",
url: new URL(
`https://example.com/${service.typeName}/${method.name}`
),
header: requestHeader(method.kind, true, timeoutMs, undefined),
body: createAsyncIterable([new Uint8Array(0)]),
});
expect(res.status).toBe(408);
expect(res.body).toBeDefined();
if (res.body !== undefined) {
const bodyBytes =
res.body instanceof Uint8Array
? res.body
: await readAllBytes(res.body);
const err = errorFromJsonBytes(
bodyBytes,
undefined,
new ConnectError("error parse failed")
);
expect(err.code).toBe(Code.DeadlineExceeded);
expect(err.message).toBe(
"[deadline_exceeded] the operation timed out"
);
}
});
});
describe("streaming", function () {
async function getLastEnvelope(res: UniversalServerResponse) {
expect(res.body).toBeDefined();
expect(res.body).not.toBeInstanceOf(Uint8Array);
if (res.body !== undefined && Symbol.asyncIterator in res.body) {
const envelopes = await pipeTo(
res.body,
transformSplitEnvelope(0xffffff),
sinkAll()
);
const last = envelopes.pop();
expect(last).toBeDefined();
return last;
}
return undefined;
}

it("should raise an error with code DEADLINE_EXCEEDED if exceeded", async function () {
const timeoutMs = 1;
const { handler, service, method } = setupTestHandler(
testService.methods.bar,
{},
async function* (req, ctx) {
await new Promise((r) => setTimeout(r, timeoutMs + 50));
ctx.deadline?.throwIfAborted();
yield { value: req.value.toString(10) };
}
);
const res = await handler({
httpVersion: "2.0",
method: "POST",
url: new URL(
`https://example.com/${service.typeName}/${method.name}`
),
header: requestHeader(method.kind, true, timeoutMs, undefined),
body: createAsyncIterable([encodeEnvelope(0, new Uint8Array(0))]),
});
expect(res.status).toBe(200);
expect(res.body).toBeDefined();
if (res.body !== undefined) {
const lastEnv = await getLastEnvelope(res);
if (lastEnv !== undefined) {
const end = endStreamFromJson(lastEnv.data);
expect(end.error?.code).toBe(Code.DeadlineExceeded);
expect(end.error?.message).toBe(
"[deadline_exceeded] the operation timed out"
);
}
}
});
});
});
});
Loading