-
Notifications
You must be signed in to change notification settings - Fork 1
/
Step10.ts
99 lines (85 loc) · 2.39 KB
/
Step10.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
module Step10 {
interface Observer<T> {
next(value: T): void;
error(err: any): void;
complete(): void;
}
type Teardown = () => void;
class Subscriber<T> implements Observer<T> {
closed: Boolean = false;
constructor(private destination: Observer<T>, private subscription: Subscription) {
subscription.add(() => (this.closed = true)); // if this is unsubscribed, don't let antyhing complete or error
}
next(value: T): void {
if (!this.closed) {
this.destination.next(value);
}
}
error(err: any): void {
if (!this.closed) {
this.closed = true;
}
this.destination.error(err);
// unsubscribe from the subscription so that it does it when you want to
this.subscription.unsubscribe();
}
complete(): void {
if (!this.closed) {
this.closed = true;
this.destination.complete();
// unsubscribe from the subscription so that it does it when you want to
this.subscription.unsubscribe();
}
}
}
class Subscription {
private teardowns: Array<Teardown> = [];
add(aTeardown: Teardown) {
this.teardowns.push(aTeardown);
}
unsubscribe() {
for (const teardown of this.teardowns) {
teardown();
}
this.teardowns = [];
}
}
class Observable<T> {
constructor(private Init: (observer: Observer<T>) => Teardown) {}
subscribe(observer: Observer<T>): Subscription {
const subscription = new Subscription();
const subscriber: Observer<T> = new Subscriber(observer, subscription);
subscription.add(this.Init(subscriber));
return subscription;
}
}
const anObservable = new Observable((observer: Observer<number>) => {
let i = 0;
const id = setInterval(() => {
observer.next(i++);
if (i > 3) {
observer.complete();
observer.next(999999); // This doesn't work anymore.
}
}, 1000);
// resturn a basic teardown
return () => {
console.log('tearing down');
clearInterval(id);
};
});
const teardown = anObservable.subscribe({
next(value: number) {
console.log(value);
},
error(err: any) {
console.error(err);
},
complete() {
console.log('Complete!');
}
});
setTimeout(() => {
teardown.unsubscribe(); // This is now a Subscription
}, 7000); // should teardown despite this timer
}