-
Notifications
You must be signed in to change notification settings - Fork 3
/
StreamExt.java
90 lines (77 loc) · 2.21 KB
/
StreamExt.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package recaf.demo.cps;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import recaf.core.ISupply;
import recaf.core.alg.JavaMethodAlg;
import recaf.core.cps.EvalJavaStmt;
import recaf.core.cps.K;
import recaf.core.cps.K0;
import recaf.core.cps.SD;
import rx.Observable;
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;
public class StreamExt<R> implements EvalJavaStmt<R>, JavaMethodAlg<Subject<R, R>, SD<R>> {
ReplaySubject<R> result = ReplaySubject.create();
int depth = 0; // light depth tracking
public Subject<R, R> Method(SD<R> body) {
depth++;
body.accept(null,
r -> {
depth--;
if (depth==0) result.onCompleted();
},
() -> {
depth--;
if (depth==0) result.onCompleted();
},
label -> { },
label -> { },
ex -> { result.onError(ex); }) ;
return result;
}
public <T> SD<R> Await(ISupply<CompletableFuture<T>> e, Function<T, SD<R>> body) {
return (label, rho, sigma, brk, contin, err) -> get(e).accept(f -> {
f.whenComplete((a, ex) -> {
if (a == null) {
err.accept(ex);
} else {
body.apply(a).accept(null, rho, sigma, brk, contin, err);
}
});
} , err);
}
public SD<R> Yield(ISupply<R> exp) {
return (label, rho, sigma, brk, contin, err) -> {
get(exp).accept(v -> {
result.onNext(v);
sigma.call();
} , err);
};
}
public <T> SD<R> YieldFrom(ISupply<Observable<R>> exp) {
return (label, rho, sigma, brk, contin, err) -> {
get(exp).accept(v -> {
sigma.call();
} , err);
};
}
private <T> void loop(Iterator<T> iter, Function<T, SD<R>> body, K<R> rho, K0 sigma, K<String> brk, K<String> contin, K<Throwable> err){
body.apply(iter.next()).accept(null,
rho,
() -> {
if (iter.hasNext())
loop(iter, body, rho, sigma, brk, contin, err);
else
sigma.call();
}, brk, contin, err);
}
public <T> SD<R> AwaitFor(ISupply<Observable<T>> coll, Function<T, SD<R>> body){
return (label, rho, sigma, brk, contin, err) -> {
get(coll).accept(v -> {
Iterator<T> it = v.toBlocking().toIterable().iterator();
loop(it, body, rho, sigma, brk, contin, err);
} , err);
};
}
}