From 3c124b2da2e99beec08a10dffd51ae3274b84e7e Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Tue, 19 Mar 2024 22:36:02 -0400 Subject: [PATCH] KIP-345 Static membership implementation (#137) * KIP-345 Add static consumer membership support * KIP-345 Add examples to docs * KIP-345 Add leave_group_on_close flag https://issues.apache.org/jira/browse/KAFKA-6995 * KIP-345 Add tests for static membership * KIP-345 Update docs for leave_group_on_close option * Update changelog.rst * remove six from base.py * Update base.py * Update base.py * Update base.py * Update changelog.rst * Update README.rst --------- Co-authored-by: Denis Kazakov Co-authored-by: Denis Kazakov --- CHANGES.md | 5 ++ README.rst | 6 +- docs/changelog.rst | 7 ++ docs/usage.rst | 12 +++ kafka/consumer/group.py | 12 ++- kafka/coordinator/base.py | 140 +++++++++++++++++++++++++++------- kafka/coordinator/consumer.py | 17 ++++- kafka/protocol/group.py | 119 +++++++++++++++++++++++++++-- test/test_consumer.py | 5 ++ test/test_consumer_group.py | 20 +++++ 10 files changed, 302 insertions(+), 41 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index ccec6b5c3..ba40007f9 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,8 @@ +# 2.0.3 (under development) + +Consumer +* KIP-345: Implement static membership support + # 2.0.2 (Sep 29, 2020) Consumer diff --git a/README.rst b/README.rst index b7acfc8a2..ce82c6d3b 100644 --- a/README.rst +++ b/README.rst @@ -64,8 +64,12 @@ that expose basic message attributes: topic, partition, offset, key, and value: .. code-block:: python + # join a consumer group for dynamic partition assignment and offset commits from kafka import KafkaConsumer - consumer = KafkaConsumer('my_favorite_topic') + consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group') + # or as a static member with a fixed group member name + # consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group', + # group_instance_id='consumer-1', leave_group_on_close=False) for msg in consumer: print (msg) diff --git a/docs/changelog.rst b/docs/changelog.rst index 9d3cb6512..67013247b 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -1,6 +1,13 @@ Changelog ========= +2.2.0 +#################### + +Consumer +-------- +* KIP-345: Implement static membership support + 2.0.2 (Sep 29, 2020) #################### diff --git a/docs/usage.rst b/docs/usage.rst index 047bbad77..dbc8813f0 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -47,6 +47,18 @@ KafkaConsumer group_id='my-group', bootstrap_servers='my.server.com') + # Use multiple static consumers w/ 2.3.0 kafka brokers + consumer1 = KafkaConsumer('my-topic', + group_id='my-group', + group_instance_id='process-1', + leave_group_on_close=False, + bootstrap_servers='my.server.com') + consumer2 = KafkaConsumer('my-topic', + group_id='my-group', + group_instance_id='process-2', + leave_group_on_close=False, + bootstrap_servers='my.server.com') + There are many configuration options for the consumer class. See :class:`~kafka.KafkaConsumer` API documentation for more details. diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 0d613e71e..fc04c4bd6 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -52,6 +52,12 @@ class KafkaConsumer: committing offsets. If None, auto-partition assignment (via group coordinator) and offset commits are disabled. Default: None + group_instance_id (str): the unique identifier to distinguish + each client instance. If set and leave_group_on_close is + False consumer group rebalancing won't be triggered until + sessiont_timeout_ms is met. Requires 2.3.0+. + leave_group_on_close (bool or None): whether to leave a consumer + group or not on consumer shutdown. key_deserializer (callable): Any callable that takes a raw message key and returns a deserialized key. value_deserializer (callable): Any callable that takes a @@ -241,6 +247,7 @@ class KafkaConsumer: sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None kafka_client (callable): Custom class / callable for creating KafkaClient instances + coordinator (callable): Custom class / callable for creating ConsumerCoordinator instances Note: Configuration parameters are described in more detail at @@ -250,6 +257,8 @@ class KafkaConsumer: 'bootstrap_servers': 'localhost', 'client_id': 'kafka-python-' + __version__, 'group_id': None, + 'group_instance_id': '', + 'leave_group_on_close': None, 'key_deserializer': None, 'value_deserializer': None, 'fetch_max_wait_ms': 500, @@ -304,6 +313,7 @@ class KafkaConsumer: 'sasl_oauth_token_provider': None, 'legacy_iterator': False, # enable to revert to < 1.4.7 iterator 'kafka_client': KafkaClient, + 'coordinator': ConsumerCoordinator, } DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000 @@ -379,7 +389,7 @@ def __init__(self, *topics, **configs): self._subscription = SubscriptionState(self.config['auto_offset_reset']) self._fetcher = Fetcher( self._client, self._subscription, self._metrics, **self.config) - self._coordinator = ConsumerCoordinator( + self._coordinator = self.config['coordinator']( self._client, self._subscription, self._metrics, assignors=self.config['partition_assignment_strategy'], **self.config) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 62773e330..d5ec4c720 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -78,6 +78,8 @@ class BaseCoordinator: DEFAULT_CONFIG = { 'group_id': 'kafka-python-default-group', + 'group_instance_id': '', + 'leave_group_on_close': None, 'session_timeout_ms': 10000, 'heartbeat_interval_ms': 3000, 'max_poll_interval_ms': 300000, @@ -92,6 +94,12 @@ def __init__(self, client, metrics, **configs): group_id (str): name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. Default: 'kafka-python-default-group' + group_instance_id (str): the unique identifier to distinguish + each client instance. If set and leave_group_on_close is + False consumer group rebalancing won't be triggered until + sessiont_timeout_ms is met. Requires 2.3.0+. + leave_group_on_close (bool or None): whether to leave a consumer + group or not on consumer shutdown. session_timeout_ms (int): The timeout used to detect failures when using Kafka's group management facilities. Default: 30000 heartbeat_interval_ms (int): The expected time in milliseconds @@ -117,6 +125,11 @@ def __init__(self, client, metrics, **configs): "different values for max_poll_interval_ms " "and session_timeout_ms") + if self.config['group_instance_id'] and self.config['api_version'] < (2, 3, 0): + raise Errors.KafkaConfigurationError( + 'Broker version %s does not support static membership' % (self.config['api_version'],), + ) + self._client = client self.group_id = self.config['group_id'] self.heartbeat = Heartbeat(**self.config) @@ -451,30 +464,48 @@ def _send_join_group_request(self): if self.config['api_version'] < (0, 9): raise Errors.KafkaError('JoinGroupRequest api requires 0.9+ brokers') elif (0, 9) <= self.config['api_version'] < (0, 10, 1): - request = JoinGroupRequest[0]( + version = 0 + args = ( self.group_id, self.config['session_timeout_ms'], self._generation.member_id, self.protocol_type(), - member_metadata) + member_metadata, + ) elif (0, 10, 1) <= self.config['api_version'] < (0, 11, 0): - request = JoinGroupRequest[1]( + version = 1 + args = ( self.group_id, self.config['session_timeout_ms'], self.config['max_poll_interval_ms'], self._generation.member_id, self.protocol_type(), - member_metadata) + member_metadata, + ) + elif self.config['api_version'] >= (2, 3, 0) and self.config['group_instance_id']: + version = 5 + args = ( + self.group_id, + self.config['session_timeout_ms'], + self.config['max_poll_interval_ms'], + self._generation.member_id, + self.config['group_instance_id'], + self.protocol_type(), + member_metadata, + ) else: - request = JoinGroupRequest[2]( + version = 2 + args = ( self.group_id, self.config['session_timeout_ms'], self.config['max_poll_interval_ms'], self._generation.member_id, self.protocol_type(), - member_metadata) + member_metadata, + ) # create the request for the coordinator + request = JoinGroupRequest[version](*args) log.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id) future = Future() _f = self._client.send(self.coordinator_id, request) @@ -558,12 +589,25 @@ def _handle_join_group_response(self, future, send_time, response): def _on_join_follower(self): # send follower's sync group with an empty assignment - version = 0 if self.config['api_version'] < (0, 11, 0) else 1 - request = SyncGroupRequest[version]( - self.group_id, - self._generation.generation_id, - self._generation.member_id, - {}) + if self.config['api_version'] >= (2, 3, 0) and self.config['group_instance_id']: + version = 3 + args = ( + self.group_id, + self._generation.generation_id, + self._generation.member_id, + self.config['group_instance_id'], + {}, + ) + else: + version = 0 if self.config['api_version'] < (0, 11, 0) else 1 + args = ( + self.group_id, + self._generation.generation_id, + self._generation.member_id, + {}, + ) + + request = SyncGroupRequest[version](*args) log.debug("Sending follower SyncGroup for group %s to coordinator %s: %s", self.group_id, self.coordinator_id, request) return self._send_sync_group_request(request) @@ -586,15 +630,30 @@ def _on_join_leader(self, response): except Exception as e: return Future().failure(e) - version = 0 if self.config['api_version'] < (0, 11, 0) else 1 - request = SyncGroupRequest[version]( - self.group_id, - self._generation.generation_id, - self._generation.member_id, - [(member_id, - assignment if isinstance(assignment, bytes) else assignment.encode()) - for member_id, assignment in group_assignment.items()]) + group_assignment = [ + (member_id, assignment if isinstance(assignment, bytes) else assignment.encode()) + for member_id, assignment in group_assignment.items() + ] + + if self.config['api_version'] >= (2, 3, 0) and self.config['group_instance_id']: + version = 3 + args = ( + self.group_id, + self._generation.generation_id, + self._generation.member_id, + self.config['group_instance_id'], + group_assignment, + ) + else: + version = 0 if self.config['api_version'] < (0, 11, 0) else 1 + args = ( + self.group_id, + self._generation.generation_id, + self._generation.member_id, + group_assignment, + ) + request = SyncGroupRequest[version](*args) log.debug("Sending leader SyncGroup for group %s to coordinator %s: %s", self.group_id, self.coordinator_id, request) return self._send_sync_group_request(request) @@ -760,15 +819,22 @@ def close(self): def maybe_leave_group(self): """Leave the current group and reset local generation/memberId.""" with self._client._lock, self._lock: - if (not self.coordinator_unknown() + if ( + not self.coordinator_unknown() and self.state is not MemberState.UNJOINED - and self._generation is not Generation.NO_GENERATION): - + and self._generation is not Generation.NO_GENERATION + and self._leave_group_on_close() + ): # this is a minimal effort attempt to leave the group. we do not # attempt any resending if the request fails or times out. log.info('Leaving consumer group (%s).', self.group_id) - version = 0 if self.config['api_version'] < (0, 11, 0) else 1 - request = LeaveGroupRequest[version](self.group_id, self._generation.member_id) + if self.config['api_version'] >= (2, 3, 0) and self.config['group_instance_id']: + version = 3 + args = (self.group_id, [(self._generation.member_id, self.config['group_instance_id'])]) + else: + version = 0 if self.config['api_version'] < (0, 11, 0) else 1 + args = self.group_id, self._generation.member_id + request = LeaveGroupRequest[version](*args) future = self._client.send(self.coordinator_id, request) future.add_callback(self._handle_leave_group_response) future.add_errback(log.error, "LeaveGroup request failed: %s") @@ -795,10 +861,23 @@ def _send_heartbeat_request(self): e = Errors.NodeNotReadyError(self.coordinator_id) return Future().failure(e) - version = 0 if self.config['api_version'] < (0, 11, 0) else 1 - request = HeartbeatRequest[version](self.group_id, - self._generation.generation_id, - self._generation.member_id) + if self.config['api_version'] >= (2, 3, 0) and self.config['group_instance_id']: + version = 2 + args = ( + self.group_id, + self._generation.generation_id, + self._generation.member_id, + self.config['group_instance_id'], + ) + else: + version = 0 if self.config['api_version'] < (0, 11, 0) else 1 + args = ( + self.group_id, + self._generation.generation_id, + self._generation.member_id, + ) + + request = HeartbeatRequest[version](*args) log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) # pylint: disable-msg=no-member future = Future() _f = self._client.send(self.coordinator_id, request) @@ -845,6 +924,9 @@ def _handle_heartbeat_response(self, future, send_time, response): log.error("Heartbeat failed: Unhandled error: %s", error) future.failure(error) + def _leave_group_on_close(self): + return self.config['leave_group_on_close'] is None or self.config['leave_group_on_close'] + class GroupCoordinatorMetrics: def __init__(self, heartbeat, metrics, prefix, tags=None): diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index d9a67860b..cf82b69fe 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -25,6 +25,8 @@ class ConsumerCoordinator(BaseCoordinator): """This class manages the coordination process with the consumer coordinator.""" DEFAULT_CONFIG = { 'group_id': 'kafka-python-default-group', + 'group_instance_id': '', + 'leave_group_on_close': None, 'enable_auto_commit': True, 'auto_commit_interval_ms': 5000, 'default_offset_commit_callback': None, @@ -45,6 +47,12 @@ def __init__(self, client, subscription, metrics, **configs): group_id (str): name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. Default: 'kafka-python-default-group' + group_instance_id (str): the unique identifier to distinguish + each client instance. If set and leave_group_on_close is + False consumer group rebalancing won't be triggered until + sessiont_timeout_ms is met. Requires 2.3.0+. + leave_group_on_close (bool or None): whether to leave a consumer + group or not on consumer shutdown. enable_auto_commit (bool): If true the consumer's offset will be periodically committed in the background. Default: True. auto_commit_interval_ms (int): milliseconds between automatic @@ -304,10 +312,15 @@ def _perform_assignment(self, leader_id, assignment_strategy, members): assert assignor, f'Invalid assignment protocol: {assignment_strategy}' member_metadata = {} all_subscribed_topics = set() - for member_id, metadata_bytes in members: + + for member in members: + if len(member) == 3: + member_id, group_instance_id, metadata_bytes = member + else: + member_id, metadata_bytes = member metadata = ConsumerProtocol.METADATA.decode(metadata_bytes) member_metadata[member_id] = metadata - all_subscribed_topics.update(metadata.subscription) # pylint: disable-msg=no-member + all_subscribed_topics.update(metadata.subscription) # pylint: disable-msg=no-member # the leader will begin watching for changes to any of the topics # the group is interested in, which ensures that all metadata changes diff --git a/kafka/protocol/group.py b/kafka/protocol/group.py index 68efdc8f9..9e698c21f 100644 --- a/kafka/protocol/group.py +++ b/kafka/protocol/group.py @@ -40,6 +40,23 @@ class JoinGroupResponse_v2(Response): ) +class JoinGroupResponse_v5(Response): + API_KEY = 11 + API_VERSION = 5 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('error_code', Int16), + ('generation_id', Int32), + ('group_protocol', String('utf-8')), + ('leader_id', String('utf-8')), + ('member_id', String('utf-8')), + ('members', Array( + ('member_id', String('utf-8')), + ('group_instance_id', String('utf-8')), + ('member_metadata', Bytes))), + ) + + class JoinGroupRequest_v0(Request): API_KEY = 11 API_VERSION = 0 @@ -81,11 +98,30 @@ class JoinGroupRequest_v2(Request): UNKNOWN_MEMBER_ID = '' +class JoinGroupRequest_v5(Request): + API_KEY = 11 + API_VERSION = 5 + RESPONSE_TYPE = JoinGroupResponse_v5 + SCHEMA = Schema( + ('group', String('utf-8')), + ('session_timeout', Int32), + ('rebalance_timeout', Int32), + ('member_id', String('utf-8')), + ('group_instance_id', String('utf-8')), + ('protocol_type', String('utf-8')), + ('group_protocols', Array( + ('protocol_name', String('utf-8')), + ('protocol_metadata', Bytes))), + ) + UNKNOWN_MEMBER_ID = '' + + + JoinGroupRequest = [ - JoinGroupRequest_v0, JoinGroupRequest_v1, JoinGroupRequest_v2 + JoinGroupRequest_v0, JoinGroupRequest_v1, JoinGroupRequest_v2, None, None, JoinGroupRequest_v5, ] JoinGroupResponse = [ - JoinGroupResponse_v0, JoinGroupResponse_v1, JoinGroupResponse_v2 + JoinGroupResponse_v0, JoinGroupResponse_v1, JoinGroupResponse_v2, None, None, JoinGroupResponse_v5, ] @@ -116,6 +152,16 @@ class SyncGroupResponse_v1(Response): ) +class SyncGroupResponse_v3(Response): + API_KEY = 14 + API_VERSION = 3 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('error_code', Int16), + ('member_assignment', Bytes) + ) + + class SyncGroupRequest_v0(Request): API_KEY = 14 API_VERSION = 0 @@ -137,8 +183,23 @@ class SyncGroupRequest_v1(Request): SCHEMA = SyncGroupRequest_v0.SCHEMA -SyncGroupRequest = [SyncGroupRequest_v0, SyncGroupRequest_v1] -SyncGroupResponse = [SyncGroupResponse_v0, SyncGroupResponse_v1] +class SyncGroupRequest_v3(Request): + API_KEY = 14 + API_VERSION = 3 + RESPONSE_TYPE = SyncGroupResponse_v3 + SCHEMA = Schema( + ('group', String('utf-8')), + ('generation_id', Int32), + ('member_id', String('utf-8')), + ('group_instance_id', String('utf-8')), + ('group_assignment', Array( + ('member_id', String('utf-8')), + ('member_metadata', Bytes))), + ) + + +SyncGroupRequest = [SyncGroupRequest_v0, SyncGroupRequest_v1, None, SyncGroupRequest_v3] +SyncGroupResponse = [SyncGroupResponse_v0, SyncGroupResponse_v1, None, SyncGroupResponse_v3] class MemberAssignment(Struct): @@ -186,8 +247,29 @@ class HeartbeatRequest_v1(Request): SCHEMA = HeartbeatRequest_v0.SCHEMA -HeartbeatRequest = [HeartbeatRequest_v0, HeartbeatRequest_v1] -HeartbeatResponse = [HeartbeatResponse_v0, HeartbeatResponse_v1] +class HeartbeatResponse_v2(Response): + API_KEY = 12 + API_VERSION = 2 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('error_code', Int16) + ) + + +class HeartbeatRequest_v2(Request): + API_KEY = 12 + API_VERSION = 2 + RESPONSE_TYPE = HeartbeatResponse_v2 + SCHEMA = Schema( + ('group', String('utf-8')), + ('generation_id', Int32), + ('member_id', String('utf-8')), + ('group_instance_id', String('utf-8')), + ) + + +HeartbeatRequest = [HeartbeatRequest_v0, HeartbeatRequest_v1, HeartbeatRequest_v2] +HeartbeatResponse = [HeartbeatResponse_v0, HeartbeatResponse_v1, HeartbeatResponse_v2] class LeaveGroupResponse_v0(Response): @@ -207,6 +289,15 @@ class LeaveGroupResponse_v1(Response): ) +class LeaveGroupResponse_v3(Response): + API_KEY = 13 + API_VERSION = 3 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('error_code', Int16) + ) + + class LeaveGroupRequest_v0(Request): API_KEY = 13 API_VERSION = 0 @@ -224,5 +315,17 @@ class LeaveGroupRequest_v1(Request): SCHEMA = LeaveGroupRequest_v0.SCHEMA -LeaveGroupRequest = [LeaveGroupRequest_v0, LeaveGroupRequest_v1] -LeaveGroupResponse = [LeaveGroupResponse_v0, LeaveGroupResponse_v1] +class LeaveGroupRequest_v3(Request): + API_KEY = 13 + API_VERSION = 3 + RESPONSE_TYPE = LeaveGroupResponse_v3 + SCHEMA = Schema( + ('group', String('utf-8')), + ('member_identity_list', Array( + ('member_id', String('utf-8')), + ('group_instance_id', String('utf-8')))), + ) + + +LeaveGroupRequest = [LeaveGroupRequest_v0, LeaveGroupRequest_v1, None, LeaveGroupRequest_v3] +LeaveGroupResponse = [LeaveGroupResponse_v0, LeaveGroupResponse_v1, None, LeaveGroupResponse_v3] diff --git a/test/test_consumer.py b/test/test_consumer.py index 436fe55c0..0c6110517 100644 --- a/test/test_consumer.py +++ b/test/test_consumer.py @@ -24,3 +24,8 @@ def test_subscription_copy(self): assert sub == set(['foo']) sub.add('fizz') assert consumer.subscription() == set(['foo']) + + def test_version_for_static_membership(self): + KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(2, 3, 0), group_instance_id='test') + with pytest.raises(KafkaConfigurationError): + KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(2, 2, 0), group_instance_id='test') diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index 53222b6fc..ed6863fa2 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -180,3 +180,23 @@ def test_heartbeat_thread(kafka_broker, topic): consumer.poll(timeout_ms=100) assert consumer._coordinator.heartbeat.last_poll > last_poll consumer.close() + + +@pytest.mark.skipif(env_kafka_version() < (2, 3, 0), reason="Requires KAFKA_VERSION >= 2.3.0") +@pytest.mark.parametrize('leave, result', [ + (False, True), + (True, False), +]) +def test_kafka_consumer_rebalance_for_static_members(kafka_consumer_factory, leave, result): + GROUP_ID = random_string(10) + + consumer1 = kafka_consumer_factory(group_id=GROUP_ID, group_instance_id=GROUP_ID, leave_group_on_close=leave) + consumer1.poll() + generation1 = consumer1._coordinator.generation().generation_id + consumer1.close() + + consumer2 = kafka_consumer_factory(group_id=GROUP_ID, group_instance_id=GROUP_ID, leave_group_on_close=leave) + consumer2.poll() + generation2 = consumer2._coordinator.generation().generation_id + consumer2.close() + assert (generation1 == generation2) is result