Skip to content

Commit

Permalink
Rewrite connection pool exception handling
Browse files Browse the repository at this point in the history
The connection pool now uses a much simpler try/except/finally block in
the context manager function to detect network/Thrift errors. The pool
will now only refreshes connections when Thrift or socket errors occur,
and will not react to unrelated application errors anymore.

With this approach, the _ClientProxy hack is not needed anymore, so it
has been completely eliminated.

See issue #25.
  • Loading branch information
wbolster committed Jun 4, 2013
1 parent 48b33d7 commit 615e2eb
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 86 deletions.
117 changes: 35 additions & 82 deletions happybase/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
import contextlib
import logging
import Queue
import socket
import threading

from thrift.Thrift import TException

from .connection import Connection

logger = logging.getLogger(__name__)
Expand All @@ -19,44 +22,6 @@
#


class _ClientProxy(object):
"""
Proxy class to silently notice Thrift client exceptions.
This class proxies all requests a Connection makes to the underlying
Thrift client, and sets a flag when the client raised an exception,
e.g. socket errors or Thrift protocol errors.
The connection pool replaces tainted connections with fresh ones.
"""
def __init__(self, connection, client):
self.connection = connection
self.client = client
self._cache = {}

def __getattr__(self, name):
"""
Hook into attribute lookup and return wrapped methods.
Since the client is only used for method calls, just treat
every attribute access as a method to wrap.
"""
wrapped = self._cache.get(name)

if wrapped is None:
def wrapped(*args, **kwargs):
method = getattr(self.client, name)
assert callable(method)
try:
return method(*args, **kwargs)
except:
self.connection._tainted = True
raise
self._cache[name] = wrapped

return wrapped


class NoConnectionsAvailable(RuntimeError):
"""
Exception raised when no connections are available.
Expand Down Expand Up @@ -99,11 +64,11 @@ def __init__(self, size, **kwargs):
self._queue = Queue.LifoQueue(maxsize=size)
self._thread_connections = threading.local()

self._connection_kwargs = kwargs
self._connection_kwargs['autoconnect'] = False
connection_kwargs = kwargs
connection_kwargs['autoconnect'] = False

for i in xrange(size):
connection = self._create_connection()
connection = Connection(**connection_kwargs)
self._queue.put(connection)

# The first connection is made immediately so that trivial
Expand All @@ -112,12 +77,6 @@ def __init__(self, size, **kwargs):
with self.connection():
pass

def _create_connection(self):
"""Create a new connection with monkey-patched Thrift client."""
connection = Connection(**self._connection_kwargs)
connection.client = _ClientProxy(connection, connection.client)
return connection

def _acquire_connection(self, timeout=None):
"""Acquire a connection from the pool."""
try:
Expand Down Expand Up @@ -152,27 +111,23 @@ def connection(self, timeout=None):
:rtype: :py:class:`happybase.Connection`
"""

# If this thread already holds a connection, just return it.
# This is the short path for nested calls from the same thread.
connection = getattr(self._thread_connections, 'current', None)
if connection is not None:
yield connection
return

# If this point is reached, this is the outermost connection
# requests for a thread. Obtain a new connection from the pool
# and keep a reference in a thread local so that nested
# connection requests from the same thread can return the same
# connection instance.
#
# Note: this code acquires a lock before assigning to the
# thread local; see
# http://emptysquare.net/blog/another-thing-about-pythons-
# threadlocals/

connection = self._acquire_connection(timeout)
with self._lock:
self._thread_connections.current = connection

return_after_use = False
if connection is None:
# This is the outermost connection requests for this thread.
# Obtain a new connection from the pool and keep a reference
# in a thread local so that nested connection requests from
# the same thread can return the same connection instance.
#
# Note: this code acquires a lock before assigning to the
# thread local; see
# http://emptysquare.net/blog/another-thing-about-pythons-
# threadlocals/
return_after_use = True
connection = self._acquire_connection(timeout)
with self._lock:
self._thread_connections.current = connection

try:
# Open connection, because connections are opened lazily.
Expand All @@ -182,23 +137,21 @@ def connection(self, timeout=None):
# Return value from the context manager's __enter__()
yield connection

finally:

# Remove thread local reference, since the thread no longer
# owns it.
del self._thread_connections.current

except (TException, socket.error):
# Refresh the underlying Thrift client if an exception
# occurred in the Thrift layer, since we don't know whether
# the connection is still usable.
if getattr(connection, '_tainted', False):
logger.info("Replacing tainted pool connection")

try:
connection.close()
except:
pass
logger.info("Replacing tainted pool connection")
connection._refresh_thrift_client()
connection.open()

connection = self._create_connection()
# Reraise to caller; see contextlib.contextmanager() docs
raise

This comment has been minimized.

Copy link
@rogerhu

rogerhu Jun 5, 2013

Contributor

If we're raising back to the caller, this means we're not continuing with the rest of the with statement? Wondering whether things like socket.timeout's should be explicitly caught and returned back to the pool without reraising...

This comment has been minimized.

Copy link
@rogerhu

rogerhu Jun 5, 2013

Contributor

Also I'm wondering if the connection pool can be depleted and not added back into the pool if all connections go bad when hitting these exceptions...still checking..


self._return_connection(connection)
finally:
# Remove thread local reference after the outermost 'with'
# block ends. Afterwards the thread no longer owns the
# connection.
if return_after_use:
del self._thread_connections.current
self._return_connection(connection)
19 changes: 15 additions & 4 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,8 @@ def test_connection_pool_construction():

def test_connection_pool():

from thrift.transport.TTransport import TTransportException

def run():
name = threading.current_thread().name
print "Thread %s starting" % name
Expand All @@ -464,15 +466,24 @@ def inner_function():
with pool.connection() as another_connection:
assert connection is another_connection

# Fake an exception once in a while
if random.random() < .001:
connection.transport.close()
raise TTransportException("Fake transport exception")

for i in xrange(100):
with pool.connection() as connection:
connection.tables()

# Fake an exception once in a while
if random.random() < .001:
connection._tainted = True
try:
inner_function()
except TTransportException:
# This error should have been picked up by the
# connection pool, and the connection should have
# been replaced by a fresh one
pass

inner_function()
connection.tables()

print "Thread %s done" % name

Expand Down

0 comments on commit 615e2eb

Please sign in to comment.