Skip to content
This repository has been archived by the owner on Dec 16, 2020. It is now read-only.

Commit

Permalink
feat: convert to node stream
Browse files Browse the repository at this point in the history
  • Loading branch information
trs committed Jul 11, 2019
1 parent a486265 commit 6fb844d
Showing 1 changed file with 17 additions and 4 deletions.
21 changes: 17 additions & 4 deletions src/PureStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export interface PureStreamInternal<In, Out> {
}

/**
* Pipes `source` into `destination`, same as native stream pipe.
* Pipes `source` into `destination`, same as a node stream pipe.
* Propogates errors from `source` to `destination`
* Destroys stream on error event
*/
Expand Down Expand Up @@ -97,8 +97,7 @@ function buildFlush<In, Out>(self: PureStream<In, Out>, method?: PureStreamFlush
}

/**
* Simplified stream implementation. Acts as a native PassThrough stream.
*
* Simplified stream implementation.
*/
export class PureStream<In, Out = In> {
private instance: PassThrough;
Expand Down Expand Up @@ -192,10 +191,24 @@ export class PureStream<In, Out = In> {
if (consume) this.instance.resume();
}

/** Convert this PureStream into a node PassThrough stream */
public toNodeStream(): PassThrough {
const stream = new PassThrough();

this.each((value) => stream.write(value)).done((err) => {
if (err) {
stream.destroy(err);
}
stream.end();
});

return stream;
}

public static wrap<T>(source: Readable): PureStream<T>;
public static wrap<T>(source: PassThrough): PureStream<T>;
public static wrap<In, Out>(source: Transform): PureStream<In, Out>;
/** Wrap a native stream in a PureStream */
/** Wrap a node stream in a PureStream */
public static wrap<In, Out>(source: Readable) {
const wrapped = new PureStream<In, Out>();
const stream = pipe(
Expand Down

0 comments on commit 6fb844d

Please sign in to comment.