-
Notifications
You must be signed in to change notification settings - Fork 3k
/
Copy pathshareReplay.ts
45 lines (42 loc) · 1.26 KB
/
shareReplay.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import { Observable } from '../Observable';
import { ReplaySubject } from '../ReplaySubject';
import { IScheduler } from '../Scheduler';
import { Subscription } from '../Subscription';
import { MonoTypeOperatorFunction } from '../interfaces';
/**
* @method shareReplay
* @owner Observable
*/
export function shareReplay<T>(bufferSize?: number, windowTime?: number, scheduler?: IScheduler ): MonoTypeOperatorFunction<T> {
let subject: ReplaySubject<T>;
let refCount = 0;
let subscription: Subscription;
let hasError = false;
let isComplete = true;
return (source: Observable<T>) => new Observable<T>(observer => {
refCount++;
if (!subject || hasError) {
hasError = false;
subject = new ReplaySubject<T>(bufferSize, windowTime, scheduler);
subscription = source.subscribe({
next(value) { subject.next(value); },
error(err) {
hasError = true;
subject.error(err);
},
complete() {
isComplete = true;
subject.complete();
},
});
}
const innerSub = subject.subscribe(observer);
return () => {
refCount--;
innerSub.unsubscribe();
if (subscription && refCount === 0 && !isComplete) {
subscription.unsubscribe();
}
};
});
};