Skip to content
Eugene Lazutkin edited this page Sep 17, 2024 · 12 revisions

Just examples. Think about how you would do them without stream-chain.

Simple pipeline

import chain from 'stream-chain';

import fs from 'node:fs';
import zlib from 'node:zlib';
import {Transform} from 'node:stream';

// the chain will work on a stream of number objects
const pipeline = chain([
  // transforms a value
  x => x * x,
  // returns several values
  x => [x - 1, x, x + 1],
  // waits for an asynchronous operation
  async x => await getTotalFromDatabaseByKey(x),
  // returns multiple values with a generator
  function* (x) {
    for (let i = x; i >= 0; --i) {
      yield i;
    }
  },
  // filters out even values
  x => x % 2 ? x : null,
  // uses an arbitrary transform stream
  new Transform({
    writableObjectMode: true,
    transform(x, _, callback) {
      // transform to text
      callback(null, x.toString());
    }
  }),
  // compress
  zlib.createGzip()
]);
// log errors
chain.on('error', error => console.log(error));
// use the chain, and save the result to a file
dataSource.pipe(chain).pipe(fs.createWriteStream('output.txt.gz'));

Using asynchronous generators

import chain from 'stream-chain';

const family = chain([
  async function*(person) {
    yield person;
    // asynchronously retrieve parents
    if (person.father) {
      yield await getPersonFromDB(person.father);
    }
    if (person.mother) {
      yield await getPersonFromDB(person.mother);
    }
    // asynchronously retrieve children, if any
    for (let i = 0; i < person.children; ++i) {
      yield await getPersonFromDB(person.children[i]);
    }
  },
  new Transform({
    writableObjectMode: true,
    transform(x, _, callback) {
      // transform to text
      callback(null, JSON.stringify(x));
    }
  }),
  zlib.createGzip(),
  fs.createWriteStream('families.json-stream.gz')
]);

people.pipe(family);

Combined functions

A block of regular functions can be separated and included in an array. Functions in such block will be combined without using streams to improve the performance.

import gen from 'stream-chain/gen.js';

const somePipe = [
  x => x * x,
  x => 2 * x
  // ... more stages
];

const otherPipe = gen(
  x => x + 1,
  x => Math.sqrt(x)
  // ... more stages
);

const myPipe = chain([
  somePipe,
  otherPipe
  // ... more stages
]);

Filter out values

Returning chain.none terminates the pipeline. In the example below, it is used to filter out all even values.

import chain, {none, finalValue} from 'stream-chain';

const pipeline = chain([
  [
    x => x % 2 ? x : none,
    x => x * x,
    x => 2 * x
  ]
  // ... more stages
]);

// input: 1, 2, 3, 4
// output: 2, 18

Return a final value prematurely

Wrapping value in finalValue() terminates a pipeline and uses the value as the final result. The example below does not double odd values.

const pipeline = chain([
  [
    x => x * x,
    x => x % 2 ? finalValue(x) : x,
    x => 2 * x
  ]
  // ... more stages
]);

// input: 1, 2, 3, 4
// output: 1, 8, 9, 32

Create Transform out of functions

Sometimes all we need is to wrap a function into a Transform. It works with combined functions as well.

Wrap a function

import asStream from 'stream-chain/asStream.js';

const stream = asStream(x => x + 1);

Wrap and combine functions

const stream = asStream(gen(
  x => x * x,
  x => 2 * x
));

Slicing streams

Take

This example processes only 5 items from the beginning of a stream.

import take from 'stream-json/utils/take.js';

const pipeline = chain([
  take(5)
  // ... more stages
]);

Skip

This example skips 5 items from the beginning of a stream.

import skip from 'stream-json/utils/skip.js';

const pipeline = chain([
  skip(5)
  // ... more stages
]);

Take & skip together

This example skips 5 items from the beginning of a stream and takes the next 5.

import takeWithSkip from 'stream-json/utils/takeWithSkip.js';

const lessEfficient = chain([
  skip(5),
  take(5)
  // ... more stages
]);

const moreEfficient = chain([
  takeWithSkip(5, 5)
  // ... more stages
]);

Conditional take

Takes while a condition is true.

import takeWhile from 'stream-json/utils/takeWhile.js';

const pipeline = chain([
  takeWhile(item => item !== 'separator')
  // ... more stages
]);

Conditional skip

Skips while a condition is true.

import skipWhile from 'stream-json/utils/skipWhile.js';

const pipeline = chain([
  skipWhile(item => item !== 'separator')
  // ... more stages
]);

Conditional take & skip together

Processes data between first two separators.

const pipeline = chain([
  skipWhile(item => item !== 'separator'),
  skip(1), // skip the separator
  takeWhile(item => item !== 'separator')
  // ... more stages
]);

Folding AKA reducing

It is the same as reduce() in JavaScript's arrays.

Folding

import fold from 'stream-json/utils/fold.js';

const pipeline = chain([
  fold((acc, x) => acc + x, 0)
  // ... more stages
]);

// input: 1, 2, 3
// output: 6

Scanning

scan() is like fold() but outputs all intermediate values of its accumulator.

import scan from 'stream-json/utils/scan.js';

const pipeline = chain([
  scan((acc, x) => acc + x, 0)
  // ... more stages
]);

// input: 1, 2, 3
// output: 1, 3, 6

Dedicated reducer

Reduce is a Writable stream, which is used at the end of a pipeline to accumulate items. Its accumulator is available as a property. It can be used like fold() and scan().

import reduceStream from 'stream-json/utils/reduceStream.js';

const toArray = reduceStream((acc, x) => {
  acc.push(x);
  return acc;
}, []);

const pipeline = chain([
  // ... more stages
  toArray
]);

// input: 1, 2, 3
// toArray.accumulator is [1, 2, 3]

Advanced use cases

Using many()

const pipeline = chain([
  x => many([x, 10 * x]),
  x => 2 * x
  // ... more stages
]);

// input: 1, 2, 3
// output: 2, 20, 4, 40, 6, 60

Using asynchronous functions

const pipeline = chain([
  async x => await getItemNumberFromDB(x),
  x => 2 * x
  // ... more stages
]);