Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: prefetch #125

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open

feat: prefetch #125

wants to merge 3 commits into from

Conversation

kavorite
Copy link

This request concerns a new pipeable operator: prefetch. Initially, it was just a utility I put together for a neural network data loader. However, it may also make a good PR.

The utility can help prevent pipeline starvation by eagerly fetching items before they're needed. I inserted mine before a bottleneck in my training loop (the forward/backward pass).

This addition ensures that my data loader is already preparing the next batch while the model is processing the current one. Thus, each batch starts before the previous one finishes training, preventing the system from starving my accelerators.

Copy link

codecov bot commented Dec 17, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 100.00%. Comparing base (8201a79) to head (26eeac1).

Additional details and impacted files
@@            Coverage Diff            @@
##              main      #125   +/-   ##
=========================================
  Coverage   100.00%   100.00%           
=========================================
  Files           15        15           
  Lines         1167      1190   +23     
=========================================
+ Hits          1167      1190   +23     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@vxgmichel
Copy link
Owner

Thanks for the PR @kavorite, and for adding tests 🙂

I had a similar idea at some point, so I'm in favor of adding it to the library.

I think prefetch is a decent name, I also had buffer in mind, what do you think?

I would also like to have the same behavior as a trio memory channel where the size can be 0. It could even be the default, as this corresponds to the behavior of a handshake between the producer and the consumer. This way, several pipe.buffer() can be inserted in a pipeline in order to split it into concurrent working units. What do you think?

I don't have much time to spend on aiostream at the moment, so sorry if my reaction time is a bit slow :)

@kavorite
Copy link
Author

kavorite commented Dec 30, 2024

helloo
I spent a lot of time last week tying up loose ends but now I have some time

What would be the desired behavior when buffer size = 0 exactly?

I tried my hand at writing one, but, this doesn't seem to be the right approach, because the task ids are different regardless of whether one prefetches or not, indicating some level of simultaneity consistent with handshaking in trio regardless (local branch implementing this uses the name buffer as an alias):

    async def bottleneck(x: int) -> int:
        nonlocal consumer_task_id
        consumer_task_id = id(asyncio.current_task())
        await asyncio.sleep(0.4)
        return x

    with assert_cleanup() as loop:
        # Without buffering (sequential):
        xs = stream.range(3) | pipe.map(loader, task_limit=1)
        ys = xs | pipe.map(bottleneck, task_limit=1)  # Process time
        await assert_run(ys, [0, 1, 2])
        assert loop.steps == pytest.approx([0.1, 0.4, 0.1, 0.4, 0.1, 0.4])
        assert producer_task_id == consumer_task_id

    with assert_cleanup() as loop:
        # With no buffering, we still expect sequential behavior, but for the producer and consumer to run in separate tasks
        xs = stream.range(3) | pipe.map(loader, task_limit=1)
        ys = xs | pipe.buffer() | pipe.map(bottleneck, task_limit=1)
        await assert_run(ys, [0, 1, 2])
        assert loop.steps == pytest.approx([0.1, 0.4, 0.1, 0.4, 0.1, 0.4])
        assert producer_task_id != consumer_task_id

the timing also appears to be consistent with simply not using the new functionality at all, I struggle to grasp the underlying motivation for this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants