-
Notifications
You must be signed in to change notification settings - Fork 26
Streams in Encore
This is a discussion of the design of Functional Future Streams for Encore. They are functional in that no operations are available (in principle) to modify the stream, and thus a stream can have multiple readers. The producer of the stream and the consumer run concurrently, and the consumer access values via futures (because the producer may not have produced). The design is semi-suspended for the moment as it awaits input from Oslo, but nevertheless aims to provide a coherent starting point.
In Quasi-Haskell notation, a stream has the following type
type Stream a = Fut (Maybe (St a))
data St a = St a (Stream a)
As Encore does not yet have sum types, there are two ways of realising this: one uses null
pointers; another is the following:
passive class E
payload : int
next : Fut E -- that is, stream
ended : bool
def init(p : int, n : Fut E) : E {
this.payload = p;
this.next = n;
this.ended = false;
this
}
def eos() : E {
this.ended = true;
this
}
def getPayload() : int {
this.payload;
}
def getNext() : Fut E {
this.next;
}
def isEos() : bool {
this.ended
}
Here one dreams that Stream = Fut E
, but there is no way of stating this in the language. Class E
stores the payload and the pointer to the tail of the stream, if the eos
field is false
, otherwise the payload and tail of the stream are ignored.
The following class give a pattern for stream producers (which can possibly be improved due to some recent compiler fixes).
class StreamProducer
def computeStream(state : int) : E {
let res = new E in {
if (state > 110) then {
res.eos()
} else {
-- previously I was doing a recursive call to this, but that didn't work
-- then I tried indirect recursive call to this (via an alias), but that
-- didn't work either
let sf = new StreamProducer in
res.init(state, sf.computeStream(state + 1) )
};
res;
};
}
The stream producer should not really create a new class for each new instance of the stream --- this is due to a bug.
The next class gives the pattern for a stream consumer. It iteratively/recursively gets elements off the stream until the end of stream has been reached.
class StreamConsumer
def consume(s : Fut E) : void {
let res = get s in
if (res.isEos()) then {
() -- we're done
} else {
let d = res.getPayload() in
let n = res.getNext() in {
print d;
this.consume(n);
();
}
};
}
class Main
def main () : void {
let producer = new StreamProducer in
let s = producer.computeStream(100)
c = new StreamConsumer
c2 = new StreamConsumer in {
c.consume(s);
c2.consume(s);
};
()
}
The approach described above heavily depends upon futures and it is a little awkward to generate streams. The first problem can be solved by hiding the implementation behind some abstraction, the second is addressed below.
The idea is that if a method has return type Stream A
, then the semantics of the method changes. Two new statement yield a
and eos
become available.
-
yield a
puts an element on the stream. -
eos
closes the stream.
Note that after calling eos
, any further calls to yield
or eos
result in an errors.
In addition, returning from the method has no effect. No result is returned and no one can detect that the method has completed.
This version of streams can more easily be used inside a while or for loop, for instance.
The design borrows from C# iterators, without saying yield return a
. They are less similar to Scala and Python's yield
, though this could be a misunderstanding on my part, and completely unrelated to Ruby's yield
.
When the result type of a method is Stream a
, the method is implemented differently. The precise implementation will depend upon the stream data type.
- Firstly, a future is created and returned as usual, but it's type is different (
Stream
) and its internals are available inside the method -- inside the method the promise interface of the future is available. This future is referred to as the current future. - each call to yield does the following
- create a new future
- create a E object contain the value and the new future
- store reference to this object in existing future
- set new future to be current future.
-
eos
adds the Eos object into the future, thereby ending the stream, and sets current future tonull
. - returning from the method has no effect, beyond finishing the method
Maybe it is possible that calling return
or otherwise exiting the method results in eos
. While this seems like a nice unification, I think this solution has limitations.
- Delegate the stream generation responsibility to another method. This could occur midstream and control could possibly return to the current method (that's tricky, though, based on constraints mentioned above).
def producer1() : Stream A {
yield 0;
yield 1;
eos;
}
def producer2() : Stream A {
yield 1;
yield 2;
delegate producer1();
}
This produces stream 1, 2, 0, 1.
Alternatively, but I'm less convinced
def producer1() : Stream A {
yield 0;
yield 1;
}
def producer2() : Stream A {
yield 1;
yield 2;
delegate producer1();
-- control returns, stream is open
yield 3;
eos;
}
This produces stream 1, 2, 0, 1, 3.
- Combining streams, for instance, those passed in as parameters or as the result of other methods. In principle, one could return a parameter stream as the current stream, or interleave parameter streams, or return one stream followed by another. (Unfortunately, as streams are functional, many of these operations probably require creating a new stream, though some sharing will be possible). One way to avoid some of the problems is to allow stream producers to have access to the open tail end (no EOS there) of the current stream (but one would need to ensure that it is accessed linearly). Then one could in O(1) time add to the end of a stream.
Other possibilities: splits, joins, interleaves, feedback, filters.
TODO TODO TODO
Chunking: storing several parts of the stream together in the same data structure, rather than releasing elements of the stream one by one. (External abstraction remains the same.)
Controlling producer runaway.