diff --git a/doc/decision-tree-widget/tree.yml b/doc/decision-tree-widget/tree.yml index 471da19038..856f33fc46 100644 --- a/doc/decision-tree-widget/tree.yml +++ b/doc/decision-tree-widget/tree.yml @@ -349,6 +349,9 @@ children: - label: using custom logic children: - label: Observable.create + - label: using a state machine similar to a for loop + children: + - label: Observable.generate - label: that throws an error children: - label: Observable.throw diff --git a/doc/operators.md b/doc/operators.md index 186f5303cb..a63c08ee97 100644 --- a/doc/operators.md +++ b/doc/operators.md @@ -120,6 +120,7 @@ There are operators for different purposes, and they may be categorized as: crea - [`fromEvent`](../class/es6/Observable.js~Observable.html#static-method-fromEvent) - [`fromEventPattern`](../class/es6/Observable.js~Observable.html#static-method-fromEventPattern) - [`fromPromise`](../class/es6/Observable.js~Observable.html#static-method-fromPromise) +- [`generate`](../class/es6/Observable.js~Observable.html#static-method-generate) - [`interval`](../class/es6/Observable.js~Observable.html#static-method-interval) - [`never`](../class/es6/Observable.js~Observable.html#static-method-never) - [`of`](../class/es6/Observable.js~Observable.html#static-method-of) diff --git a/spec/observables/generate-spec.ts b/spec/observables/generate-spec.ts new file mode 100644 index 0000000000..52edaa8ae4 --- /dev/null +++ b/spec/observables/generate-spec.ts @@ -0,0 +1,168 @@ +import * as Rx from '../../dist/cjs/Rx.KitchenSink'; +import '../../dist/cjs/add/observable/generate'; +import {TestScheduler} from '../../dist/cjs/testing/TestScheduler'; +import {expect} from 'chai'; +declare const {asDiagram, expectObservable}; +declare const rxTestScheduler: TestScheduler; + +const Observable = Rx.Observable; + +function err(): any { + throw 'error'; +} + +describe('Observable.generate', () => { + asDiagram('generate(1, x => false, x => x + 1)') + ('should complete if condition does not meet', () => { + const source = Observable.generate(1, x => false, x => x + 1); + const expected = '|'; + + expectObservable(source).toBe(expected); + }); + + asDiagram('generate(1, x => x == 1, x => x + 1)') + ('should produce first value immediately', () => { + const source = Observable.generate(1, x => x == 1, x => x + 1); + const expected = '(1|)'; + + expectObservable(source).toBe(expected, { '1': 1 }); + }); + + asDiagram('generate(1, x => x < 3, x => x + 1)') + ('should produce all values synchronously', () => { + const source = Observable.generate(1, x => x < 3, x => x + 1); + const expected = '(12|)'; + + expectObservable(source).toBe(expected, { '1': 1, '2': 2 }); + }); + + it('should use result selector', () => { + const source = Observable.generate(1, x => x < 3, x => x + 1, x => (x + 1).toString()); + const expected = '(23|)'; + + expectObservable(source).toBe(expected); + }); + + it('should allow omit condition', () => { + const source = Observable.generate({ + initialState: 1, + iterate: x => x + 1, + resultSelector: x => x.toString() + }).take(5); + const expected = '(12345|)'; + + expectObservable(source).toBe(expected); + }); + + it('should stop producing when unsubscribed', () => { + const source = Observable.generate(1, x => x < 4, x => x + 1); + let count = 0; + const subscriber = new Rx.Subscriber( + x => { + count++; + if (x == 2) { + subscriber.unsubscribe(); + } + } + ); + source.subscribe(subscriber); + expect(count).to.be.equal(2); + }); + + it('should accept a scheduler', () => { + const source = Observable.generate({ + initialState: 1, + condition: x => x < 4, + iterate: x => x + 1, + resultSelector: x => x, + scheduler: rxTestScheduler + }); + const expected = '(123|)'; + + let count = 0; + source.subscribe(x => count++); + + expect(count).to.be.equal(0); + rxTestScheduler.flush(); + expect(count).to.be.equal(3); + + expectObservable(source).toBe(expected, { '1': 1, '2': 2, '3': 3 }); + }); + + it('should allow minimal possible options', () => { + const source = Observable.generate({ + initialState: 1, + iterate: x => x * 2 + }).take(3); + const expected = '(124|)'; + + expectObservable(source).toBe(expected, { '1': 1, '2': 2, '4': 4 }); + }); + + it('should emit error if result selector throws', () => { + const source = Observable.generate({ + initialState: 1, + iterate: x => x * 2, + resultSelector: err + }); + const expected = '(#)'; + + expectObservable(source).toBe(expected); + }); + + it('should emit error if result selector throws on scheduler', () => { + const source = Observable.generate({ + initialState: 1, + iterate: x => x * 2, + resultSelector: err, + scheduler: rxTestScheduler + }); + const expected = '(#)'; + + expectObservable(source).toBe(expected); + }); + + it('should emit error after first value if iterate function throws', () => { + const source = Observable.generate({ + initialState: 1, + iterate: err + }); + const expected = '(1#)'; + + expectObservable(source).toBe(expected, { '1': 1 }); + }); + + it('should emit error after first value if iterate function throws on scheduler', () => { + const source = Observable.generate({ + initialState: 1, + iterate: err, + scheduler: rxTestScheduler + }); + const expected = '(1#)'; + + expectObservable(source).toBe(expected, { '1': 1 }); + }); + + it('should emit error if condition function throws', () => { + const source = Observable.generate({ + initialState: 1, + iterate: x => x + 1, + condition: err + }); + const expected = '(#)'; + + expectObservable(source).toBe(expected); + }); + + it('should emit error if condition function throws on scheduler', () => { + const source = Observable.generate({ + initialState: 1, + iterate: x => x + 1, + condition: err, + scheduler: rxTestScheduler + }); + const expected = '(#)'; + + expectObservable(source).toBe(expected); + }); +}); \ No newline at end of file diff --git a/src/Rx.ts b/src/Rx.ts index 73e55132b3..80a9cf3473 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -19,6 +19,7 @@ import './add/observable/from'; import './add/observable/fromEvent'; import './add/observable/fromEventPattern'; import './add/observable/fromPromise'; +import './add/observable/generate'; import './add/observable/interval'; import './add/observable/merge'; import './add/observable/race'; diff --git a/src/add/observable/generate.ts b/src/add/observable/generate.ts new file mode 100644 index 0000000000..ed042e96d0 --- /dev/null +++ b/src/add/observable/generate.ts @@ -0,0 +1,10 @@ +import {Observable} from '../../Observable'; +import {GenerateObservable} from '../../observable/GenerateObservable'; + +Observable.generate = GenerateObservable.create; + +declare module '../../Observable' { + namespace Observable { + export let generate: typeof GenerateObservable.create; + } +} \ No newline at end of file diff --git a/src/observable/GenerateObservable.ts b/src/observable/GenerateObservable.ts new file mode 100644 index 0000000000..71ff576907 --- /dev/null +++ b/src/observable/GenerateObservable.ts @@ -0,0 +1,296 @@ +import {Observable} from '../Observable' ; +import {Scheduler} from '../Scheduler'; +import {Subscriber} from '../Subscriber'; +import {Subscription} from '../Subscription'; +import {Action} from '../scheduler/Action'; + +import {isScheduler} from '../util/isScheduler'; + +const selfSelector = (value: T) => value; + +export type ConditionFunc = (state: S) => boolean; +export type IterateFunc = (state: S) => S; +export type ResultFunc = (state: S) => T; + +interface SchedulerState { + needIterate?: boolean; + state: S; + subscriber: Subscriber; + condition?: ConditionFunc; + iterate: IterateFunc; + resultSelector: ResultFunc; +} + +export interface GenerateBaseOptions { + /** + * Inital state. + */ + initialState: S; + /** + * Condition function that accepts state and returns boolean. + * When it returns false, the generator stops. + * If not specified, a generator never stops. + */ + condition?: ConditionFunc; + /** + * Iterate function that accepts state and returns new state. + */ + iterate: IterateFunc; + /** + * Scheduler to use for generation process. + * By default, a generator starts immediately. + */ + scheduler?: Scheduler; +} + +export interface GenerateOptions extends GenerateBaseOptions { + /** + * Result selection function that accepts state and returns a value to emit. + */ + resultSelector: ResultFunc; +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @extends {Ignored} + * @hide true + */ +export class GenerateObservable extends Observable { + constructor(private initialState: S, + private condition: ConditionFunc, + private iterate: IterateFunc, + private resultSelector: ResultFunc, + private scheduler?: Scheduler) { + super(); + } + + /** + * Generates an observable sequence by running a state-driven loop + * producing the sequence's elements, using the specified scheduler + * to send out observer messages. + * + * + * + * @example Produces sequence of 0, 1, 2, ... 9, then completes. + * var res = Rx.Observable.generate(0, x => x < 10, x => x + 1, x => x); + * + * @example Using asap scheduler, produces sequence of 2, 3, 5, then completes. + * var res = Rx.Observable.generate(1, x => x < 5, x => x * 2, x => x + 1, Rx.Scheduler.asap); + * + * @see {@link from} + * @see {@link create} + * + * @param {S} initialState Initial state. + * @param {function (state: S): boolean} condition Condition to terminate generation (upon returning false). + * @param {function (state: S): S} iterate Iteration step function. + * @param {function (state: S): T} resultSelector Selector function for results produced in the sequence. + * @param {Scheduler} [scheduler] A {@link Scheduler} on which to run the generator loop. If not provided, defaults to emit immediately. + * @returns {Observable} The generated sequence. + */ + static create(initialState: S, + condition: ConditionFunc, + iterate: IterateFunc, + resultSelector: ResultFunc, + scheduler?: Scheduler): Observable + + /** + * Generates an observable sequence by running a state-driven loop + * producing the sequence's elements, using the specified scheduler + * to send out observer messages. + * The overload uses state as an emitted value. + * + * + * + * @example Produces sequence of 0, 1, 2, ... 9, then completes. + * var res = Rx.Observable.generate(0, x => x < 10, x => x + 1); + * + * @example Using asap scheduler, produces sequence of 1, 2, 4, then completes. + * var res = Rx.Observable.generate(1, x => x < 5, x => x * 2, Rx.Scheduler.asap); + * + * @see {@link from} + * @see {@link create} + * + * @param {S} initialState Initial state. + * @param {function (state: S): boolean} condition Condition to terminate generation (upon returning false). + * @param {function (state: S): S} iterate Iteration step function. + * @param {Scheduler} [scheduler] A {@link Scheduler} on which to run the generator loop. If not provided, defaults to emit immediately. + * @returns {Observable} The generated sequence. + */ + static create(initialState: S, + condition: ConditionFunc, + iterate: IterateFunc, + scheduler?: Scheduler): Observable + + /** + * Generates an observable sequence by running a state-driven loop + * producing the sequence's elements, using the specified scheduler + * to send out observer messages. + * The overload accepts options object that might contain inital state, iterate, + * condition and scheduler. + * + * + * + * @example Produces sequence of 0, 1, 2, ... 9, then completes. + * var res = Rx.Observable.generate({ + * initialState: 0, + * condition: x => x < 10, + * iterate: x => x + 1 + * }); + * + * @see {@link from} + * @see {@link create} + * + * @param {GenerateBaseOptions} options Object that must contain initialState, iterate and might contain condition and scheduler. + * @returns {Observable} The generated sequence. + */ + static create(options: GenerateBaseOptions): Observable + + /** + * Generates an observable sequence by running a state-driven loop + * producing the sequence's elements, using the specified scheduler + * to send out observer messages. + * The overload accepts options object that might contain inital state, iterate, + * condition, result selector and scheduler. + * + * + * + * @example Produces sequence of 0, 1, 2, ... 9, then completes. + * var res = Rx.Observable.generate({ + * initialState: 0, + * condition: x => x < 10, + * iterate: x => x + 1, + * resultSelector: x => x + * }); + * + * @see {@link from} + * @see {@link create} + * + * @param {GenerateOptions} options Object that must contain initialState, iterate, resultSelector and might contain condition and scheduler. + * @returns {Observable} The generated sequence. + */ + static create(options: GenerateOptions): Observable + + static create(initialStateOrOptions: S | GenerateOptions, + condition?: ConditionFunc, + iterate?: IterateFunc, + resultSelectorOrObservable?: (ResultFunc) | Scheduler, + scheduler?: Scheduler): Observable { + if (arguments.length == 1) { + return new GenerateObservable( + (>initialStateOrOptions).initialState, + (>initialStateOrOptions).condition, + (>initialStateOrOptions).iterate, + (>initialStateOrOptions).resultSelector || selfSelector, + (>initialStateOrOptions).scheduler); + } + + if (resultSelectorOrObservable === undefined || isScheduler(resultSelectorOrObservable)) { + return new GenerateObservable( + initialStateOrOptions, + condition, + iterate, + selfSelector, + resultSelectorOrObservable); + } + + return new GenerateObservable( + initialStateOrOptions, + condition, + iterate, + >resultSelectorOrObservable, + scheduler); + } + + protected _subscribe(subscriber: Subscriber): Subscription | Function | void { + let state = this.initialState; + if (this.scheduler) { + return this.scheduler.schedule>(GenerateObservable.dispatch, 0, { + subscriber, + iterate: this.iterate, + condition: this.condition, + resultSelector: this.resultSelector, + state }); + } + const { condition, resultSelector, iterate } = this; + do { + if (condition) { + let conditionResult: boolean; + try { + conditionResult = condition(state); + } catch (err) { + subscriber.error(err); + return; + } + if (!conditionResult) { + subscriber.complete(); + break; + } + } + let value: T; + try { + value = resultSelector(state); + } catch (err) { + subscriber.error(err); + return; + } + subscriber.next(value); + if (subscriber.isUnsubscribed) { + break; + } + try { + state = iterate(state); + } catch (err) { + subscriber.error(err); + return; + } + } while (true); + } + + private static dispatch(state: SchedulerState) { + const { subscriber, condition } = state; + if (subscriber.isUnsubscribed) { + return; + } + if (state.needIterate) { + try { + state.state = state.iterate(state.state); + } catch (err) { + subscriber.error(err); + return; + } + } else { + state.needIterate = true; + } + if (condition) { + let conditionResult: boolean; + try { + conditionResult = condition(state.state); + } catch (err) { + subscriber.error(err); + return; + } + if (!conditionResult) { + subscriber.complete(); + return; + } + if (subscriber.isUnsubscribed) { + return; + } + } + let value: T; + try { + value = state.resultSelector(state.state); + } catch (err) { + subscriber.error(err); + return; + } + if (subscriber.isUnsubscribed) { + return; + } + subscriber.next(value); + if (subscriber.isUnsubscribed) { + return; + } + return (>>this).schedule(state); + } +} \ No newline at end of file