RxJS библиотека для объединения асинхронных и событийно-ориентированных программ использующих потоки наблюдателей. Библиотека предоставляет основной тип Observable, его производные (Observer, Scheduler, Subject), а так же операторы похожие на Array#extras (map, filter, reduce, every и др.) позволяющие обрабатывать асинхронные события как коллекции.
Думайте об RxJS как о Lodash для событий
ReactiveX объединяет паттерн Наблюдатель c паттерном Итератор и функциональное программирование с коллекциями чтобы помочь вам в управлении потоком событий.
Ниже список основных понятий RxJS:
- Observable: представляет неизменяеменяемый поток данных
- Observer: объект, методы которого знают как обрабатывать данные полученные из Observable
- Subscription: содержит данные о выполнении Observable, используется в первую очередь для отмены исполнения потока (как
removeEventListener
) - Операторы: чистые функции позволяющие в функциональном стиле работать со значениями как операторы
map
,filter
,concat
,reduce
и др. - Subject: похож на EventEmmiter, единственный способ передачи значений нескольким наблюдателям
- Schedulers: позволяют откладывать выполнение с помощью
setTimeout
илиrequestAnimationFrame
и др.
Стандартный способ прослушивания событий:
document.addEventListener('click', () => console.log('CНажатоlick!'));
Используя RxJS вы создаете observable:
import { fromEvent } from 'rxjs';
fromEvent(document, 'click').subscribe(() => console.log('Click'));
Использование чистых функций, делает код более безопасным, а RxJS очень мощным инструментом. Обычно вы создаете грязную функцию, которая меняет состояние вашего кода и это опасно.
let count = 0;
document.addEventListener('click', () => console.log(`Clicked ${++count} times`));
Используя RxJS вы изолируете ваше состояние:
import { fromEvent } from 'rxjs';
import { scan } from 'rxjs/operators';
fromEvent(document, 'click')
.pipe(
scan(count => count + 1, 0)
)
.subscribe(count => console.log(`Clicked ${count} times`));
Оператор scan
работает как reduce у массивов. Он использует значение пришедшее как аргумент функции и возвращает новое значение, которое станет аргументом для этой же функции, но уже при следующем вызове.
RxJS имеет большой выбор операторов, помогающие контролировать данные потока.
Ниже приведена реализация программы разрешающая только один клик в секунду на чистом Javascript:
let count = 0;
let rate = 1000;
let lastClick = Date.now() - rate;
document.addEventListener('click', () => {
if (Date.now() - lastClick >= rate) {
console.log(`Нажато ${++count} раз(а)`);
lastClick = Date.now();
}
});
То же самое на RxJS:
import { fromEvent } from 'rxjs';
import { throttleTime, scan } from 'rxjs/operators';
fromEvent(document, 'click')
.pipe(
throttleTime(1000),
scan(count => count + 1, 0)
)
.subscribe(count => console.log(`Нажато ${count} раз(а)`));
Такие операторы как filter, delay, debounceTime, take, takeUntil, distinct, distinctUntilChanged тоже могут контроллировать поток.
RxJS так же предоставляет операторы для трансформирования данных.
Ниже приведен способ сложения координаты по оси X вашего курсора с предыдущимы координатами на чистом JavaScript:
let count = 0;
const rate = 1000;
let lastClick = Date.now() - rate;
document.addEventListener('click', event => {
if (Date.now() - lastClick >= rate) {
count += event.clientX;
console.log(count);
lastClick = Date.now();
}
});
То же самое на RxJS:
import { fromEvent } from 'rxjs';
import { throttleTime, map, scan } from 'rxjs/operators';
fromEvent(document, 'click')
.pipe(
throttleTime(1000),
map(event => event.clientX),
scan((count, clientX) => count + clientX, 0)
)
.subscribe(count => console.log(count));
pluck, pairwise, sample и др. операторы так же могут изменять данные