Skip to content

Commit

Permalink
Support timeouts in clients (#635)
Browse files Browse the repository at this point in the history
  • Loading branch information
timostamm authored May 16, 2023
1 parent 00a8706 commit 9ba933e
Show file tree
Hide file tree
Showing 14 changed files with 1,082 additions and 731 deletions.
2 changes: 1 addition & 1 deletion packages/connect-web-bench/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ it like a web server would usually do.

| code generator | bundle size | minified | compressed |
|----------------|-------------------:|-----------------------:|---------------------:|
| connect | 109,228 b | 47,812 b | 12,800 b |
| connect | 111,637 b | 49,004 b | 13,163 b |
| grpc-web | 414,906 b | 301,127 b | 53,279 b |
257 changes: 121 additions & 136 deletions packages/connect-web/src/connect-transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import { Message, MethodIdempotency, MethodKind } from "@bufbuild/protobuf";
import type {
AnyMessage,
BinaryReadOptions,
Expand All @@ -24,34 +23,31 @@ import type {
PartialMessage,
ServiceType,
} from "@bufbuild/protobuf";
import {
appendHeaders,
Code,
connectErrorFromReason,
runStreaming,
runUnary,
} from "@bufbuild/connect";
import { Message, MethodIdempotency, MethodKind } from "@bufbuild/protobuf";
import type {
Interceptor,
StreamResponse,
Transport,
UnaryRequest,
UnaryResponse,
} from "@bufbuild/connect";
import { appendHeaders } from "@bufbuild/connect";
import {
createClientMethodSerializers,
createEnvelopeReadableStream,
createMethodUrl,
encodeEnvelope,
runStreamingCall,
runUnaryCall,
} from "@bufbuild/connect/protocol";
import {
requestHeader,
endStreamFlag,
endStreamFromJson,
errorFromJson,
requestHeader,
trailerDemux,
validateResponse,
transformConnectPostToGetRequest,
validateResponse,
} from "@bufbuild/connect/protocol-connect";
import { assertFetchApi } from "./assert-fetch-api.js";

Expand Down Expand Up @@ -145,78 +141,75 @@ export function createConnectTransport(
options.jsonOptions,
options.binaryOptions
);
try {
return await runUnary<I, O>(
{
stream: false,
service,
method,
url: createMethodUrl(options.baseUrl, service, method),
init: {
method: "POST",
credentials: options.credentials ?? "same-origin",
redirect: "error",
mode: "cors",
},
header: requestHeader(
method.kind,
useBinaryFormat,
timeoutMs,
header
),
message: normalize(message),
signal: signal ?? new AbortController().signal,
return await runUnaryCall<I, O>({
interceptors: options.interceptors,
signal,
timeoutMs,
req: {
stream: false,
service,
method,
url: createMethodUrl(options.baseUrl, service, method),
init: {
method: "POST",
credentials: options.credentials ?? "same-origin",
redirect: "error",
mode: "cors",
},
async (req: UnaryRequest<I, O>): Promise<UnaryResponse<I, O>> => {
const useGet =
options.useHttpGet === true &&
method.idempotency === MethodIdempotency.NoSideEffects;
let body: BodyInit | null = null;
if (useGet) {
req = transformConnectPostToGetRequest(
req,
serialize(req.message),
useBinaryFormat
);
} else {
body = serialize(req.message);
}
const response = await fetch(req.url, {
...req.init,
headers: req.header,
signal: req.signal,
body,
});
const { isUnaryError, unaryError } = validateResponse(
method.kind,
useBinaryFormat,
response.status,
response.headers
header: requestHeader(
method.kind,
useBinaryFormat,
timeoutMs,
header
),
message: normalize(message),
},
next: async (req: UnaryRequest<I, O>): Promise<UnaryResponse<I, O>> => {
const useGet =
options.useHttpGet === true &&
method.idempotency === MethodIdempotency.NoSideEffects;
let body: BodyInit | null = null;
if (useGet) {
req = transformConnectPostToGetRequest(
req,
serialize(req.message),
useBinaryFormat
);
if (isUnaryError) {
throw errorFromJson(
(await response.json()) as JsonValue,
appendHeaders(...trailerDemux(response.headers)),
unaryError
);
}
const [demuxedHeader, demuxedTrailer] = trailerDemux(
response.headers
} else {
body = serialize(req.message);
}
const response = await fetch(req.url, {
...req.init,
headers: req.header,
signal: req.signal,
body,
});
const { isUnaryError, unaryError } = validateResponse(
method.kind,
useBinaryFormat,
response.status,
response.headers
);
if (isUnaryError) {
throw errorFromJson(
(await response.json()) as JsonValue,
appendHeaders(...trailerDemux(response.headers)),
unaryError
);
return <UnaryResponse<I, O>>{
stream: false,
service,
method,
header: demuxedHeader,
message: parse(new Uint8Array(await response.arrayBuffer())),
trailer: demuxedTrailer,
};
},
options.interceptors
);
} catch (e) {
throw connectErrorFromReason(e, Code.Internal);
}
}
const [demuxedHeader, demuxedTrailer] = trailerDemux(
response.headers
);
return <UnaryResponse<I, O>>{
stream: false,
service,
method,
header: demuxedHeader,
message: parse(new Uint8Array(await response.arrayBuffer())),
trailer: demuxedTrailer,
};
},
});
},

async stream<
Expand All @@ -242,32 +235,28 @@ export function createConnectTransport(
trailerTarget: Headers
) {
const reader = createEnvelopeReadableStream(body).getReader();
try {
let endStreamReceived = false;
for (;;) {
const result = await reader.read();
if (result.done) {
break;
}
const { flags, data } = result.value;
if ((flags & endStreamFlag) === endStreamFlag) {
endStreamReceived = true;
const endStream = endStreamFromJson(data);
if (endStream.error) {
throw endStream.error;
}
endStream.metadata.forEach((value, key) =>
trailerTarget.set(key, value)
);
continue;
}
yield parse(data);
let endStreamReceived = false;
for (;;) {
const result = await reader.read();
if (result.done) {
break;
}
if (!endStreamReceived) {
throw "missing EndStreamResponse";
const { flags, data } = result.value;
if ((flags & endStreamFlag) === endStreamFlag) {
endStreamReceived = true;
const endStream = endStreamFromJson(data);
if (endStream.error) {
throw endStream.error;
}
endStream.metadata.forEach((value, key) =>
trailerTarget.set(key, value)
);
continue;
}
} catch (e) {
throw connectErrorFromReason(e);
yield parse(data);
}
if (!endStreamReceived) {
throw "missing EndStreamResponse";
}
}

Expand All @@ -283,9 +272,11 @@ export function createConnectTransport(
}
return encodeEnvelope(0, serialize(r.value));
}

return runStreaming<I, O>(
{
return await runStreamingCall<I, O>({
interceptors: options.interceptors,
timeoutMs,
signal,
req: {
stream: true,
service,
method,
Expand All @@ -296,7 +287,6 @@ export function createConnectTransport(
redirect: "error",
mode: "cors",
},
signal: signal ?? new AbortController().signal,
header: requestHeader(
method.kind,
useBinaryFormat,
Expand All @@ -305,37 +295,32 @@ export function createConnectTransport(
),
message: input,
},
async (req) => {
try {
const fRes = await fetch(req.url, {
...req.init,
headers: req.header,
signal: req.signal,
body: await createRequestBody(req.message),
});
validateResponse(
method.kind,
useBinaryFormat,
fRes.status,
fRes.headers
);
if (fRes.body === null) {
throw "missing response body";
}
const trailer = new Headers();
const res: StreamResponse<I, O> = {
...req,
header: fRes.headers,
trailer,
message: parseResponseBody(fRes.body, trailer),
};
return res;
} catch (e) {
throw connectErrorFromReason(e, Code.Internal);
next: async (req) => {
const fRes = await fetch(req.url, {
...req.init,
headers: req.header,
signal: req.signal,
body: await createRequestBody(req.message),
});
validateResponse(
method.kind,
useBinaryFormat,
fRes.status,
fRes.headers
);
if (fRes.body === null) {
throw "missing response body";
}
const trailer = new Headers();
const res: StreamResponse<I, O> = {
...req,
header: fRes.headers,
trailer,
message: parseResponseBody(fRes.body, trailer),
};
return res;
},
options.interceptors
).catch((e: unknown) => Promise.reject(connectErrorFromReason(e)));
});
},
};
}
Loading

0 comments on commit 9ba933e

Please sign in to comment.