Skip to content

Commit

Permalink
transport propagation
Browse files Browse the repository at this point in the history
  • Loading branch information
turbocrime committed Mar 14, 2024
1 parent 4940581 commit 67d6b95
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 66 deletions.
24 changes: 4 additions & 20 deletions packages/transport-chrome/session-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import {
isTransportError,
isTransportEvent,
isTransportMessage,
isTransportStream,
TransportStream,
Expand All @@ -25,7 +24,7 @@ import { ChannelLabel, nameConnection } from './channel-names';
import { isTransportInitChannel, TransportInitChannel } from './message';
import { PortStreamSink, PortStreamSource } from './stream';
import { Code, ConnectError } from '@connectrpc/connect';
import { errorFromJson, errorToJson } from '@connectrpc/connect/protocol-connect';
import { errorToJson } from '@connectrpc/connect/protocol-connect';

export class CRSessionClient {
private static singleton?: CRSessionClient;
Expand Down Expand Up @@ -62,15 +61,10 @@ export class CRSessionClient {

private disconnect = () => {
this.clientPort.removeEventListener('message', this.clientListener);
this.clientPort.addEventListener('message', (ev: MessageEvent<unknown>) => {
if (isTransportEvent(ev.data)) {
const { requestId } = ev.data;
this.clientPort.postMessage({ requestId, error: 'Connection closed' });
}
});
this.clientPort.postMessage({
error: errorToJson(new ConnectError('Connection closed', Code.Unavailable), undefined),
});
this.clientPort.close();
};

private clientListener = (ev: MessageEvent<unknown>) => {
Expand All @@ -96,11 +90,7 @@ export class CRSessionClient {
};

private acceptChannelStreamResponse = ({ requestId, channel: name }: TransportInitChannel) => {
const stream = new ReadableStream(
new PortStreamSource(chrome.runtime.connect({ name }), r =>
errorFromJson(r, undefined, ConnectError.from(r)),
),
);
const stream = new ReadableStream(new PortStreamSource(chrome.runtime.connect({ name })));
return [{ requestId, stream }, [stream]] satisfies [TransportStream, [Transferable]];
};

Expand All @@ -109,13 +99,7 @@ export class CRSessionClient {
const sinkListener = (p: chrome.runtime.Port) => {
if (p.name !== channel) return;
chrome.runtime.onConnect.removeListener(sinkListener);
stream
.pipeTo(
new WritableStream(
new PortStreamSink(p, r => errorToJson(ConnectError.from(r), undefined)),
),
)
.catch(() => null);
void stream.pipeTo(new WritableStream(new PortStreamSink(p)));
};
chrome.runtime.onConnect.addListener(sinkListener);
return { requestId, channel } satisfies TransportInitChannel;
Expand Down
17 changes: 3 additions & 14 deletions packages/transport-chrome/session-manager.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ConnectError } from '@connectrpc/connect';
import { errorFromJson, errorToJson } from '@connectrpc/connect/protocol-connect';
import { errorToJson } from '@connectrpc/connect/protocol-connect';
import { ChannelLabel, nameConnection, parseConnectionName } from './channel-names';
import { isTransportInitChannel, TransportInitChannel } from './message';
import { PortStreamSink, PortStreamSource } from './stream';
Expand Down Expand Up @@ -154,14 +154,7 @@ export class CRSessionManager {
const sinkListener = (p: chrome.runtime.Port) => {
if (p.name !== channel) return;
chrome.runtime.onConnect.removeListener(sinkListener);
stream
.pipeTo(
new WritableStream(
new PortStreamSink(p, r => errorToJson(ConnectError.from(r), undefined)),
),
{ signal },
)
.catch(() => null);
void stream.pipeTo(new WritableStream(new PortStreamSink(p)), { signal });
};
chrome.runtime.onConnect.addListener(sinkListener);
return { requestId, channel };
Expand All @@ -172,10 +165,6 @@ export class CRSessionManager {
channel: name,
}: TransportInitChannel): TransportStream => ({
requestId,
stream: new ReadableStream(
new PortStreamSource(chrome.runtime.connect({ name }), r =>
errorFromJson(r, undefined, ConnectError.from(r)),
),
),
stream: new ReadableStream(new PortStreamSource(chrome.runtime.connect({ name }))),
});
}
37 changes: 22 additions & 15 deletions packages/transport-chrome/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,34 @@
*/

import type { JsonValue } from '@bufbuild/protobuf';
import { Code, ConnectError } from '@connectrpc/connect';
import { errorFromJson, errorToJson } from '@connectrpc/connect/protocol-connect';

export class PortStreamSource implements UnderlyingDefaultSource<JsonValue> {
constructor(
private incoming: chrome.runtime.Port,
private jsonToReason = (r: JsonValue) => r as unknown,
) {}
constructor(private incoming: chrome.runtime.Port) {}

// A port can't pull like a normal source, so handlers are attached at start
start(cont: ReadableStreamDefaultController<JsonValue>) {
this.incoming.onDisconnect.addListener(() => cont.error('Disconnect'));
this.incoming.onDisconnect.addListener(port =>
cont.error(new ConnectError('Source disconnected', Code.Aborted, undefined, undefined, port)),
);
this.incoming.onMessage.addListener(chunk => {
if (isStreamAbort(chunk)) cont.error(this.jsonToReason(chunk.abort));
if (isStreamAbort(chunk))
cont.error(errorFromJson(chunk.abort, undefined, ConnectError.from(chunk.abort)));
else if (isStreamValue(chunk)) cont.enqueue(chunk.value);
else if (isStreamEnd(chunk)) {
this.incoming.disconnect();
cont.close();
} else cont.error('Unexpected subchannel transport');
} else
cont.error(
new ConnectError(
'Unexpected subchannel transport',
Code.Unimplemented,
undefined,
undefined,
chunk,
),
);
});
}

Expand All @@ -35,10 +46,7 @@ export class PortStreamSink implements UnderlyingSink<JsonValue> {
* @param outgoing port to write to
* @param reasonToJson abort reason to jsonifiable
*/
constructor(
private outgoing: chrome.runtime.Port,
private reasonToJson = (r: unknown): JsonValue => String(r),
) {}
constructor(private outgoing: chrome.runtime.Port) {}

write(chunk: JsonValue) {
this.outgoing.postMessage({
Expand All @@ -54,10 +62,9 @@ export class PortStreamSink implements UnderlyingSink<JsonValue> {
}

abort(reason?: unknown) {
if (reason !== 'Disconnect')
this.outgoing.postMessage({
abort: this.reasonToJson(reason),
} satisfies StreamAbort);
this.outgoing.postMessage({
abort: errorToJson(ConnectError.from(reason), undefined),
} satisfies StreamAbort);
this.outgoing.disconnect();
}
}
Expand Down
47 changes: 47 additions & 0 deletions packages/transport-dom/src/create.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,4 +157,51 @@ describe('createChannelClient', () => {
const res = Array.fromAsync(streamResponse);
await expect(res).resolves.toBeTruthy();
});

it('should require streaming requests to contain at least one message', async () => {
const { port2 } = new MessageChannel();
const transportOptions = {
getPort: () => Promise.resolve(port2),
defaultTimeoutMs: 5000,
jsonOptions: { typeRegistry },
};

const transport = createChannelTransport(transportOptions);

const streamRequest = transport.stream(
ElizaService,
ElizaService.methods.introduce,
undefined,
undefined,
undefined,
// eslint-disable-next-line @typescript-eslint/no-empty-function
(async function* () {})(),
);

await expect(streamRequest).rejects.toThrowError('missing request message');
});

it('should require server-streaming requests to contain only one message', async () => {
const { port2 } = new MessageChannel();
const transportOptions = {
getPort: () => Promise.resolve(port2),
defaultTimeoutMs: 5000,
jsonOptions: { typeRegistry },
};

const transport = createChannelTransport(transportOptions);

const input = new IntroduceRequest({ name: 'Prax' });

const streamRequest = transport.stream(
ElizaService,
ElizaService.methods.introduce,
undefined,
undefined,
undefined,
ReadableStream.from([input, input, input]),
);

await expect(streamRequest).rejects.toThrow();
});
});
42 changes: 25 additions & 17 deletions packages/transport-dom/src/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,7 @@ import {
PartialMessage,
ServiceType,
} from '@bufbuild/protobuf';
import {
Code as ConnectErrorCode,
ConnectError,
StreamResponse,
UnaryResponse,
} from '@connectrpc/connect';
import { Code, ConnectError, StreamResponse, UnaryResponse } from '@connectrpc/connect';
import { CommonTransportOptions } from '@connectrpc/connect/protocol';
import { errorFromJson } from '@connectrpc/connect/protocol-connect';
import {
Expand Down Expand Up @@ -81,7 +76,7 @@ export const createChannelTransport = ({
setTimeout(
reject,
defaultTimeoutMs,
new ConnectError('Channel connection request timed out', ConnectErrorCode.Unavailable),
new ConnectError('Channel connection request timed out', Code.Unavailable),
),
);

Expand All @@ -95,13 +90,28 @@ export const createChannelTransport = ({

const transportListener = ({ data }: MessageEvent<unknown>) => {
if (isTransportEvent(data)) {
// this is a response to a specific request. the port may be shared, so it
// may contain a requestId we don't know about. the response may be
// successful, or contain an error conveyed only to the caller.
const respond = pending.get(data.requestId);
if (respond) respond(data);
} else if (isTransportError(data)) {
// this is a channel-level error, corresponding to no specific request.
// this will fail this transport, and every client using this transport.
// every transport sharing this port will fail independently.
listenerError.reject(
errorFromJson(data.error, data.metadata, new ConnectError('Transport failed')),
);
} else
listenerError.reject(
errorFromJson(data.error, data.metadata, new ConnectError('Response failed')),
new ConnectError(
'Unknown item in transport',
Code.Unimplemented,
undefined,
undefined,
data,
),
);
} else listenerError.reject(ConnectError.from(data));
};

return {
Expand Down Expand Up @@ -164,17 +174,15 @@ export const createChannelTransport = ({

if (method.kind === MethodKind.ServerStreaming) {
const iter = input[Symbol.asyncIterator]();
const [{ value }, { done }] = [
(await iter.next()) as IteratorYieldResult<PartialMessage<I>>,
await iter.next(),
];
if (!done)
const [{ value } = { value: null }, { done }] = [await iter.next(), await iter.next()];
if (done && typeof value === 'object' && value != null) {
const message = Any.pack(new method.I(value as object)).toJson(jsonOptions);
port.postMessage({ requestId, message, header } satisfies TransportMessage);
} else
throw new ConnectError(
'MethodKind.ServerStreaming expects a single request message',
ConnectErrorCode.OutOfRange,
Code.OutOfRange,
);
const message = Any.pack(new method.I(value)).toJson(jsonOptions);
port.postMessage({ requestId, message, header } satisfies TransportMessage);
} else {
const stream = ReadableStream.from(input).pipeThrough(
new TransformStream({
Expand Down

0 comments on commit 67d6b95

Please sign in to comment.