Skip to content

Commit

Permalink
Used typed stream classes.
Browse files Browse the repository at this point in the history
  • Loading branch information
uhop committed Sep 15, 2024
1 parent 1179931 commit f7827dc
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 21 deletions.
4 changes: 3 additions & 1 deletion src/asStream.d.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
/// <reference types="node" />

import {Duplex, DuplexOptions} from 'node:stream';
import {TypedDuplex} from './typed-streams';
import {Arg0, Ret} from './defs';

export = asStream;

declare function asStream<F extends (chunk: any, encoding?: string) => unknown>(
fn: F,
options?: DuplexOptions
): Duplex;
): TypedDuplex<Arg0<F>, Ret<F>>;
56 changes: 47 additions & 9 deletions src/index.d.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
/// <reference types="node" />

import {Duplex, DuplexOptions, Readable, Transform, Writable} from 'node:stream';
import {TypedDuplex, TypedReadable, TypedTransform, TypedWritable} from './typed-streams';

import {
none,
stop,
Expand All @@ -26,9 +29,9 @@ import {
} from './defs';
import gen, {type FnItem} from './gen';
import asStream from './asStream';
import {Duplex, DuplexOptions, Readable, Transform, Writable} from 'node:stream';

export = chain;
export {TypedDuplex, TypedReadable, TypedTransform, TypedWritable};

export type DuplexStream<W = any, R = any> = {
readable: ReadableStream<R>;
Expand All @@ -53,7 +56,16 @@ export interface ChainOutput<W, R> extends Duplex {
output: Readable | Writable | Duplex | Transform;
}

export type Arg0<F> = F extends Writable | Transform | Duplex
export type Arg0<F> =
F extends TypedTransform<infer W, any>
? W
: F extends TypedDuplex<infer W, any>
? W
: F extends TypedReadable
? never
: F extends TypedWritable<infer W>
? W
: F extends Writable | Transform | Duplex
? any
: F extends Readable
? never
Expand All @@ -75,7 +87,16 @@ export type Arg0<F> = F extends Writable | Transform | Duplex
? Parameters<F>[0]
: never;

export type Ret<F> = F extends Readable | Transform | Duplex
export type Ret<F> =
F extends TypedTransform<any, infer R>
? R
: F extends TypedDuplex<any, infer R>
? R
: F extends TypedReadable<infer R>
? R
: F extends TypedWritable<any>
? never
: F extends Readable | Transform | Duplex
? any
: F extends Writable
? never
Expand All @@ -95,28 +116,45 @@ export type Ret<F> = F extends Readable | Transform | Duplex
? OutputType<F>
: never;

export type ChainItem<I, F> = F extends Writable | Transform | Duplex
export type ChainItem<I, F> =
F extends TypedTransform<infer W, infer R>
? I extends W
? F
: TypedTransform<I, R>
: F extends TypedDuplex<infer W, infer R>
? I extends W
? F
: TypedDuplex<I, R>
: F extends TypedReadable
? I extends never
? F
: never
: F extends TypedWritable<infer W>
? I extends W
? F
: TypedWritable<I>
: F extends Writable | Transform | Duplex
? F
: F extends Readable
? I extends never
? F
: never
: F extends TransformStream<infer W, any>
: F extends TransformStream<infer W, infer R>
? I extends W
? F
: never
: F extends DuplexStream<infer W, any>
: TransformStream<I, R>
: F extends DuplexStream<infer W, infer R>
? I extends W
? F
: never
: DuplexStream<I, R>
: F extends ReadableStream
? I extends never
? F
: never
: F extends WritableStream<infer W>
? I extends W
? F
: never
: WritableStream<I>
: F extends readonly [infer F1, ...infer R]
? readonly [ChainItem<I, F1>, ...ChainList<Ret<F1>, R>]
: F extends readonly unknown[]
Expand Down
3 changes: 2 additions & 1 deletion src/jsonl/parserStream.d.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
/// <reference types="node" />

import {Duplex, DuplexOptions} from 'node:stream';
import {TypedDuplex} from '../typed-streams';

export = parserStream;

interface ParserOptions extends DuplexOptions {
reviver?: (this: unknown, key: string, value: unknown) => unknown;
}

declare function parserStream(options: ParserOptions): Duplex;
declare function parserStream<T>(options: ParserOptions): TypedDuplex<string | Uint8Array, T>;
3 changes: 2 additions & 1 deletion src/jsonl/stringerStream.d.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/// <reference types="node" />

import {Transform} from 'node:stream';
import {TypedTransform} from '../typed-streams';

export = stringer;

Expand All @@ -13,4 +14,4 @@ interface StringerOptions {
space?: string | number;
}

declare function stringer(options: any): Transform;
declare function stringer<T>(options: any): TypedTransform<T, string>;
3 changes: 2 additions & 1 deletion src/utils/readableFrom.d.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/// <reference types="node" />

import {Readable, ReadableOptions} from 'node:stream';
import {TypedReadable} from '../typed-streams';

export = readableFrom;

Expand All @@ -10,4 +11,4 @@ interface ReadableFromOptions<T = unknown> extends ReadableOptions {
iterable?: Iter<T>;
}

declare function readableFrom<T>(options: Iter<T> | ReadableFromOptions<T>): Readable;
declare function readableFrom<T>(options: Iter<T> | ReadableFromOptions<T>): TypedReadable<T>;
7 changes: 4 additions & 3 deletions src/utils/reduceStream.d.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/// <reference types="node" />

import {Writable, WritableOptions} from 'stream';
import {TypedWritable} from '../typed-streams';

export = reduceStream;

Expand All @@ -12,14 +13,14 @@ interface ReduceStreamOptions<A = unknown, T = A> extends WritableOptions {
initial?: A;
}

interface ReduceStreamOutput<A = unknown> extends Writable {
interface ReduceStreamOutput<A = unknown, T = A> extends TypedWritable<T, A> {
accumulator: A;
}

declare function reduceStream<A = unknown, T = A>(
options: ReduceStreamOptions<A, T>
): ReduceStreamOutput<A>;
): ReduceStreamOutput<A, T>;
declare function reduceStream<A = unknown, T = A>(
reducer: Reducer<A, T> | ReducerPromise<A, T>,
initial: A
): ReduceStreamOutput<A>;
): ReduceStreamOutput<A, T>;
19 changes: 14 additions & 5 deletions ts-check/demo.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import chain, {many} from 'stream-chain';
import chain, {asStream, many, TypedTransform} from 'stream-chain';
import readableFrom from 'stream-chain/utils/readableFrom.js';

import {Transform} from 'node:stream';
Expand All @@ -16,13 +16,13 @@ const c = chain([
// returns several values
(x: number) => many([x - 1, x, x + 1]),
// waits for an asynchronous operation
(x: number) => getTotalFromDatabaseByKey(x),
async (x: number) => await getTotalFromDatabaseByKey(x),
// or: (x: number) => getTotalFromDatabaseByKey(x),
// returns multiple values with a generator
function* (x: number) {
for (let i = x; i > 0; --i) {
for (let i = x; i >= 0; --i) {
yield i;
}
return 0;
},
// filters out even values
(x: number) => (x % 2 ? x : null),
Expand All @@ -32,7 +32,16 @@ const c = chain([
transform(x, _, callback) {
callback(null, x + 1);
}
})
}),
// uses a typed transform stream
new TypedTransform<number, string>({
objectMode: true,
transform(x, _, callback) {
callback(null, String(x + 1));
}
}),
// uses a wrapped function
asStream((x: string) => !x)
] as const),
output: number[] = [];
c.on('data', (data: number) => output.push(data));
Expand Down

0 comments on commit f7827dc

Please sign in to comment.