Skip to content
Eugene Lazutkin edited this page Sep 17, 2024 · 4 revisions

utils is a collection of useful utilities:

Slicing

utils/skip.js

skip(n) skips n items from the beginning of a stream.

// const skip = require('stream-json/utils/skip.js');
import skip from 'stream-json/utils/skip.js';

chain([
  dataStream,
  skip(5) // skip 5 items
]);

utils/skipWhile.js

skipWhile(fn) skips items from the beginning of a stream until fn returns a falsy value. fn is called with a single argument: the item. As soon as fn returns false, all items will go through and the function will not be consulted anymore. fn can be an asynchronous function.

// const skipWhile = require('stream-json/utils/skipWhile.js');
import skipWhile from 'stream-json/utils/skipWhile.js';

chain([
  dataStream,
  skipWhile(item => item.foo === 'bar') // skip items until `foo` is not `bar`
]);

utils/take.js

take(n, finalValue) takes n items from the beginning of a stream, then returns finalValue, which defaults to none (see defs for more details). If you want to stop a stream early, use stop as the finalValue.

// const take = require('stream-json/utils/take.js');
import take from 'stream-json/utils/take.js';

chain([
  function* () { for (let i = 0;; ++i) yield i; },
  take(5, stop) // take 5 items and stop the stream
]);

utils/takeWhile.js

takeWhile(fn, finalValue) takes items from the beginning of a stream until fn returns a falsy value. fn is called with a single argument: the item. As soon as fn returns false, takeWhile() will start returning finalValue, which defaults to none (see defs for more details) and fn will not be consulted anymore. fn can be an asynchronous function. If you want to stop a stream early, use stop as the finalValue.

// const takeWhile = require('stream-json/utils/takeWhile.js');
import takeWhile from 'stream-json/utils/takeWhile.js';

chain([
  function* () { for (let i = 0;; ++i) yield i; },
  x => x * x,
  takeWhile(item => item < 55, stop) // take items less than 55 and stop the stream
]);

utils/takeWithSkip.js

takeWithSkip(n, skip, finalValue) skips skip items from the beginning of a stream, then takes n items. See notes on finalValue in the take() section above. skip defaults to 0.

// const takeWithSkip = require('stream-json/utils/takeWithSkip.js');
import takeWithSkip from 'stream-json/utils/takeWithSkip.js';

chain([
  function* () { for (let i = 0;; ++i) yield i; },
  takeWithSkip(5, 2, stop) // skip 2 items, take 5 items
]);

Folding AKA reducing

utils/fold.js

fold(fn, initialValue) reduces a stream into a single value. fn is called with two arguments: the accumulator and the current item. The accumulator is the result of the previous call to fn or initialValue for the very first call. fn can be an asynchronous function.

// const fold = require('stream-json/utils/fold.js');
import fold from 'stream-json/utils/fold.js';

chain([
  function* () { for (let i = 0; i < 4; ++i) yield i; },
  fold((acc, item) => acc + item, 0)
]);
// produces: 6

utils/reduce.js

reduce() is an alias for fold().

utils/reduceStream.js

reduceStream() is a stream version of reduce(). Unlike reduce(), it returns a Writable stream. The function supports two signatures:

  • reduceStream(fn, initialValue) — this signature corresponds to reduce() above.
  • reduceStream(options)options is an object used to initialize Writable with the following custom options:
    • objectMode is set to true by default.
    • initial is an initial value of an accumulator. Default: 0.
    • reducer is a function or an asynchronous function, which takes a current accumulator value and a current item and returns a new accumulator value. Default: (acc, value) => value.

reducer is called in the context of the created stream. It takes two arguments: the current accumulator value and the current item.

The returned stream is a writable stream. It has a special property accumulator which contains the current accumulator value.

// const reduceStream = require('stream-json/utils/reduceStream.js');
import reduceStream from 'stream-json/utils/reduceStream.js';

const r = reduceStream((acc, x) => acc + x, 0);

chain([
  function* () { for (let i = 0; i < 4; ++i) yield i; },
  r
]);

// when we are done
console.log(r.accumulator); // 6

utils/scan.js

scan(fn, initialValue) is a companion to fold(), but unlike fold(), on every incoming item it returns the current accumulator value. fn is called with two arguments: the accumulator and the current item. fn can be an asynchronous function.

// const scan = require('stream-json/utils/scan.js');
import scan from 'stream-json/utils/scan.js';

chain([
  function* () { for (let i = 0; i < 4; ++i) yield i; },
  scan((acc, item) => acc + item, 0) // produces: 0, 1, 3, 6
]);

Adapting

utils/readableFrom.js

readableFrom(options) adapts an iterable/iterator to a Readable stream.

options is an object used to initialize Readable with the following custom options:

  • objectMode is set to true by default.
  • iterable is an iterable/iterator, which will be a source of items.
// const readableFrom = require('stream-json/utils/readableFrom.js');
import readableFrom from 'stream-json/utils/readableFrom.js';

chain([
  readableFrom({iterable: [1, 2, 3]}),
  x => console.log(x) // 1, 2, 3
]);

Stream helpers

utils/fixUtf8Stream.js

fixUtf8Stream() is a function that emits valid UTF-8 strings from a stream of bytes by repartitioning chunks.

It solves a simple problem: streams are not aware of any encodings and can read bytes breaking multi-byte characters. It is fine when we are dealing with ASCII, but will break international characters and symbols.

Usually it goes in the pipeline before any other text processing. Note that included JSONL parsers already use it so it is not necessary to include it. See the jsonl module for more details.

// const fixUtf8Stream = require('stream-json/utils/fixUtf8Stream.js');
import fixUtf8Stream from 'stream-json/utils/fixUtf8Stream.js';

chain([
  fixUtf8Stream(),
  // ...
]);

utils/lines.js

lines() is a function that emits lines from a stream of bytes line by line.

// const lines = require('stream-json/utils/lines.js');
import lines from 'stream-json/utils/lines.js';

chain([
  lines(),
  // ...
]);

utils/batch.js

batch(size) is a function that emits batches of size items as arrays. The last batch may be smaller than size.

// const batch = require('stream-json/utils/batch.js');
import batch from 'stream-json/utils/batch.js';

chain([
  function* () { for (let i = 0; i < 10; ++i) yield i; },
  batch(3) // produces: [0, 1, 2], [3, 4, 5], [6, 7, 8], [9]
]);