-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore(file sink): Upgrade file sink for tokio-compat #1988
Conversation
920a2d5
to
147b53c
Compare
Aaand just after I created a PR noticed a problem. The |
147b53c
to
88a14b6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This LOOOKS great! Left some comments inline, I'd like to see how we could clean up the giant select statement. I think we should try to use the tokio select macro.
mod bytes_path; | ||
use bytes_path::BytesPath; | ||
|
||
mod streaming_sink; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could probably place this stuff in sinks/mod.rs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we end up going with it - definitely! I think we can extract bytes_path
into global utils too. I'll do a final pass on the locations of things before the merge.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving it upper now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aborting, leaving it as-is for now, we'll correct it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking great, left some comments :)
@@ -0,0 +1,83 @@ | |||
use super::StreamingSink; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the reasoning for moving this out of being on the streamingsink type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rationale is this code is more of a compatibility layer to the pre-async topology. This is partially outlined in the comments to the units inside the mod already. The idea is we can support this pull-based operation mode as a first-class at the topology layer, and it then we will be able to simply remove this mod. The StreamingSink
type will remain though. For this reason, I think it's a good idea to separate them from the start by design.
If you are asking in regards to why use separate files - I just feel like it's cleaner to have multiple smaller files rather than a big one, especially if they have different concerns, like in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a couple comments about testing, happy to chat over slack about any deeper questions :)
src/expiring_hash_map/tests.rs
Outdated
rt.block_on_std(async { | ||
let mut map = ExpiringHashMap::<String, String>::new(); | ||
|
||
let a_minute_ago = Instant::now() - Duration::from_secs(60); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also be using the built in test-util functionality to test timers, refer to the tokio time tests where we pause the time and advance it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw those at the tokio
repo, but, unfortunately, everything is pub(create)
there. It would be nice to use that indeed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created tokio-rs/tokio#2313
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What API's from clock do you need?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally all of them. I haven't looked thoroughly, but I think I'll need a clock-based Instance
override there, and the calls to freeze time (pause
, advance
, etc).
My idea is that instead of practically waiting, we should freeze time, then shift time into the future. This, in turn, should trigger timer - and then we poll again, and the test should pass.
There should be a better way than doing delay
s in tests, cause it's a problematic design for a whole lot of reasons. I'm already used to time freezing in more mature test ecosystems of other languages, so it really catches my attention. 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking pretty good to me! Seems like we'll be blocked on adding a runtime to SinkContext
, but otherwise seems pretty close to mergeable?
We're also blocked by #1922. In addition to that, there's a API change to the |
@LucioFranco I've implemented the Please re-review! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still looking good to me.
src/expiring_hash_map/mod.rs
Outdated
/// # }); | ||
/// ``` | ||
pub async fn next_expired(&mut self) -> Option<Result<ExpiredItem<K, V>, Error>> { | ||
let key = self.expiration_queue.next().await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the error here just a timer error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. This is a little bit of a bummer to expose since it's something we'd usually unwrap, but I'm fine with either as long as we make the nature of the possible error clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right! I forgot to add the errors section 😦
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
huh this is odd I thought we removed errors for timers from tokio 0.2 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do timers just panic now?
It might make sense to return errors from timers IMO, but I'm not aware of how the development unfolds in tokio
. 😄 Either way - it doesn't make sense to go ahead of the tokio
changes. We'll just remove errors here if/when relevant tokio
update arrives.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking good, left some comments inline.
src/expiring_hash_map/mod.rs
Outdated
use tokio02::time::{delay_queue, DelayQueue, Error}; | ||
|
||
#[cfg(test)] | ||
mod tests; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not-minor nit: the file isn't big enough to split these can we just have the tests and the map in the same file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll change it since you asked. However, I personally think this style of keeping tests mod in an adjacent file is easier to maintain.
I don't think I should have a strong opinion on how to organize files yet, since I'm still relatively new in this project, and haven't touched/seen the majority of the code. But these minor details usually matter to me. On this particular topic, I'll just say I like how I did it more. 😄
src/expiring_hash_map/tests.rs
Outdated
use tokio02 as tokio; | ||
use tokio_test::{assert_pending, assert_ready, task}; | ||
|
||
fn unwrap_ready<T>(poll: Poll<T>) -> T { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like using https://docs.rs/tokio-test/0.2.0/tokio_test/macro.assert_ready_eq.html would work well here, happy to add more asserts to tokio-test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ready eq requires specifying a type for expected Ok
. I don't know how to work around that, but I'm happy to correct it if you show me the way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It shouldn't be working with result types? You can peek the internal impl here https://docs.rs/tokio-test/0.2.0/src/tokio_test/macros.rs.html#174 it just does basically what this does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do assert_eq!(unwrap_ready(fut.poll()).unwrap().unwrap().0, "val");
I can't find a way to do the same with assert_ready_eq
.
For instance:
assert_ready_eq!(fut.unwrap().unwrap().0, "val");
- down't work, cause the first argument has to be the future;assert_ready_ok!(fut.unwrap().0, "val");
- down't work either for the same reason;assert_ready_ok!(fut, Some(("val", ?));
- I'd love to write it in this form, but I don't know the value of theKey
here, so it doesn't work either.
I'd be happy to improve anything there. The solution I settled on works, but I'm not very pleased with it.
@@ -0,0 +1,66 @@ | |||
use super::StreamingSink; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense to just expose a struct that handles all of this instead of a bunch of functions for now. Lets try to keep it simple.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean? Something like struct Compat
? Sorry, I don't understand what you're proposing here.
09eea32
to
6cd58d6
Compare
2ea0709
to
e6428be
Compare
Great PR! Please pay attention to the following items before merging: Files matching
Files matching
This is an automatically generated QA checklist based on modified files |
This reverts commit dd5c922. Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
- switch StreamingSink to impl Stream - provides a significant boot to composability; - simpler compat layer - this compat approach should be much easier to understand, it consists of a set of simple and documented components that are composed together in a meaningful way; - switch tests to directly working with FileSink instead of futures compat layer - this build upon on StreamingSink taking impl Stream instead of mpsc::Receiver. Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
…expired Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
1a5b49d
to
d5b8f44
Compare
Merging this to unlock the hotfix for the Pending questions to be solved after the merge. |
This is the file sink upgraded for
tokio-compat
support. It's being developed atop of the #1922 and will be rebased tomaster
when then branch is merged. For now, the base of the PR is set to thetokio-compat
branch.It relies heavily on the solution proposed by @LucioFranco at #1945.
I tried to preserve the tests as much as possible.
This PR also introduces the
ExpiringHashMap
- a component that's designed to be reusable. I decided to introduce it because I saw this logic implemented multiple times across the codebase. This should make it easier to add expiration functionality to the existing code where we just useHashMap
s - for instance in places where we need per stream buffers, sockets, files and so on.To do:
ExpiringHashMap
(don't forget: chore(file sink): Upgrade file sink for tokio-compat #1988 (comment))ExpiringHashMap
interfaceExpiringHashMap
- explain howcontinuing
at theNone
case may result in a spin loop, and that to prevent this users MUST dois_empty
check