Skip to content
This repository has been archived by the owner on Jun 20, 2023. It is now read-only.

Jason3S/rx-stream

Folders and files

NameName
Last commit message
Last commit date

Latest commit

2f87be7 · Jun 20, 2023
Jun 7, 2022
Jun 3, 2019
May 11, 2021
Jul 18, 2021
Apr 1, 2021
May 11, 2021
May 11, 2021
May 11, 2021
Apr 27, 2018
Apr 2, 2021
Apr 1, 2021
Jun 7, 2022
Feb 10, 2017
Jun 20, 2023
Apr 1, 2021
Jan 31, 2023
Jan 31, 2023
Feb 12, 2017

Repository files navigation

rxjs-stream

Note With the addition of Async Iternables, this library is no longer necessary.

This is a simple library for converting to and from NodeJS stream and RxJS 7.

This was created to fill the gap left by rx-node, which only works with rxjs 4.

Installation

npm install --save rxjs rxjs-stream

Usage

Writing to a stream.

import { rxToStream } from 'rxjs-stream';

let data = 'This is a bit of text to have some fun with';
let src = Rx.Observable.from(data.split(' '));
rxToStream(src).pipe(process.stdout);

Writing objects to a stream

To write objects, you must pass in the ReadableOptions with objectMode to be true: { objectMode: true }

import { rxToStream } from 'rxjs-stream';

let data = 'This is a bit of text to have some fun with';
let wordObj = data.split(' ').map((text) => ({ text }));
let src = Rx.Observable.from(wordObj);
let stream = rxToStream(src, { objectMode: true });

Read from a stream

import { rxToStream, streamToStringRx } from 'rxjs-stream';

// Read stdin and make it upper case then send it to stdout
let ob = streamToStringRx(process.stdin).map((text) => text.toUpperCase());

rxToStream(ob).pipe(process.stdout);

Performance

It is recommended to buffer observable values before sending them to the stream. Node streams work better with fewer calls of a large amount of data than with many calls with a small amount of data.

Example:

import * as loremIpsum from 'lorem-ipsum';
import { rxToStream } from 'rxjs-stream';

let book = loremIpsum({ count: 1000, format: 'plain', units: 'paragraphs' });
let words = Rx.Observable.from(book.split(/\b/));
let wordsBuffered = words.bufferCount(1000).map((words) => words.join(''));
let stream = rxToStream(wordsBuffered);

stream.pipe(process.stdout);

Compatibility

This library is tested with Node 12 and above.

rx-stream RxJS Node
4.x 7.x >=12
3.x 6.x >=10