Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request errors on overloaded systems may poison the connection pool #550

Closed
vlaci opened this issue Jun 3, 2022 · 20 comments
Closed

Request errors on overloaded systems may poison the connection pool #550

vlaci opened this issue Jun 3, 2022 · 20 comments

Comments

@vlaci
Copy link

vlaci commented Jun 3, 2022

It is somewhat related to #502 as we started first observing the below issue in httpcore 0.14.5. Upgrading to 0.14.7 seemingly fixed the issue but we found out that there is still some underlying race condition that can poison the pool, making it unable to serve requests ever again. The issue came up using httpx==0.22.0 httpcore==0.14.7, it is not present using httpx==0.16.1 httpcore==0.12.3

Here is a script brute forcing the client and reliably triggering the issue on my machine. (The script is written for the sync version, but we are having this issue in production with the async version as well).

Repro script

PARALLELISM and REQUESTS may need adjustments to reproduce the issue.

import multiprocessing.pool
import socket
import httpx

PARALLELISM = 20
REQUESTS = 200

unavailable_listener = socket.socket()
unavailable_listener.bind(("localhost", 0))
unavailable_listener.listen()

unavailable_url = "http://{}:{}".format(*unavailable_listener.getsockname())

client = httpx.Client(limits=httpx.Limits(max_connections=1, max_keepalive_connections=0))

pool = multiprocessing.pool.ThreadPool(PARALLELISM)

# client.get(...) works well at this point

# Raises httpcore.PoolTimeout as expected because the pool has been exhausted
pool.map(lambda _: client.get(unavailable_url), range(REQUESTS))

# client.get(...) will still raise PoolTimeout even though there supposed to be no outstanding connections

After running the above code, the pool is no longer operational, containing a stuck connection:

>>> client._transport._pool._pool
[<HTTPConnection [CONNECTING]>]
>>> client._transport._pool._pool[0]._connect_failed
False
>>> client._transport._pool._pool[0]._connection
>>> 
>>> client._transport._pool._pool[0].is_avaliable()
>>> False

My hunch is that RequestStatus.set_connection() will succeed from _attempt_to_acquire_connection but status.wait_for_connection() can still time-out on an overloaded system leaving the half-created connection there. Subsequent request attempts won't be able to use this connection instance as it isn't available, and it won't be cleaned up as it is not closed nor failed.

The interesting about this situation is that a just crafted connection can be put in the pool without guaranteeing that it will be used during the current requests.

@stale
Copy link

stale bot commented Dec 16, 2022

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the wontfix This will not be worked on label Dec 16, 2022
@K900
Copy link

K900 commented Dec 20, 2022

Not stale.

@stale stale bot removed the wontfix This will not be worked on label Dec 20, 2022
@nihilSup
Copy link

Facing this or very similar behaviour under some load in production microservice after upgrading httpx to (httpx ==0.23.0) and python to 3.10. After some time and several ConnectTimeout/ReadTimeout exactly all requests are finished with PoolTimeout. Downgrade to httpx ==0.18.1 solved the issue.

Could reproduce issue with pure httpcore (adapted @vlaci script, `httpcore==0.16.3`)
import asyncio
import traceback
from collections import Counter
from socket import socket

import httpcore

extensions = {
    "timeout": {
        "connect": .2,
        "read": .2,
        "pool": .2,
        "write": .2,
    },
    # "trace": log,
}


def report(pool):
    print("-- Report:", f"--- {pool.connections=}", f"--- {pool._requests=}", sep="\n")
    if pool.connections:
        conn = pool.connections[0]
        print(f"--- {conn=}")
        print(f"--- Connection has expired? - {conn.has_expired()}")
        print(f"--- Inner connection is {conn._connection}")


async def main():
    _socket = socket()
    _socket.bind(("localhost", 0))
    _socket.listen()

    async with httpcore.AsyncConnectionPool(
            max_connections=1,
            max_keepalive_connections=0,
    ) as pool:
        # Control
        await pool.request("GET", "https://en.wikipedia.org/wiki/2023")
        print("- Control is OK")
        report(pool)

        # Saturate pool
        num_tasks = 600
        url = "http://{}:{}".format(*_socket.getsockname())
        results = await asyncio.gather(
            *[
                asyncio.create_task(pool.request(
                    "GET",
                    url,
                    extensions=extensions,
                ))
                for _ in range(num_tasks)
            ],
            return_exceptions=True,
        )
        print(f"- Finished saturation\n-- ({num_tasks=}): {dict(Counter(type(res) for res in results))}")
        report(pool)

        # Control
        try:
            await pool.request("GET", "https://en.wikipedia.org/wiki/2023", extensions=extensions)  # PoolTimeout
            # await pool.request("GET", url, extensions=extensions)  Expected ReadTimeout, got PoolTimeout
        except httpcore.PoolTimeout:
            print("- ERROR: Got pool timeout")
            traceback.print_exc(chain=True)
        else:
            print("- No pool timeout!")
Some observations Every task is trying to acquire connection (`AsyncConnectionPool._attempt_to_acquire_connection` in `AsyncConnectionPool.handle_async_request`).

First one (lets name it t1) succeeded and passes through status.wait_for_connection in AsyncConnectionPool.handle_async_request while other tasks hang here.

Then t1 can create a real connection in connection.handle_async_request(request) but fails with ReadTimeout.

This leads to call self.response_closed(status) and real connection is closed and connection wrapper is removed from the pool, request status is removed from separate register either. After this new request status creates another connection wrapper with empty connection.
Looks like under normal conditions acquiring of new connection wrapper triggers status.wait_for_connection in some task and then new real connection can be established so on and so forth. But under some load asyncio event loop is not fast enough to change coroutines and happens PoolTimeout. So no one can establish real connection (because all tasks has been finished with PoolTimeout and removed) and there is connection wrapper inside pool without real connection.

Now two things can happen.

First one is pool receives new request to some different url. It tries to handle it but hangs until PoolTimeout on status.wait_for_connection because pool can't create new connection wrapper and old one neither can process request nor can be removed (connection wrapper has no real connection so it is not idle or expired, it is "CONNECTING").

Second is pool receives new request to same url as it is in connection wrapper inside pool ("CONNECTING" one). In this case request can't be processed because connection wrapper can handle request but wrapper is not available at the same time (failed is False and connection is None and httpx2 property is False either)

In short looks like there is case when under some load connection wrapper in pool can be in CONNECTING state and can't obtain actual connection. You can increase PoolTimeout so it is bigger than other timeouts. Even a slight excess greatly increases performance, but pool poisoning is still possible.

I see two approaches:

  • Introduce CONNECTING expiration to trigger is_expired cleanup
  • Kill one CONNECTING connection if no request statuses needs it

I tried second approach and it worked well - no pool timeouts and "control" requests are successfull:

async def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool:
#...
# Attempt to close CONNECTING connections that noone needs
        if self.is_full_pool:
            for idx, connection in enumerate(self._pool):  # Try to check old connections first
                if not connection.is_connecting():
                    continue
                for req_status in self._requests:
                    if req_status is status:  # skip current request
                        continue
                    cur_origin = req_status.request.url.origin
                    if connection.can_handle_request(cur_origin):
                        break
                else:  # There is no requests that can be handled by this connection
                    await connection.aclose()
                    self._pool.pop(idx)
                    # print(f"XXX {id(status)} Zombie killed!")
                    break

@c25vdw
Copy link

c25vdw commented Jan 13, 2023

I have recently stumbled on this problem and I think it is a racing condition around the cleanup logic during exceptions and the _pool_lock that caused the connection pool to have zombie "CONNECTING" wrappers. As a result, the connection pool will grow unbounded with stale, unintialized connections.

The situation happens when a pending request timed out (PoolTimeout) immediately after an active request timed out (ReadTimeout or any other exceptions from the connection.handle_async_request)

Explanation

imagine an active connection A timed out waiting for server response (ReadTimeout), while at the same time (immediately after), the next pending request B timed out waiting for pool (PoolTimeout). a walk through of the events:

  • A timed out reading response from server. Raises ReadTimeout at
    try:
    response = await connection.handle_async_request(request)
  • B timed out waiting to enter the pool. Raises PoolTimeout at
    try:
    connection = await status.wait_for_connection(timeout=timeout)
    except BaseException as exc:
    # If we timeout here, or if the task is cancelled, then make
    # sure to remove the request from the queue before bubbling
    # up the exception.
    async with self._pool_lock:
    self._requests.remove(status)
    raise exc
  • In A's exception handling, A obtained _pool_lock and calls response_closed()
    • remove closed connection (expected)
    • acquire pending requests and assign a connection to it. In this case it grabbed B and created a connection for B in pool. (expected in happy path and in most situations)
    • this in turn calls B.set_connection()
    • Notice that connection pool now contains a unintialized connection prepared for B!
  • in B's exception handling, B waited for the lock and now finally obtained _pool_lock. It removes itself from pool._requests and raised the PoolTimeout and break early.

Now, since B raised exception and no longer proceed, the connection in the pool was never initialized or deleted. Its _connect_failed will always be False, and the underlying http1.1 connection will always be None. Mostly fatally, a spot in the connection pool is forever taken and there is no way to clean it up.

Potential Solution

The root cause of the issue is that the connection handling logic of B is skipped due to PoolTimeout. But in this case, the PoolTimeout is almost meaningless: it was a few milliseconds away, if not less, for request B to make it in time. With that said, I suggest that B continues to execute if it realizes that a connection is assigned in its PoolTimeout exception handling.

That is, for line 232-234 in async/connection_pool.py, replace with:

                async with self._pool_lock:
                    if status.connection is not None:
                        connection = status.connection
                        # continue handling the connection
                    else:
                        self._requests.remove(status)
                        raise exc

I have personally tested this in my local setup, and it gave the expected improvement. However, I am not totally familiar with the library and whether this will affect anything unrelated, hopefully not. Anyways, I hope this can be considered or at least help to clear up the mist.

Thank you for creating and maintaining this library!

@K900
Copy link

K900 commented Jan 13, 2023

@tomchristie @florimondmanca apologies for the ping, but can you take a look at the comments here? It seems like an important issue.

@nihilSup
Copy link

That is, for line 232-234 in async/connection_pool.py

Few thoughts. I think it is safer to handle this way in a separate "except branch" for PoolTimeout only. Also reusing connection after general PoolTimeout exception maybe unsafe and unexpected. Tested your approach locally and it worked, I like it, it's quite simple

@cdeler
Copy link
Member

cdeler commented Jan 14, 2023

Let me check it. I'll try to reproduce it locally first

@tomchristie
Copy link
Member

Can confirm that I've also reproduced the issue.

I used adapted @nihilSup's example slightly, using `trio`...
import trio
import traceback
from collections import Counter
from socket import socket

import httpcore

extensions = {
    "timeout": {
        "connect": .2,
        "read": .2,
        "pool": .2,
        "write": .2,
    },
    # "trace": log,
}


def report(pool):
    print("-- Report:", f"--- {pool.connections=}", f"--- {pool._requests=}", sep="\n")
    if pool.connections:
        conn = pool.connections[0]
        print(f"--- {conn=}")
        print(f"--- Connection has expired? - {conn.has_expired()}")
        print(f"--- Inner connection is {conn._connection}")


async def make_request(pool, url, extensions):
    try:
        r = await pool.request(
            "GET",
            url,
            extensions=extensions,
        )
    except httpcore.TimeoutException as exc:
        print(type(exc))
    else:
        print(r)


async def main():
    _socket = socket()
    _socket.bind(("localhost", 0))
    _socket.listen()

    async with httpcore.AsyncConnectionPool(
            max_connections=1,
            max_keepalive_connections=0,
    ) as pool:
        # Control
        await pool.request("GET", "https://en.wikipedia.org/wiki/2023")
        print("- Control is OK")
        report(pool)

        # Saturate pool
        num_tasks = 300
        url = "http://{}:{}".format(*_socket.getsockname())
        async with trio.open_nursery() as nursery:
            for _ in range(num_tasks):
                nursery.start_soon(make_request, pool, url, extensions)

        print(f"- Finished saturation\n")
        report(pool)

        # Control
        try:
            await pool.request("GET", "https://en.wikipedia.org/wiki/2023", extensions=extensions)  # PoolTimeout
        except httpcore.PoolTimeout:
            print("- ERROR: Got pool timeout")
            traceback.print_exc(chain=True)
        else:
            print("- No pool timeout!")


trio.run(main)

I was able to confirm the ReadError/PoolTimeout combination caused the error, and that it resulted in a state where we're handling the PoolTimeout, but the connection has been assigned to the waiting request.

I was also able to confirm the suggested fix, although I'm not yet entirely comfortable with it. It seems a little brittle, and I'd prefer to understand first if we're able to avoid this state.

I wonder if we're able to reliably reproduce this state, using a mocked out network backend similar to some of the cases in tests/_async/test_connection_pool.py?

@nihilSup
Copy link

nihilSup commented Jan 16, 2023

I wonder if we're able to reliably reproduce this state, using a mocked out network backend similar to some of the cases in tests/_async/test_connection_pool.py?

@tomchristie On weekend I prepared draft PR with tests both for sync and async implementations.

httpcore/backends/mock.py
class HangingStream(MockStream):
    def read(self, max_bytes: int, timeout: Optional[float] = None) -> bytes:
        if self._closed:
            raise ReadError("Connection closed")
        time.sleep(timeout or 0.1)
        raise ReadTimeout


class MockBackend(NetworkBackend):
    def __init__(
            self,
            buffer: typing.List[bytes],
            http2: bool = False,
            resp_stream_cls: Optional[Type[NetworkStream]] = None,
    ) -> None:
        self._buffer = buffer
        self._http2 = http2
        self._resp_stream_cls: Type[MockStream] = resp_stream_cls or MockStream

    def connect_tcp(
        self,
        host: str,
        port: int,
        timeout: Optional[float] = None,
        local_address: Optional[str] = None,
    ) -> NetworkStream:
        return self._resp_stream_cls(list(self._buffer), http2=self._http2)

    def connect_unix_socket(
        self, path: str, timeout: Optional[float] = None
    ) -> NetworkStream:
        return self._resp_stream_cls(list(self._buffer), http2=self._http2)

    def sleep(self, seconds: float) -> None:
        pass


class AsyncHangingStream(AsyncMockStream):
    async def read(self, max_bytes: int, timeout: Optional[float] = None) -> bytes:
        if self._closed:
            raise ReadError("Connection closed")
        await anyio.sleep(timeout or 0.1)
        raise ReadTimeout


class AsyncMockBackend(AsyncNetworkBackend):
    def __init__(
            self,
            buffer: typing.List[bytes],
            http2: bool = False,
            resp_stream_cls: Optional[Type[AsyncNetworkStream]] = None,
    ) -> None:
        self._buffer = buffer
        self._http2 = http2
        self._resp_stream_cls: Type[AsyncMockStream] = resp_stream_cls or AsyncMockStream

    async def connect_tcp(
        self,
        host: str,
        port: int,
        timeout: Optional[float] = None,
        local_address: Optional[str] = None,
    ) -> AsyncNetworkStream:
        return self._resp_stream_cls(list(self._buffer), http2=self._http2)

    async def connect_unix_socket(
        self, path: str, timeout: Optional[float] = None
    ) -> AsyncNetworkStream:
        return self._resp_stream_cls(list(self._buffer), http2=self._http2)

    async def sleep(self, seconds: float) -> None:
        pass
tests/_async/test_connection_pool.py
@pytest.mark.trio
async def test_pool_under_load():
    """
    Pool must remain operational after some peak load.
    """
    network_backend = AsyncMockBackend([], resp_stream_cls=AsyncHangingStream)

    async def fetch(_pool: AsyncConnectionPool, *exceptions: Type[BaseException]):
        with contextlib.suppress(*exceptions):
            async with pool.stream(
                    "GET",
                    "http://a.com/",
                    extensions={
                        "timeout": {
                            "connect": 0.1,
                            "read": 0.1,
                            "pool": 0.1,
                            "write": 0.1,
                        },
                    },
            ) as response:
                await response.aread()

    async with AsyncConnectionPool(
        max_connections=1, network_backend=network_backend
    ) as pool:
        async with concurrency.open_nursery() as nursery:
            for _ in range(300):
                # Sending many requests to the same url. All of them but one will have PoolTimeout. One will
                # be finished with ReadTimeout
                nursery.start_soon(fetch, pool, PoolTimeout, ReadTimeout)
        if pool.connections:  # There is one connection in pool in "CONNECTING" state
            assert pool.connections[0].is_connecting()
        with pytest.raises(ReadTimeout):  # ReadTimeout indicates that connection could be retrieved
            await fetch(pool)
tests/_sync/test_connection_pool.py
def test_pool_under_load():
    """
    Pool must remain operational after some peak load.
    """
    network_backend = MockBackend([], resp_stream_cls=HangingStream)

    def fetch(_pool: ConnectionPool, *exceptions: Type[BaseException]):
        with contextlib.suppress(*exceptions):
            with pool.stream(
                    "GET",
                    "http://a.com/",
                    extensions={
                        "timeout": {
                            "connect": 0.1,
                            "read": 0.1,
                            "pool": 0.1,
                            "write": 0.1,
                        },
                    },
            ) as response:
                response.read()

    with ConnectionPool(
        max_connections=1, network_backend=network_backend
    ) as pool:
        with concurrency.open_nursery() as nursery:
            for _ in range(300):
                # Sending many requests to the same url. All of them but one will have PoolTimeout. One will
                # be finished with ReadTimeout
                nursery.start_soon(fetch, pool, PoolTimeout, ReadTimeout)
        if pool.connections:  # There is one connection in pool in "CONNECTING" state
            assert pool.connections[0].is_connecting()
        with pytest.raises(ReadTimeout):  # ReadTimeout indicates that connection could be retrieved
            fetch(pool)

assert pool.connections[0].is_connecting() can be skipped because it contains my helper method

@c25vdw
Copy link

c25vdw commented Jan 16, 2023

An observation after I played with your test code locally @nihilSup

We can reproduce this zombie situation without ReadTimeout, and even, without the hanging backend:

under tests/_async/test_connection_pool.py

@pytest.mark.trio
async def test_pool_timeout_connection_cleanup():
    network_backend = AsyncMockBackend(
        [
            b"HTTP/1.1 200 OK\r\n",
            b"Content-Type: plain/text\r\n",
            b"Content-Length: 13\r\n",
            b"\r\n",
            b"Hello, world!",
        ]
    )

    async with AsyncConnectionPool(
        network_backend=network_backend, max_connections=2
    ) as pool:
        with pytest.raises(PoolTimeout):
            extensions = {"timeout": {"pool": 0}}
            await pool.request("GET", "https://example.com/", extensions=extensions)

        # wait for a considerable amount of time to make sure all requests time out
        await concurrency.sleep(1)

        print(pool.connections) # [<AsyncHTTPConnection [CONNECTING]>]
        print(pool._requests) # [] bad! both should be empty

I think the reason why this achieves the same outcome is because as we set timeout to 0, status.wait_for_connection(timeout=0) will immediately raise PoolTimeout, but before that it was assigned a connection in

await self._attempt_to_acquire_connection(status)

This could be a more minimal reproduction of the scenario. But the difference is that the connection assignment in real situation happened in another request by the ReadTimeout.

@nihilSup
Copy link

Good catch, I think this fact shows clearly "missed state" in connection's "state machine". As for unit tests I think we want to simulate some situation, e.g peak load with errors and pool saturation and test that pool can be operational after this, so maybe my test is a little bit more "unbiased"

@nihilSup
Copy link

Played with a more minimal reproduction by @c25vdw. I think we need it also as separate unit-test to check corner case of 0 PoolTimeout. Also another possible zombie cleanup:

except BaseException as exc:
# If we timeout here, or if the task is cancelled, then make
# sure to remove the request from the queue before bubbling
# up the exception.
async with self._pool_lock:
self._requests.remove(status)
raise exc

Replace with this
except BaseException as exc:
    # If we timeout here, or if the task is cancelled, then make
    # sure to remove the request from the queue before bubbling
    # up the exception.
    async with self._pool_lock:
        self._requests.remove(status)  # Remove as usual
        if (
                status.connection and status.connection.is_connecting()
                and not any(req.connection is status.connection for req in self._requests)
        ):
            # Zombie cleanup
            for idx, conn in enumerate(self._pool):
                if conn is status.connection:
                    self._pool.pop(idx)
                    break
        raise exc

This cleanup also based on the same assumption that if we have CONNECTING connection and there is no requests which need it we can safely remove it from pool.

nihilSup pushed a commit to nihilSup/httpcore that referenced this issue Jan 29, 2023
nihilSup pushed a commit to nihilSup/httpcore that referenced this issue Jan 29, 2023
nihilSup pushed a commit to nihilSup/httpcore that referenced this issue Jan 29, 2023
nihilSup pushed a commit to nihilSup/httpcore that referenced this issue Jan 29, 2023
nihilSup pushed a commit to nihilSup/httpcore that referenced this issue Jan 29, 2023
nihilSup pushed a commit to nihilSup/httpcore that referenced this issue Jan 29, 2023
nihilSup pushed a commit to nihilSup/httpcore that referenced this issue Jan 29, 2023
nihilSup pushed a commit to nihilSup/httpcore that referenced this issue Jan 29, 2023
nihilSup pushed a commit to nihilSup/httpcore that referenced this issue Jan 29, 2023
nihilSup pushed a commit to nihilSup/httpcore that referenced this issue Jan 29, 2023
nihilSup pushed a commit to nihilSup/httpcore that referenced this issue Jan 29, 2023
nihilSup pushed a commit to nihilSup/httpcore that referenced this issue Jan 29, 2023
nihilSup pushed a commit to nihilSup/httpcore that referenced this issue Jan 29, 2023
nihilSup pushed a commit to nihilSup/httpcore that referenced this issue Jan 29, 2023
nihilSup pushed a commit to nihilSup/httpcore that referenced this issue Jan 29, 2023
nihilSup pushed a commit to nihilSup/httpcore that referenced this issue Jan 29, 2023
nihilSup pushed a commit to nihilSup/httpcore that referenced this issue Jan 29, 2023
nihilSup pushed a commit to nihilSup/httpcore that referenced this issue Jan 29, 2023
nihilSup pushed a commit to nihilSup/httpcore that referenced this issue Jan 29, 2023
@nihilSup
Copy link

Sorry for this mess, didn't expect this, passing all the linters in ci took a very long time, so I had to convert the PR to a draft beforehand.

@camlee
Copy link

camlee commented Mar 7, 2023

I think that I'm running into this issue as well.

@nihilSup : it looks like you've got all the linting issues sorted out now, right? What's left to move this fix forward and is there anything someone else can do to help? I'm looking at the code but am a little out of my depth here.

@nihilSup
Copy link

@camlee yes

@JacobHenner
Copy link

@nihilSup : it looks like you've got all the linting issues sorted out now, right? What's left to move this fix forward and is there anything someone else can do to help? I'm looking at the code but am a little out of my depth here.

Indeed - I've run into this issue in two of my prod codebases and would like to fix ASAP - please let me know what I can do to help.

@tomchristie
Copy link
Member

I'm going to take a punt and work on the assumption that #688 has closed this for us.
If that's not the case then we can reopen and reassess this.

@konstantin-baidin-y42
Copy link

I experimented with versions 0.16.3, 0.17.1 and 0.18.0.

Having some long-responding server on localhost:5522 the following code fails with PoolTImeout. You can get and run such a long-responding server from this comment: encode/httpx#1171 (comment)

I expect that the following code will not raise PoolTimeout, especially when I canceled all the tasks and waited for an additional 5 seconds for cancelation to complete.

import asyncio
import httpcore


async def main() -> None:
    async with httpcore.AsyncConnectionPool(max_connections=3) as pool:
        async def do_one_request():
            return await pool.request("GET", "http://localhost:5522/", extensions={"timeout": {"pool": 1}})

        # First, create many requests, then cancel while they are in progress.
        tasks = []
        for i in range(5):
            tasks.append(asyncio.create_task(do_one_request()))
            await asyncio.sleep(0.0001)
            tasks[-1].cancel()

        print("Wait reasonable amount of time")
        await asyncio.sleep(5)
        print("Starting another request will now fail with a `PoolTimeout`")
        await do_one_request()


asyncio.run(main())

My case is probably related to the issue discussed here.

@jhaenchen
Copy link

I'm still seeing this on httpcore 1.0.5. I'm using the repro from the very first post on this issue:

import multiprocessing.pool
import socket
import httpx

PARALLELISM = 20
REQUESTS = 200

unavailable_listener = socket.socket()
unavailable_listener.bind(("localhost", 0))
unavailable_listener.listen()

unavailable_url = "http://{}:{}".format(*unavailable_listener.getsockname())

client = httpx.Client(limits=httpx.Limits(max_connections=1, max_keepalive_connections=0))

pool = multiprocessing.pool.ThreadPool(PARALLELISM)

pool.map(lambda _: client.get(unavailable_url), range(REQUESTS))

client.get(unavailable_url)

>httpx.PoolTimeout # over and over and over again as many times as you try

@tomchristie
Copy link
Member

Okay, looks like you've got a nice simple reproducible case there, shall we work through that and then consider re-opening?

Next step... reduce the case above to just use the httpcore API, rather than through httpx.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

10 participants