Skip to content

Commit

Permalink
feat(io): iterateReader[Sync]() (denoland#4247)
Browse files Browse the repository at this point in the history
  • Loading branch information
iuioiua authored Feb 19, 2024
1 parent 8205edc commit a828744
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 29 deletions.
104 changes: 104 additions & 0 deletions io/iterate_reader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
// This module is browser compatible.

import { DEFAULT_BUFFER_SIZE } from "./_constants.ts";
import type { Reader, ReaderSync } from "./types.ts";

export type { Reader, ReaderSync };

/**
* Turns a {@linkcode Reader} into an async iterator.
*
* @example
* ```ts
* import { iterateReader } from "https://deno.land/std@$STD_VERSION/io/iterate_reader.ts";
*
* using file = await Deno.open("/etc/passwd");
* for await (const chunk of iterateReader(file)) {
* console.log(chunk);
* }
* ```
*
* Second argument can be used to tune size of a buffer.
* Default size of the buffer is 32kB.
*
* @example
* ```ts
* import { iterateReader } from "https://deno.land/std@$STD_VERSION/io/iterate_reader.ts";
*
* using file = await Deno.open("/etc/passwd");
* const iter = iterateReader(file, {
* bufSize: 1024 * 1024
* });
* for await (const chunk of iter) {
* console.log(chunk);
* }
* ```
*/
export async function* iterateReader(
reader: Reader,
options?: {
bufSize?: number;
},
): AsyncIterableIterator<Uint8Array> {
const bufSize = options?.bufSize ?? DEFAULT_BUFFER_SIZE;
const b = new Uint8Array(bufSize);
while (true) {
const result = await reader.read(b);
if (result === null) {
break;
}

yield b.slice(0, result);
}
}

/**
* Turns a {@linkcode ReaderSync} into an iterator.
*
* ```ts
* import { iterateReaderSync } from "https://deno.land/std@$STD_VERSION/io/iterate_reader.ts";
*
* using file = Deno.openSync("/etc/passwd");
* for (const chunk of iterateReaderSync(file)) {
* console.log(chunk);
* }
* ```
*
* Second argument can be used to tune size of a buffer.
* Default size of the buffer is 32kB.
*
* ```ts
* import { iterateReaderSync } from "https://deno.land/std@$STD_VERSION/io/iterate_reader.ts";
* using file = await Deno.open("/etc/passwd");
* const iter = iterateReaderSync(file, {
* bufSize: 1024 * 1024
* });
* for (const chunk of iter) {
* console.log(chunk);
* }
* ```
*
* Iterator uses an internal buffer of fixed size for efficiency; it returns
* a view on that buffer on each iteration. It is therefore caller's
* responsibility to copy contents of the buffer if needed; otherwise the
* next iteration will overwrite contents of previously returned chunk.
*/
export function* iterateReaderSync(
reader: ReaderSync,
options?: {
bufSize?: number;
},
): IterableIterator<Uint8Array> {
const bufSize = options?.bufSize ?? DEFAULT_BUFFER_SIZE;
const b = new Uint8Array(bufSize);
while (true) {
const result = reader.readSync(b);
if (result === null) {
break;
}

yield b.slice(0, result);
}
}
111 changes: 111 additions & 0 deletions io/iterate_reader_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import { assertEquals } from "../assert/assert_equals.ts";
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.

import { iterateReader, iterateReaderSync } from "./iterate_reader.ts";
import { readerFromIterable } from "../streams/reader_from_iterable.ts";
import { delay } from "../async/delay.ts";
import type { Reader, ReaderSync } from "./types.ts";

Deno.test("iterateReader()", async () => {
// ref: https://github.com/denoland/deno/issues/2330
const encoder = new TextEncoder();

class TestReader implements Reader {
#offset = 0;
#buf: Uint8Array;

constructor(s: string) {
this.#buf = new Uint8Array(encoder.encode(s));
}

read(p: Uint8Array): Promise<number | null> {
const n = Math.min(p.byteLength, this.#buf.byteLength - this.#offset);
p.set(this.#buf.slice(this.#offset, this.#offset + n));
this.#offset += n;

if (n === 0) {
return Promise.resolve(null);
}

return Promise.resolve(n);
}
}

const reader = new TestReader("hello world!");

let totalSize = 0;
await Array.fromAsync(
iterateReader(reader),
(buf) => totalSize += buf.byteLength,
);

assertEquals(totalSize, 12);
});

Deno.test("iterateReader() works with slow consumer", async () => {
const a = new Uint8Array([97]);
const b = new Uint8Array([98]);
const iter = iterateReader(readerFromIterable([a, b]));
const promises = [];
for await (const bytes of iter) {
promises.push(delay(10).then(() => bytes));
}
assertEquals([a, b], await Promise.all(promises));
});

Deno.test("iterateReaderSync()", () => {
// ref: https://github.com/denoland/deno/issues/2330
const encoder = new TextEncoder();

class TestReader implements ReaderSync {
#offset = 0;
#buf: Uint8Array;

constructor(s: string) {
this.#buf = new Uint8Array(encoder.encode(s));
}

readSync(p: Uint8Array): number | null {
const n = Math.min(p.byteLength, this.#buf.byteLength - this.#offset);
p.set(this.#buf.slice(this.#offset, this.#offset + n));
this.#offset += n;

if (n === 0) {
return null;
}

return n;
}
}

const reader = new TestReader("hello world!");

let totalSize = 0;
for (const buf of iterateReaderSync(reader)) {
totalSize += buf.byteLength;
}

assertEquals(totalSize, 12);
});

Deno.test("iterateReaderSync() works with slow consumer", async () => {
const a = new Uint8Array([97]);
const b = new Uint8Array([98]);
const data = [a, b];
const readerSync = {
readSync(u8: Uint8Array) {
const bytes = data.shift();
if (bytes) {
u8.set(bytes);
return bytes.length;
}
return null;
},
};
const iter = iterateReaderSync(readerSync);
const promises = [];
for (const bytes of iter) {
promises.push(delay(10).then(() => bytes));
}
assertEquals([a, b], await Promise.all(promises));
});
1 change: 1 addition & 0 deletions io/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export * from "./buf_writer.ts";
export * from "./buffer.ts";
export * from "./copy.ts";
export * from "./copy_n.ts";
export * from "./iterate_reader.ts";
export * from "./limited_reader.ts";
export * from "./multi_reader.ts";
export * from "./read_all.ts";
Expand Down
10 changes: 6 additions & 4 deletions io/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ export interface Reader {
*
* Implementations should not retain a reference to `p`.
*
* Use iterateReader() from https://deno.land/std@$STD_VERSION/streams/iterate_reader.ts to turn a Reader into an
* AsyncIterator.
* Use
* {@linkcode https://deno.land/std@$STD_VERSION/io/to_iterator.ts?s=toIterator}
* to turn a {@linkcode Reader} into an {@linkcode AsyncIterableIterator}.
*/
read(p: Uint8Array): Promise<number | null>;
}
Expand All @@ -52,8 +53,9 @@ export interface ReaderSync {
*
* Implementations should not retain a reference to `p`.
*
* Use iterateReaderSync() from https://deno.land/std@$STD_VERSION/streams/iterate_reader.ts to turn a ReaderSync
* into an Iterator.
* Use
* {@linkcode https://deno.land/std@$STD_VERSION/io/to_iterator.ts?s=toIteratorSync}
* to turn a {@linkcode ReaderSync} into an {@linkcode IterableIterator}.
*/
readSync(p: Uint8Array): number | null;
}
Expand Down
35 changes: 10 additions & 25 deletions streams/iterate_reader.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
// This module is browser compatible.

import { DEFAULT_BUFFER_SIZE } from "./_common.ts";
import {
iterateReader as _iterateReader,
iterateReaderSync as _iterateReaderSync,
} from "../io/iterate_reader.ts";
import type { Reader, ReaderSync } from "../io/types.ts";

export type { Reader, ReaderSync };
Expand Down Expand Up @@ -35,24 +38,15 @@ export type { Reader, ReaderSync };
* }
* ```
*
* @deprecated (will be removed after 1.0.0) Use {@linkcode ReadableStreamDefaultReader} instead.
* @deprecated (will be removed in 1.0.0) Import from {@link https://deno.land/std/io/iterate_reader.ts} instead.
*/
export async function* iterateReader(
export function iterateReader(
r: Reader,
options?: {
bufSize?: number;
},
): AsyncIterableIterator<Uint8Array> {
const bufSize = options?.bufSize ?? DEFAULT_BUFFER_SIZE;
const b = new Uint8Array(bufSize);
while (true) {
const result = await r.read(b);
if (result === null) {
break;
}

yield b.slice(0, result);
}
return _iterateReader(r, options);
}

/**
Expand Down Expand Up @@ -87,22 +81,13 @@ export async function* iterateReader(
* responsibility to copy contents of the buffer if needed; otherwise the
* next iteration will overwrite contents of previously returned chunk.
*
* @deprecated (will be removed after 1.0.0) Use {@linkcode ReadableStream} instead.
* @deprecated (will be removed in 1.0.0) Import from {@link https://deno.land/std/io/iterate_reader.ts} instead.
*/
export function* iterateReaderSync(
export function iterateReaderSync(
r: ReaderSync,
options?: {
bufSize?: number;
},
): IterableIterator<Uint8Array> {
const bufSize = options?.bufSize ?? DEFAULT_BUFFER_SIZE;
const b = new Uint8Array(bufSize);
while (true) {
const result = r.readSync(b);
if (result === null) {
break;
}

yield b.slice(0, result);
}
return _iterateReaderSync(r, options);
}

0 comments on commit a828744

Please sign in to comment.