Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(sinks): New StreamingSink trait and file sink spike #1945

Closed
wants to merge 1 commit into from

Conversation

LucioFranco
Copy link
Contributor

This is an initial spike of what a StreamingSink trait would look like with tokio 0.2 and async/await. This is a proposal that could possibly get adopted for our file sink rewrite that is due since it is not compatible with our current runtime upgrade work.

Mainly opening this to get some feedback and thoughts, this tries to implement the previous logic in a very intuitive and easy to follow method.

cc @lukesteensen @MOZGIII

Signed-off-by: Lucio Franco luciofranco14@gmail.com

Signed-off-by: Lucio Franco <luciofranco14@gmail.com>
Comment on lines +38 to +62
pub struct LazyStreamingSink<T> {
sink: Option<(Receiver<Event>, T)>,
inner: CompatSink<Sender<Event>, Event>,
}

impl<T: StreamingSink> Sink for LazyStreamingSink<T> {
type SinkItem = Event;
type SinkError = ();

fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
if let Some((rx, mut sink)) = self.sink.take() {
tokio02::spawn(async move {
if let Err(error) = sink.run(rx).await {
error!(message = "Unexpected sink failure.", %error);
}
});
}

self.inner.start_send(item).map_err(drop)
}

fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
self.inner.poll_complete().map_err(drop)
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be dropped if we had access to an executor from build via SinkContext. This will just lazily spawn the bg task.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we need to spawn a background task. There's only one task doing work here, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we want to represent the actual logic of the sink as a its own task that gets fed items. Its just impossible to mix an async fn and Sink trait anyways. If this style works we may just want to feed a receiver to the sinks instead of using the sink trait. That would avoid the extra spawn but in end I don't think this hurts too much anyways.

@MOZGIII
Copy link
Contributor

MOZGIII commented Mar 4, 2020

I'm thinking that the futures::Sink is not a good fit for us. I mean, at the sinks::RouterSink level. We don't really have to use it, and in some of our cases - it just doesn't make any sense. I think we should roll out our own trait without even looking at futures's Sink. I guess the only similarities among what they have and what we need is the name. Which leads to confusion.
This is just an observation for now, and I'd be very happy to discuss it in more detail!

@LucioFranco
Copy link
Contributor Author

@MOZGIII agreed, but I think this is to show that we can always model things in terms of sinks but have a different trait for what our sinks will actually implement.

@MOZGIII
Copy link
Contributor

MOZGIII commented Mar 4, 2020

@MOZGIII agreed, but I think this is to show that we can always model things in terms of sinks but have a different trait for what our sinks will actually implement.

I don't get what you mean. However, I can say since I wrote my comment my ming has changed again, and I'm not so against the futures::Sink design. But I also have a feeling that we can implement a much better internal architecture if we just approach it differently - and simplify things.

@LucioFranco
Copy link
Contributor Author

@MOZGIII right, what I mean is that we can always use Sink as the abstraction that topology accepts but within that we can write specific traits like StreamingSink and HttpSink that abstract out the annoying details of sink. They both would have adapters that allow them to return a type that implements sink.

@MOZGIII
Copy link
Contributor

MOZGIII commented Mar 4, 2020

we can always

Well, that was my initial point: I don't we why would we. But yeah, we can.

So, what are the benefits of actually using sink there, or anywhere even? Compared to a simple future that can take the value and process it (like run). I see the idea in general, but for our purposes - how is that useful?

@LucioFranco
Copy link
Contributor Author

@MOZGIII sink is the most flexible api by far, allowing more fine grained control, so I don't think it makes sense to give that up just yet.

Copy link
Contributor

@MOZGIII MOZGIII left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a way to communicate completion for the processing of a single event. Otherwise we can have rance conditions, where users call sink.send_all(...).await and expects the processing of all the events completed when the control returns.
That's effectively what I hit at #1988 (comment).

@binarylogic
Copy link
Contributor

This is similar to what I’ve said before:

#1113 (comment)

I’m curious if you see a simple way to make that happen.

@MOZGIII
Copy link
Contributor

MOZGIII commented Mar 5, 2020

I'm working on a prototype for an AsyncSink - it's a more direct approach to implement a Sink with async traits:

#[async_trait]
pub trait AsyncSink<Item> {
    type Error;

    async fn ready(&mut self) -> Result<(), Self::Error>;

    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>;

    async fn flush(&mut self) -> Result<(), Self::Error>;

    async fn close(&mut self) -> Result<(), Self::Error>;
}

So, pretty much like the Sink trait, but with async_trait. It should fulfill all our needs, as long as we don't exceed what async_trait supports (i.e. hit GAT or sth).

@MOZGIII
Copy link
Contributor

MOZGIII commented Mar 5, 2020

There are some fundamental difficulties, so I might bail on the idea of having the async trait I outlined above. I'd be glad if we could create a task force to discuss the async APIs. Also, looking forward to a presentation that @LucioFranco suggested!

I checked #1113, and what we have here is different. In short, our sink implementation from this PR and #1988 are problematic, and do not uphold the invariants the API users (including myself) expect. I think there's a simple fix to that.

@LucioFranco
Copy link
Contributor Author

@MOZGIII so an issue with goig with that api is that it doesn't really model very well the stateless poll fn that sink has.

I think we don't want to follow the async fn style sink but instead have a single future that consumes some sort of stream.

@MOZGIII
Copy link
Contributor

MOZGIII commented Mar 5, 2020

What if we do it even simpler? Instead of
async fn run(&mut self, input: Receiver<Event>) -> Result<()>;
we can have
async fn process(&mut self, event: Event>) -> Result<(), self::Error>;.

This has good properties: per-even completion, composable, easy to implement. We should even be able to properly implement a Sink atop of this interface - if we always buffer things at start_send and run processing on them on poll_ready and poll_flush. One downside is it doesn't have an inner loop.
This is the fundamental choice we have there really - we can have a simple interface either consuming a stream and with an inner loop, but with no ability to communicate per-item completion, or consuming single evens - and then we don't have an inner loop. That is only for "simple" interface cases though.

Now, the Sink interface actually works with a single item at a time. It has no support for batching per se - only internal buffering, but the progress on the internal buffer is not communicated to the users of the API per item - the only option users have there is to poll_flush to ensure everything that was buffered before is processed. Users can also call poll_ready to involve the "some amount of buffer processing" that would put the internal buffer to a state when it can ingest one more item. So, not necessarily a full flush, but some meaningful action - like maybe a flush if the buffer is full enough. Pretty cool stuff is you ask me, but, as I said before, this advanced functionality of the sink interface we don't seem to be using much.

Our StreamingSink is the other way around, and have completely different underlying properties. I don't think it's worth trying to implement that

So, to summarise all of the above, I think a simple async fn process(&mut self, event: Event>) -> Result<(), self::Error>; is the way to go to replace Sink. It's actually trivial to implement a loop that spans across multiple events if we see the need. We can always do the buffering internally, and spawn the loops.

Alternatively, we could try and build our own more-sink-like interface, but I doubt we really have a need for that. We can start simple! And Sink is kind of not simple - I've had to spend some time to get a grasp of its properties.

P.S. I want to give some more context to my previous comment:

There are some fundamental difficulties, so I might bail on the idea of having the async trait I outlined above.

I concluded we'd need to pin multiple futures to somewhere - meaning we either need multiple stacks or we need a way of swap futures in and out. Either I'm missing something, or it's currently not possible to implement without unsafe, and I don't want to get into that complexity - just don't think it'll be a good value for us, since we have good workarounds.

@LucioFranco
Copy link
Contributor Author

@MOZGIII one thing that sticks out to me here is if we use a single event processing fn that doesn't really need to be async because if it were that'd mean we have no choice but to either spawn out the event sending future or block the next even.

This also leads to the ability to not have full control over the full sink. Think I want to expire a cache, with this process model how do we do that? If we have a stream coming in we can select against it. To me that model fits much better what we will realistically use it for.

@MOZGIII
Copy link
Contributor

MOZGIII commented Mar 5, 2020

@MOZGIII one thing that sticks out to me here is if we use a single event processing fn that doesn't really need to be async because if it were that'd mean we have no choice but to either spawn out the event sending future or block the next even.

Idk, having a non-async fn is as good as always blocking for the next event. And really, imo, we shouldn't do spawning to run the topology loops.

This also leads to the ability to not have full control over the full sink. Think I want to expire a cache, with this process model how do we do that? If we have a stream coming in we can select against it. To me that model fits much better what we will realistically use it for.

Let's see. So, an example. We want to expire the cache. With the proposed async process fn version, we only have two options: either do it during the process call, or in between the process calls. Now, it depends what initiates this cache busing. If it's internal decision - just keep it at the process fn. If it's external - for instance, the topology owner (or whatever) desires to send a signal to expire the cache - we either simple do it right away in between the process calls, or register the desire to perform cache expiration for the process fn to take care.

We can do this elegantly. Consider this:

trait EventProcessor {
    type Error;
    async fn process(&mut self, event: Event) -> Result<(), Self::Error>
}

enum ControlCommandProcessingError { ... }

trait ControlCommandProcessor {
    type ControlCommand;

    async fn control(&mut self, cmd: ControlCommand) -> Result<(), ControlCommandProcessingError>;
}

trait TopologyUnit: ControlCommandProcessor + EventProcessor {}

async fn topology_loop(units: &[Box<dyn TopologyUnit>]) {
    // connect topology
    // error handling ommited for simplcity
    for (unit, input) in connected_units {
        // create and expose the control_tx and control_rx somehow
        spawn(async {
            loop {
                // I'm lazy, so it's more of a pseudocode than real rust:
                select {
                    control = control_rx.next() =>  unit.control(control),
                    event = input.next() => unit.process(event),
                }
            }
        })
    }
}

@LucioFranco
Copy link
Contributor Author

imo, we shouldn't do spawning to run the topology loops.

Why is that? I don't think we do enough spawning currently.

If it's external - for instance, the topology owner (or whatever) desires to send a signal to expire the cache - we either simple do it right away in between the process calls, or register the desire to perform cache expiration for the process fn to take care.

I am not following how having topology send a signal to evict works? To me this seems more complicated and less isolated than consuming/dropping a stream.

@binarylogic
Copy link
Contributor

Closing, superseded by #1988.

@binarylogic binarylogic closed this Mar 9, 2020
@binarylogic binarylogic deleted the lucio/file-sink-spike branch April 24, 2020 20:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants