Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream processing #431

Closed
bvssvni opened this issue Jan 12, 2017 · 1 comment
Closed

Stream processing #431

bvssvni opened this issue Jan 12, 2017 · 1 comment

Comments

@bvssvni
Copy link
Member

bvssvni commented Jan 12, 2017

When thinking about bin in #430 I started to think of stream processing for binary data, and how Nile does this in parallel.

So far, Dyon has focused on sequential programming. I believe parallel stream processing could be interesting. This allows a nice syntax for setting up streams, without spawning threads or communicating over channels.

The basic idea is that bin/link/[] and a new kind of functions using a double arrow => can be composed to do parallel stream processing. In the long term there could added support for processing large files, web sockets and compression streams.

In Nile, there are only binary streams, but in Dyon you also might want to use link or [] (array).

Here is an example of a stream that increases all bytes with a value:

fn main() {
    a := unwrap(load_bin(file: "test.bin"))
    // Streams run in parallel whenever possible.
    // Starts immediately, but you do not need to wait for the result.
    b := a => inc(2)
    // `join__stream` returns the result of stream processing.
    _ := unwrap(save(bin: unwrap(join(stream: b)), file: "test2.bin"))
}

fn inc(val: f64) => bin[u8] >> bin[u16] {
    // Code that runs before `proc` is called the "prologue".
    println(val)
    proc {
        // `proc` is the "body" of the stream.
        // This part is called multiple times, often in parallel with other streams.
        a := << u8 // read `u8` from input stream.
        >> u16 a + val // write `u16` to output stream.
    }
    // Code that runs after `proc` is called the "epilogue".
    println(val)
}

The inner stream type is a special kind of type that "rotates" when using the >> operator. At each end point, the rotated type must have rotated a whole round, or else the type checker will complain. This is a bit more flexible than Nile and more suitable to a dynamically typed scripting language. If you leave out the inner stream type, you can decide the behavior at run time.

For example, if you have bin[u8, u16] you need to use >> u8 and >> u16 in order. Dyon will complain if you do >> u8 but no >> u16. Zero, one or many times is OK, but stopping in the middle and is not allowed.

The inner stream type must contain precise memory layout. One idea is to use struct like in Rust to declare inner stream types.

// Declare an inner stream type.
struct Point { x: f32, y: f32 }

fn add(p: vec4) => bin[Point] >> bin[Point] {
    proc {
        pos := << Point
        >> Point pos + p
    }
}

The link stream only accepts bool/f64/str.

fn inc(val: f64) => link[f64] >> link[f64] {
    proc {
        a := << f64
        >> f64 a + val
    }
}

The [] stream uses deep clone on items.

fn inc(val: f64) => [f64] >> [f64] {
    proc {
        a := << f64
        >> f64 a + val
    }
}

fn main() {
    a := [0, 1, 2] => inc(2)
    println(unwrap(join(stream: a))) // prints `[2, 3, 4]`
}

You can also convert from one stream to another:

fn foo() => bin[u8] >> link[bool] { ... }

You can call another stream function from within a stream function. Such calls can happen in the middle, and will "rotate" the inner stream type:

fn foo() => bin[u8, u16] >> bin[u16] {
    proc {
        a := << u8
        bar(a)
        _ := << u16
    }
}

fn bar(a: f64) => bin[u16, u8] >> bin[u16] {
    proc i a {
        b := << u16
        _ := << u8
        >> b + i
    }
}

This will pause the stream for the caller until the sub stream completes. By default proc { ... } runs until there is no more data, but proc i n { ... } sets a limit, generating an error message if the stream is closed before the limit is reached.

You can also use streams as first class citizens:

fn foo() => [f64] >> [f64] { ... }

fn main() {
    a := foo()
    bar(a)
}

fn bar(a: [f64] >> [f64]) {
    b := [0, 1, 2] => a
    println(unwrap(join(stream: b)))
}
@bvssvni
Copy link
Member Author

bvssvni commented Jun 16, 2018

Closed in favor of in-types #495

@bvssvni bvssvni closed this as completed Jun 16, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant