Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buffer reuse and race condition in client.IncomingContent.stream() #498

Open
ohait opened this issue May 29, 2024 · 1 comment
Open

Buffer reuse and race condition in client.IncomingContent.stream() #498

ohait opened this issue May 29, 2024 · 1 comment

Comments

@ohait
Copy link

ohait commented May 29, 2024

We've been debugging some issues using Blacksheep v1 where a reverse proxy was randomly returning a truncated response, and we narrowed it down to:
https://github.com/Neoteroi/BlackSheep/blob/main/blacksheep/client/connection.py#L69-L70

I believe this code need to be changed to something like:

            buf = bytes(self._body)
            self._body.clear()
            yield bytes(buf)

I don't have an easy way to reproduce, but I observed this:
task 1) data is coming in via extend_body() which notify self._chunk
task 2) awakes and read the body
task 2) it yields passing the body
task 2) whatever codes was waiting for yield awaits
task 1) detect more data and calls extend_body() again (with the notify)
task 2) yields "returns" and do a _body.clear() resetting the buffer and what was added to the buffer before
task 2) it loops back and don't wait on the event, since the previous notify
task 2) the body is cleared, so it breaks out assuming there is no more data (while in fact, there was more)

I also expect there could be other cases where the buffer changes while yield

HTH

@ohait
Copy link
Author

ohait commented May 29, 2024

there is also a race on the complete, which might be set AFTER the yield

monkey patching as follow solves all the problem:

from blacksheep.client.connection import IncomingContent

async def _blacksheep_client_incoming_content_stream(self):
    completed = False
    while not completed:
        await self._chunk.wait()
        self._chunk.clear()

        if not self._body:
            break

        buf = bytes(self._body)  # create a copy of the buffer
        self._body.clear()
        completed = self.complete.is_set()  # we must check for EOD before yielding, or it will race

        yield bytes(buf)  # use the copy

        if self._exc:
            raise self._exc

IncomingContent.stream = _blacksheep_client_incoming_content_stream

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant