Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Potential new sink trait design #2122

Closed
LucioFranco opened this issue Mar 23, 2020 · 11 comments
Closed

Potential new sink trait design #2122

LucioFranco opened this issue Mar 23, 2020 · 11 comments
Assignees
Labels
domain: sinks Anything related to the Vector's sinks domain: topology Anything related to Vector's topology code meta: feedback Anything related to customer/user feedback. meta: idea Anything in the idea phase. Needs further discussion and consensus before work can begin. type: tech debt A code change that does not add user value.

Comments

@LucioFranco
Copy link
Contributor

LucioFranco commented Mar 23, 2020

I decided to not go with an RFC right out the bat but I'd like some feedback on a new sink trait design.

Current

Currently, we use futures01::Sink trait as the main entry point for our sinks. We then break out and implement layers on top of this trait to provide the basic batching, partitioning and streaming implementations that our sinks use.

The current sink trait looks like this and we force the the SinkItem to be an Event. The relevant code is here.

futures01::Sink definition:

trait Sink {
	type SinkItem;
	type SinkError;

	fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError>;

	fn poll_complete(&mut self) -> Poll<(), Self::SinkError>;
}

There are a couple flaws with this design:

  1. It is complicated, it uses a poll based api which is extremely error prone and must be written very carefully. This only becomes worse with futures03::Sink which 1) is far from stable (compared to Stream and Future, I also don't know of anyone actively working on stabilizing it). The new sink trait, which can be found here is much much more complicated introducing pinning and 2 more required fn.

  2. The sink trait is incompatible with our encode_event model where we go from one type to another by encoding it into a different type. The reason this is incompatible is due to the fact that if start_send is not able to send (aka applying back pressure) we must return the original event. The issue arises with that we need to encode the event to send it! Therefore if we encode it one way to get it back into its original form to return it we must decode it. This is very costly where this could happen often. The solution to this is to use an api like with tower::Service where there is a Service::poll_ready that doesn't require an item to be sent. We already use this type of api within vector already but we require to buffer at least one event and this introduces overhead.

  3. Most of our sinks return futures intead of submitting items into a "sink" that gets driven. This leads me to think that an api like poll_complete is actually more low level than what we actually need. It again introduces a complexity that isn't required and can lead to io starvation issues like BatchSink does not properly drive the inner IO resources #299.

Proposal

I suggest that we move to a Stream based api rather than a push Sink based API. We have already experimented with this type of api in #1988 and #2096. It has so far been proven to work quite well.

The key idea with the new design is to instead of "pushing" items into a Sink, we treat a vector sink as a Future that is provided some Stream<Item = Event>. The sink can then at its own rate "pull" events out of the Stream. Usually, this stream is either a mpsc in memory buffer or an on disk leveldb backed buffer. In fact, we already have this in place within topology but we instead use the Stream::forward combinator to stream events into the sink. This code can be found here.

What I suggest we do is instead of calling the Stream::forward combinator, we instead pass the rx end of the channel into our sink config. This would then allow us to return a Future that processes the stream asynchronously.

This model also fits well with async/await because we currently have generator support for Future but not for Sink. This means we can use the less error prone api for implementing new sinks. The benefit here is a much better experience writting sinks.

Proposed trait:

#[async_trait]
trait Sink {
	async fn run(&self, rx: Receiver<Event>) -> crate::Result<()>;
}

This would then allow us to write our batching and streaming sinks in a much more readable and maintainable way.

@LucioFranco LucioFranco added domain: topology Anything related to Vector's topology code meta: idea Anything in the idea phase. Needs further discussion and consensus before work can begin. domain: sinks Anything related to the Vector's sinks meta: feedback Anything related to customer/user feedback. labels Mar 23, 2020
@Hoverbear
Copy link
Contributor

This sounds good, @LucioFranco :)

Do you see any possible complications in implementation? Will this change anything about our buffering or batching strategies?

@LucioFranco
Copy link
Contributor Author

It shouldn't change any user facing things, only internal. We should be able to achieve the same buffering/batching as we do now.

@Hoverbear
Copy link
Contributor

@lukesteensen it'd be good for you to weigh in here. :)

@lukesteensen
Copy link
Member

Overall I think this seems reasonable. We'll very likely want to have more specific APIs for different types of sinks, but this makes sense as the bottom layer.

One thing that'd be interesting to think about is if there's a similarly general design that'd make it easier to "drive" sinks from the outside. This would be very useful for testing purposes and could help get rid of some of the sleeping and flakey tests we have now. Maybe something like a async fn submit(&mut self, event: Event)?

@LucioFranco
Copy link
Contributor Author

@lukesteensen I am not following what you mean by async fn submit to me this doesn't work because we never have a single future per event but more so have a future per chunk of events. Which is why I like the streaming api better since it allows you to just work with streams.

@lukesteensen
Copy link
Member

Basically, the run fn in this proposal is pretty much always going to be structured as a loop:

while let Some(event) = rx.next().await {
    // do stuff with the event here
}

If there's a feasible way to do it, it could be nice for the sink API to just be fn do_stuff(event: Event). It wouldn't need to return a future.

This could be totally wrong, just brainstorming to see if there's something that could help simplify tests, etc.

@LucioFranco
Copy link
Contributor Author

@lukesteensen well for current sinks that use the HttpSink pattern you could just pass it to BatchedHttpSink with forward(rx).

The loop case would replace the streaming setup. The one thing that isn't solved is the acking. But I think that could be covered with combinator types while still allowing flexibility.

@binarylogic binarylogic added the type: tech debt A code change that does not add user value. label Mar 30, 2020
@LucioFranco
Copy link
Contributor Author

Update on this, I have spent the last few days endlessly cornering myself with our topology code. There are a couple issues, we can not use async fn in our sink trait since that is not object safe. We also have a proliferation of use of the Sink trait throughout our topology which is causing issues with this change as well. I will write up a more in-depth update but we'd like to better understand what the implications of this change are. The big issue why this is important is that upgrading to the newer sink type is probably not the best idea 1) its far from stable and has no one really pushing for it to be stable 2) its much much more complex.

@LucioFranco
Copy link
Contributor Author

So here is a quick write up of some potential things we could do to improve topology. First, I will (as usual) note some problems with our current topology that should be motivation to update things.

Current topology implementation

Currently, our topology implementation is quite complex. I after over a year of working on this code base still very much struggle to totally understand what is going on. This is partially due to how complex the task is but also I believe a confusing usage of types. To give a high level understanding of what our current topology does we can break it down into three main areas.

Sources are the simplest component, they are made up of two tasks 1) a source to channel task (this is the one returned from the SourceConfig trait). 2) is a task that takes the rx end of the channel and "pumps" events to our Fanout channel. I will talk about fanout later but just know that its goal is to fanout events to multiple destinations.

Transforms are a mix of what sinks provide but they still output. Transforms are built up of three main pieces. 1) A BufferInputCloner which is a wrapper around UnboundedSender::clone. This allows us to clone and pass around the input end of our transforms. 2) The transform task itself, since our config trait produces a stream we have a task that holds onto the rx end of the BufferInputCloner and transforms events. It then sends these events into 3) the output end of the transform, again this uses the Fanout type.

Sinks are similar to transforms except they do not have an output. Their output is obviously some downstream service. Sinks instead of being three tasks like transforms are only made up of two tasks. 1) Is the same as the transform where we have a BufferInputCloner. 2) We have a Sink (or what we type alias it to internally RouterSink) implementation that is passed to the Fanout type of the previous transform or sink in the chain.

To sum this part up it looks like this usually:

Source -> tx -> source pump -> fanout -> BufferInputCloner -> transform stream -> fanout -> sink

In general this is a very good approach and has worked quite well for us. That said, there are problems with this setup when it comes to upgrading our types. Not only that, we in fact have much more buffer space than what the user may add to their sink config. We currently allocate a size 1000 channel for sources -> our fanout pump and we also allocate another 100 size channel in between the fanout pump and the input to our transforms. This means with a basic source -> transform -> sink, we by default buffer up to 1100 events before we even hit our buffer type for the sink. This may not be a problem in the end but it is still something to note.

So, I mentioned I would talk about fanout. So Fanout is a type that lives within our topology code. It's job is to fanout events to multiple destinations. It also supports three actions once it has been created add, remove, and replace. This then allows us to dynamically update where we fanout events too. This works quite well but has a couple issues with it. One, we must use a secondary channel to send control messages. This means, that within topology we have to keep track of our fanout control channel, input cloner and the individual tasks. This becomes complex quite quick. The other issue is that Fanout ties us directly to the old sink type, while the old type was not super hard to use, the new sink trait is far from stable and is just not a good idea to use here. Since, we want to do as little low level futures work as possible.

This problem continues as we end up using the sink type in a lot of places. This problem becomes paramount with Fanout because its interface accepts Sink everywhere and wants to push items directly. While this works great for us now, it does not support the proposal above. This is because we want to shift our sinks from working on a push basis to accepting some stream that it can pull from independently.

New proposal

My new proposal is a bit more intrusive than I originally expected but I think the outcome is much better. What I suggest is that we shift the entire pipeline to use tokio::sync::broadcast and pass a receiver end of a channel to transforms and sinks. In fact, we already do this within our Transform trait.

The problem here is that all transforms need an extra 100 size channel to buffer events. This is quite unfortunate and it is a result of our Fanout type. So what I suggest is that we swap fanout for a the tokio broadcast type. This type internally is very similar to Fanout in the sense that it will clone a event and pass it to multiple senders. The issue with using broadcast directly is that it does not provide the same features that our Fanout does, namely shutting down a destination and adding a new one.

This though can be solved by a new type called Valve which is designed to be remotely shutoff and end a stream. This would support our use case where we want to remotely shutoff an input to a component. For example, we could hold onto a handle within RunningTopology for use when we want to shutdown an output. It would first "close" the Valve letting the transform/sink drain. This idea coincides with our new shutdown method where components upon receiving None should attempt to drain and exit. This should solve the remove case.

For adding a new transform/sink, we can create a new Receiver and spawn the component task with it. That part should be quite straight forward, no need for a control channel to update some running Fanout task.

The main issue with this approach is that we can no longer support the replace control action. This is due to the fact that these components need to own the incoming stream type. Because of this, its almost impossible to rewire it (We might be able to implement some broadcast type that could support this). That said, I personally don't think its worth it to support this much more complex replace in place type. We've seen this be a point of confusion in the past and I believe it makes our topology much more complicated and makes testing this type of behavior very difficult. It also introduces plenty of surface for race conditions if not done correctly.

So to generally sum up, I think we should simplify our topology implementation, lean on better types like tokio::sync::broadcast and shift our transform/sink implementations to receive a broadcast stream type.

Another major thing I'd also like to bring up with this proposal is also simplifying our traits, we currently have a few traits all over the place. The big one I can think of is Transform which lives witin the transform/mod.rs while lots of other traits live in topology/config/mod.rs. Even though this generally makes sense I think its confusing for new developers to find the correct traits. I think keeping them all in one spot might make this discovery easier.

TL;DR: Replace our usage of intermediate channels to satisfy Fanout requirements with a proper broadcast channel and better remote shutoff control. Use a ring buffer of a smaller size and bubble up back pressure better.

(also sorry I said this would be short but ended up writing a lot, hopefully this can be useful for future devs that come across this code)

@binarylogic
Copy link
Contributor

Noting, #2625 will likely address and supersede this.

@lukesteensen
Copy link
Member

Closing as this is superseded by (and largely implemented as part of) our new VectorSink abstraction.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: sinks Anything related to the Vector's sinks domain: topology Anything related to Vector's topology code meta: feedback Anything related to customer/user feedback. meta: idea Anything in the idea phase. Needs further discussion and consensus before work can begin. type: tech debt A code change that does not add user value.
Projects
None yet
Development

No branches or pull requests

6 participants