Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(streams/unstable): toLines() #5121

Merged
merged 7 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions streams/deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"./to-array-buffer": "./to_array_buffer.ts",
"./to-blob": "./to_blob.ts",
"./to-json": "./to_json.ts",
"./to-lines": "./to_lines.ts",
"./to-text": "./to_text.ts",
"./to-transform-stream": "./to_transform_stream.ts",
"./zip-readable-streams": "./zip_readable_streams.ts"
Expand Down
1 change: 1 addition & 0 deletions streams/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export * from "./text_line_stream.ts";
export * from "./to_array_buffer.ts";
export * from "./to_blob.ts";
export * from "./to_json.ts";
export * from "./to_lines.ts";
export * from "./to_text.ts";
export * from "./to_transform_stream.ts";
export * from "./zip_readable_streams.ts";
76 changes: 76 additions & 0 deletions streams/to_lines.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.

/**
* Converts a ReadableStream of `Uint8Array` or `string` into an
* `AsyncGenerator` of `string`, where each value is divided by a newline at
* `\n` or `\r\n`. Trimming the last line if it is empty.
*
* @param readable A `ReadableStream` of `Uint8Array` or `string`.
* @returns An `AsyncGenerator<string>`
*
* @example JSON Lines
* ```ts
* import { toLines } from "@std/streams/to-lines";
* import { assertEquals } from "@std/assert/assert-equals";
*
* const readable = ReadableStream.from([
* '{"name": "Alice", "age": ',
* '30}\r\n{"name": "Bob", "age"',
* ": 25}\n",
* ]);
*
* type Person = { name: string, age: number };
*
* const people: Person[] = []
* for await (const line of toLines(readable)) {
* people.push(JSON.parse(line) as Person)
* }
*
* assertEquals(
* people,
* [{ "name": "Alice", "age": 30 }, { "name": "Bob", "age": 25 }],
* );
* ```
*/
export async function* toLines(
readable: ReadableStream<Uint8Array> | ReadableStream<string>,
BlackAsLight marked this conversation as resolved.
Show resolved Hide resolved
): AsyncGenerator<string> {
const reader = readable.getReader();
const decoder = new TextDecoder();
let buffer = "";
let index = 0;
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
buffer += typeof value === "string" ? value : decoder.decode(value);
while (index < buffer.length) {
if (buffer[index] === "\n") {
yield buffer.slice(0, index - (buffer[index - 1] === "\r" ? 1 : 0));
buffer = buffer.slice(index + 1);
index = 0;
} else {
++index;
}
}
}
if (buffer.length) {
yield buffer;
}
BlackAsLight marked this conversation as resolved.
Show resolved Hide resolved
} catch (reason) {
await reader.cancel(reason);
}

Check warning on line 64 in streams/to_lines.ts

View check run for this annotation

Codecov / codecov/patch

streams/to_lines.ts#L63-L64

Added lines #L63 - L64 were not covered by tests
BlackAsLight marked this conversation as resolved.
Show resolved Hide resolved
}

console.log(
await Array.fromAsync(toLines(ReadableStream.from([
"qwertzu",
"iopasd\r\nmnbvc",
"xylk\rjhgfds\napoiuzt\r",
"qwr\r09ei\rqwrjiowqr\r",
"\nrewq0987\n\n654321",
"\nrewq0987\r\n\r\n654321\r",
]))),
);
BlackAsLight marked this conversation as resolved.
Show resolved Hide resolved
53 changes: 53 additions & 0 deletions streams/to_lines_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.

import { toLines } from "./to_lines.ts";
import { assertEquals } from "@std/assert";

Deno.test("toLines parses simple input", async () => {
const stream = ReadableStream.from([
"qwertzu",
"iopasd\r\nmnbvc",
"xylk\rjhgfds\napoiuzt\r",
"qwr\r09ei\rqwrjiowqr\r",
"\nrewq0987\n\n654321",
"\nrewq0987\r\n\r\n654321\r",
]);

assertEquals(await Array.fromAsync(toLines(stream)), [
"qwertzuiopasd",
"mnbvcxylk\rjhgfds",
"apoiuzt\rqwr\r09ei\rqwrjiowqr",
"rewq0987",
"",
"654321",
"rewq0987",
"",
"654321\r",
]);

const stream2 = ReadableStream.from("rewq0987\r\n\r\n654321\n");

assertEquals(await Array.fromAsync(toLines(stream2)), [
"rewq0987",
"",
"654321",
]);
});

Deno.test("toLines parses large chunks", async () => {
const totalLines = 20_000;
const stream = ReadableStream.from("\n".repeat(totalLines));
const lines = await Array.fromAsync(toLines(stream));

assertEquals(lines.length, totalLines);
assertEquals(lines, Array.from({ length: totalLines }).fill(""));
});

Deno.test("toLines converts Uint8Array chunks to strings", async () => {
const stream = ReadableStream.from([
Uint8Array.from([72, 101, 108, 108, 111]),
Uint8Array.from([10, 87, 111, 114, 108, 100, 33, 10]),
]);

assertEquals(await Array.fromAsync(toLines(stream)), ["Hello", "World!"]);
});