Skip to content

Commit

Permalink
[1.26] Don't close ConnectionPools from PoolManager until unreferenced
Browse files Browse the repository at this point in the history
Co-authored-by: Seth Michael Larson <sethmichaellarson@gmail.com>
  • Loading branch information
thomasmaclean and sethmlarson authored Apr 23, 2023
1 parent 6b77c39 commit 710114d
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 35 deletions.
38 changes: 30 additions & 8 deletions src/urllib3/connectionpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@
from .util.url import _normalize_host as normalize_host
from .util.url import get_host, parse_url

try: # Platform-specific: Python 3
import weakref

weakref_finalize = weakref.finalize
except AttributeError: # Platform-specific: Python 2
from .packages.backports.weakref_finalize import weakref_finalize

xrange = six.moves.xrange

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -220,6 +227,16 @@ def __init__(
self.conn_kw["proxy"] = self.proxy
self.conn_kw["proxy_config"] = self.proxy_config

# Do not pass 'self' as callback to 'finalize'.
# Then the 'finalize' would keep an endless living (leak) to self.
# By just passing a reference to the pool allows the garbage collector
# to free self if nobody else has a reference to it.
pool = self.pool

# Close all the HTTPConnections in the pool before the
# HTTPConnectionPool object is garbage collected.
weakref_finalize(self, _close_pool_connections, pool)

def _new_conn(self):
"""
Return a fresh :class:`HTTPConnection`.
Expand Down Expand Up @@ -489,14 +506,8 @@ def close(self):
# Disable access to the pool
old_pool, self.pool = self.pool, None

try:
while True:
conn = old_pool.get(block=False)
if conn:
conn.close()

except queue.Empty:
pass # Done.
# Close all the HTTPConnections in the pool.
_close_pool_connections(old_pool)

def is_same_host(self, url):
"""
Expand Down Expand Up @@ -1108,3 +1119,14 @@ def _normalize_host(host, scheme):
if host.startswith("[") and host.endswith("]"):
host = host[1:-1]
return host


def _close_pool_connections(pool):
"""Drains a queue of connections and closes each one."""
try:
while True:
conn = pool.get(block=False)
if conn:
conn.close()
except queue.Empty:
pass # Done.
155 changes: 155 additions & 0 deletions src/urllib3/packages/backports/weakref_finalize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# -*- coding: utf-8 -*-
"""
backports.weakref_finalize
~~~~~~~~~~~~~~~~~~
Backports the Python 3 ``weakref.finalize`` method.
"""
from __future__ import absolute_import

import itertools
import sys
from weakref import ref

__all__ = ["weakref_finalize"]


class weakref_finalize(object):
"""Class for finalization of weakrefable objects
finalize(obj, func, *args, **kwargs) returns a callable finalizer
object which will be called when obj is garbage collected. The
first time the finalizer is called it evaluates func(*arg, **kwargs)
and returns the result. After this the finalizer is dead, and
calling it just returns None.
When the program exits any remaining finalizers for which the
atexit attribute is true will be run in reverse order of creation.
By default atexit is true.
"""

# Finalizer objects don't have any state of their own. They are
# just used as keys to lookup _Info objects in the registry. This
# ensures that they cannot be part of a ref-cycle.

__slots__ = ()
_registry = {}
_shutdown = False
_index_iter = itertools.count()
_dirty = False
_registered_with_atexit = False

class _Info(object):
__slots__ = ("weakref", "func", "args", "kwargs", "atexit", "index")

def __init__(self, obj, func, *args, **kwargs):
if not self._registered_with_atexit:
# We may register the exit function more than once because
# of a thread race, but that is harmless
import atexit

atexit.register(self._exitfunc)
weakref_finalize._registered_with_atexit = True
info = self._Info()
info.weakref = ref(obj, self)
info.func = func
info.args = args
info.kwargs = kwargs or None
info.atexit = True
info.index = next(self._index_iter)
self._registry[self] = info
weakref_finalize._dirty = True

def __call__(self, _=None):
"""If alive then mark as dead and return func(*args, **kwargs);
otherwise return None"""
info = self._registry.pop(self, None)
if info and not self._shutdown:
return info.func(*info.args, **(info.kwargs or {}))

def detach(self):
"""If alive then mark as dead and return (obj, func, args, kwargs);
otherwise return None"""
info = self._registry.get(self)
obj = info and info.weakref()
if obj is not None and self._registry.pop(self, None):
return (obj, info.func, info.args, info.kwargs or {})

def peek(self):
"""If alive then return (obj, func, args, kwargs);
otherwise return None"""
info = self._registry.get(self)
obj = info and info.weakref()
if obj is not None:
return (obj, info.func, info.args, info.kwargs or {})

@property
def alive(self):
"""Whether finalizer is alive"""
return self in self._registry

@property
def atexit(self):
"""Whether finalizer should be called at exit"""
info = self._registry.get(self)
return bool(info) and info.atexit

@atexit.setter
def atexit(self, value):
info = self._registry.get(self)
if info:
info.atexit = bool(value)

def __repr__(self):
info = self._registry.get(self)
obj = info and info.weakref()
if obj is None:
return "<%s object at %#x; dead>" % (type(self).__name__, id(self))
else:
return "<%s object at %#x; for %r at %#x>" % (
type(self).__name__,
id(self),
type(obj).__name__,
id(obj),
)

@classmethod
def _select_for_exit(cls):
# Return live finalizers marked for exit, oldest first
L = [(f, i) for (f, i) in cls._registry.items() if i.atexit]
L.sort(key=lambda item: item[1].index)
return [f for (f, i) in L]

@classmethod
def _exitfunc(cls):
# At shutdown invoke finalizers for which atexit is true.
# This is called once all other non-daemonic threads have been
# joined.
reenable_gc = False
try:
if cls._registry:
import gc

if gc.isenabled():
reenable_gc = True
gc.disable()
pending = None
while True:
if pending is None or weakref_finalize._dirty:
pending = cls._select_for_exit()
weakref_finalize._dirty = False
if not pending:
break
f = pending.pop()
try:
# gc is disabled, so (assuming no daemonic
# threads) the following is the only line in
# this function which might trigger creation
# of a new finalizer
f()
except Exception:
sys.excepthook(*sys.exc_info())
assert f not in cls._registry
finally:
# prevent any more finalizers from executing during shutdown
weakref_finalize._shutdown = True
if reenable_gc:
gc.enable()
2 changes: 1 addition & 1 deletion src/urllib3/poolmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class PoolManager(RequestMethods):
def __init__(self, num_pools=10, headers=None, **connection_pool_kw):
RequestMethods.__init__(self, headers)
self.connection_pool_kw = connection_pool_kw
self.pools = RecentlyUsedContainer(num_pools, dispose_func=lambda p: p.close())
self.pools = RecentlyUsedContainer(num_pools)

# Locally set the pool classes and keys so other PoolManagers can
# override them.
Expand Down
57 changes: 31 additions & 26 deletions test/test_poolmanager.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import gc
import socket
from test import resolvesLocalhostFQDN

import pytest
from mock import patch

from urllib3 import connection_from_url
from urllib3.exceptions import ClosedPoolError, LocationValueError
from urllib3.exceptions import LocationValueError
from urllib3.poolmanager import PoolKey, PoolManager, key_fn_by_scheme
from urllib3.util import retry, timeout

Expand Down Expand Up @@ -60,24 +61,12 @@ def test_many_urls(self):
def test_manager_clear(self):
p = PoolManager(5)

conn_pool = p.connection_from_url("http://google.com")
p.connection_from_url("http://google.com")
assert len(p.pools) == 1

conn = conn_pool._get_conn()

p.clear()
assert len(p.pools) == 0

with pytest.raises(ClosedPoolError):
conn_pool._get_conn()

conn_pool._put_conn(conn)

with pytest.raises(ClosedPoolError):
conn_pool._get_conn()

assert len(p.pools) == 0

@pytest.mark.parametrize("url", ["http://@", None])
def test_nohost(self, url):
p = PoolManager(5)
Expand All @@ -86,19 +75,8 @@ def test_nohost(self, url):

def test_contextmanager(self):
with PoolManager(1) as p:
conn_pool = p.connection_from_url("http://google.com")
p.connection_from_url("http://google.com")
assert len(p.pools) == 1
conn = conn_pool._get_conn()

assert len(p.pools) == 0

with pytest.raises(ClosedPoolError):
conn_pool._get_conn()

conn_pool._put_conn(conn)

with pytest.raises(ClosedPoolError):
conn_pool._get_conn()

assert len(p.pools) == 0

Expand Down Expand Up @@ -397,3 +375,30 @@ def test_e2e_connect_to_ipv6_scoped(self, create_connection, url):
conn.connect()

assert create_connection.call_args[0][0] == ("a::b%zone", 80)

def test_thread_safty(self):
pool_manager = PoolManager(num_pools=2)

# thread 1 gets a pool for host x
pool_1 = pool_manager.connection_from_url("http://host_x:80/")

# thread 2 gets a pool for host y
pool_2 = pool_manager.connection_from_url("http://host_y:80/")

# thread 3 gets a pool for host z
pool_3 = pool_manager.connection_from_url("http://host_z:80")

# None of the pools should be closed, since all of them are referenced.
assert pool_1.pool is not None
assert pool_2.pool is not None
assert pool_3.pool is not None

conn_queue = pool_1.pool
assert conn_queue.qsize() > 0

# thread 1 stops.
del pool_1
gc.collect()

# Connection should be closed, because reference to pool_1 is gone.
assert conn_queue.qsize() == 0

0 comments on commit 710114d

Please sign in to comment.