From a828744083e2c0d7bca7e1b764f352ac1b7f7922 Mon Sep 17 00:00:00 2001 From: Asher Gomez Date: Mon, 19 Feb 2024 21:24:22 +1100 Subject: [PATCH] feat(io): `iterateReader[Sync]()` (#4247) --- io/iterate_reader.ts | 104 +++++++++++++++++++++++++++++++++++ io/iterate_reader_test.ts | 111 ++++++++++++++++++++++++++++++++++++++ io/mod.ts | 1 + io/types.ts | 10 ++-- streams/iterate_reader.ts | 35 ++++-------- 5 files changed, 232 insertions(+), 29 deletions(-) create mode 100644 io/iterate_reader.ts create mode 100644 io/iterate_reader_test.ts diff --git a/io/iterate_reader.ts b/io/iterate_reader.ts new file mode 100644 index 000000000000..d9d370127405 --- /dev/null +++ b/io/iterate_reader.ts @@ -0,0 +1,104 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. +// This module is browser compatible. + +import { DEFAULT_BUFFER_SIZE } from "./_constants.ts"; +import type { Reader, ReaderSync } from "./types.ts"; + +export type { Reader, ReaderSync }; + +/** + * Turns a {@linkcode Reader} into an async iterator. + * + * @example + * ```ts + * import { iterateReader } from "https://deno.land/std@$STD_VERSION/io/iterate_reader.ts"; + * + * using file = await Deno.open("/etc/passwd"); + * for await (const chunk of iterateReader(file)) { + * console.log(chunk); + * } + * ``` + * + * Second argument can be used to tune size of a buffer. + * Default size of the buffer is 32kB. + * + * @example + * ```ts + * import { iterateReader } from "https://deno.land/std@$STD_VERSION/io/iterate_reader.ts"; + * + * using file = await Deno.open("/etc/passwd"); + * const iter = iterateReader(file, { + * bufSize: 1024 * 1024 + * }); + * for await (const chunk of iter) { + * console.log(chunk); + * } + * ``` + */ +export async function* iterateReader( + reader: Reader, + options?: { + bufSize?: number; + }, +): AsyncIterableIterator { + const bufSize = options?.bufSize ?? DEFAULT_BUFFER_SIZE; + const b = new Uint8Array(bufSize); + while (true) { + const result = await reader.read(b); + if (result === null) { + break; + } + + yield b.slice(0, result); + } +} + +/** + * Turns a {@linkcode ReaderSync} into an iterator. + * + * ```ts + * import { iterateReaderSync } from "https://deno.land/std@$STD_VERSION/io/iterate_reader.ts"; + * + * using file = Deno.openSync("/etc/passwd"); + * for (const chunk of iterateReaderSync(file)) { + * console.log(chunk); + * } + * ``` + * + * Second argument can be used to tune size of a buffer. + * Default size of the buffer is 32kB. + * + * ```ts + * import { iterateReaderSync } from "https://deno.land/std@$STD_VERSION/io/iterate_reader.ts"; + + * using file = await Deno.open("/etc/passwd"); + * const iter = iterateReaderSync(file, { + * bufSize: 1024 * 1024 + * }); + * for (const chunk of iter) { + * console.log(chunk); + * } + * ``` + * + * Iterator uses an internal buffer of fixed size for efficiency; it returns + * a view on that buffer on each iteration. It is therefore caller's + * responsibility to copy contents of the buffer if needed; otherwise the + * next iteration will overwrite contents of previously returned chunk. + */ +export function* iterateReaderSync( + reader: ReaderSync, + options?: { + bufSize?: number; + }, +): IterableIterator { + const bufSize = options?.bufSize ?? DEFAULT_BUFFER_SIZE; + const b = new Uint8Array(bufSize); + while (true) { + const result = reader.readSync(b); + if (result === null) { + break; + } + + yield b.slice(0, result); + } +} diff --git a/io/iterate_reader_test.ts b/io/iterate_reader_test.ts new file mode 100644 index 000000000000..b2adea11051b --- /dev/null +++ b/io/iterate_reader_test.ts @@ -0,0 +1,111 @@ +import { assertEquals } from "../assert/assert_equals.ts"; +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. + +import { iterateReader, iterateReaderSync } from "./iterate_reader.ts"; +import { readerFromIterable } from "../streams/reader_from_iterable.ts"; +import { delay } from "../async/delay.ts"; +import type { Reader, ReaderSync } from "./types.ts"; + +Deno.test("iterateReader()", async () => { + // ref: https://github.com/denoland/deno/issues/2330 + const encoder = new TextEncoder(); + + class TestReader implements Reader { + #offset = 0; + #buf: Uint8Array; + + constructor(s: string) { + this.#buf = new Uint8Array(encoder.encode(s)); + } + + read(p: Uint8Array): Promise { + const n = Math.min(p.byteLength, this.#buf.byteLength - this.#offset); + p.set(this.#buf.slice(this.#offset, this.#offset + n)); + this.#offset += n; + + if (n === 0) { + return Promise.resolve(null); + } + + return Promise.resolve(n); + } + } + + const reader = new TestReader("hello world!"); + + let totalSize = 0; + await Array.fromAsync( + iterateReader(reader), + (buf) => totalSize += buf.byteLength, + ); + + assertEquals(totalSize, 12); +}); + +Deno.test("iterateReader() works with slow consumer", async () => { + const a = new Uint8Array([97]); + const b = new Uint8Array([98]); + const iter = iterateReader(readerFromIterable([a, b])); + const promises = []; + for await (const bytes of iter) { + promises.push(delay(10).then(() => bytes)); + } + assertEquals([a, b], await Promise.all(promises)); +}); + +Deno.test("iterateReaderSync()", () => { + // ref: https://github.com/denoland/deno/issues/2330 + const encoder = new TextEncoder(); + + class TestReader implements ReaderSync { + #offset = 0; + #buf: Uint8Array; + + constructor(s: string) { + this.#buf = new Uint8Array(encoder.encode(s)); + } + + readSync(p: Uint8Array): number | null { + const n = Math.min(p.byteLength, this.#buf.byteLength - this.#offset); + p.set(this.#buf.slice(this.#offset, this.#offset + n)); + this.#offset += n; + + if (n === 0) { + return null; + } + + return n; + } + } + + const reader = new TestReader("hello world!"); + + let totalSize = 0; + for (const buf of iterateReaderSync(reader)) { + totalSize += buf.byteLength; + } + + assertEquals(totalSize, 12); +}); + +Deno.test("iterateReaderSync() works with slow consumer", async () => { + const a = new Uint8Array([97]); + const b = new Uint8Array([98]); + const data = [a, b]; + const readerSync = { + readSync(u8: Uint8Array) { + const bytes = data.shift(); + if (bytes) { + u8.set(bytes); + return bytes.length; + } + return null; + }, + }; + const iter = iterateReaderSync(readerSync); + const promises = []; + for (const bytes of iter) { + promises.push(delay(10).then(() => bytes)); + } + assertEquals([a, b], await Promise.all(promises)); +}); diff --git a/io/mod.ts b/io/mod.ts index 393a7856f16a..56750e0df945 100644 --- a/io/mod.ts +++ b/io/mod.ts @@ -14,6 +14,7 @@ export * from "./buf_writer.ts"; export * from "./buffer.ts"; export * from "./copy.ts"; export * from "./copy_n.ts"; +export * from "./iterate_reader.ts"; export * from "./limited_reader.ts"; export * from "./multi_reader.ts"; export * from "./read_all.ts"; diff --git a/io/types.ts b/io/types.ts index 5bf45f057bf5..5e0fac6eae93 100644 --- a/io/types.ts +++ b/io/types.ts @@ -24,8 +24,9 @@ export interface Reader { * * Implementations should not retain a reference to `p`. * - * Use iterateReader() from https://deno.land/std@$STD_VERSION/streams/iterate_reader.ts to turn a Reader into an - * AsyncIterator. + * Use + * {@linkcode https://deno.land/std@$STD_VERSION/io/to_iterator.ts?s=toIterator} + * to turn a {@linkcode Reader} into an {@linkcode AsyncIterableIterator}. */ read(p: Uint8Array): Promise; } @@ -52,8 +53,9 @@ export interface ReaderSync { * * Implementations should not retain a reference to `p`. * - * Use iterateReaderSync() from https://deno.land/std@$STD_VERSION/streams/iterate_reader.ts to turn a ReaderSync - * into an Iterator. + * Use + * {@linkcode https://deno.land/std@$STD_VERSION/io/to_iterator.ts?s=toIteratorSync} + * to turn a {@linkcode ReaderSync} into an {@linkcode IterableIterator}. */ readSync(p: Uint8Array): number | null; } diff --git a/streams/iterate_reader.ts b/streams/iterate_reader.ts index 131f95fdbd84..2057ce0dbdf3 100644 --- a/streams/iterate_reader.ts +++ b/streams/iterate_reader.ts @@ -1,7 +1,10 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. // This module is browser compatible. -import { DEFAULT_BUFFER_SIZE } from "./_common.ts"; +import { + iterateReader as _iterateReader, + iterateReaderSync as _iterateReaderSync, +} from "../io/iterate_reader.ts"; import type { Reader, ReaderSync } from "../io/types.ts"; export type { Reader, ReaderSync }; @@ -35,24 +38,15 @@ export type { Reader, ReaderSync }; * } * ``` * - * @deprecated (will be removed after 1.0.0) Use {@linkcode ReadableStreamDefaultReader} instead. + * @deprecated (will be removed in 1.0.0) Import from {@link https://deno.land/std/io/iterate_reader.ts} instead. */ -export async function* iterateReader( +export function iterateReader( r: Reader, options?: { bufSize?: number; }, ): AsyncIterableIterator { - const bufSize = options?.bufSize ?? DEFAULT_BUFFER_SIZE; - const b = new Uint8Array(bufSize); - while (true) { - const result = await r.read(b); - if (result === null) { - break; - } - - yield b.slice(0, result); - } + return _iterateReader(r, options); } /** @@ -87,22 +81,13 @@ export async function* iterateReader( * responsibility to copy contents of the buffer if needed; otherwise the * next iteration will overwrite contents of previously returned chunk. * - * @deprecated (will be removed after 1.0.0) Use {@linkcode ReadableStream} instead. + * @deprecated (will be removed in 1.0.0) Import from {@link https://deno.land/std/io/iterate_reader.ts} instead. */ -export function* iterateReaderSync( +export function iterateReaderSync( r: ReaderSync, options?: { bufSize?: number; }, ): IterableIterator { - const bufSize = options?.bufSize ?? DEFAULT_BUFFER_SIZE; - const b = new Uint8Array(bufSize); - while (true) { - const result = r.readSync(b); - if (result === null) { - break; - } - - yield b.slice(0, result); - } + return _iterateReaderSync(r, options); }