Skip to content

Commit

Permalink
fix: add async iterable symbol to Stream Wrapper implementations (#4195)
Browse files Browse the repository at this point in the history
* async iterables

* changelog and version

* add seed

* Revert "add seed"

This reverts commit d2fb203.

* update

* seed

* Revert "seed"

This reverts commit 7fb99f7.

* unify types

* add seed
  • Loading branch information
RohinBhargava authored Aug 5, 2024
1 parent 8d7d799 commit ce30266
Show file tree
Hide file tree
Showing 499 changed files with 10,089 additions and 2,417 deletions.
8 changes: 6 additions & 2 deletions generators/typescript/sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.38.3] - 2024-08-02

- Fix: Adds async iterable to StreamWrapper implementation for easier use with downstream dependencies.

## [0.38.2] - 2024-08-01

- Fix: Refactors the `noScripts` feature flag to make sure that no `yarn install` commands
can be accidentally triggered.
- Fix: Refactors the `noScripts` feature flag to make sure that no `yarn install` commands
can be accidentally triggered.

## [0.38.1] - 2024-08-01

Expand Down
2 changes: 1 addition & 1 deletion generators/typescript/sdk/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.38.2
0.38.3
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ describe("Node18UniversalStreamWrapper", () => {
it("should write to dest when calling pipe to writable stream", async () => {
const rawStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode("test"));
controller.enqueue(new TextEncoder().encode("test"));
controller.close();
}
Expand All @@ -53,6 +54,7 @@ describe("Node18UniversalStreamWrapper", () => {
it("should write to dest when calling pipe to node writable stream", async () => {
const rawStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode("test"));
controller.enqueue(new TextEncoder().encode("test"));
controller.close();
}
Expand All @@ -71,6 +73,7 @@ describe("Node18UniversalStreamWrapper", () => {
it("should write nothing when calling pipe and unpipe", async () => {
const rawStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode("test"));
controller.enqueue(new TextEncoder().encode("test"));
controller.close();
}
Expand Down Expand Up @@ -116,20 +119,21 @@ describe("Node18UniversalStreamWrapper", () => {
it("should read the stream", async () => {
const rawStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode("test"));
controller.enqueue(new TextEncoder().encode("test"));
controller.close();
}
});
const stream = new Node18UniversalStreamWrapper(rawStream);

const data = await stream.read();

expect(data).toEqual(new TextEncoder().encode("test"));
expect(await stream.read()).toEqual(new TextEncoder().encode("test"));
expect(await stream.read()).toEqual(new TextEncoder().encode("test"));
});

it("should read the stream as text", async () => {
const rawStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode("test"));
controller.enqueue(new TextEncoder().encode("test"));
controller.close();
}
Expand All @@ -138,7 +142,7 @@ describe("Node18UniversalStreamWrapper", () => {

const data = await stream.text();

expect(data).toEqual("test");
expect(data).toEqual("testtest");
});

it("should read the stream as json", async () => {
Expand All @@ -154,4 +158,21 @@ describe("Node18UniversalStreamWrapper", () => {

expect(data).toEqual({ test: "test" });
});

it("should allow use with async iteratable stream", async () => {
const rawStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode("test"));
controller.enqueue(new TextEncoder().encode("test"));
controller.close();
}
});
let data = "";
const stream = new Node18UniversalStreamWrapper(rawStream);
for await (const chunk of stream) {
data += new TextDecoder().decode(chunk);
}

expect(data).toEqual("testtest");
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { NodePre18StreamWrapper } from "../../stream-wrappers/NodePre18StreamWra

describe("NodePre18StreamWrapper", () => {
it("should set encoding to utf-8", async () => {
const rawStream = (await import("stream")).Readable.from(["test"]);
const rawStream = (await import("stream")).Readable.from(["test", "test"]);
const stream = new NodePre18StreamWrapper(rawStream);
const setEncodingSpy = jest.spyOn(stream, "setEncoding");

Expand All @@ -12,7 +12,7 @@ describe("NodePre18StreamWrapper", () => {
});

it("should register an event listener for readable", async () => {
const rawStream = (await import("stream")).Readable.from(["test"]);
const rawStream = (await import("stream")).Readable.from(["test", "test"]);
const stream = new NodePre18StreamWrapper(rawStream);
const onSpy = jest.spyOn(stream, "on");

Expand All @@ -22,7 +22,7 @@ describe("NodePre18StreamWrapper", () => {
});

it("should remove an event listener for data", async () => {
const rawStream = (await import("stream")).Readable.from(["test"]);
const rawStream = (await import("stream")).Readable.from(["test", "test"]);
const stream = new NodePre18StreamWrapper(rawStream);
const offSpy = jest.spyOn(stream, "off");

Expand All @@ -34,7 +34,7 @@ describe("NodePre18StreamWrapper", () => {
});

it("should write to dest when calling pipe to node writable stream", async () => {
const rawStream = (await import("stream")).Readable.from(["test"]);
const rawStream = (await import("stream")).Readable.from(["test", "test"]);
const stream = new NodePre18StreamWrapper(rawStream);
const dest = new (await import("stream")).Writable({
write(chunk, encoding, callback) {
Expand All @@ -47,7 +47,7 @@ describe("NodePre18StreamWrapper", () => {
});

it("should write nothing when calling pipe and unpipe", async () => {
const rawStream = (await import("stream")).Readable.from(["test"]);
const rawStream = (await import("stream")).Readable.from(["test", "test"]);
const stream = new NodePre18StreamWrapper(rawStream);
const buffer: Uint8Array[] = [];
const dest = new (await import("stream")).Writable({
Expand All @@ -63,7 +63,7 @@ describe("NodePre18StreamWrapper", () => {
});

it("should destroy the stream", async () => {
const rawStream = (await import("stream")).Readable.from(["test"]);
const rawStream = (await import("stream")).Readable.from(["test", "test"]);
const stream = new NodePre18StreamWrapper(rawStream);
const destroySpy = jest.spyOn(stream, "destroy");

Expand All @@ -73,7 +73,7 @@ describe("NodePre18StreamWrapper", () => {
});

it("should pause the stream and resume", async () => {
const rawStream = (await import("stream")).Readable.from(["test"]);
const rawStream = (await import("stream")).Readable.from(["test", "test"]);
const stream = new NodePre18StreamWrapper(rawStream);
const pauseSpy = jest.spyOn(stream, "pause");

Expand All @@ -86,21 +86,20 @@ describe("NodePre18StreamWrapper", () => {
});

it("should read the stream", async () => {
const rawStream = (await import("stream")).Readable.from(["test"]);
const rawStream = (await import("stream")).Readable.from(["test", "test"]);
const stream = new NodePre18StreamWrapper(rawStream);

const data = await stream.read();

expect(data).toEqual("test");
expect(await stream.read()).toEqual("test");
expect(await stream.read()).toEqual("test");
});

it("should read the stream as text", async () => {
const rawStream = (await import("stream")).Readable.from(["test"]);
const rawStream = (await import("stream")).Readable.from(["test", "test"]);
const stream = new NodePre18StreamWrapper(rawStream);

const data = await stream.text();

expect(data).toEqual("test");
expect(data).toEqual("testtest");
});

it("should read the stream as json", async () => {
Expand All @@ -111,4 +110,15 @@ describe("NodePre18StreamWrapper", () => {

expect(data).toEqual({ test: "test" });
});

it("should allow use with async iteratable stream", async () => {
const rawStream = (await import("stream")).Readable.from(["test", "test"]);
let data = "";
const stream = new NodePre18StreamWrapper(rawStream);
for await (const chunk of stream) {
data += chunk;
}

expect(data).toEqual("testtest");
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ describe("UndiciStreamWrapper", () => {
it("should write to dest when calling pipe to writable stream", async () => {
const rawStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode("test"));
controller.enqueue(new TextEncoder().encode("test"));
controller.close();
}
Expand Down Expand Up @@ -93,20 +94,21 @@ describe("UndiciStreamWrapper", () => {
it("should read the stream", async () => {
const rawStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode("test"));
controller.enqueue(new TextEncoder().encode("test"));
controller.close();
}
});
const stream = new UndiciStreamWrapper(rawStream);

const data = await stream.read();

expect(data).toEqual(new TextEncoder().encode("test"));
expect(await stream.read()).toEqual(new TextEncoder().encode("test"));
expect(await stream.read()).toEqual(new TextEncoder().encode("test"));
});

it("should read the stream as text", async () => {
const rawStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode("test"));
controller.enqueue(new TextEncoder().encode("test"));
controller.close();
}
Expand All @@ -115,7 +117,7 @@ describe("UndiciStreamWrapper", () => {

const data = await stream.text();

expect(data).toEqual("test");
expect(data).toEqual("testtest");
});

it("should read the stream as json", async () => {
Expand All @@ -131,4 +133,21 @@ describe("UndiciStreamWrapper", () => {

expect(data).toEqual({ test: "test" });
});

it("should allow use with async iteratable stream", async () => {
const rawStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode("test"));
controller.enqueue(new TextEncoder().encode("test"));
controller.close();
}
});
let data = "";
const stream = new UndiciStreamWrapper(rawStream);
for await (const chunk of stream) {
data += new TextDecoder().decode(chunk);
}

expect(data).toEqual("testtest");
});
});
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import type { Writable } from "stream";
import { EventCallback, StreamWrapper } from "./chooseStreamWrapper";

export class Node18UniversalStreamWrapper
implements StreamWrapper<Node18UniversalStreamWrapper | Writable | WritableStream<Uint8Array>, Uint8Array>
export class Node18UniversalStreamWrapper<ReadFormat extends Uint8Array | Uint16Array | Uint32Array>
implements
StreamWrapper<Node18UniversalStreamWrapper<ReadFormat> | Writable | WritableStream<ReadFormat>, ReadFormat>
{
private readableStream: ReadableStream<Uint8Array>;
private reader: ReadableStreamDefaultReader<Uint8Array>;
private readableStream: ReadableStream<ReadFormat>;
private reader: ReadableStreamDefaultReader<ReadFormat>;
private events: Record<string, EventCallback[] | undefined>;
private paused: boolean;
private resumeCallback: ((value?: unknown) => void) | null;
private encoding: string | null;

constructor(readableStream: ReadableStream<Uint8Array>) {
constructor(readableStream: ReadableStream<ReadFormat>) {
this.readableStream = readableStream;
this.reader = this.readableStream.getReader();
this.events = {
Expand All @@ -37,8 +38,8 @@ export class Node18UniversalStreamWrapper
}

public pipe(
dest: Node18UniversalStreamWrapper | Writable | WritableStream<Uint8Array>
): Node18UniversalStreamWrapper | Writable | WritableStream<Uint8Array> {
dest: Node18UniversalStreamWrapper<ReadFormat> | Writable | WritableStream<ReadFormat>
): Node18UniversalStreamWrapper<ReadFormat> | Writable | WritableStream<ReadFormat> {
this.on("data", async (chunk) => {
if (dest instanceof Node18UniversalStreamWrapper) {
dest._write(chunk);
Expand Down Expand Up @@ -78,12 +79,12 @@ export class Node18UniversalStreamWrapper
}

public pipeTo(
dest: Node18UniversalStreamWrapper | Writable | WritableStream<Uint8Array>
): Node18UniversalStreamWrapper | Writable | WritableStream<Uint8Array> {
dest: Node18UniversalStreamWrapper<ReadFormat> | Writable | WritableStream<ReadFormat>
): Node18UniversalStreamWrapper<ReadFormat> | Writable | WritableStream<ReadFormat> {
return this.pipe(dest);
}

public unpipe(dest: Node18UniversalStreamWrapper | Writable | WritableStream<Uint8Array>): void {
public unpipe(dest: Node18UniversalStreamWrapper<ReadFormat> | Writable | WritableStream<ReadFormat>): void {
this.off("data", async (chunk) => {
if (dest instanceof Node18UniversalStreamWrapper) {
dest._write(chunk);
Expand Down Expand Up @@ -149,7 +150,7 @@ export class Node18UniversalStreamWrapper
return this.paused;
}

public async read(): Promise<Uint8Array | undefined> {
public async read(): Promise<ReadFormat | undefined> {
if (this.paused) {
await new Promise((resolve) => {
this.resumeCallback = resolve;
Expand All @@ -168,7 +169,7 @@ export class Node18UniversalStreamWrapper
}

public async text(): Promise<string> {
const chunks: Uint8Array[] = [];
const chunks: ReadFormat[] = [];

while (true) {
const { done, value } = await this.reader.read();
Expand All @@ -185,7 +186,7 @@ export class Node18UniversalStreamWrapper
return JSON.parse(text);
}

private _write(chunk: Uint8Array): void {
private _write(chunk: ReadFormat): void {
this._emit("data", chunk);
}

Expand Down Expand Up @@ -228,4 +229,24 @@ export class Node18UniversalStreamWrapper
this._emit("error", error);
}
}

[Symbol.asyncIterator](): AsyncIterableIterator<ReadFormat> {
return {
next: async () => {
if (this.paused) {
await new Promise((resolve) => {
this.resumeCallback = resolve;
});
}
const { done, value } = await this.reader.read();
if (done) {
return { done: true, value: undefined };
}
return { done: false, value };
},
[Symbol.asyncIterator]() {
return this;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,20 @@ export class NodePre18StreamWrapper implements StreamWrapper<Writable, Buffer> {
const text = await this.text();
return JSON.parse(text);
}

public [Symbol.asyncIterator](): AsyncIterableIterator<Buffer> {
const readableStream = this.readableStream;
const iterator = readableStream[Symbol.asyncIterator]();

// Create and return an async iterator that yields buffers
return {
async next(): Promise<IteratorResult<Buffer>> {
const { value, done } = await iterator.next();
return { value: value as Buffer, done };
},
[Symbol.asyncIterator]() {
return this;
}
};
}
}
Loading

0 comments on commit ce30266

Please sign in to comment.