Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add async iterable symbol to Stream Wrapper implementations #4195

Merged
merged 10 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
8 changes: 6 additions & 2 deletions generators/typescript/sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.38.3] - 2024-08-02

- Fix: Adds async iterable to StreamWrapper implementation for easier use with downstream dependencies.

## [0.38.2] - 2024-08-01

- Fix: Refactors the `noScripts` feature flag to make sure that no `yarn install` commands
can be accidentally triggered.
- Fix: Refactors the `noScripts` feature flag to make sure that no `yarn install` commands
can be accidentally triggered.

## [0.38.1] - 2024-08-01

Expand Down
2 changes: 1 addition & 1 deletion generators/typescript/sdk/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.38.2
0.38.3
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ describe("Node18UniversalStreamWrapper", () => {
it("should write to dest when calling pipe to writable stream", async () => {
const rawStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode("test"));
controller.enqueue(new TextEncoder().encode("test"));
controller.close();
}
Expand All @@ -53,6 +54,7 @@ describe("Node18UniversalStreamWrapper", () => {
it("should write to dest when calling pipe to node writable stream", async () => {
const rawStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode("test"));
controller.enqueue(new TextEncoder().encode("test"));
controller.close();
}
Expand All @@ -71,6 +73,7 @@ describe("Node18UniversalStreamWrapper", () => {
it("should write nothing when calling pipe and unpipe", async () => {
const rawStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode("test"));
controller.enqueue(new TextEncoder().encode("test"));
controller.close();
}
Expand Down Expand Up @@ -116,20 +119,21 @@ describe("Node18UniversalStreamWrapper", () => {
it("should read the stream", async () => {
const rawStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode("test"));
controller.enqueue(new TextEncoder().encode("test"));
controller.close();
}
});
const stream = new Node18UniversalStreamWrapper(rawStream);

const data = await stream.read();

expect(data).toEqual(new TextEncoder().encode("test"));
expect(await stream.read()).toEqual(new TextEncoder().encode("test"));
expect(await stream.read()).toEqual(new TextEncoder().encode("test"));
});

it("should read the stream as text", async () => {
const rawStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode("test"));
controller.enqueue(new TextEncoder().encode("test"));
controller.close();
}
Expand All @@ -138,7 +142,7 @@ describe("Node18UniversalStreamWrapper", () => {

const data = await stream.text();

expect(data).toEqual("test");
expect(data).toEqual("testtest");
});

it("should read the stream as json", async () => {
Expand All @@ -154,4 +158,21 @@ describe("Node18UniversalStreamWrapper", () => {

expect(data).toEqual({ test: "test" });
});

it("should allow use with async iteratable stream", async () => {
const rawStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode("test"));
controller.enqueue(new TextEncoder().encode("test"));
controller.close();
}
});
let data = "";
const stream = new Node18UniversalStreamWrapper(rawStream);
for await (const chunk of stream) {
data += new TextDecoder().decode(chunk);
}

expect(data).toEqual("testtest");
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { NodePre18StreamWrapper } from "../../stream-wrappers/NodePre18StreamWra

describe("NodePre18StreamWrapper", () => {
it("should set encoding to utf-8", async () => {
const rawStream = (await import("stream")).Readable.from(["test"]);
const rawStream = (await import("stream")).Readable.from(["test", "test"]);
const stream = new NodePre18StreamWrapper(rawStream);
const setEncodingSpy = jest.spyOn(stream, "setEncoding");

Expand All @@ -12,7 +12,7 @@ describe("NodePre18StreamWrapper", () => {
});

it("should register an event listener for readable", async () => {
const rawStream = (await import("stream")).Readable.from(["test"]);
const rawStream = (await import("stream")).Readable.from(["test", "test"]);
const stream = new NodePre18StreamWrapper(rawStream);
const onSpy = jest.spyOn(stream, "on");

Expand All @@ -22,7 +22,7 @@ describe("NodePre18StreamWrapper", () => {
});

it("should remove an event listener for data", async () => {
const rawStream = (await import("stream")).Readable.from(["test"]);
const rawStream = (await import("stream")).Readable.from(["test", "test"]);
const stream = new NodePre18StreamWrapper(rawStream);
const offSpy = jest.spyOn(stream, "off");

Expand All @@ -34,7 +34,7 @@ describe("NodePre18StreamWrapper", () => {
});

it("should write to dest when calling pipe to node writable stream", async () => {
const rawStream = (await import("stream")).Readable.from(["test"]);
const rawStream = (await import("stream")).Readable.from(["test", "test"]);
const stream = new NodePre18StreamWrapper(rawStream);
const dest = new (await import("stream")).Writable({
write(chunk, encoding, callback) {
Expand All @@ -47,7 +47,7 @@ describe("NodePre18StreamWrapper", () => {
});

it("should write nothing when calling pipe and unpipe", async () => {
const rawStream = (await import("stream")).Readable.from(["test"]);
const rawStream = (await import("stream")).Readable.from(["test", "test"]);
const stream = new NodePre18StreamWrapper(rawStream);
const buffer: Uint8Array[] = [];
const dest = new (await import("stream")).Writable({
Expand All @@ -63,7 +63,7 @@ describe("NodePre18StreamWrapper", () => {
});

it("should destroy the stream", async () => {
const rawStream = (await import("stream")).Readable.from(["test"]);
const rawStream = (await import("stream")).Readable.from(["test", "test"]);
const stream = new NodePre18StreamWrapper(rawStream);
const destroySpy = jest.spyOn(stream, "destroy");

Expand All @@ -73,7 +73,7 @@ describe("NodePre18StreamWrapper", () => {
});

it("should pause the stream and resume", async () => {
const rawStream = (await import("stream")).Readable.from(["test"]);
const rawStream = (await import("stream")).Readable.from(["test", "test"]);
const stream = new NodePre18StreamWrapper(rawStream);
const pauseSpy = jest.spyOn(stream, "pause");

Expand All @@ -86,21 +86,20 @@ describe("NodePre18StreamWrapper", () => {
});

it("should read the stream", async () => {
const rawStream = (await import("stream")).Readable.from(["test"]);
const rawStream = (await import("stream")).Readable.from(["test", "test"]);
const stream = new NodePre18StreamWrapper(rawStream);

const data = await stream.read();

expect(data).toEqual("test");
expect(await stream.read()).toEqual("test");
expect(await stream.read()).toEqual("test");
});

it("should read the stream as text", async () => {
const rawStream = (await import("stream")).Readable.from(["test"]);
const rawStream = (await import("stream")).Readable.from(["test", "test"]);
const stream = new NodePre18StreamWrapper(rawStream);

const data = await stream.text();

expect(data).toEqual("test");
expect(data).toEqual("testtest");
});

it("should read the stream as json", async () => {
Expand All @@ -111,4 +110,15 @@ describe("NodePre18StreamWrapper", () => {

expect(data).toEqual({ test: "test" });
});

it("should allow use with async iteratable stream", async () => {
const rawStream = (await import("stream")).Readable.from(["test", "test"]);
let data = "";
const stream = new NodePre18StreamWrapper(rawStream);
for await (const chunk of stream) {
data += chunk;
}

expect(data).toEqual("testtest");
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ describe("UndiciStreamWrapper", () => {
it("should write to dest when calling pipe to writable stream", async () => {
const rawStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode("test"));
controller.enqueue(new TextEncoder().encode("test"));
controller.close();
}
Expand Down Expand Up @@ -93,20 +94,21 @@ describe("UndiciStreamWrapper", () => {
it("should read the stream", async () => {
const rawStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode("test"));
controller.enqueue(new TextEncoder().encode("test"));
controller.close();
}
});
const stream = new UndiciStreamWrapper(rawStream);

const data = await stream.read();

expect(data).toEqual(new TextEncoder().encode("test"));
expect(await stream.read()).toEqual(new TextEncoder().encode("test"));
expect(await stream.read()).toEqual(new TextEncoder().encode("test"));
});

it("should read the stream as text", async () => {
const rawStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode("test"));
controller.enqueue(new TextEncoder().encode("test"));
controller.close();
}
Expand All @@ -115,7 +117,7 @@ describe("UndiciStreamWrapper", () => {

const data = await stream.text();

expect(data).toEqual("test");
expect(data).toEqual("testtest");
});

it("should read the stream as json", async () => {
Expand All @@ -131,4 +133,21 @@ describe("UndiciStreamWrapper", () => {

expect(data).toEqual({ test: "test" });
});

it("should allow use with async iteratable stream", async () => {
const rawStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode("test"));
controller.enqueue(new TextEncoder().encode("test"));
controller.close();
}
});
let data = "";
const stream = new UndiciStreamWrapper(rawStream);
for await (const chunk of stream) {
data += new TextDecoder().decode(chunk);
}

expect(data).toEqual("testtest");
});
});
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import type { Writable } from "stream";
import { EventCallback, StreamWrapper } from "./chooseStreamWrapper";

export class Node18UniversalStreamWrapper
implements StreamWrapper<Node18UniversalStreamWrapper | Writable | WritableStream<Uint8Array>, Uint8Array>
export class Node18UniversalStreamWrapper<ReadFormat extends Uint8Array | Uint16Array | Uint32Array>
implements
StreamWrapper<Node18UniversalStreamWrapper<ReadFormat> | Writable | WritableStream<ReadFormat>, ReadFormat>
{
private readableStream: ReadableStream<Uint8Array>;
private reader: ReadableStreamDefaultReader<Uint8Array>;
private readableStream: ReadableStream<ReadFormat>;
private reader: ReadableStreamDefaultReader<ReadFormat>;
private events: Record<string, EventCallback[] | undefined>;
private paused: boolean;
private resumeCallback: ((value?: unknown) => void) | null;
private encoding: string | null;

constructor(readableStream: ReadableStream<Uint8Array>) {
constructor(readableStream: ReadableStream<ReadFormat>) {
this.readableStream = readableStream;
this.reader = this.readableStream.getReader();
this.events = {
Expand All @@ -37,8 +38,8 @@ export class Node18UniversalStreamWrapper
}

public pipe(
dest: Node18UniversalStreamWrapper | Writable | WritableStream<Uint8Array>
): Node18UniversalStreamWrapper | Writable | WritableStream<Uint8Array> {
dest: Node18UniversalStreamWrapper<ReadFormat> | Writable | WritableStream<ReadFormat>
): Node18UniversalStreamWrapper<ReadFormat> | Writable | WritableStream<ReadFormat> {
this.on("data", async (chunk) => {
if (dest instanceof Node18UniversalStreamWrapper) {
dest._write(chunk);
Expand Down Expand Up @@ -78,12 +79,12 @@ export class Node18UniversalStreamWrapper
}

public pipeTo(
dest: Node18UniversalStreamWrapper | Writable | WritableStream<Uint8Array>
): Node18UniversalStreamWrapper | Writable | WritableStream<Uint8Array> {
dest: Node18UniversalStreamWrapper<ReadFormat> | Writable | WritableStream<ReadFormat>
): Node18UniversalStreamWrapper<ReadFormat> | Writable | WritableStream<ReadFormat> {
return this.pipe(dest);
}

public unpipe(dest: Node18UniversalStreamWrapper | Writable | WritableStream<Uint8Array>): void {
public unpipe(dest: Node18UniversalStreamWrapper<ReadFormat> | Writable | WritableStream<ReadFormat>): void {
this.off("data", async (chunk) => {
if (dest instanceof Node18UniversalStreamWrapper) {
dest._write(chunk);
Expand Down Expand Up @@ -149,7 +150,7 @@ export class Node18UniversalStreamWrapper
return this.paused;
}

public async read(): Promise<Uint8Array | undefined> {
public async read(): Promise<ReadFormat | undefined> {
if (this.paused) {
await new Promise((resolve) => {
this.resumeCallback = resolve;
Expand All @@ -168,7 +169,7 @@ export class Node18UniversalStreamWrapper
}

public async text(): Promise<string> {
const chunks: Uint8Array[] = [];
const chunks: ReadFormat[] = [];

while (true) {
const { done, value } = await this.reader.read();
Expand All @@ -185,7 +186,7 @@ export class Node18UniversalStreamWrapper
return JSON.parse(text);
}

private _write(chunk: Uint8Array): void {
private _write(chunk: ReadFormat): void {
this._emit("data", chunk);
}

Expand Down Expand Up @@ -228,4 +229,24 @@ export class Node18UniversalStreamWrapper
this._emit("error", error);
}
}

[Symbol.asyncIterator](): AsyncIterableIterator<ReadFormat> {
return {
next: async () => {
if (this.paused) {
await new Promise((resolve) => {
this.resumeCallback = resolve;
});
}
const { done, value } = await this.reader.read();
if (done) {
return { done: true, value: undefined };
}
return { done: false, value };
},
[Symbol.asyncIterator]() {
return this;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,20 @@ export class NodePre18StreamWrapper implements StreamWrapper<Writable, Buffer> {
const text = await this.text();
return JSON.parse(text);
}

public [Symbol.asyncIterator](): AsyncIterableIterator<Buffer> {
const readableStream = this.readableStream;
const iterator = readableStream[Symbol.asyncIterator]();

// Create and return an async iterator that yields buffers
return {
async next(): Promise<IteratorResult<Buffer>> {
const { value, done } = await iterator.next();
return { value: value as Buffer, done };
},
[Symbol.asyncIterator]() {
return this;
}
};
}
}
Loading
Loading