Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new future::FutureGroup type for dynamic async concurrency #147

Closed
wants to merge 12 commits into from

Conversation

yoshuawuyts
Copy link
Owner

@yoshuawuyts yoshuawuyts commented Jul 17, 2023

This is similar to FuturesUnordered from the futures library, or JoinSet from tokio.

Example

let mut set = FutureGroup::new();
group.insert(future::ready(2));
group.insert(future::ready(4));

let mut out = 0;
while let Some(num) = group.next().await {
    out += num;
}
assert_eq!(out, 6);
assert_eq!(group.len(), 0);
assert!(set.is_empty());

Tasks

  • Initial implementation
  • Make use of an internal WakerSet to improve performance
  • Figure out a solution to our lending iterator problem

@yoshuawuyts
Copy link
Owner Author

Actually, now that I think of it: we probably should remove the Handle in favor of using a channel.

@yoshuawuyts
Copy link
Owner Author

Okay, done - removed the handle! There is still a real question about how to weave through the handle pattern though. I briefly tried this test case, but it doesn't work out because of the lifetimes:

    #[test]
    fn concurrent_channel() {
        enum Message<T> {
            Future(Pin<Box<dyn Future<Output = T> + 'static>>),
            Output(T),
        }
        futures_lite::future::block_on(async {
            let mut set = FutureSet::new();

            let (sender, receiver) = async_channel::bounded(1);
            sender.try_send(async { 2 + 2 }).unwrap();
            let receiver = receiver.map(|t| Message::Future(Box::pin(t)));

            set.insert(async { Message::Output(1 + 1) });
            let mut stream = (set, receiver).merge();

            let mut out = 0;
            while let Some(msg) = stream.next().await {
                match msg {
                    Message::Future(fut) => set.insert(async move { // `set` has been moved
                        let output = fut.await;
                        Message::Output(output)
                    }),
                    Message::Output(num) => out += num,
                }
            }

            assert_eq!(out, 6);
        });
    }

I'm pretty certain there is some formulation of LendingAsyncIterator which will allow us to pull through the underlying sources on each iteration so they can be modified from inside the loop. I wrote an example of this on nightly a few weeks ago, which is currently broken again. With that we could do something like this:

let mut iter = vec![0; 1]).into_async_iter().lend_mut();

let mut appended = false;
while let Some((item, vec)) = iter.next().await {
    if !appended {
        vec.push(item + 1);
        appended = true;
    }
    println!("{item}");
}

We should explore some options with this:

  • Add a StreamSet type which has both iter and lending_iter sets of methods - the latter of which borrows the underlying iterators and returns them. This would be a new trait, which only operates on Unpin async iterators.
  • Do what we did in the playground and add an AsyncIterator::lend method which derefs the underlying stream into its components.
    • We should combine this with impl Deref{,Mut} for TupleMerge where A..: Unpin {} or whatever. So if you .merge N streams, you can .lend them to extract the individual underlying streams.

@yoshuawuyts yoshuawuyts mentioned this pull request Jul 17, 2023
@yoshuawuyts yoshuawuyts marked this pull request as draft July 17, 2023 15:29
@matheus-consoli
Copy link
Collaborator

@yoshuawuyts, am I missing something, or does adding a 'static bound to the generic AsyncIterator fix the problem on your playground example?

- impl<I: AsyncIterator          > AsyncLendingIterator for LendMut<I>
+ impl<I: AsyncIterator + 'static> AsyncLendingIterator for LendMut<I>

- impl<I: AsyncIterator          > AsyncLendingIterator for Lend<I>
+ impl<I: AsyncIterator + 'static> AsyncLendingIterator for Lend<I>

@yoshuawuyts
Copy link
Owner Author

@matheus-consoli we explicitly want the lifetime to be non-static. I filed rust-lang/rust#113796 which has just been fixed, so this should work again on the next nightly.

@yoshuawuyts
Copy link
Owner Author

yoshuawuyts commented Aug 4, 2023

Okay, updated it! - This now seems to work as expected, and I'm pretty happy with it. Oh CI is probably failing because this requires nightly features. If we want to move forward with this PR we should apply the proper cfgs for this to work.

Examples

Here are two examples of this in action:

//! A basic example showing how to add items to the set

let mut set = FutureSet::new();
set.insert(async { 1 + 1 });
set.insert(async { 2 + 2 });

let mut out = 0;
while let Some((num, _set)) = set.next().await {
    out += num;
}
assert_eq!(out, 6);
assert_eq!(set.len(), 0);
assert!(set.is_empty());
//! A more complex example, showing how to concurrently await both a channel of futures, as well as the futures received on that channel.
let mut set = FutureSet::new();

let (sender, receiver) = async_channel::bounded(1);
sender.try_send(async { 2 + 2 }).unwrap();

set.insert(async { Message::Output(1 + 1) });
set.insert(async move {
    let fut = receiver.recv().await.unwrap();
    Message::Future(Box::pin(fut))
});

let mut out = 0;
while let Some((msg, set)) = set.next().await {
    match msg {
        Message::Future(fut) => set.insert(async move {
            let output = fut.await;
            Message::Output(output)
        }),
        Message::Output(num) => out += num,
    }
}

assert_eq!(out, 6);

Further Improvements

It seems though that directly implementing LendingIterator for FutureSet is a bit clumsy, and instead we'd indeed be better off implementing the generalized .lend method. Like, we're basically hard-coding that now, which we means we're taking an ergonomics penalty every time someone doesn't want to update from the loop. With that we could rewrite the core loop of the first example to not require the inert _set item:

let mut set = FutureSet::new();
set.insert(async { 1 + 1 });
set.insert(async { 2 + 2 });

let mut out = 0;
while let Some(num) = set.next().await {
    out += num;
}

Thoughts on StreamSet

Futures are easily representable as a set because they're short-lived. By the time you get the output the future is invalidated. However async iterators are different - they're long-lived and we need to be able to manage them more directly. Tokio had the good insight that that should probably make use of a map-shape so you can key into it and drop items. However, choosing your own keys can be a bit annoying, so we may want to take care of that for you (as is done in e.g. the slab crate).

@yoshuawuyts
Copy link
Owner Author

async-iterator@2.2.0 now has support for the lend and lend_mut methods which will allow us to simplify some of this.

@yoshuawuyts
Copy link
Owner Author

Updated the implementation to take homogenous future types only. This saves intermediate allocations, and allows users to choose their own strategy for unifying types.

It's not without tradeoffs, but those tradeoffs seem inherent to Rust's handling of heterogenous types, and are shared by e.g. unifying the output types of Stream::merge as well.

@yoshuawuyts yoshuawuyts changed the title Add a new future::FutureSet type for dynamic async concurrency Add a new future::FutureGroup type for dynamic async concurrency Aug 10, 2023
@yoshuawuyts
Copy link
Owner Author

We should reimplement this on top of #149. Either by copying the impl (which should be simple enough), or by using it internally and wrapping types in stream::once. A lot of the hard work has gone into that PR, and we just have to adapt it for our uses.

@yoshuawuyts
Copy link
Owner Author

Closing in favor of #150

@yoshuawuyts yoshuawuyts deleted the futureset branch August 13, 2023 00:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants