Skip to content

Commit

Permalink
Add connection_timeout_ms and reset the timeout counter more often (#132
Browse files Browse the repository at this point in the history
)

* Add connection_timeout_ms and reset the timeout counter more often

* Refactor last_attempt -> last_activity
This semantically reflects the new usage of the variable better

* Make tests work again

* Add unit tests of new BrokerConnection functionality
The test mocks parts of BrokerConnection in order to assert that the connection state machine allows long-lasting connections as long as the state progresses often enough

* Re-introduce last_attempt to avoid breakage

---------

Co-authored-by: Liam S. Crouch <spam@petterroea.com>
  • Loading branch information
wbarnha and petterroea authored Apr 3, 2024
1 parent aba153f commit 6c9eb37
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 13 deletions.
4 changes: 4 additions & 0 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ class KafkaClient:
rate. To avoid connection storms, a randomization factor of 0.2
will be applied to the backoff resulting in a random range between
20% below and 20% above the computed value. Default: 1000.
connection_timeout_ms (int): Connection timeout in milliseconds.
Default: None, which defaults it to the same value as
request_timeout_ms.
request_timeout_ms (int): Client request timeout in milliseconds.
Default: 30000.
connections_max_idle_ms: Close idle connections after the number of
Expand Down Expand Up @@ -145,6 +148,7 @@ class KafkaClient:
'bootstrap_servers': 'localhost',
'bootstrap_topics_filter': set(),
'client_id': 'kafka-python-' + __version__,
'connection_timeout_ms': None,
'request_timeout_ms': 30000,
'wakeup_timeout_ms': 3000,
'connections_max_idle_ms': 9 * 60 * 1000,
Expand Down
24 changes: 19 additions & 5 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ class BrokerConnection:
rate. To avoid connection storms, a randomization factor of 0.2
will be applied to the backoff resulting in a random range between
20% below and 20% above the computed value. Default: 1000.
connection_timeout_ms (int): Connection timeout in milliseconds.
Default: None, which defaults it to the same value as
request_timeout_ms.
request_timeout_ms (int): Client request timeout in milliseconds.
Default: 30000.
max_in_flight_requests_per_connection (int): Requests are pipelined
Expand Down Expand Up @@ -188,6 +191,7 @@ class BrokerConnection:
'client_id': 'kafka-python-' + __version__,
'node_id': 0,
'request_timeout_ms': 30000,
'connection_timeout_ms': None,
'reconnect_backoff_ms': 50,
'reconnect_backoff_max_ms': 1000,
'max_in_flight_requests_per_connection': 5,
Expand Down Expand Up @@ -231,6 +235,9 @@ def __init__(self, host, port, afi, **configs):
for key in self.config:
if key in configs:
self.config[key] = configs[key]

if self.config['connection_timeout_ms'] is None:
self.config['connection_timeout_ms'] = self.config['request_timeout_ms']

self.node_id = self.config.pop('node_id')

Expand Down Expand Up @@ -284,7 +291,10 @@ def __init__(self, host, port, afi, **configs):
if self.config['ssl_context'] is not None:
self._ssl_context = self.config['ssl_context']
self._sasl_auth_future = None
self.last_attempt = 0
self.last_activity = 0
# This value is not used for internal state, but it is left to allow backwards-compatability
# The variable last_activity is now used instead, but is updated more often may therefore break compatability with some hacks.
self.last_attempt= 0
self._gai = []
self._sensors = None
if self.config['metrics']:
Expand Down Expand Up @@ -362,6 +372,7 @@ def connect(self):
self.config['state_change_callback'](self.node_id, self._sock, self)
log.info('%s: connecting to %s:%d [%s %s]', self, self.host,
self.port, self._sock_addr, AFI_NAMES[self._sock_afi])
self.last_activity = time.time()

if self.state is ConnectionStates.CONNECTING:
# in non-blocking mode, use repeated calls to socket.connect_ex
Expand Down Expand Up @@ -394,6 +405,7 @@ def connect(self):
self.state = ConnectionStates.CONNECTED
self._reset_reconnect_backoff()
self.config['state_change_callback'](self.node_id, self._sock, self)
self.last_activity = time.time()

# Connection failed
# WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems
Expand All @@ -419,6 +431,7 @@ def connect(self):
self.state = ConnectionStates.CONNECTED
self._reset_reconnect_backoff()
self.config['state_change_callback'](self.node_id, self._sock, self)
self.last_activity = time.time()

if self.state is ConnectionStates.AUTHENTICATING:
assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL')
Expand All @@ -429,12 +442,13 @@ def connect(self):
self.state = ConnectionStates.CONNECTED
self._reset_reconnect_backoff()
self.config['state_change_callback'](self.node_id, self._sock, self)
self.last_activity = time.time()

if self.state not in (ConnectionStates.CONNECTED,
ConnectionStates.DISCONNECTED):
# Connection timed out
request_timeout = self.config['request_timeout_ms'] / 1000.0
if time.time() > request_timeout + self.last_attempt:
request_timeout = self.config['connection_timeout_ms'] / 1000.0
if time.time() > request_timeout + self.last_activity:
log.error('Connection attempt to %s timed out', self)
self.close(Errors.KafkaConnectionError('timeout'))
return self.state
Expand Down Expand Up @@ -595,7 +609,7 @@ def blacked_out(self):
re-establish a connection yet
"""
if self.state is ConnectionStates.DISCONNECTED:
if time.time() < self.last_attempt + self._reconnect_backoff:
if time.time() < self.last_activity + self._reconnect_backoff:
return True
return False

Expand All @@ -606,7 +620,7 @@ def connection_delay(self):
the reconnect backoff time. When connecting or connected, returns a very
large number to handle slow/stalled connections.
"""
time_waited = time.time() - (self.last_attempt or 0)
time_waited = time.time() - (self.last_activity or 0)
if self.state is ConnectionStates.DISCONNECTED:
return max(self._reconnect_backoff - time_waited, 0) * 1000
else:
Expand Down
4 changes: 4 additions & 0 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ class KafkaProducer:
brokers or partitions. Default: 300000
retry_backoff_ms (int): Milliseconds to backoff when retrying on
errors. Default: 100.
connection_timeout_ms (int): Connection timeout in milliseconds.
Default: None, which defaults it to the same value as
request_timeout_ms.
request_timeout_ms (int): Client request timeout in milliseconds.
Default: 30000.
receive_buffer_bytes (int): The size of the TCP receive buffer
Expand Down Expand Up @@ -300,6 +303,7 @@ class KafkaProducer:
'max_request_size': 1048576,
'metadata_max_age_ms': 300000,
'retry_backoff_ms': 100,
'connection_timeout_ms': None,
'request_timeout_ms': 30000,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
Expand Down
88 changes: 80 additions & 8 deletions test/test_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import socket

import pytest
import time

from kafka.conn import BrokerConnection, ConnectionStates, collect_hosts
from kafka.protocol.api import RequestHeader
Expand Down Expand Up @@ -61,28 +62,99 @@ def test_connect_timeout(_socket, conn):
# Initial connect returns EINPROGRESS
# immediate inline connect returns EALREADY
# second explicit connect returns EALREADY
# third explicit connect returns EALREADY and times out via last_attempt
# third explicit connect returns EALREADY and times out via last_activity
_socket.connect_ex.side_effect = [EINPROGRESS, EALREADY, EALREADY, EALREADY]
conn.connect()
assert conn.state is ConnectionStates.CONNECTING
conn.connect()
assert conn.state is ConnectionStates.CONNECTING
conn.last_activity = 0
conn.last_attempt = 0
conn.connect()
assert conn.state is ConnectionStates.DISCONNECTED

def test_connect_timeout_slowconn(_socket, conn, mocker):
# Same as test_connect_timeout,
# but we make the connection run longer than the timeout in order to test that
# BrokerConnection resets the timer whenever things happen during the connection
# See https://github.com/dpkp/kafka-python/issues/2386
_socket.connect_ex.side_effect = [EINPROGRESS, EISCONN]

# 0.8 = we guarantee that when testing with three intervals of this we are past the timeout
time_between_connect = (conn.config['connection_timeout_ms']/1000) * 0.8
start = time.time()

# Use plaintext auth for simplicity
last_activity = conn.last_activity
last_attempt = conn.last_attempt
conn.config['security_protocol'] = 'SASL_PLAINTEXT'
conn.connect()
assert conn.state is ConnectionStates.CONNECTING
# Ensure the last_activity counter was updated
# Last_attempt should also be updated
assert conn.last_activity > last_activity
assert conn.last_attempt > last_attempt
last_attempt = conn.last_attempt
last_activity = conn.last_activity

# Simulate time being passed
# This shouldn't be enough time to time out the connection
conn._try_authenticate = mocker.Mock(side_effect=[False, False, True])
with mock.patch("time.time", return_value=start+time_between_connect):
# This should trigger authentication
# Note that an authentication attempt isn't actually made until now.
# We simulate that authentication does not succeed at this point
# This is technically incorrect, but it lets us see what happens
# to the state machine when the state doesn't change for two function calls
conn.connect()
assert conn.last_activity > last_activity
# Last attempt is kept as a legacy variable, should not update
assert conn.last_attempt == last_attempt
last_activity = conn.last_activity

assert conn.state is ConnectionStates.AUTHENTICATING


# This time around we should be way past timeout.
# Now we care about connect() not terminating the attempt,
# because connection state was progressed in the meantime.
with mock.patch("time.time", return_value=start+time_between_connect*2):
# Simulate this one not succeeding as well. This is so we can ensure things don't time out
conn.connect()

# No state change = no activity change
assert conn.last_activity == last_activity
assert conn.last_attempt == last_attempt

# If last_activity was not reset when the state transitioned to AUTHENTICATING,
# the connection state would be timed out now.
assert conn.state is ConnectionStates.AUTHENTICATING


# This time around, the connection should succeed.
with mock.patch("time.time", return_value=start+time_between_connect*3):
# This should finalize the connection
conn.connect()

assert conn.last_activity > last_activity
assert conn.last_attempt == last_attempt
last_activity = conn.last_activity

assert conn.state is ConnectionStates.CONNECTED



def test_blacked_out(conn):
with mock.patch("time.time", return_value=1000):
conn.last_attempt = 0
conn.last_activity = 0
assert conn.blacked_out() is False
conn.last_attempt = 1000
conn.last_activity = 1000
assert conn.blacked_out() is True


def test_connection_delay(conn):
with mock.patch("time.time", return_value=1000):
conn.last_attempt = 1000
conn.last_activity = 1000
assert conn.connection_delay() == conn.config['reconnect_backoff_ms']
conn.state = ConnectionStates.CONNECTING
assert conn.connection_delay() == float('inf')
Expand Down Expand Up @@ -286,7 +358,7 @@ def test_lookup_on_connect():
]

with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m:
conn.last_attempt = 0
conn.last_activity = 0
conn.connect()
m.assert_called_once_with(hostname, port, 0, socket.SOCK_STREAM)
assert conn._sock_afi == afi2
Expand All @@ -301,11 +373,10 @@ def test_relookup_on_failure():
assert conn.host == hostname
mock_return1 = []
with mock.patch("socket.getaddrinfo", return_value=mock_return1) as m:
last_attempt = conn.last_attempt
last_activity = conn.last_activity
conn.connect()
m.assert_called_once_with(hostname, port, 0, socket.SOCK_STREAM)
assert conn.disconnected()
assert conn.last_attempt > last_attempt

afi2 = socket.AF_INET
sockaddr2 = ('127.0.0.2', 9092)
Expand All @@ -314,12 +385,13 @@ def test_relookup_on_failure():
]

with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m:
conn.last_attempt = 0
conn.last_activity = 0
conn.connect()
m.assert_called_once_with(hostname, port, 0, socket.SOCK_STREAM)
assert conn._sock_afi == afi2
assert conn._sock_addr == sockaddr2
conn.close()
assert conn.last_activity > last_activity


def test_requests_timed_out(conn):
Expand Down

0 comments on commit 6c9eb37

Please sign in to comment.