Skip to content

Commit

Permalink
RxJS 5 support, rewrite WikiStream example with RxJS 5, iterator-base…
Browse files Browse the repository at this point in the history
…d selectors
  • Loading branch information
bsideup committed Jan 16, 2017
1 parent fb67528 commit 5b9b5ad
Show file tree
Hide file tree
Showing 13 changed files with 4,833 additions and 832 deletions.
6 changes: 5 additions & 1 deletion examples/wikistream/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
"serve": "webpack-dev-server --inline"
},
"dependencies": {
"babel-polyfill": "^6.20.0",
"react": "~15.3.0",
"react-dom": "~15.3.0",
"rx": "~4.1.0",
"rx-connect": "^0.1.5",
"rxjs": "~5.0.0",
"semantic-ui-css": "~2.2.2"
},
"devDependencies": {
Expand All @@ -19,7 +20,10 @@
"babel-preset-es2015": "~6.9.0",
"babel-preset-react": "~6.11.1",
"babel-preset-stage-0": "~6.5.0",
"css-loader": "^0.26.1",
"file-loader": "^0.9.0",
"html-webpack-plugin": "~2.22.0",
"style-loader": "^0.13.1",
"webpack": "~1.13.1",
"webpack-dev-server": "~1.14.1"
}
Expand Down
49 changes: 25 additions & 24 deletions examples/wikistream/src/index.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import "semantic-ui-css/semantic.css";
import "babel-polyfill";

import React from "react";
import ReactDOM from "react-dom";
import { rxConnect, ofActions } from "rx-connect";
import Rx from "rx";
import Rx from "rxjs";

import { ofWikiChanges } from "./wikiChangesObservable";

Expand All @@ -14,44 +15,44 @@ const CHANNELS = [
"commons.wikimedia.org",
];

@rxConnect(() => {
@rxConnect(function* () {
const actions = {
changeChannel$: new Rx.Subject(),
pause$: new Rx.Subject(),
resume$: new Rx.Subject(),
};

yield Rx.Observable::ofActions(actions);

const channel$ = actions.changeChannel$
.pluck(0)
.startWith(CHANNELS[0])
.distinctUntilChanged()
.shareReplay(1);
.publishReplay(1)
.refCount();

yield channel$.map(channel => ({ channel }));

const active$ = Rx.Observable
.merge(
actions.pause$.map(false),
actions.resume$.map(true),
actions.pause$.mapTo(false),
actions.resume$.mapTo(true),
)
.startWith(true)
.shareReplay(1);

return Rx.Observable.merge(
Rx.Observable::ofActions(actions),

channel$.map(channel => ({ channel })),

active$.map(active => ({ active })),

channel$
.flatMapLatest(channel =>
Rx.Observable::ofWikiChanges(channel)
.pausable(active$)
.sample(200) // Use sampling, otherwise Wikipedia might provide a lot of data :)
.scan((acc, next) => [next, ...acc].slice(0, 20), []) // Last 20
.startWith(undefined)
)
.map(items => ({ items }))
);
.publishReplay(1)
.refCount();

yield active$.map(active => ({ active }));

yield channel$
.switchMap(channel =>
Rx.Observable::ofWikiChanges(channel)
.let(o => active$.switchMap(active => active ? o : Rx.Observable.never()))
.sample(Rx.Observable.interval(200)) // Use sampling, otherwise Wikipedia might provide a lot of data :)
.scan((acc, next) => [next, ...acc].slice(0, 20), []) // Last 20
.startWith(undefined)
)
.map(items => ({ items }));
})
class WikiStream extends React.PureComponent {

Expand Down
4 changes: 2 additions & 2 deletions examples/wikistream/src/wikiChangesObservable.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ export function ofWikiChanges(channel) {

socket.on("connect", () => socket.emit("subscribe", channel));

socket.on("change", data => observer.onNext(data));
socket.on("change", data => observer.next(data));

return () => {
socket.removeAllListeners("connect");
socket.removeAllListeners("change");
socket.disconnect();
observer.onCompleted();
observer.complete();
}
});
}
1 change: 1 addition & 0 deletions examples/wikistream/webpack.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ module.exports = {
],
devServer: {
port: 3000,
inline: true,
contentBase: targetPath,
historyApiFallback: true
}
Expand Down
Loading

0 comments on commit 5b9b5ad

Please sign in to comment.