diff --git a/.eslintrc.yaml b/.eslintrc.yaml index 8ce4b4e2a..150ea794d 100644 --- a/.eslintrc.yaml +++ b/.eslintrc.yaml @@ -56,7 +56,7 @@ overrides: parserOptions: sourceType: module project: - - ./{packages{,/ts-type-utils},actions}/*{,/tests{,/helpers}}/tsconfig.json + - ./{packages{,/ts-type-utils},actions}/*{,/tests{,/helpers},/test-d}/tsconfig.json settings: node: # see https://github.com/mysticatea/eslint-plugin-node/blob/v11.1.0/docs/rules/shebang.md diff --git a/packages/stream-transform-from/.github/workflows/publish.sh b/packages/stream-transform-from/.github/workflows/publish.sh new file mode 100755 index 000000000..6495a1d96 --- /dev/null +++ b/packages/stream-transform-from/.github/workflows/publish.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +# https://qiita.com/yudoufu/items/48cb6fb71e5b498b2532#comment-87e291b98f4cabf77138 +readonly DIR_PATH="$(cd "$(dirname "${BASH_SOURCE:-${(%):-%N}}")"; pwd)" + +outputs_tag_name="${outputs_tag_name}" \ + node "${DIR_PATH}/../../scripts/publish-convert-readme.js" + +# see https://stackoverflow.com/a/62675843/4907315 +pnpm publish --access=public --no-git-checks diff --git a/packages/stream-transform-from/README.md b/packages/stream-transform-from/README.md new file mode 100644 index 000000000..9a15e829c --- /dev/null +++ b/packages/stream-transform-from/README.md @@ -0,0 +1,166 @@ +# @sounisi5011/stream-transform-from + +[![Go to the latest release page on npm](https://img.shields.io/npm/v/@sounisi5011/stream-transform-from.svg)](https://www.npmjs.com/package/@sounisi5011/stream-transform-from) +![Supported Node.js version: ^12.17.x || 14.x || 15.x || 16.x](https://img.shields.io/node/v/@sounisi5011/stream-transform-from) +[![Tested with Jest](https://img.shields.io/badge/tested_with-jest-99424f.svg)](https://github.com/facebook/jest) +[![Commitizen friendly](https://img.shields.io/badge/commitizen-friendly-brightgreen.svg)](http://commitizen.github.io/cz-cli/) + + +[![Dependencies Status](https://status.david-dm.org/gh/sounisi5011/npm-packages.svg?path=packages%2Fstream-transform-from)](https://david-dm.org/sounisi5011/npm-packages?path=packages/stream-transform-from) +[![Build Status](https://github.com/sounisi5011/npm-packages/actions/workflows/ci.yaml/badge.svg)](https://github.com/sounisi5011/npm-packages/actions/workflows/ci.yaml) +[![Maintainability Status](https://api.codeclimate.com/v1/badges/26495b68302f7ff963c3/maintainability)](https://codeclimate.com/github/sounisi5011/npm-packages/maintainability) + +[`stream.Transform` class]: https://nodejs.org/docs/latest/api/stream.html#stream_class_stream_transform +[`Buffer` object]: https://nodejs.org/api/buffer.html + +Create a [transform stream][`stream.Transform` class] from an async iterator. +This is [the last piece](https://github.com/nodejs/node/issues/27140#issuecomment-533266638) needed to convert between streams and async iterators/generators. + +## Features + +* No dependencies + + This package uses only the Node.js built-in [`stream.Transform` class]. + +* Strict type definition + + The exact type definitions for arguments and return values will be generated based on the `objectMode` option. + +* Encoding arguments can be used + + You can use `encoding`, which is passed as the second argument of the [`transform._transform()` method](https://nodejs.org/docs/latest/api/stream.html#stream_transform_transform_chunk_encoding_callback). + This allows you to safely convert a string to [`Buffer` object]. + +## Installation + +```sh +npm install @sounisi5011/stream-transform-from +``` + +```sh +yarn add @sounisi5011/stream-transform-from +``` + +```sh +pnpm add @sounisi5011/stream-transform-from +``` + +## Usage + +### Convert [`Buffer` objects][`Buffer` object] + +```js +const fs = require('fs'); +const stream = require('stream'); + +const { transformFrom } = require('@sounisi5011/stream-transform-from'); + +stream.pipeline( + fs.createReadStream('input.txt', 'utf8'), + transformFrom(async function*(source) { + for await (const { chunk } of source) { + yield chunk.toString('utf8').toUpperCase(); + } + }), + fs.createWriteStream('output.txt'), + error => { + if (error) { + console.error(error); + } else { + console.log('done!'); + } + } +); +``` + +### Convert any type value + +```js +const stream = require('stream'); + +const { transformFrom } = require('@sounisi5011/stream-transform-from'); + +stream.pipeline( + stream.Readable.from([1, 2, 3]), + transformFrom( + async function*(source) { + for await (const { chunk } of source) { + yield chunk + 2; + } + }, + { objectMode: true } + ), + // ... + error => { + if (error) { + console.error(error); + } else { + console.log('done!'); + } + } +); +``` + +### Convert string to [`Buffer`][`Buffer` object] using encoding + +```js +const stream = require('stream'); + +const { transformFrom } = require('@sounisi5011/stream-transform-from'); + +stream.pipeline( + // ... + transformFrom( + async function*(source) { + for await (const { chunk, encoding } of source) { + if (typeof chunk === 'string') { + yield Buffer.from(chunk, encoding); + } + } + }, + { writableObjectMode: true } + ), + // ... + error => { + if (error) { + console.error(error); + } else { + console.log('done!'); + } + } +); +``` + +## API + +```js +const { transformFrom } = require('@sounisi5011/stream-transform-from'); + +// The return value is a Transform stream. +const transformStream = transformFrom( + async function*(source) { + // `source` is `AsyncIterableIterator<{ chunk: Buffer, encoding: BufferEncoding }>` + // or `AsyncIterableIterator<{ chunk: unknown, encoding: BufferEncoding }>` type + + // The value returned by `yield` keyword will be passed as the first argument of `transform.push()` method. + }, + + // The second argument is an options for the Transform stream. + // The options are passed to the constructor function of the Transform class. + // However, the following fields are not allowed: + // + `construct` + // + `read` + // + `write` + // + `writev` + // + `final` + // + `destroy` + // + `transform` + // + `flush` + // The fields listed above will be ignored if specified. + {} +); +``` + +## Related + +* [generator-transform-stream](https://github.com/bealearts/generator-transform-stream) diff --git a/packages/stream-transform-from/examples/index.js b/packages/stream-transform-from/examples/index.js new file mode 100644 index 000000000..bf972ea50 --- /dev/null +++ b/packages/stream-transform-from/examples/index.js @@ -0,0 +1,44 @@ +const stream = require('stream'); + +const { transformFrom } = require('@sounisi5011/stream-transform-from'); + +stream.pipeline( + stream.Readable.from([1, 2, 3, 4, 5]), + // Convert a number to a Buffer object. + transformFrom( + async function*(source) { + for await (const { chunk: inputChunk } of source) { + console.log({ inputChunk }); + + if (typeof inputChunk === 'number') { + const code = inputChunk; + yield Buffer.from([code]); + } + } + }, + { writableObjectMode: true }, + ), + // Transform a Buffer object. + transformFrom(async function*(source) { + for await (const { chunk } of source) { + yield Buffer.concat([ + Buffer.from([0xF0]), + chunk, + Buffer.from([0xFF]), + ]); + } + }), + new stream.Writable({ + write(outputChunk, _, done) { + console.log({ outputChunk }); + done(); + }, + }), + error => { + if (error) { + console.error(error); + } else { + console.log('done!'); + } + }, +); diff --git a/packages/stream-transform-from/examples/package.json b/packages/stream-transform-from/examples/package.json new file mode 100644 index 000000000..f8ab790c3 --- /dev/null +++ b/packages/stream-transform-from/examples/package.json @@ -0,0 +1,9 @@ +{ + "private": true, + "dependencies": { + "@sounisi5011/stream-transform-from": "link:.." + }, + "engines": { + "node": "^12.3.x" + } +} diff --git a/packages/stream-transform-from/jest.config.js b/packages/stream-transform-from/jest.config.js new file mode 100644 index 000000000..868d6590b --- /dev/null +++ b/packages/stream-transform-from/jest.config.js @@ -0,0 +1,11 @@ +module.exports = { + preset: 'ts-jest', + coverageDirectory: 'coverage', + globals: { + 'ts-jest': { + tsconfig: '/tests/tsconfig.json', + }, + }, + testEnvironment: 'node', + testMatch: ['/tests/**/*.ts'], +}; diff --git a/packages/stream-transform-from/package.json b/packages/stream-transform-from/package.json new file mode 100644 index 000000000..503382a8d --- /dev/null +++ b/packages/stream-transform-from/package.json @@ -0,0 +1,86 @@ +{ + "name": "@sounisi5011/stream-transform-from", + "version": "0.0.0", + "description": "Create a transform stream from an async iterator", + "keywords": [ + "async", + "asyncgenerator", + "asyncgeneratorfunction", + "asyncgeneratorfunctions", + "asyncgenerators", + "asynciterable", + "asynciterables", + "asynciterator", + "asynciterators", + "from", + "generator", + "generatorfunction", + "generatorfunctions", + "generators", + "iterable", + "iterables", + "iterator", + "iterators", + "stream", + "transform", + "util", + "utility" + ], + "homepage": "https://github.com/sounisi5011/npm-packages/tree/main/packages/stream-transform-from#readme", + "bugs": { + "url": "https://github.com/sounisi5011/npm-packages/issues" + }, + "repository": { + "type": "git", + "url": "git+https://github.com/sounisi5011/npm-packages.git", + "directory": "packages/stream-transform-from" + }, + "license": "MIT", + "author": "sounisi5011", + "type": "commonjs", + "exports": { + ".": "./dist/index.js", + "./package.json": "./package.json" + }, + "main": "./dist/index.js", + "types": "./dist/index.d.ts", + "directories": { + "lib": "./src/", + "example": "./examples/", + "test": "./tests/" + }, + "files": [ + "dist/", + "src/" + ], + "scripts": { + "build": "tsc", + "lint:tsc": "run-p lint:tsc:*", + "lint:tsc:src": "tsc --noEmit", + "lint:tsc:test": "tsc -p ./tests/ --noEmit", + "lint:tsc:test-d": "tsc -p ./test-d/ --noEmit", + "test": "run-if-supported-node run-p test:*", + "test:examples": "run-s build test:examples:*", + "test:examples:index": "node ./examples/index.js", + "test:jest": "jest", + "test:tsd": "run-s build test:tsd:*", + "test:tsd:exec": "tsd" + }, + "devDependencies": { + "@sounisi5011/scripts--run-if-supported-node": "workspace:^0.0.0", + "@types/node": "15.x", + "tsd": "0.15.1" + }, + "engines": { + "node": "^12.17.x || 14.x || 15.x || 16.x" + }, + "runkitExampleFilename": "./examples/index.js", + "tsd": { + "compilerOptions": { + "lib": [ + "es2019" + ], + "rootDir": "./" + } + } +} diff --git a/packages/stream-transform-from/scripts/publish-convert-readme.js b/packages/stream-transform-from/scripts/publish-convert-readme.js new file mode 100644 index 000000000..5dbcded8a --- /dev/null +++ b/packages/stream-transform-from/scripts/publish-convert-readme.js @@ -0,0 +1,87 @@ +const fs = require('fs'); +const path = require('path'); + +const getPath = pathname => path.resolve(__dirname, pathname); + +const pkgPath = getPath('../package.json'); +const readmePath = getPath('../README.md'); + +const pkg = require(pkgPath); + +/** + * @param {string} name + * @returns {string|undefined} + */ +function lookupEnv(name) { + for (const [envName, value] of Object.entries(process.env)) { + if (envName.toLowerCase() === name.toLowerCase()) { + return value; + } + } + return undefined; +} + +const repoURL = pkg.repository + && (typeof pkg.repository === 'string' ? pkg.repository : pkg.repository.url) + .replace(/^git\+/, '') + .replace(/\.git$/, ''); +const directory = pkg.repository && typeof pkg.repository === 'object' + ? pkg.repository.directory.replace(/\/*$/, '/') + : ''; +const tagName = lookupEnv('outputs_tag_name'); +const rootURL = repoURL ? `${repoURL}/tree/${tagName || 'main'}/${directory}` : ''; + +/** + * @param {string} url + * @returns {string} + */ +function replaceURL(url) { + if (pkg.version) { + if (url.startsWith('https://img.shields.io/bundlephobia/') && !/\/\d+(?:\.\d+){2}$/.test(url)) { + return `${url}/${pkg.version}`; + } + if ( + url.startsWith('https://bundlephobia.com/result?p=') + || url.startsWith('https://packagephobia.com/badge?p=') + || url.startsWith('https://packagephobia.com/result?p=') + ) { + return url.replace(/(?:@\d+(?:\.\d+){2})?$/, `@${pkg.version}`); + } + } + if (rootURL) { + if (url.startsWith('./')) { + return rootURL + url.replace(/^\.\//, ''); + } + } + if (pkg.license) { + if (url.startsWith('https://img.shields.io/npm/l/')) { + return `https://img.shields.io/static/v1?label=license&message=${encodeURIComponent(pkg.license)}&color=green`; + } + } + if (pkg.engines && pkg.engines.node && url.startsWith('https://img.shields.io/node/v/')) { + return `https://img.shields.io/static/v1?label=node&message=${ + encodeURIComponent(pkg.engines.node) + }&color=brightgreen`; + } + return url; +} + +const readmeText = fs.readFileSync(readmePath, 'utf8'); + +const updatedReadmeText = readmeText + .replace(/(?<=\()(?:https?:\/\/|\.{1,2}\/)[^)\s]+/g, url => { + const newUrl = replaceURL(url); + if (newUrl !== url) { + console.log(`replace "${url}"\n to "${newUrl}"`); + } + return newUrl; + }) + .replace(/(?<=\]: *)(?:https?:\/\/|\.{1,2}\/)[^\s]+/g, url => { + const newUrl = replaceURL(url); + if (newUrl !== url) { + console.log(`replace "${url}"\n to "${newUrl}"`); + } + return newUrl; + }); + +fs.writeFileSync(readmePath, updatedReadmeText); diff --git a/packages/stream-transform-from/src/index.ts b/packages/stream-transform-from/src/index.ts new file mode 100644 index 000000000..83c192fe9 --- /dev/null +++ b/packages/stream-transform-from/src/index.ts @@ -0,0 +1,220 @@ +import { version as nodeVersion } from 'process'; +import { Transform } from 'stream'; + +import type * as stream from 'stream'; + +type GetPropValue = K extends (keyof T) ? T[K] : undefined; + +type IfNeverThenUnknown = [T] extends [never] ? unknown : T; + +/** + * If the `objectMode` and `writableObjectMode` options is not `true`, + * the chunk value is always an instance of Buffer. + */ +export type InputChunkType = ( + true extends GetPropValue ? unknown : Buffer +); + +/** + * If the `objectMode` and `readableObjectMode` options is not `true`, + * the chunk value must be of type string or an instance of Buffer or Uint8Array. + * @see https://github.com/nodejs/node/blob/v12.17.0/lib/_stream_readable.js#L226-L244 + */ +export type OutputChunkType = IfNeverThenUnknown< + T extends ({ objectMode: true } | { readableObjectMode: true }) ? never + : string | Buffer | Uint8Array +>; + +export type SourceIterator = ( + AsyncIterableIterator<{ + chunk: InputChunkType; + encoding: BufferEncoding; + }> +); + +export type TransformFunction = ( + (source: SourceIterator) => + | Iterable> + | AsyncIterable> +); + +type ReceivedData = + | { chunk: InputChunkType; encoding: BufferEncoding; done?: false } + | { done: true }; + +/** + * Prior to Node.js v14, there is a bug where the `finish` event is fired before the callback passed to the `transform._flush()` method is called. + * This bug has been fixed in Node.js v15. + * @see https://github.com/nodejs/node/issues/34274 + * @see https://github.com/nodejs/node/pull/34314 + */ +const HAS_FLUSH_BUG = !(Number(nodeVersion.match(/(?<=^v)\d+/)?.[0]) >= 15); + +const DISALLOW_OPTION_NAMES = [ + 'construct', + 'read', + 'write', + 'writev', + 'final', + 'destroy', + 'transform', + 'flush', +] as const; + +function removeProp(obj: T, props: readonly never[]): T; +function removeProp( + obj: T | undefined, + props: readonly K[], +): Omit | undefined; +function removeProp( + obj: T | null, + props: readonly K[], +): Omit | null; +function removeProp( + obj: T | null | undefined, + props: readonly K[], +): Omit | null | undefined; +function removeProp( + obj: Record | null | undefined, + props: readonly PropertyKey[], +): Record | null | undefined { + if (obj === null || obj === undefined) return obj; + return Object.fromEntries( + Object.entries(obj) + .filter(([p]) => !props.includes(p)), + ); +} + +export class TransformFromAsyncIterable< + TOpts extends stream.TransformOptions = Record +> extends Transform { + private transformCallback: stream.TransformCallback | undefined; + private isFinished = false; + private receiveData?: (data: ReceivedData) => void; + private readonly receivedDataList: Array> = []; + + constructor(transformFn: TransformFunction, opts?: TOpts) { + super(removeProp(opts, DISALLOW_OPTION_NAMES)); + + const source = this.createSource(); + const result = transformFn(source); + (async () => { + for await (const chunk of result) { + this.push(chunk); + } + })() + .then(() => this.finish()) + .catch(error => this.finish(error)); + } + + _transform( + chunk: InputChunkType, + encoding: BufferEncoding, + callback: stream.TransformCallback, + ): void { + if (this.isFinished) { + callback(); + } else { + this.transformCallback = callback; + this.emitToSource({ chunk, encoding }); + } + } + + _flush(callback: stream.TransformCallback): void { + if (this.isFinished) { + callback(); + } else { + this.transformCallback = HAS_FLUSH_BUG + ? this.emitFinishEventAfterCallback(callback) + : callback; + this.emitToSource({ done: true }); + } + } + + private finish(error?: Error): void { + this.isFinished = true; + if (error) { + if (!this.callTransformCallback(error)) { + this.destroy(error); + } + } else { + this.push(null); + this.callTransformCallback(); + } + } + + private callTransformCallback( + ...args: Parameters + ): boolean { + const { transformCallback } = this; + if (transformCallback) { + this.transformCallback = undefined; + transformCallback(...args); + return true; + } + return false; + } + + private async *createSource(): SourceIterator { + while (true) { + const data: ReceivedData = ( + this.receivedDataList.shift() + ?? await new Promise(resolve => { + this.receiveData = resolve; + this.callTransformCallback(); + }) + ); + if (data.done) break; + const { done: _, ...chunkData } = data; + yield chunkData; + } + } + + private emitToSource(data: ReceivedData): void { + const { receiveData } = this; + if (receiveData) { + this.receiveData = undefined; + receiveData(data); + } else { + this.receivedDataList.push(data); + } + } + + private emitFinishEventAfterCallback( + flushCallback: stream.TransformCallback, + ): stream.TransformCallback { + const finishEventName = 'finish'; + const finishEventList: unknown[][] = []; + + this.emit = (event: string | symbol, ...args: unknown[]) => { + if (finishEventName === event) { + finishEventList.push(args); + return false; + } + return super.emit(event, ...args); + }; + + return (...args) => { + flushCallback(...args); + + // @ts-expect-error TS2790: The operand of a 'delete' operator must be optional. + delete this.emit; + + const [error] = args; + if (!error) { + for (const args of finishEventList) { + this.emit(finishEventName, ...args); + } + } + }; + } +} + +export function transformFrom< + TOpts extends stream.TransformOptions = Record +>( + transformFn: TransformFunction, + options?: TOpts, +): stream.Transform { + return new TransformFromAsyncIterable(transformFn, options); +} diff --git a/packages/stream-transform-from/test-d/index.test-d.ts b/packages/stream-transform-from/test-d/index.test-d.ts new file mode 100644 index 000000000..a0be40b6e --- /dev/null +++ b/packages/stream-transform-from/test-d/index.test-d.ts @@ -0,0 +1,247 @@ +import { expectType } from 'tsd'; + +import { transformFrom } from '../src'; + +import type * as stream from 'stream'; + +expectType(transformFrom(async function*(source) { + expectType>(source); + for await (const { chunk, encoding } of source) { + expectType(chunk); + expectType(encoding); + } + yield ''; +})); + +/** + * Source type + */ + +transformFrom(async function*(source) { + for await (const { chunk } of source) expectType(chunk); + yield ''; +}, { objectMode: false }); + +transformFrom(async function*(source) { + for await (const { chunk } of source) expectType(chunk); + yield ''; +}, { objectMode: true }); + +transformFrom(async function*(source) { + for await (const { chunk } of source) expectType(chunk); + yield ''; +}, { objectMode: undefined }); + +transformFrom(async function*(source) { + for await (const { chunk } of source) expectType(chunk); + yield ''; +}, { readableObjectMode: false }); + +transformFrom(async function*(source) { + for await (const { chunk } of source) expectType(chunk); + yield ''; +}, { readableObjectMode: true }); + +transformFrom(async function*(source) { + for await (const { chunk } of source) expectType(chunk); + yield ''; +}, { readableObjectMode: undefined }); + +transformFrom(async function*(source) { + for await (const { chunk } of source) expectType(chunk); + yield ''; +}, { writableObjectMode: false }); + +transformFrom(async function*(source) { + for await (const { chunk } of source) expectType(chunk); + yield ''; +}, { writableObjectMode: true }); + +transformFrom(async function*(source) { + for await (const { chunk } of source) expectType(chunk); + yield ''; +}, { writableObjectMode: undefined }); + +declare const transformOpts: stream.TransformOptions; + +transformFrom(async function*(source) { + for await (const { chunk } of source) expectType(chunk); + yield ''; +}, transformOpts); + +transformFrom(async function*(source) { + for await (const { chunk } of source) expectType(chunk); + yield ''; +}, { ...transformOpts, objectMode: false, writableObjectMode: false }); + +transformFrom(async function*(source) { + for await (const { chunk } of source) expectType(chunk); + yield ''; +}, { ...transformOpts, objectMode: undefined, writableObjectMode: undefined }); + +declare const boolTypeObjectMode: { objectMode: boolean }; + +transformFrom(async function*(source) { + for await (const { chunk } of source) expectType(chunk); + yield ''; +}, boolTypeObjectMode); + +transformFrom(async function*(source) { + for await (const { chunk } of source) expectType(chunk); + yield ''; +}, { ...boolTypeObjectMode, objectMode: false }); + +transformFrom(async function*(source) { + for await (const { chunk } of source) expectType(chunk); + yield ''; +}, { ...boolTypeObjectMode, objectMode: undefined }); + +declare const boolIndexSignature: Record; + +transformFrom(async function*(source) { + for await (const { chunk } of source) expectType(chunk); + yield ''; +}, boolIndexSignature); + +declare const falseIndexSignature: Record; + +transformFrom(async function*(source) { + for await (const { chunk } of source) expectType(chunk); + yield ''; +}, falseIndexSignature); + +/** + * Output type + */ + +// Object values are not allowed because the "objectMode" option has a value of not `true` +// @ts-expect-error TS2345 +transformFrom(async function*() { + yield 42; +}, { objectMode: false }); + +// Object values are allowed because the "objectMode" option has a value of `true` +transformFrom(async function*() { + yield 42; +}, { objectMode: true }); + +// Object values are not allowed because the "objectMode" option has a value of not `true` +// @ts-expect-error TS2345 +transformFrom(async function*() { + yield 42; +}, { objectMode: undefined }); + +// Object values are not allowed because the "readableObjectMode" option has a value of not `true` +// @ts-expect-error TS2345 +transformFrom(async function*() { + yield 42; +}, { readableObjectMode: false }); + +// Object values are allowed because the "readableObjectMode" option has a value of `true` +transformFrom(async function*() { + yield 42; +}, { readableObjectMode: true }); + +// Object values are not allowed because the "readableObjectMode" option has a value of not `true` +// @ts-expect-error TS2345 +transformFrom(async function*() { + yield 42; +}, { readableObjectMode: undefined }); + +// @ts-expect-error TS2345 +transformFrom(async function*() { + yield 42; +}, { writableObjectMode: false }); + +// @ts-expect-error TS2345 +transformFrom(async function*() { + yield 42; +}, { writableObjectMode: true }); + +// @ts-expect-error TS2345 +transformFrom(async function*() { + yield 42; +}, { writableObjectMode: undefined }); + +// Object values are allowed because the "objectMode" option has a value of `true` +transformFrom(async function*() { + yield 42; +}, { objectMode: true, readableObjectMode: false }); + +// Object values are allowed because the "readableObjectMode" option has a value of `true` +transformFrom(async function*() { + yield 42; +}, { objectMode: false, readableObjectMode: true }); + +// Object values are allowed because the "objectMode" and "readableObjectMode" options have a value of `true` +transformFrom(async function*() { + yield 42; +}, { objectMode: true, readableObjectMode: true }); + +// Object values are not allowed because the value may not be `true` +// @ts-expect-error TS2345 +transformFrom(async function*() { + yield 42; +}, transformOpts); + +// Object values are allowed because the "objectMode" option has a value of `true` +transformFrom(async function*() { + yield 42; +}, { ...transformOpts, objectMode: true }); + +// Object values are allowed because the "readableObjectMode" option has a value of `true` +transformFrom(async function*() { + yield 42; +}, { ...transformOpts, readableObjectMode: true }); + +// Object values are allowed because the "objectMode" option has a value of `true` +transformFrom(async function*() { + yield 42; +}, { ...transformOpts, objectMode: true, readableObjectMode: false }); + +// Object values are allowed because the "readableObjectMode" option has a value of `true` +transformFrom(async function*() { + yield 42; +}, { ...transformOpts, objectMode: false, readableObjectMode: true }); + +// Object values are allowed because the "objectMode" and "readableObjectMode" options have a value of `true` +transformFrom(async function*() { + yield 42; +}, { ...transformOpts, objectMode: true, readableObjectMode: true }); + +// Object values are not allowed because the "objectMode" and "readableObjectMode" options have a value of not `true` +// @ts-expect-error TS2345 +transformFrom(async function*() { + yield 42; +}, { ...transformOpts, objectMode: undefined, writableObjectMode: undefined }); + +// Object values are not allowed because the value of the "objectMode" option may not be `true` +// @ts-expect-error TS2345 +transformFrom(async function*() { + yield 42; +}, boolTypeObjectMode); + +// Object values are allowed because the "objectMode" option has a value of `true` +transformFrom(async function*() { + yield 42; +}, { ...boolTypeObjectMode, objectMode: true }); + +// Object values are not allowed because the "objectMode" option has a value of not `true` +// @ts-expect-error TS2345 +transformFrom(async function*() { + yield 42; +}, { ...boolTypeObjectMode, objectMode: undefined }); + +// Object values are not allowed because the value may not be `true` +// @ts-expect-error TS2345 +transformFrom(async function*() { + yield 42; +}, boolIndexSignature); + +declare const trueIndexSignature: Record; + +// Object values are not allowed because the value may be `undefined` +// @ts-expect-error TS2345 +transformFrom(async function*() { + yield 42; +}, trueIndexSignature); diff --git a/packages/stream-transform-from/test-d/tsconfig.json b/packages/stream-transform-from/test-d/tsconfig.json new file mode 100644 index 000000000..e4d199e7e --- /dev/null +++ b/packages/stream-transform-from/test-d/tsconfig.json @@ -0,0 +1,14 @@ +{ + "extends": "../tsconfig.base.json", + "compilerOptions": { + /* Visit https://aka.ms/tsconfig.json to read more about this file */ + + /* Basic Options */ + "rootDir": "../", + "noEmit": true, + + /* Module Resolution Options */ + "types": ["node"] + }, + "include": ["./**/*", "../src/**/*"] +} diff --git a/packages/stream-transform-from/tests/public-api.ts b/packages/stream-transform-from/tests/public-api.ts new file mode 100644 index 000000000..a10ca43c7 --- /dev/null +++ b/packages/stream-transform-from/tests/public-api.ts @@ -0,0 +1,1062 @@ +import events from 'events'; +import * as stream from 'stream'; +import { promisify } from 'util'; + +import { transformFrom } from '../src'; + +function assertType(_: T): void { + // +} + +function createNoopWritable(opts?: Omit): stream.Writable { + return new stream.Writable({ + ...opts, + write(_chunk, _, done) { + done(); + }, + }); +} + +function createOutputWritable( + outputChunkList: unknown[], + opts?: Omit, +): stream.Writable { + return new stream.Writable({ + ...opts, + write(chunk, _, done) { + outputChunkList.push(chunk); + done(); + }, + }); +} + +describe('passes though chunks', () => { + const data = ['first', 'second', 'third']; + const outputData = data.map(str => Buffer.from(str)); + + it.each<[string, () => stream.Transform]>([ + [ + 'builtin Transform', + () => + new stream.Transform({ + transform(chunk, _encoding, done) { + this.push(chunk); + done(); + }, + }), + ], + [ + 'transformFrom()', + () => + transformFrom(async function*(source) { + for await (const { chunk } of source) { + yield chunk; + } + }), + ], + ])('%s', async (_, createTransform) => { + const outputChunkList: unknown[] = []; + await promisify(stream.pipeline)( + stream.Readable.from(data), + createTransform(), + createOutputWritable(outputChunkList), + ); + expect(outputChunkList).toStrictEqual(outputData); + }); +}); + +describe('transforms chunks', () => { + const data = ['first', 'second', 'third']; + const outputData = data.map(str => Buffer.from(`(${str})`)); + + it.each<[string, () => stream.Transform]>([ + [ + 'builtin Transform', + () => + new stream.Transform({ + transform(chunk, _encoding, done) { + this.push(Buffer.concat([ + Buffer.from('('), + chunk, + Buffer.from(')'), + ])); + done(); + }, + }), + ], + [ + 'transformFrom()', + () => + transformFrom(async function*(source) { + for await (const { chunk } of source) { + yield Buffer.concat([ + Buffer.from('('), + chunk, + Buffer.from(')'), + ]); + } + }), + ], + ])('%s', async (_, createTransform) => { + const outputChunkList: unknown[] = []; + await promisify(stream.pipeline)( + stream.Readable.from(data), + createTransform(), + createOutputWritable(outputChunkList), + ); + expect(outputChunkList).toStrictEqual(outputData); + }); +}); + +describe('passes through objects', () => { + const data = [{ name: 'first' }, { name: 'second' }, { name: 'third' }]; + + it.each<[string, () => stream.Transform]>([ + [ + 'builtin Transform', + () => + new stream.Transform({ + transform(obj, _encoding, done) { + this.push(obj); + done(); + }, + objectMode: true, + }), + ], + [ + 'transformFrom()', + () => + transformFrom( + async function*(source) { + for await (const { chunk } of source) { + yield chunk; + } + }, + { objectMode: true }, + ), + ], + ])('%s', async (_, createTransform) => { + const outputChunkList: unknown[] = []; + await promisify(stream.pipeline)( + stream.Readable.from(data), + createTransform(), + createOutputWritable(outputChunkList, { objectMode: true }), + ); + expect(outputChunkList).toStrictEqual(data); + }); +}); + +describe('transforms objects', () => { + const data = [{ name: 'first' }, { name: 'second' }, { name: 'third' }]; + const outputData = [['first'], ['second'], ['third']]; + + // eslint-disable-next-line @typescript-eslint/ban-types + function hasProp(obj: object, propName: T): obj is Record { + return propName in obj; + } + function validateChunk(chunk: unknown): chunk is { name: string } { + if (typeof chunk !== 'object' || chunk === null) return false; + if (!hasProp(chunk, 'name')) return false; + return typeof chunk.name === 'string'; + } + + it.each<[string, () => stream.Transform]>([ + [ + 'builtin Transform', + () => + new stream.Transform({ + transform(obj, _encoding, done) { + if (!validateChunk(obj)) { + done(new Error('Invalid chunk!')); + return; + } + this.push([obj.name]); + done(); + }, + objectMode: true, + }), + ], + [ + 'transformFrom()', + () => + transformFrom( + async function*(source) { + for await (const { chunk: obj } of source) { + if (!validateChunk(obj)) { + throw new Error('Invalid chunk!'); + } + yield [obj.name]; + } + }, + { objectMode: true }, + ), + ], + ])('%s', async (_, createTransform) => { + const outputChunkList: unknown[] = []; + await promisify(stream.pipeline)( + stream.Readable.from(data), + createTransform(), + createOutputWritable(outputChunkList, { objectMode: true }), + ); + expect(outputChunkList).toStrictEqual(outputData); + }); +}); + +describe('transforms string with passed encoding', () => { + const data = Buffer.from([0x1F, 0x20]); + /** + * @see https://github.com/nodejs/node/blob/v12.17.0/lib/buffer.js#L601-L719 + */ + const encodingList: BufferEncoding[] = ['utf8', 'ucs2', 'utf16le', 'latin1', 'ascii', 'base64', 'hex']; + const outputData = Array.from({ length: encodingList.length }).fill(data); + + it.each<[string, () => stream.Transform]>([ + [ + 'builtin Transform', + () => + new stream.Transform({ + transform(chunk, encoding, done) { + this.push(Buffer.from(chunk, encoding)); + done(); + }, + writableObjectMode: true, + }), + ], + [ + 'transformFrom()', + () => + transformFrom(async function*(source) { + for await (const { chunk, encoding } of source) { + if (typeof chunk === 'string') { + yield Buffer.from(chunk, encoding); + } + } + }, { writableObjectMode: true }), + ], + ])('%s', async (_, createTransform) => { + const transform = createTransform(); + const outputChunkList: unknown[] = []; + + for (const encoding of encodingList) { + transform.write(data.toString(encoding), encoding); + } + transform.end(); + + await promisify(stream.pipeline)( + transform, + createOutputWritable(outputChunkList), + ); + expect(outputChunkList).toStrictEqual(outputData); + }); +}); + +describe('split chunks', () => { + const data = [ + 'line1', + 'line2\nline3', + '', + 'line4\n\nline5\n', + '\nline6\n\n', + ]; + const outputData = [ + Buffer.from('line1'), + Buffer.from('line2'), + Buffer.from('line3'), + Buffer.from('line4'), + Buffer.from('line5'), + Buffer.from('line6'), + ]; + + it.each<[string, () => stream.Transform]>([ + [ + 'builtin Transform', + () => + new stream.Transform({ + transform(chunk, _encoding, done) { + const lineList = (chunk.toString('utf8') as string) + .split(/\n+/) + .filter(line => line !== ''); + for (const line of lineList) { + this.push(line); + } + done(); + }, + }), + ], + [ + 'transformFrom()', + () => + transformFrom(async function*(source) { + for await (const { chunk } of source) { + const lineList = chunk.toString('utf8') + .split(/\n+/) + .filter(line => line !== ''); + yield* lineList; + } + }), + ], + ])('%s', async (_, createTransform) => { + const outputChunkList: unknown[] = []; + await promisify(stream.pipeline)( + stream.Readable.from(data), + createTransform(), + createOutputWritable(outputChunkList), + ); + expect(outputChunkList).toStrictEqual(outputData); + }); +}); + +describe('merge chunks', () => { + const data = ['first', 'second', 'third']; + const outputData = [Buffer.from(data.join('\n'))]; + + it.each<[string, () => stream.Transform]>([ + [ + 'builtin Transform', + () => { + const chunkList: Buffer[] = []; + return new stream.Transform({ + transform(chunk, _encoding, done) { + chunkList.push(chunk); + done(); + }, + flush(done) { + this.push(chunkList.join('\n')); + done(); + }, + }); + }, + ], + [ + 'transformFrom()', + () => + transformFrom(async function*(source) { + const chunkList: Buffer[] = []; + for await (const { chunk } of source) { + chunkList.push(chunk); + } + yield chunkList.join('\n'); + }), + ], + ])('%s', async (_, createTransform) => { + const outputChunkList: unknown[] = []; + await promisify(stream.pipeline)( + stream.Readable.from(data), + createTransform(), + createOutputWritable(outputChunkList), + ); + expect(outputChunkList).toStrictEqual(outputData); + }); +}); + +describe('break during transform', () => { + const data = ['first', 'second', 'third', 'fourth', 'fifth']; + const outputData = [Buffer.from('first'), Buffer.from('second'), Buffer.from('third')]; + + it.each<[string, () => stream.Transform]>([ + [ + 'builtin Transform', + () => { + let finished = false; + return new stream.Transform({ + transform(chunk, _encoding, done) { + if (!finished) { + this.push(chunk); + } + if (chunk.toString('utf8') === 'third') { + finished = true; + } + done(); + }, + }); + }, + ], + [ + 'transformFrom()', + () => + transformFrom(async function*(source) { + for await (const { chunk } of source) { + yield chunk; + if (chunk.toString('utf8') === 'third') { + break; + } + } + }), + ], + ])('%s', async (_, createTransform) => { + const outputChunkList: unknown[] = []; + await promisify(stream.pipeline)( + stream.Readable.from(data), + createTransform(), + createOutputWritable(outputChunkList), + ); + expect(outputChunkList).toStrictEqual(outputData); + }); +}); + +describe('get data only when needed', () => { + interface LogType { + phase: number; + chunk: string; + } + const data = ['first', 'second', 'third', 'fourth', 'fifth']; + const outputData: readonly LogType[] = data.flatMap((chunk, index) => { + const prevChunk = data[index - 1]; + const nextChunk = data[index + 1]; + return [ + ...(prevChunk ? [] : [{ phase: 1, chunk }]), + { phase: 2, chunk }, + ...(nextChunk ? [{ phase: 1, chunk: nextChunk }] : []), + { phase: Infinity, chunk }, + ]; + }); + + it('builtin Transform', async () => { + const loggerList: LogType[] = []; + await promisify(stream.pipeline)( + stream.Readable.from(data), + new stream.Transform({ + transform(chunk, _encoding, done) { + loggerList.push({ phase: 1, chunk: chunk.toString('utf8') }); + setImmediate(() => { + done(null, chunk); + }); + }, + }), + new stream.Transform({ + transform(chunk, _encoding, done) { + loggerList.push({ phase: 2, chunk: chunk.toString('utf8') }); + setImmediate(() => { + done(null, chunk); + }); + }, + }), + new stream.Writable({ + write(chunk, _, done) { + loggerList.push({ phase: Infinity, chunk: chunk.toString('utf8') }); + done(); + }, + }), + ); + expect(loggerList).toStrictEqual(outputData); + }); + + it('transformFrom()', async () => { + const loggerList: LogType[] = []; + await promisify(stream.pipeline)( + stream.Readable.from(data), + transformFrom(async function*(source) { + for await (const { chunk } of source) { + loggerList.push({ phase: 1, chunk: chunk.toString('utf8') }); + yield chunk; + } + }), + transformFrom(async function*(source) { + for await (const { chunk } of source) { + loggerList.push({ phase: 2, chunk: chunk.toString('utf8') }); + yield chunk; + } + }), + new stream.Writable({ + write(chunk, _, done) { + loggerList.push({ phase: Infinity, chunk: chunk.toString('utf8') }); + done(); + }, + }), + ); + expect(loggerList).toStrictEqual(outputData); + }); +}); + +describe('throw error from Readable', () => { + describe.each<[string, () => stream.Transform]>([ + [ + 'builtin Transform', + () => + new stream.Transform({ + transform(chunk, _encoding, done) { + this.push(chunk); + done(); + }, + }), + ], + [ + 'builtin Transform (async done)', + () => + new stream.Transform({ + transform(chunk, _encoding, done) { + setImmediate(() => { + this.push(chunk); + done(); + }); + }, + }), + ], + [ + 'transformFrom()', + () => + transformFrom(async function*(source) { + for await (const { chunk } of source) { + yield chunk; + } + }), + ], + ])('%s', (_, createTransform) => { + // eslint-disable-next-line require-yield + const readableInput = function*(): Iterable { + throw new Error('foo'); + }; + + it.each<[string, (() => stream.Writable) | undefined]>([ + [ + 'pipe to WritableStream', + createNoopWritable, + ], + [ + 'not pipe to WritableStream', + undefined, + ], + ])('%s', async (_, createWritable) => { + const resultPromise = promisify(stream.pipeline)( + stream.Readable.from(readableInput()), + createTransform(), + ...createWritable ? [createWritable()] : [], + ); + await expect(resultPromise).rejects.toThrow(Error); + await expect(resultPromise).rejects.toThrow(/^foo$/); + }); + }); +}); + +describe('throw error from Transform', () => { + const data = ['']; + + describe('when transforming', () => { + describe.each<[string, () => stream.Transform]>([ + [ + 'builtin Transform', + () => + new stream.Transform({ + transform(_chunk, _encoding, done) { + done(new Error('bar')); + }, + }), + ], + [ + 'builtin Transform (async done)', + () => + new stream.Transform({ + transform(_chunk, _encoding, done) { + setImmediate(() => done(new Error('bar'))); + }, + }), + ], + [ + 'transformFrom()', + () => + // eslint-disable-next-line require-yield + transformFrom(async function*() { + throw new Error('bar'); + }), + ], + ])('%s', (_, createTransform) => { + it.each<[string, (() => stream.Writable) | undefined]>([ + [ + 'pipe to WritableStream', + createNoopWritable, + ], + [ + 'not pipe to WritableStream', + undefined, + ], + ])('%s', async (_, createWritable) => { + const resultPromise = promisify(stream.pipeline)( + stream.Readable.from(data), + createTransform(), + ...createWritable ? [createWritable()] : [], + ); + await expect(resultPromise).rejects.toThrow(Error); + await expect(resultPromise).rejects.toThrow(/^bar$/); + }); + }); + }); + + describe('when flush', () => { + /** + * @see https://github.com/nodejs/node/pull/34314 + */ + const isBugFixed = Number(/^\d+/.exec(process.versions.node)?.[0]) >= 15; + + const table: Array<[string, { createTransform: () => stream.Transform; hasBug: boolean }]> = [ + [ + 'builtin Transform', + { + createTransform: () => + new stream.Transform({ + transform(_chunk, _encoding, done) { + done(); + }, + flush(done) { + done(new Error('baz')); + }, + }), + hasBug: false, + }, + ], + [ + 'builtin Transform (async done)', + { + createTransform: () => + new stream.Transform({ + transform(_chunk, _encoding, done) { + done(); + }, + flush(done) { + setImmediate(() => done(new Error('baz'))); + }, + }), + hasBug: !isBugFixed, + }, + ], + [ + 'transformFrom()', + { + createTransform: () => + // eslint-disable-next-line require-yield + transformFrom(async function*(source) { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const _ of source); + throw new Error('baz'); + }), + hasBug: false, + }, + ], + ]; + + describe.each(table)('%s', (_, { createTransform }) => { + it('pipe to WritableStream', async () => { + const resultPromise = promisify(stream.pipeline)( + stream.Readable.from(data), + createTransform(), + createNoopWritable(), + ); + await expect(resultPromise).rejects.toThrow(Error); + await expect(resultPromise).rejects.toThrow(/^baz$/); + }); + }); + + const noBugTable = table.filter(([, { hasBug }]) => !hasBug); + if (noBugTable.length >= 1) { + // eslint-disable-next-line jest/no-identical-title + describe.each(noBugTable)('%s', (_, { createTransform }) => { + it('not pipe to WritableStream', async () => { + const resultPromise = promisify(stream.pipeline)( + stream.Readable.from(data), + createTransform(), + ); + await expect(resultPromise).rejects.toThrow(Error); + await expect(resultPromise).rejects.toThrow(/^baz$/); + }); + }); + } + + const hasBugTable = table.filter(([, { hasBug }]) => hasBug); + if (hasBugTable.length >= 1) { + // eslint-disable-next-line jest/no-identical-title + describe.each(hasBugTable)('%s', (_, { createTransform }) => { + it('not pipe to WritableStream (error cannot be detected)', async () => { + const transform = createTransform(); + + const errorPromise = (async () => (await events.once(transform, 'error'))[0])(); + const resultPromise = promisify(stream.pipeline)( + stream.Readable.from(data), + transform, + ); + + // If the flush process is completed asynchronously, the `finish` event will be fired before the `error` event is fired. + // For this reason, `stream.pipeline()` will not get an error and will exit normally. + // see https://github.com/nodejs/node/issues/34274 + await expect(resultPromise).resolves.toBeUndefined(); + + // However, errors are thrown. + // And the `error` event is also fired. + await expect(errorPromise).resolves.toBeInstanceOf(Error); + await expect(errorPromise).resolves.toStrictEqual(expect.objectContaining({ + message: 'baz', + })); + }); + }); + } + }); +}); + +describe('options that affect functionality should be ignored', () => { + const data = ['first', 'second', 'third']; + const outputData = data.map(str => Buffer.from(`:${str}.`)); + + it.each([ + { + read(_size) { + this.push(Buffer.from([255])); + this.push(null); + }, + }, + { + write(_chunk, _encoding, done) { + this.push(Buffer.from([255])); + this.push(null); + done(); + }, + }, + { + writev(_chunks, done) { + this.push(Buffer.from([255])); + this.push(null); + done(); + }, + }, + { + final(done) { + this.push(Buffer.from([255])); + this.push(null); + done(); + }, + }, + { + transform(_chunk, _encoding, done) { + this.push(Buffer.from([255])); + this.push(null); + done(); + }, + }, + { + flush(done) { + this.push(Buffer.from([255])); + this.push(null); + done(); + }, + }, + ])('%p', async options => { + const outputChunkList: unknown[] = []; + await promisify(stream.pipeline)( + stream.Readable.from(data), + transformFrom( + async function*(source) { + for await (const { chunk } of source) { + if (!Buffer.isBuffer(chunk)) continue; + yield Buffer.concat([ + Buffer.from(':'), + chunk, + Buffer.from('.'), + ]); + } + }, + options, + ), + createOutputWritable(outputChunkList), + ); + expect(outputChunkList).toStrictEqual(outputData); + }); + it.each([ + { + // This field has been added in Node v15.0.0 + construct(done) { + console.log({ done }); + done(new Error('!!!')); + }, + }, + { + destroy(_error, done) { + done(new Error('???')); + }, + }, + // eslint-disable-next-line jest/no-identical-title + ])('%p', async options => { + const resultPromise = promisify(stream.pipeline)( + stream.Readable.from(data), + transformFrom( + // eslint-disable-next-line require-yield + async function*() { + throw new Error('qux'); + }, + options, + ), + ); + await expect(resultPromise).rejects.toThrow(Error); + await expect(resultPromise).rejects.toThrow(/^qux$/); + }); +}); + +describe('source iterator contains only Buffer objects', () => { + describe.each( + [ + {}, + { + objectMode: false, + readableObjectMode: false, + writableObjectMode: false, + }, + { + objectMode: false, + readableObjectMode: true, + writableObjectMode: false, + }, + ] as const, + )('options: %p', options => { + const data = ['first', 'second', 'third']; + + it('builtin Transform', async () => { + expect.assertions(data.length); + await promisify(stream.pipeline)( + stream.Readable.from(data), + new stream.Transform({ + transform(chunk, _encoding, done) { + expect(chunk).toBeInstanceOf(Buffer); + done(null, ''); + }, + ...options, + }), + createNoopWritable(), + ); + }); + + it('transformFrom()', async () => { + expect.assertions(data.length); + await promisify(stream.pipeline)( + stream.Readable.from(data), + transformFrom( + // eslint-disable-next-line require-yield + async function*(source) { + for await (const { chunk } of source) { + assertType(chunk); + expect(chunk).toBeInstanceOf(Buffer); + } + }, + options, + ), + createNoopWritable(), + ); + }); + }); +}); + +describe('source iterator contains more than just Buffer objects', () => { + describe.each( + [ + { + objectMode: false, + readableObjectMode: false, + writableObjectMode: true, + }, + { + objectMode: false, + readableObjectMode: true, + writableObjectMode: true, + }, + { + objectMode: true, + readableObjectMode: false, + writableObjectMode: false, + }, + { + objectMode: true, + readableObjectMode: true, + writableObjectMode: false, + }, + { + objectMode: true, + readableObjectMode: false, + writableObjectMode: true, + }, + { + objectMode: true, + readableObjectMode: true, + writableObjectMode: true, + }, + ] as const, + )('options: %p', options => { + const data = ['first', 'second', 'third']; + + it('builtin Transform', async () => { + expect.assertions(data.length); + await promisify(stream.pipeline)( + stream.Readable.from(data), + new stream.Transform({ + transform(chunk, _encoding, done) { + expect(chunk).not.toBeInstanceOf(Buffer); + done(null, ''); + }, + ...options, + }), + createNoopWritable(), + ); + }); + + it('transformFrom()', async () => { + expect.assertions(data.length); + await promisify(stream.pipeline)( + stream.Readable.from(data), + transformFrom( + // eslint-disable-next-line require-yield + async function*(source) { + for await (const chunk of source) { + // @ts-expect-error TS2345: Argument of type 'unknown' is not assignable to parameter of type 'Buffer'. + assertType(chunk); + expect(chunk).not.toBeInstanceOf(Buffer); + } + }, + options, + ), + createNoopWritable(), + ); + }); + }); +}); + +describe('can return non-buffer value', () => { + describe.each( + [ + { + objectMode: false, + readableObjectMode: true, + writableObjectMode: false, + }, + { + objectMode: false, + readableObjectMode: true, + writableObjectMode: true, + }, + { + objectMode: true, + readableObjectMode: false, + writableObjectMode: false, + }, + { + objectMode: true, + readableObjectMode: true, + writableObjectMode: false, + }, + { + objectMode: true, + readableObjectMode: false, + writableObjectMode: true, + }, + { + objectMode: true, + readableObjectMode: true, + writableObjectMode: true, + }, + ] as const, + )('options: %p', options => { + describe.each<[string, () => stream.Transform]>([ + [ + 'builtin Transform', + () => + new stream.Transform({ + transform(_chunk, _encoding, done) { + this.push(42); + done(); + }, + ...options, + }), + ], + [ + 'transformFrom()', + () => + transformFrom( + async function*() { + yield 42; + }, + options, + ), + ], + ])('%s', (_, createTransform) => { + const data = ['']; + + it.each<[string, (() => stream.Writable) | undefined]>([ + [ + 'pipe to WritableStream', + () => createNoopWritable({ objectMode: true }), + ], + [ + 'not pipe to WritableStream', + undefined, + ], + ])('%s', async (_, createWritable) => { + const resultPromise = promisify(stream.pipeline)( + stream.Readable.from(data), + createTransform(), + ...createWritable ? [createWritable()] : [], + ); + await expect(resultPromise).resolves.not.toThrow(); + }); + }); + }); +}); + +describe('can not return non-buffer value', () => { + describe.each( + [ + {}, + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + {} as { objectMode: true }, + { + objectMode: false, + readableObjectMode: false, + writableObjectMode: false, + }, + { + objectMode: false, + readableObjectMode: false, + writableObjectMode: true, + }, + ] as const, + )('options: %p', options => { + describe.each<[string, () => stream.Transform]>([ + [ + 'builtin Transform', + () => + new stream.Transform({ + transform(_chunk, _encoding, done) { + this.push(42); + done(); + }, + ...options, + }), + ], + [ + 'transformFrom()', + () => + transformFrom( + // @ts-expect-error TS2345: Argument of type '() => AsyncGenerator' is not assignable to parameter of type '(source: AsyncIterableIterator) => Iterable | AsyncIterable'. + async function*() { + yield 42; + }, + options, + ), + ], + ])('%s', (_, createTransform) => { + const data = ['']; + + it.each<[string, (() => stream.Writable) | undefined]>([ + [ + 'pipe to WritableStream', + () => createNoopWritable({ objectMode: true }), + ], + [ + 'not pipe to WritableStream', + undefined, + ], + ])('%s', async (_, createWritable) => { + const resultPromise = promisify(stream.pipeline)( + stream.Readable.from(data), + createTransform(), + ...createWritable ? [createWritable()] : [], + ); + await expect(resultPromise).rejects.toThrow( + /^The "chunk" argument must be of type string or an instance of Buffer or Uint8Array\. Received type number \(42\)$/, + ); + }); + }); + }); +}); diff --git a/packages/stream-transform-from/tests/tsconfig.json b/packages/stream-transform-from/tests/tsconfig.json new file mode 100644 index 000000000..22dca8aac --- /dev/null +++ b/packages/stream-transform-from/tests/tsconfig.json @@ -0,0 +1,14 @@ +{ + "extends": "../tsconfig.base.json", + "compilerOptions": { + /* Visit https://aka.ms/tsconfig.json to read more about this file */ + + /* Basic Options */ + "rootDir": "../", + "noEmit": true, + + /* Module Resolution Options */ + "types": ["node", "jest"] + }, + "include": ["./**/*", "../src/**/*"] +} diff --git a/packages/stream-transform-from/tsconfig.base.json b/packages/stream-transform-from/tsconfig.base.json new file mode 100644 index 000000000..37c424779 --- /dev/null +++ b/packages/stream-transform-from/tsconfig.base.json @@ -0,0 +1,9 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + /* Visit https://aka.ms/tsconfig.json to read more about this file */ + + /* Basic Options */ + "declarationMap": false + } +} diff --git a/packages/stream-transform-from/tsconfig.json b/packages/stream-transform-from/tsconfig.json new file mode 100644 index 000000000..bf4e4b036 --- /dev/null +++ b/packages/stream-transform-from/tsconfig.json @@ -0,0 +1,16 @@ +{ + "extends": "./tsconfig.base.json", + "compilerOptions": { + /* Visit https://aka.ms/tsconfig.json to read more about this file */ + + /* Basic Options */ + "outDir": "./dist", + // When the `composite` option is `true` and the `rootDir` option is undefined, + // files will not be created directly under the `outDir` + "rootDir": "./src", + + /* Module Resolution Options */ + "types": ["node"] + }, + "include": ["./src/**/*"] +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 2ea138e58..a6b821642 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -138,6 +138,22 @@ importers: dependencies: '@sounisi5011/encrypted-archive': link:.. + packages/stream-transform-from: + specifiers: + '@sounisi5011/scripts--run-if-supported-node': workspace:^0.0.0 + '@types/node': 15.x + tsd: 0.15.1 + devDependencies: + '@sounisi5011/scripts--run-if-supported-node': link:../../scripts/run-if-supported-node + '@types/node': 15.3.0 + tsd: 0.15.1 + + packages/stream-transform-from/examples: + specifiers: + '@sounisi5011/stream-transform-from': link:.. + dependencies: + '@sounisi5011/stream-transform-from': link:.. + packages/ts-type-utils/has-own-property: specifiers: tsd: 0.15.1 @@ -1025,6 +1041,10 @@ packages: /@types/node/14.14.33: resolution: {integrity: sha512-oJqcTrgPUF29oUP8AsUqbXGJNuPutsetaa9kTQAQce5Lx5dTYWV02ScBiT/k1BX/Z7pKeqedmvp39Wu4zR7N7g==} + /@types/node/15.3.0: + resolution: {integrity: sha512-8/bnjSZD86ZfpBsDlCIkNXIvm+h6wi9g7IqL+kmFkQ+Wvu3JrasgLElfiPgoo8V8vVfnEi0QVS12gbl94h9YsQ==} + dev: true + /@types/normalize-package-data/2.4.0: resolution: {integrity: sha512-f5j5b/Gf71L+dbqxIpQ4Z2WlmI/mPJ0fOkGGmFgtb6sAu97EPczzbS3/tJKxmcYDj55OX6ssqwDAWOHIYDRDGA==} dev: true