Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Add Stream Wrappers for use with various environments #4118

Merged
merged 14 commits into from
Jul 26, 2024
  •  
  •  
  •  
24 changes: 24 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -195,3 +195,27 @@ jobs:

- name: Ensure no changes to git-tracked files
run: git --no-pager diff --exit-code

cloudflare-startup-tests:
runs-on: ubuntu-latest
steps:
- name: Checkout repo
uses: actions/checkout@v4

- name: Run cloudflare startup test
run: ./scripts/cloudflare-startup-test.sh

- name: Ensure no changes to git-tracked files
run: git --no-pager diff --exit-code

browser-startup-tests:
runs-on: ubuntu-latest
steps:
- name: Checkout repo
uses: actions/checkout@v4

- name: Run browser startup test
run: ./scripts/browser-startup-test.sh

- name: Ensure no changes to git-tracked files
run: git --no-pager diff --exit-code
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,15 @@ export class FetcherImpl extends CoreUtility implements Fetcher {
"../getResponseBody": "../../../src/core/fetcher/getResponseBody",
"../makeRequest": "../../../src/core/fetcher/makeRequest",
"../requestWithRetries": "../../../src/core/fetcher/requestWithRetries",
"../signals": "../../../src/core/fetcher/signals"
"../signals": "../../../src/core/fetcher/signals",
"../../stream-wrappers/Node18UniversalStreamWrapper":
"../../../src/core/fetcher/stream-wrappers/Node18UniversalStreamWrapper",
"../../stream-wrappers/NodePre18StreamWrapper":
"../../../src/core/fetcher/stream-wrappers/NodePre18StreamWrapper",
"../../stream-wrappers/UndiciStreamWrapper":
"../../../src/core/fetcher/stream-wrappers/UndiciStreamWrapper",
"../../stream-wrappers/chooseStreamWrapper":
"../../../src/core/fetcher/stream-wrappers/chooseStreamWrapper"
}
},
originalPathOnDocker: AbsoluteFilePath.of("/assets/fetcher/fetcher"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { getRequestBody } from "./getRequestBody";
import { getResponseBody } from "./getResponseBody";
import { makeRequest } from "./makeRequest";
import { requestWithRetries } from "./requestWithRetries";
import { chooseStreamWrapper } from "./stream-wrappers/chooseStreamWrapper";

export type FetchFunction = <R = unknown>(args: Fetcher.Args) => Promise<APIResponse<R, Fetcher.Error>>;

Expand Down Expand Up @@ -90,7 +91,7 @@ export async function fetcherImpl<R = unknown>(args: Fetcher.Args): Promise<APIR

if (response.status >= 200 && response.status < 400) {
if (args.duplex && args.responseType === "streaming") {
responseBody = (await import("stream")).Readable.from(responseBody as any);
responseBody = chooseStreamWrapper(responseBody);
}

return {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import { Node18UniversalStreamWrapper } from "../../stream-wrappers/Node18UniversalStreamWrapper";

describe("Node18UniversalStreamWrapper", () => {
it("should set encoding to utf-8", async () => {
const rawStream = new ReadableStream();
const stream = new Node18UniversalStreamWrapper(rawStream);
const setEncodingSpy = jest.spyOn(stream, "setEncoding");

stream.setEncoding("utf-8");

expect(setEncodingSpy).toHaveBeenCalledWith("utf-8");
});

it("should register an event listener for readable", async () => {
const rawStream = new ReadableStream();
const stream = new Node18UniversalStreamWrapper(rawStream);
const onSpy = jest.spyOn(stream, "on");

stream.on("readable", () => {});

expect(onSpy).toHaveBeenCalledWith("readable", expect.any(Function));
});

it("should remove an event listener for data", async () => {
const rawStream = new ReadableStream();
const stream = new Node18UniversalStreamWrapper(rawStream);
const offSpy = jest.spyOn(stream, "off");

const fn = () => {};
stream.on("data", fn);
stream.off("data", fn);

expect(offSpy).toHaveBeenCalledWith("data", expect.any(Function));
});

it("should write to dest when calling pipe to writable stream", async () => {
const rawStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode("test"));
controller.close();
}
});
const stream = new Node18UniversalStreamWrapper(rawStream);
const dest = new WritableStream({
write(chunk) {
expect(chunk).toEqual(new TextEncoder().encode("test"));
}
});

stream.pipe(dest);
});

it("should write to dest when calling pipe to node writable stream", async () => {
const rawStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode("test"));
controller.close();
}
});
const stream = new Node18UniversalStreamWrapper(rawStream);
const dest = new (await import("stream")).Writable({
write(chunk, encoding, callback) {
expect(chunk.toString()).toEqual("test");
callback();
}
});

stream.pipe(dest);
});

it("should write nothing when calling pipe and unpipe", async () => {
const rawStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode("test"));
controller.close();
}
});
const stream = new Node18UniversalStreamWrapper(rawStream);
const buffer: Uint8Array[] = [];
const dest = new WritableStream({
write(chunk) {
buffer.push(chunk);
}
});

stream.pipe(dest);
stream.unpipe(dest);
expect(buffer).toEqual([]);
});

it("should destroy the stream", async () => {
const rawStream = new ReadableStream();
const stream = new Node18UniversalStreamWrapper(rawStream);
const destroySpy = jest.spyOn(stream, "destroy");

stream.destroy();

expect(destroySpy).toHaveBeenCalled();
});

it("should pause and resume the stream", async () => {
const rawStream = new ReadableStream();
const stream = new Node18UniversalStreamWrapper(rawStream);
const pauseSpy = jest.spyOn(stream, "pause");
const resumeSpy = jest.spyOn(stream, "resume");

expect(stream.isPaused).toBe(false);
stream.pause();
expect(stream.isPaused).toBe(true);
stream.resume();

expect(pauseSpy).toHaveBeenCalled();
expect(resumeSpy).toHaveBeenCalled();
});

it("should read the stream", async () => {
const rawStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode("test"));
controller.close();
}
});
const stream = new Node18UniversalStreamWrapper(rawStream);

const data = await stream.read();

expect(data).toEqual(new TextEncoder().encode("test"));
});

it("should read the stream as text", async () => {
const rawStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode("test"));
controller.close();
}
});
const stream = new Node18UniversalStreamWrapper(rawStream);

const data = await stream.text();

expect(data).toEqual("test");
});

it("should read the stream as json", async () => {
const rawStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode(JSON.stringify({ test: "test" })));
controller.close();
}
});
const stream = new Node18UniversalStreamWrapper(rawStream);

const data = await stream.json();

expect(data).toEqual({ test: "test" });
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import { NodePre18StreamWrapper } from "../../stream-wrappers/NodePre18StreamWrapper";

describe("NodePre18StreamWrapper", () => {
it("should set encoding to utf-8", async () => {
const rawStream = (await import("stream")).Readable.from(["test"]);
const stream = new NodePre18StreamWrapper(rawStream);
const setEncodingSpy = jest.spyOn(stream, "setEncoding");

stream.setEncoding("utf-8");

expect(setEncodingSpy).toHaveBeenCalledWith("utf-8");
});

it("should register an event listener for readable", async () => {
const rawStream = (await import("stream")).Readable.from(["test"]);
const stream = new NodePre18StreamWrapper(rawStream);
const onSpy = jest.spyOn(stream, "on");

stream.on("readable", () => {});

expect(onSpy).toHaveBeenCalledWith("readable", expect.any(Function));
});

it("should remove an event listener for data", async () => {
const rawStream = (await import("stream")).Readable.from(["test"]);
const stream = new NodePre18StreamWrapper(rawStream);
const offSpy = jest.spyOn(stream, "off");

const fn = () => {};
stream.on("data", fn);
stream.off("data", fn);

expect(offSpy).toHaveBeenCalledWith("data", expect.any(Function));
});

it("should write to dest when calling pipe to node writable stream", async () => {
const rawStream = (await import("stream")).Readable.from(["test"]);
const stream = new NodePre18StreamWrapper(rawStream);
const dest = new (await import("stream")).Writable({
write(chunk, encoding, callback) {
expect(chunk.toString()).toEqual("test");
callback();
}
});

stream.pipe(dest);
});

it("should write nothing when calling pipe and unpipe", async () => {
const rawStream = (await import("stream")).Readable.from(["test"]);
const stream = new NodePre18StreamWrapper(rawStream);
const buffer: Uint8Array[] = [];
const dest = new (await import("stream")).Writable({
write(chunk, encoding, callback) {
buffer.push(chunk);
callback();
}
});
stream.pipe(dest);
stream.unpipe();

expect(buffer).toEqual([]);
});

it("should destroy the stream", async () => {
const rawStream = (await import("stream")).Readable.from(["test"]);
const stream = new NodePre18StreamWrapper(rawStream);
const destroySpy = jest.spyOn(stream, "destroy");

stream.destroy();

expect(destroySpy).toHaveBeenCalledWith();
});

it("should pause the stream and resume", async () => {
const rawStream = (await import("stream")).Readable.from(["test"]);
const stream = new NodePre18StreamWrapper(rawStream);
const pauseSpy = jest.spyOn(stream, "pause");

stream.pause();
expect(stream.isPaused).toBe(true);
stream.resume();
expect(stream.isPaused).toBe(false);

expect(pauseSpy).toHaveBeenCalledWith();
});

it("should read the stream", async () => {
const rawStream = (await import("stream")).Readable.from(["test"]);
const stream = new NodePre18StreamWrapper(rawStream);

const data = await stream.read();

expect(data).toEqual("test");
});

it("should read the stream as text", async () => {
const rawStream = (await import("stream")).Readable.from(["test"]);
const stream = new NodePre18StreamWrapper(rawStream);

const data = await stream.text();

expect(data).toEqual("test");
});

it("should read the stream as json", async () => {
const rawStream = (await import("stream")).Readable.from([JSON.stringify({ test: "test" })]);
const stream = new NodePre18StreamWrapper(rawStream);

const data = await stream.json();

expect(data).toEqual({ test: "test" });
});
});
Loading
Loading