From fcad136c31f4f24c676dbf299a45eeb9abb5b652 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Mon, 18 Mar 2024 11:31:39 -0400 Subject: [PATCH] Run pyupgrade on everything. --- kafka/__init__.py | 2 - kafka/admin/__init__.py | 2 - kafka/admin/acl_resource.py | 11 ++--- kafka/admin/client.py | 10 ++-- kafka/admin/config_resource.py | 4 +- kafka/admin/new_partitions.py | 5 +- kafka/admin/new_topic.py | 4 +- kafka/client_async.py | 22 ++++----- kafka/cluster.py | 12 ++--- kafka/codec.py | 6 --- kafka/conn.py | 44 +++++++---------- kafka/consumer/__init__.py | 2 - kafka/consumer/fetcher.py | 48 +++++++++---------- kafka/consumer/group.py | 12 ++--- kafka/consumer/subscription_state.py | 28 ++++++----- kafka/coordinator/base.py | 14 +++--- kafka/coordinator/consumer.py | 36 +++++++------- kafka/coordinator/heartbeat.py | 4 +- kafka/coordinator/protocol.py | 4 +- kafka/errors.py | 18 ++++--- kafka/future.py | 4 +- kafka/metrics/__init__.py | 2 - kafka/metrics/compound_stat.py | 4 +- kafka/metrics/dict_reporter.py | 10 ++-- kafka/metrics/kafka_metric.py | 4 +- kafka/metrics/measurable.py | 4 +- kafka/metrics/measurable_stat.py | 2 - kafka/metrics/metric_config.py | 4 +- kafka/metrics/metric_name.py | 6 +-- kafka/metrics/metrics.py | 6 +-- kafka/metrics/metrics_reporter.py | 4 +- kafka/metrics/quota.py | 5 +- kafka/metrics/stat.py | 4 +- kafka/oauth/__init__.py | 2 - kafka/oauth/abstract.py | 2 - kafka/partitioner/__init__.py | 2 - kafka/partitioner/default.py | 4 +- kafka/producer/__init__.py | 2 - kafka/producer/buffer.py | 4 +- kafka/producer/future.py | 12 ++--- kafka/producer/kafka.py | 16 +++---- kafka/producer/record_accumulator.py | 12 ++--- kafka/producer/sender.py | 20 ++++---- kafka/protocol/__init__.py | 3 -- kafka/protocol/abstract.py | 4 +- kafka/protocol/admin.py | 2 - kafka/protocol/api.py | 6 +-- kafka/protocol/commit.py | 2 - kafka/protocol/fetch.py | 2 - kafka/protocol/frame.py | 2 +- kafka/protocol/group.py | 2 - kafka/protocol/message.py | 8 ++-- kafka/protocol/metadata.py | 2 - kafka/protocol/offset.py | 4 +- kafka/protocol/parser.py | 4 +- kafka/protocol/pickle.py | 2 - kafka/protocol/produce.py | 2 - kafka/protocol/struct.py | 4 +- kafka/protocol/types.py | 14 +++--- kafka/record/abc.py | 9 ++-- kafka/record/default_records.py | 14 +++--- kafka/record/legacy_records.py | 10 ++-- kafka/record/memory_records.py | 3 +- kafka/sasl/__init__.py | 2 +- kafka/sasl/msk.py | 2 +- kafka/sasl/oauthbearer.py | 2 +- kafka/scram.py | 12 ++--- kafka/serializer/__init__.py | 2 - kafka/serializer/abstract.py | 6 +-- kafka/structs.py | 1 - kafka/util.py | 27 +++++------ kafka/vendor/enum34.py | 42 ++++++++--------- kafka/vendor/selectors34.py | 70 ++++++++++++++-------------- kafka/vendor/six.py | 17 ++++--- kafka/vendor/socketpair.py | 16 ++----- kafka/version.py | 5 +- test/record/test_default_records.py | 6 +-- test/record/test_legacy_records.py | 5 +- test/record/test_records.py | 2 - test/test_consumer_integration.py | 7 +++ 80 files changed, 290 insertions(+), 456 deletions(-) diff --git a/kafka/__init__.py b/kafka/__init__.py index d5e30affa..685593ce3 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - __title__ = 'kafka' from kafka.version import __version__ __author__ = 'Dana Powers' diff --git a/kafka/admin/__init__.py b/kafka/admin/__init__.py index c240fc6d0..c67fb9e6a 100644 --- a/kafka/admin/__init__.py +++ b/kafka/admin/__init__.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - from kafka.admin.config_resource import ConfigResource, ConfigResourceType from kafka.admin.client import KafkaAdminClient from kafka.admin.acl_resource import (ACL, ACLFilter, ResourcePattern, ResourcePatternFilter, ACLOperation, diff --git a/kafka/admin/acl_resource.py b/kafka/admin/acl_resource.py index fd997a10a..fbc84be60 100644 --- a/kafka/admin/acl_resource.py +++ b/kafka/admin/acl_resource.py @@ -1,4 +1,3 @@ -from __future__ import absolute_import from kafka.errors import IllegalArgumentError # enum in stdlib as of py3.4 @@ -69,7 +68,7 @@ class ACLResourcePatternType(IntEnum): PREFIXED = 4 -class ACLFilter(object): +class ACLFilter: """Represents a filter to use with describing and deleting ACLs The difference between this class and the ACL class is mainly that @@ -161,7 +160,7 @@ def __init__( permission_type, resource_pattern ): - super(ACL, self).__init__(principal, host, operation, permission_type, resource_pattern) + super().__init__(principal, host, operation, permission_type, resource_pattern) self.validate() def validate(self): @@ -173,7 +172,7 @@ def validate(self): raise IllegalArgumentError("resource_pattern must be a ResourcePattern object") -class ResourcePatternFilter(object): +class ResourcePatternFilter: def __init__( self, resource_type, @@ -232,7 +231,7 @@ def __init__( resource_name, pattern_type=ACLResourcePatternType.LITERAL ): - super(ResourcePattern, self).__init__(resource_type, resource_name, pattern_type) + super().__init__(resource_type, resource_name, pattern_type) self.validate() def validate(self): @@ -240,5 +239,5 @@ def validate(self): raise IllegalArgumentError("resource_type cannot be ANY") if self.pattern_type in [ACLResourcePatternType.ANY, ACLResourcePatternType.MATCH]: raise IllegalArgumentError( - "pattern_type cannot be {} on a concrete ResourcePattern".format(self.pattern_type.name) + f"pattern_type cannot be {self.pattern_type.name} on a concrete ResourcePattern" ) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 8eb7504a7..204c47b7c 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - from collections import defaultdict import copy import logging @@ -32,7 +30,7 @@ log = logging.getLogger(__name__) -class KafkaAdminClient(object): +class KafkaAdminClient: """A class for administering the Kafka cluster. Warning: @@ -194,7 +192,7 @@ def __init__(self, **configs): log.debug("Starting KafkaAdminClient with configuration: %s", configs) extra_configs = set(configs).difference(self.DEFAULT_CONFIG) if extra_configs: - raise KafkaConfigurationError("Unrecognized configs: {}".format(extra_configs)) + raise KafkaConfigurationError(f"Unrecognized configs: {extra_configs}") self.config = copy.copy(self.DEFAULT_CONFIG) self.config.update(configs) @@ -874,7 +872,7 @@ def describe_configs(self, config_resources, include_synonyms=False): )) else: raise NotImplementedError( - "Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient.".format(version)) + f"Support for DescribeConfigs v{version} has not yet been added to KafkaAdminClient.") self._wait_for_futures(futures) return [f.value for f in futures] @@ -1197,7 +1195,7 @@ def _list_consumer_group_offsets_send_request(self, group_id, topics_partitions_dict = defaultdict(set) for topic, partition in partitions: topics_partitions_dict[topic].add(partition) - topics_partitions = list(six.iteritems(topics_partitions_dict)) + topics_partitions = list(topics_partitions_dict.items()) request = OffsetFetchRequest[version](group_id, topics_partitions) else: raise NotImplementedError( diff --git a/kafka/admin/config_resource.py b/kafka/admin/config_resource.py index e3294c9c4..0ae3f528e 100644 --- a/kafka/admin/config_resource.py +++ b/kafka/admin/config_resource.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - # enum in stdlib as of py3.4 try: from enum import IntEnum # pylint: disable=import-error @@ -15,7 +13,7 @@ class ConfigResourceType(IntEnum): TOPIC = 2 -class ConfigResource(object): +class ConfigResource: """A class for specifying config resources. Arguments: resource_type (ConfigResourceType): the type of kafka resource diff --git a/kafka/admin/new_partitions.py b/kafka/admin/new_partitions.py index 429b2e190..613fb861e 100644 --- a/kafka/admin/new_partitions.py +++ b/kafka/admin/new_partitions.py @@ -1,7 +1,4 @@ -from __future__ import absolute_import - - -class NewPartitions(object): +class NewPartitions: """A class for new partition creation on existing topics. Note that the length of new_assignments, if specified, must be the difference between the new total number of partitions and the existing number of partitions. Arguments: diff --git a/kafka/admin/new_topic.py b/kafka/admin/new_topic.py index 645ac383a..a50c3a374 100644 --- a/kafka/admin/new_topic.py +++ b/kafka/admin/new_topic.py @@ -1,9 +1,7 @@ -from __future__ import absolute_import - from kafka.errors import IllegalArgumentError -class NewTopic(object): +class NewTopic: """ A class for new topic creation Arguments: name (string): name of the topic diff --git a/kafka/client_async.py b/kafka/client_async.py index 3076c4ba0..0b546c314 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import, division - import collections import copy import logging @@ -32,14 +30,10 @@ from kafka.vendor import socketpair from kafka.version import __version__ -if six.PY2: - ConnectionError = None - - log = logging.getLogger('kafka.client') -class KafkaClient(object): +class KafkaClient: """ A network client for asynchronous request/response network I/O. @@ -374,7 +368,7 @@ def _maybe_connect(self, node_id): if conn is None: broker = self.cluster.broker_metadata(node_id) - assert broker, 'Broker id %s not in current metadata' % (node_id,) + assert broker, 'Broker id {} not in current metadata'.format(node_id) log.debug("Initiating connection to node %s at %s:%s", node_id, broker.host, broker.port) @@ -686,7 +680,7 @@ def _poll(self, timeout): unexpected_data = key.fileobj.recv(1) if unexpected_data: # anything other than a 0-byte read means protocol issues log.warning('Protocol out of sync on %r, closing', conn) - except socket.error: + except OSError: pass conn.close(Errors.KafkaConnectionError('Socket EVENT_READ without in-flight-requests')) continue @@ -701,7 +695,7 @@ def _poll(self, timeout): if conn not in processed and conn.connected() and conn._sock.pending(): self._pending_completion.extend(conn.recv()) - for conn in six.itervalues(self._conns): + for conn in self._conns.values(): if conn.requests_timed_out(): log.warning('%s timed out after %s ms. Closing connection.', conn, conn.config['request_timeout_ms']) @@ -941,7 +935,7 @@ def wakeup(self): except socket.timeout: log.warning('Timeout to send to wakeup socket!') raise Errors.KafkaTimeoutError() - except socket.error as e: + except OSError as e: log.warning('Unable to send to wakeup socket!') if self._raise_upon_socket_err_during_wakeup: raise e @@ -951,7 +945,7 @@ def _clear_wake_fd(self): while True: try: self._wake_r.recv(1024) - except socket.error: + except OSError: break def _maybe_close_oldest_connection(self): @@ -981,7 +975,7 @@ def bootstrap_connected(self): OrderedDict = dict -class IdleConnectionManager(object): +class IdleConnectionManager: def __init__(self, connections_max_idle_ms): if connections_max_idle_ms > 0: self.connections_max_idle = connections_max_idle_ms / 1000 @@ -1043,7 +1037,7 @@ def poll_expired_connection(self): return None -class KafkaClientMetrics(object): +class KafkaClientMetrics: def __init__(self, metrics, metric_group_prefix, conns): self.metrics = metrics self.metric_group_name = metric_group_prefix + '-metrics' diff --git a/kafka/cluster.py b/kafka/cluster.py index 438baf29d..db0e77818 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - import collections import copy import logging @@ -16,7 +14,7 @@ log = logging.getLogger(__name__) -class ClusterMetadata(object): +class ClusterMetadata: """ A class to manage kafka cluster metadata. @@ -128,9 +126,9 @@ def available_partitions_for_topic(self, topic): """ if topic not in self._partitions: return None - return set([partition for partition, metadata - in six.iteritems(self._partitions[topic]) - if metadata.leader != -1]) + return {partition for partition, metadata + in self._partitions[topic].items() + if metadata.leader != -1} def leader_for_partition(self, partition): """Return node_id of leader, -1 unavailable, None if unknown.""" @@ -361,7 +359,7 @@ def add_group_coordinator(self, group, response): # Use a coordinator-specific node id so that group requests # get a dedicated connection - node_id = 'coordinator-{}'.format(response.coordinator_id) + node_id = f'coordinator-{response.coordinator_id}' coordinator = BrokerMetadata( node_id, response.host, diff --git a/kafka/codec.py b/kafka/codec.py index c740a181c..2bdd72185 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - import gzip import io import platform @@ -149,10 +147,6 @@ def snappy_encode(payload, xerial_compatible=True, xerial_blocksize=32*1024): # buffer... likely a python-snappy bug, so just use a slice copy chunker = lambda payload, i, size: payload[i:size+i] - elif six.PY2: - # Sliced buffer avoids additional copies - # pylint: disable-msg=undefined-variable - chunker = lambda payload, i, size: buffer(payload, i, size) else: # snappy.compress does not like raw memoryviews, so we have to convert # tobytes, which is a copy... oh well. it's the thought that counts. diff --git a/kafka/conn.py b/kafka/conn.py index f253cbda1..1bac266e6 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import, division - import copy import errno import logging @@ -36,11 +34,6 @@ from kafka.version import __version__ -if six.PY2: - ConnectionError = socket.error - TimeoutError = socket.error - BlockingIOError = Exception - log = logging.getLogger(__name__) DEFAULT_KAFKA_PORT = 9092 @@ -97,7 +90,7 @@ class SSLWantWriteError(Exception): } -class ConnectionStates(object): +class ConnectionStates: DISCONNECTING = '' DISCONNECTED = '' CONNECTING = '' @@ -106,7 +99,7 @@ class ConnectionStates(object): AUTHENTICATING = '' -class BrokerConnection(object): +class BrokerConnection: """Initialize a Kafka broker connection Keyword Arguments: @@ -384,7 +377,7 @@ def connect(self): ret = None try: ret = self._sock.connect_ex(self._sock_addr) - except socket.error as err: + except OSError as err: ret = err.errno # Connection succeeded @@ -416,7 +409,7 @@ def connect(self): log.error('Connect attempt to %s returned error %s.' ' Disconnecting.', self, ret) errstr = errno.errorcode.get(ret, 'UNKNOWN') - self.close(Errors.KafkaConnectionError('{} {}'.format(ret, errstr))) + self.close(Errors.KafkaConnectionError(f'{ret} {errstr}')) return self.state # Needs retry @@ -576,8 +569,7 @@ def _send_bytes(self, data): break raise except BlockingIOError: - if six.PY3: - break + break raise return total_sent @@ -772,7 +764,7 @@ def send_pending_requests(self): except (ConnectionError, TimeoutError) as e: log.exception("Error sending request data to %s", self) - error = Errors.KafkaConnectionError("%s: %s" % (self, e)) + error = Errors.KafkaConnectionError("{}: {}".format(self, e)) self.close(error=error) return False @@ -805,7 +797,7 @@ def send_pending_requests_v2(self): except (ConnectionError, TimeoutError, Exception) as e: log.exception("Error sending request data to %s", self) - error = Errors.KafkaConnectionError("%s: %s" % (self, e)) + error = Errors.KafkaConnectionError("{}: {}".format(self, e)) self.close(error=error) return False @@ -878,8 +870,7 @@ def _recv(self): err = Errors.KafkaConnectionError(e) break except BlockingIOError: - if six.PY3: - break + break # For PY2 this is a catchall and should be re-raised raise @@ -914,10 +905,10 @@ def requests_timed_out(self): def _handle_api_version_response(self, response): error_type = Errors.for_code(response.error_code) assert error_type is Errors.NoError, "API version check failed" - self._api_versions = dict([ - (api_key, (min_version, max_version)) + self._api_versions = { + api_key: (min_version, max_version) for api_key, min_version, max_version in response.api_versions - ]) + } return self._api_versions def get_api_versions(self): @@ -1055,9 +1046,6 @@ def reset_override_configs(): elif (isinstance(f.exception, Errors.CorrelationIdError) and version == (0, 10)): pass - elif six.PY2: - assert isinstance(f.exception.args[0], socket.error) - assert f.exception.args[0].errno in (32, 54, 104) else: assert isinstance(f.exception.args[0], ConnectionError) log.info("Broker is not v%s -- it did not recognize %s", @@ -1075,7 +1063,7 @@ def __str__(self): AFI_NAMES[self._sock_afi], self._sock_addr) -class BrokerConnectionMetrics(object): +class BrokerConnectionMetrics: def __init__(self, metrics, metric_group_prefix, node_id): self.metrics = metrics @@ -1130,7 +1118,7 @@ def __init__(self, metrics, metric_group_prefix, node_id): # if one sensor of the metrics has been registered for the connection, # then all other sensors should have been registered; and vice versa - node_str = 'node-{0}'.format(node_id) + node_str = f'node-{node_id}' node_sensor = metrics.get_sensor(node_str + '.bytes-sent') if not node_sensor: metric_group_name = metric_group_prefix + '-node-metrics.' + node_str @@ -1197,7 +1185,7 @@ def _address_family(address): try: socket.inet_pton(af, address) return af - except (ValueError, AttributeError, socket.error): + except (ValueError, AttributeError, OSError): continue return socket.AF_UNSPEC @@ -1241,7 +1229,7 @@ def get_ip_port_afi(host_and_port_str): log.warning('socket.inet_pton not available on this platform.' ' consider `pip install win_inet_pton`') pass - except (ValueError, socket.error): + except (ValueError, OSError): # it's a host:port pair pass host, port = host_and_port_str.rsplit(':', 1) @@ -1257,7 +1245,7 @@ def collect_hosts(hosts, randomize=True): randomize the returned list. """ - if isinstance(hosts, six.string_types): + if isinstance(hosts, str): hosts = hosts.strip().split(',') result = [] diff --git a/kafka/consumer/__init__.py b/kafka/consumer/__init__.py index e09bcc1b8..5341d5648 100644 --- a/kafka/consumer/__init__.py +++ b/kafka/consumer/__init__.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - from kafka.consumer.group import KafkaConsumer __all__ = [ diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 7ff9daf7b..954a03505 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - import collections import copy import logging @@ -45,7 +43,7 @@ class RecordTooLargeError(Errors.KafkaError): pass -class Fetcher(six.Iterator): +class Fetcher: DEFAULT_CONFIG = { 'key_deserializer': None, 'value_deserializer': None, @@ -120,7 +118,7 @@ def send_fetches(self): List of Futures: each future resolves to a FetchResponse """ futures = [] - for node_id, request in six.iteritems(self._create_fetch_requests()): + for node_id, request in self._create_fetch_requests().items(): if self._client.ready(node_id): log.debug("Sending FetchRequest to node %s", node_id) future = self._client.send(node_id, request, wakeup=False) @@ -209,7 +207,7 @@ def end_offsets(self, partitions, timeout_ms): partitions, OffsetResetStrategy.LATEST, timeout_ms) def beginning_or_end_offset(self, partitions, timestamp, timeout_ms): - timestamps = dict([(tp, timestamp) for tp in partitions]) + timestamps = {tp: timestamp for tp in partitions} offsets = self._retrieve_offsets(timestamps, timeout_ms) for tp in timestamps: offsets[tp] = offsets[tp][0] @@ -244,7 +242,7 @@ def _reset_offset(self, partition): if self._subscriptions.is_assigned(partition): self._subscriptions.seek(partition, offset) else: - log.debug("Could not find offset for partition %s since it is probably deleted" % (partition,)) + log.debug(f"Could not find offset for partition {partition} since it is probably deleted") def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")): """Fetch offset for each partition passed in ``timestamps`` map. @@ -296,7 +294,7 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")): log.debug("Stale metadata was raised, and we now have an updated metadata. Rechecking partition existence") unknown_partition = future.exception.args[0] # TopicPartition from StaleMetadata if self._client.cluster.leader_for_partition(unknown_partition) is None: - log.debug("Removed partition %s from offsets retrieval" % (unknown_partition, )) + log.debug(f"Removed partition {unknown_partition} from offsets retrieval") timestamps.pop(unknown_partition) else: time.sleep(self.config['retry_backoff_ms'] / 1000.0) @@ -305,7 +303,7 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")): remaining_ms = timeout_ms - elapsed_ms raise Errors.KafkaTimeoutError( - "Failed to get offsets by timestamps in %s ms" % (timeout_ms,)) + f"Failed to get offsets by timestamps in {timeout_ms} ms") def fetched_records(self, max_records=None, update_offsets=True): """Returns previously fetched records and updates consumed offsets. @@ -522,7 +520,7 @@ def _send_offset_requests(self, timestamps): Future: resolves to a mapping of retrieved offsets """ timestamps_by_node = collections.defaultdict(dict) - for partition, timestamp in six.iteritems(timestamps): + for partition, timestamp in timestamps.items(): node_id = self._client.cluster.leader_for_partition(partition) if node_id is None: self._client.add_topic(partition.topic) @@ -554,7 +552,7 @@ def on_fail(err): if not list_offsets_future.is_done: list_offsets_future.failure(err) - for node_id, timestamps in six.iteritems(timestamps_by_node): + for node_id, timestamps in timestamps_by_node.items(): _f = self._send_offset_request(node_id, timestamps) _f.add_callback(on_success) _f.add_errback(on_fail) @@ -562,7 +560,7 @@ def on_fail(err): def _send_offset_request(self, node_id, timestamps): by_topic = collections.defaultdict(list) - for tp, timestamp in six.iteritems(timestamps): + for tp, timestamp in timestamps.items(): if self.config['api_version'] >= (0, 10, 1): data = (tp.partition, timestamp) else: @@ -570,9 +568,9 @@ def _send_offset_request(self, node_id, timestamps): by_topic[tp.topic].append(data) if self.config['api_version'] >= (0, 10, 1): - request = OffsetRequest[1](-1, list(six.iteritems(by_topic))) + request = OffsetRequest[1](-1, list(by_topic.items())) else: - request = OffsetRequest[0](-1, list(six.iteritems(by_topic))) + request = OffsetRequest[0](-1, list(by_topic.items())) # Client returns a future that only fails on network issues # so create a separate future and attach a callback to update it @@ -713,7 +711,7 @@ def _create_fetch_requests(self): else: version = 0 requests = {} - for node_id, partition_data in six.iteritems(fetchable): + for node_id, partition_data in fetchable.items(): if version < 3: requests[node_id] = FetchRequest[version]( -1, # replica_id @@ -755,9 +753,9 @@ def _handle_fetch_response(self, request, send_time, response): partition, offset = partition_data[:2] fetch_offsets[TopicPartition(topic, partition)] = offset - partitions = set([TopicPartition(topic, partition_data[0]) + partitions = {TopicPartition(topic, partition_data[0]) for topic, partitions in response.topics - for partition_data in partitions]) + for partition_data in partitions} metric_aggregator = FetchResponseMetricAggregator(self._sensors, partitions) # randomized ordering should improve balance for short-lived consumers @@ -866,7 +864,7 @@ def _parse_fetched_data(self, completed_fetch): return parsed_records - class PartitionRecords(object): + class PartitionRecords: def __init__(self, fetch_offset, tp, messages): self.fetch_offset = fetch_offset self.topic_partition = tp @@ -910,7 +908,7 @@ def take(self, n=None): return res -class FetchResponseMetricAggregator(object): +class FetchResponseMetricAggregator: """ Since we parse the message data for each partition from each fetch response lazily, fetch-level metrics need to be aggregated as the messages @@ -939,10 +937,10 @@ def record(self, partition, num_bytes, num_records): self.sensors.records_fetched.record(self.total_records) -class FetchManagerMetrics(object): +class FetchManagerMetrics: def __init__(self, metrics, prefix): self.metrics = metrics - self.group_name = '%s-fetch-manager-metrics' % (prefix,) + self.group_name = f'{prefix}-fetch-manager-metrics' self.bytes_fetched = metrics.sensor('bytes-fetched') self.bytes_fetched.add(metrics.metric_name('fetch-size-avg', self.group_name, @@ -986,15 +984,15 @@ def record_topic_fetch_metrics(self, topic, num_bytes, num_records): bytes_fetched = self.metrics.sensor(name) bytes_fetched.add(self.metrics.metric_name('fetch-size-avg', self.group_name, - 'The average number of bytes fetched per request for topic %s' % (topic,), + f'The average number of bytes fetched per request for topic {topic}', metric_tags), Avg()) bytes_fetched.add(self.metrics.metric_name('fetch-size-max', self.group_name, - 'The maximum number of bytes fetched per request for topic %s' % (topic,), + f'The maximum number of bytes fetched per request for topic {topic}', metric_tags), Max()) bytes_fetched.add(self.metrics.metric_name('bytes-consumed-rate', self.group_name, - 'The average number of bytes consumed per second for topic %s' % (topic,), + f'The average number of bytes consumed per second for topic {topic}', metric_tags), Rate()) bytes_fetched.record(num_bytes) @@ -1007,10 +1005,10 @@ def record_topic_fetch_metrics(self, topic, num_bytes, num_records): records_fetched = self.metrics.sensor(name) records_fetched.add(self.metrics.metric_name('records-per-request-avg', self.group_name, - 'The average number of records in each request for topic %s' % (topic,), + f'The average number of records in each request for topic {topic}', metric_tags), Avg()) records_fetched.add(self.metrics.metric_name('records-consumed-rate', self.group_name, - 'The average number of records consumed per second for topic %s' % (topic,), + f'The average number of records consumed per second for topic {topic}', metric_tags), Rate()) records_fetched.record(num_records) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index a1d1dfa37..53800a1cc 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import, division - import copy import logging import socket @@ -23,7 +21,7 @@ log = logging.getLogger(__name__) -class KafkaConsumer(six.Iterator): +class KafkaConsumer: """Consume records from a Kafka cluster. The consumer will transparently handle the failure of servers in the Kafka @@ -315,7 +313,7 @@ def __init__(self, *topics, **configs): # Only check for extra config keys in top-level class extra_configs = set(configs).difference(self.DEFAULT_CONFIG) if extra_configs: - raise KafkaConfigurationError("Unrecognized configs: %s" % (extra_configs,)) + raise KafkaConfigurationError(f"Unrecognized configs: {extra_configs}") self.config = copy.copy(self.DEFAULT_CONFIG) self.config.update(configs) @@ -968,7 +966,7 @@ def metrics(self, raw=False): return self._metrics.metrics.copy() metrics = {} - for k, v in six.iteritems(self._metrics.metrics.copy()): + for k, v in self._metrics.metrics.copy().items(): if k.group not in metrics: metrics[k.group] = {} if k.name not in metrics[k.group]: @@ -1013,7 +1011,7 @@ def offsets_for_times(self, timestamps): raise UnsupportedVersionError( "offsets_for_times API not supported for cluster version {}" .format(self.config['api_version'])) - for tp, ts in six.iteritems(timestamps): + for tp, ts in timestamps.items(): timestamps[tp] = int(ts) if ts < 0: raise ValueError( @@ -1118,7 +1116,7 @@ def _update_fetch_positions(self, partitions): def _message_generator_v2(self): timeout_ms = 1000 * (self._consumer_timeout - time.time()) record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False) - for tp, records in six.iteritems(record_map): + for tp, records in record_map.items(): # Generators are stateful, and it is possible that the tp / records # here may become stale during iteration -- i.e., we seek to a # different offset, pause consumption, or lose assignment. diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 08842d133..31102b8bc 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - import abc import logging import re @@ -13,7 +11,7 @@ log = logging.getLogger(__name__) -class SubscriptionState(object): +class SubscriptionState: """ A class for tracking the topics, partitions, and offsets for the consumer. A partition is "assigned" either directly with assign_from_user() (manual @@ -130,16 +128,16 @@ def _ensure_valid_topic_name(self, topic): # https://github.com/apache/kafka/blob/39eb31feaeebfb184d98cc5d94da9148c2319d81/clients/src/main/java/org/apache/kafka/common/internals/Topic.java if topic is None: raise TypeError('All topics must not be None') - if not isinstance(topic, six.string_types): + if not isinstance(topic, str): raise TypeError('All topics must be strings') if len(topic) == 0: raise ValueError('All topics must be non-empty strings') if topic == '.' or topic == '..': raise ValueError('Topic name cannot be "." or ".."') if len(topic) > self._MAX_NAME_LENGTH: - raise ValueError('Topic name is illegal, it can\'t be longer than {0} characters, topic: "{1}"'.format(self._MAX_NAME_LENGTH, topic)) + raise ValueError(f'Topic name is illegal, it can\'t be longer than {self._MAX_NAME_LENGTH} characters, topic: "{topic}"') if not self._TOPIC_LEGAL_CHARS.match(topic): - raise ValueError('Topic name "{0}" is illegal, it contains a character other than ASCII alphanumerics, ".", "_" and "-"'.format(topic)) + raise ValueError(f'Topic name "{topic}" is illegal, it contains a character other than ASCII alphanumerics, ".", "_" and "-"') def change_subscription(self, topics): """Change the topic subscription. @@ -157,7 +155,7 @@ def change_subscription(self, topics): if self._user_assignment: raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) - if isinstance(topics, six.string_types): + if isinstance(topics, str): topics = [topics] if self.subscription == set(topics): @@ -247,7 +245,7 @@ def assign_from_subscribed(self, assignments): for tp in assignments: if tp.topic not in self.subscription: - raise ValueError("Assigned partition %s for non-subscribed topic." % (tp,)) + raise ValueError(f"Assigned partition {tp} for non-subscribed topic.") # after rebalancing, we always reinitialize the assignment state self.assignment.clear() @@ -299,13 +297,13 @@ def assigned_partitions(self): def paused_partitions(self): """Return current set of paused TopicPartitions.""" - return set(partition for partition in self.assignment - if self.is_paused(partition)) + return {partition for partition in self.assignment + if self.is_paused(partition)} def fetchable_partitions(self): """Return set of TopicPartitions that should be Fetched.""" fetchable = set() - for partition, state in six.iteritems(self.assignment): + for partition, state in self.assignment.items(): if state.is_fetchable(): fetchable.add(partition) return fetchable @@ -317,7 +315,7 @@ def partitions_auto_assigned(self): def all_consumed_offsets(self): """Returns consumed offsets as {TopicPartition: OffsetAndMetadata}""" all_consumed = {} - for partition, state in six.iteritems(self.assignment): + for partition, state in self.assignment.items(): if state.has_valid_position: all_consumed[partition] = OffsetAndMetadata(state.position, '') return all_consumed @@ -348,7 +346,7 @@ def has_all_fetch_positions(self): def missing_fetch_positions(self): missing = set() - for partition, state in six.iteritems(self.assignment): + for partition, state in self.assignment.items(): if not state.has_valid_position: missing.add(partition) return missing @@ -372,7 +370,7 @@ def _add_assigned_partition(self, partition): self.assignment[partition] = TopicPartitionState() -class TopicPartitionState(object): +class TopicPartitionState: def __init__(self): self.committed = None # last committed OffsetAndMetadata self.has_valid_position = False # whether we have valid position @@ -420,7 +418,7 @@ def is_fetchable(self): return not self.paused and self.has_valid_position -class ConsumerRebalanceListener(object): +class ConsumerRebalanceListener: """ A callback interface that the user can implement to trigger custom actions when the set of partitions assigned to the consumer changes. diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index e71984108..d8f8ed9b0 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import, division - import abc import copy import logging @@ -21,13 +19,13 @@ log = logging.getLogger('kafka.coordinator') -class MemberState(object): +class MemberState: UNJOINED = '' # the client is not part of a group REBALANCING = '' # the client has begun rebalancing STABLE = '' # the client has joined and is sending heartbeats -class Generation(object): +class Generation: def __init__(self, generation_id, member_id, protocol): self.generation_id = generation_id self.member_id = member_id @@ -43,7 +41,7 @@ class UnjoinedGroupException(Errors.KafkaError): retriable = True -class BaseCoordinator(object): +class BaseCoordinator: """ BaseCoordinator implements group management for a single group member by interacting with a designated Kafka broker (the coordinator). Group @@ -597,7 +595,7 @@ def _on_join_leader(self, response): self._generation.member_id, [(member_id, assignment if isinstance(assignment, bytes) else assignment.encode()) - for member_id, assignment in six.iteritems(group_assignment)]) + for member_id, assignment in group_assignment.items()]) log.debug("Sending leader SyncGroup for group %s to coordinator %s: %s", self.group_id, self.coordinator_id, request) @@ -850,7 +848,7 @@ def _handle_heartbeat_response(self, future, send_time, response): future.failure(error) -class GroupCoordinatorMetrics(object): +class GroupCoordinatorMetrics: def __init__(self, heartbeat, metrics, prefix, tags=None): self.heartbeat = heartbeat self.metrics = metrics @@ -903,7 +901,7 @@ def __init__(self, heartbeat, metrics, prefix, tags=None): class HeartbeatThread(threading.Thread): def __init__(self, coordinator): - super(HeartbeatThread, self).__init__() + super().__init__() self.name = coordinator.group_id + '-heartbeat' self.coordinator = coordinator self.enabled = False diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 971f5e802..1e415fa7a 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import, division - import collections import copy import functools @@ -78,7 +76,7 @@ def __init__(self, client, subscription, metrics, **configs): True the only way to receive records from an internal topic is subscribing to it. Requires 0.10+. Default: True """ - super(ConsumerCoordinator, self).__init__(client, metrics, **configs) + super().__init__(client, metrics, **configs) self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: @@ -129,7 +127,7 @@ def __init__(self, client, subscription, metrics, **configs): def __del__(self): if hasattr(self, '_cluster') and self._cluster: self._cluster.remove_listener(WeakMethod(self._handle_metadata_update)) - super(ConsumerCoordinator, self).__del__() + super().__del__() def protocol_type(self): return ConsumerProtocol.PROTOCOL_TYPE @@ -218,7 +216,7 @@ def _on_join_complete(self, generation, member_id, protocol, self._assignment_snapshot = None assignor = self._lookup_assignor(protocol) - assert assignor, 'Coordinator selected invalid assignment protocol: %s' % (protocol,) + assert assignor, f'Coordinator selected invalid assignment protocol: {protocol}' assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes) @@ -305,7 +303,7 @@ def time_to_next_poll(self): def _perform_assignment(self, leader_id, assignment_strategy, members): assignor = self._lookup_assignor(assignment_strategy) - assert assignor, 'Invalid assignment protocol: %s' % (assignment_strategy,) + assert assignor, f'Invalid assignment protocol: {assignment_strategy}' member_metadata = {} all_subscribed_topics = set() for member_id, metadata_bytes in members: @@ -336,7 +334,7 @@ def _perform_assignment(self, leader_id, assignment_strategy, members): log.debug("Finished assignment for group %s: %s", self.group_id, assignments) group_assignment = {} - for member_id, assignment in six.iteritems(assignments): + for member_id, assignment in assignments.items(): group_assignment[member_id] = assignment return group_assignment @@ -381,13 +379,13 @@ def need_rejoin(self): and self._joined_subscription != self._subscription.subscription): return True - return super(ConsumerCoordinator, self).need_rejoin() + return super().need_rejoin() def refresh_committed_offsets_if_needed(self): """Fetch committed offsets for assigned partitions.""" if self._subscription.needs_fetch_committed_offsets: offsets = self.fetch_committed_offsets(self._subscription.assigned_partitions()) - for partition, offset in six.iteritems(offsets): + for partition, offset in offsets.items(): # verify assignment is still active if self._subscription.is_assigned(partition): self._subscription.assignment[partition].committed = offset @@ -433,7 +431,7 @@ def close(self, autocommit=True): if autocommit: self._maybe_auto_commit_offsets_sync() finally: - super(ConsumerCoordinator, self).close() + super().close() def _invoke_completed_offset_commit_callbacks(self): while self.completed_offset_commits: @@ -568,7 +566,7 @@ def _send_offset_commit_request(self, offsets): # create the offset commit request offset_data = collections.defaultdict(dict) - for tp, offset in six.iteritems(offsets): + for tp, offset in offsets.items(): offset_data[tp.topic][tp.partition] = offset if self._subscription.partitions_auto_assigned(): @@ -593,8 +591,8 @@ def _send_offset_commit_request(self, offsets): partition, offset.offset, offset.metadata - ) for partition, offset in six.iteritems(partitions)] - ) for topic, partitions in six.iteritems(offset_data)] + ) for partition, offset in partitions.items()] + ) for topic, partitions in offset_data.items()] ) elif self.config['api_version'] >= (0, 8, 2): request = OffsetCommitRequest[1]( @@ -605,8 +603,8 @@ def _send_offset_commit_request(self, offsets): offset.offset, -1, offset.metadata - ) for partition, offset in six.iteritems(partitions)] - ) for topic, partitions in six.iteritems(offset_data)] + ) for partition, offset in partitions.items()] + ) for topic, partitions in offset_data.items()] ) elif self.config['api_version'] >= (0, 8, 1): request = OffsetCommitRequest[0]( @@ -616,8 +614,8 @@ def _send_offset_commit_request(self, offsets): partition, offset.offset, offset.metadata - ) for partition, offset in six.iteritems(partitions)] - ) for topic, partitions in six.iteritems(offset_data)] + ) for partition, offset in partitions.items()] + ) for topic, partitions in offset_data.items()] ) log.debug("Sending offset-commit request with %s for group %s to %s", @@ -809,10 +807,10 @@ def _maybe_auto_commit_offsets_async(self): self._commit_offsets_async_on_complete) -class ConsumerCoordinatorMetrics(object): +class ConsumerCoordinatorMetrics: def __init__(self, metrics, metric_group_prefix, subscription): self.metrics = metrics - self.metric_group_name = '%s-coordinator-metrics' % (metric_group_prefix,) + self.metric_group_name = f'{metric_group_prefix}-coordinator-metrics' self.commit_latency = metrics.sensor('commit-latency') self.commit_latency.add(metrics.metric_name( diff --git a/kafka/coordinator/heartbeat.py b/kafka/coordinator/heartbeat.py index 2f5930b63..b12159cdd 100644 --- a/kafka/coordinator/heartbeat.py +++ b/kafka/coordinator/heartbeat.py @@ -1,10 +1,8 @@ -from __future__ import absolute_import, division - import copy import time -class Heartbeat(object): +class Heartbeat: DEFAULT_CONFIG = { 'group_id': None, 'heartbeat_interval_ms': 3000, diff --git a/kafka/coordinator/protocol.py b/kafka/coordinator/protocol.py index 56a390159..97efc1c84 100644 --- a/kafka/coordinator/protocol.py +++ b/kafka/coordinator/protocol.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - from kafka.protocol.struct import Struct from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String from kafka.structs import TopicPartition @@ -26,7 +24,7 @@ def partitions(self): for partition in partitions] -class ConsumerProtocol(object): +class ConsumerProtocol: PROTOCOL_TYPE = 'consumer' ASSIGNMENT_STRATEGIES = ('range', 'roundrobin') METADATA = ConsumerProtocolMemberMetadata diff --git a/kafka/errors.py b/kafka/errors.py index b33cf51e2..cb3ff285f 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - import inspect import sys @@ -12,8 +10,8 @@ class KafkaError(RuntimeError): def __str__(self): if not self.args: return self.__class__.__name__ - return '{0}: {1}'.format(self.__class__.__name__, - super(KafkaError, self).__str__()) + return '{}: {}'.format(self.__class__.__name__, + super().__str__()) class IllegalStateError(KafkaError): @@ -68,7 +66,7 @@ class IncompatibleBrokerVersion(KafkaError): class CommitFailedError(KafkaError): def __init__(self, *args, **kwargs): - super(CommitFailedError, self).__init__( + super().__init__( """Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() @@ -96,9 +94,9 @@ class BrokerResponseError(KafkaError): def __str__(self): """Add errno to standard KafkaError str""" - return '[Error {0}] {1}'.format( + return '[Error {}] {}'.format( self.errno, - super(BrokerResponseError, self).__str__()) + super().__str__()) class NoError(BrokerResponseError): @@ -471,7 +469,7 @@ class KafkaTimeoutError(KafkaError): class FailedPayloadsError(KafkaError): def __init__(self, payload, *args): - super(FailedPayloadsError, self).__init__(*args) + super().__init__(*args) self.payload = payload @@ -498,7 +496,7 @@ class QuotaViolationError(KafkaError): class AsyncProducerQueueFull(KafkaError): def __init__(self, failed_msgs, *args): - super(AsyncProducerQueueFull, self).__init__(*args) + super().__init__(*args) self.failed_msgs = failed_msgs @@ -508,7 +506,7 @@ def _iter_broker_errors(): yield obj -kafka_errors = dict([(x.errno, x) for x in _iter_broker_errors()]) +kafka_errors = {x.errno: x for x in _iter_broker_errors()} def for_code(error_code): diff --git a/kafka/future.py b/kafka/future.py index d0f3c6658..e9f534611 100644 --- a/kafka/future.py +++ b/kafka/future.py @@ -1,12 +1,10 @@ -from __future__ import absolute_import - import functools import logging log = logging.getLogger(__name__) -class Future(object): +class Future: error_on_callbacks = False # and errbacks def __init__(self): diff --git a/kafka/metrics/__init__.py b/kafka/metrics/__init__.py index 2a62d6334..22427e967 100644 --- a/kafka/metrics/__init__.py +++ b/kafka/metrics/__init__.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - from kafka.metrics.compound_stat import NamedMeasurable from kafka.metrics.dict_reporter import DictReporter from kafka.metrics.kafka_metric import KafkaMetric diff --git a/kafka/metrics/compound_stat.py b/kafka/metrics/compound_stat.py index ac92480dc..260714256 100644 --- a/kafka/metrics/compound_stat.py +++ b/kafka/metrics/compound_stat.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - import abc from kafka.metrics.stat import AbstractStat @@ -20,7 +18,7 @@ def stats(self): raise NotImplementedError -class NamedMeasurable(object): +class NamedMeasurable: def __init__(self, metric_name, measurable_stat): self._name = metric_name self._stat = measurable_stat diff --git a/kafka/metrics/dict_reporter.py b/kafka/metrics/dict_reporter.py index 0b98fe1e4..bc8088ca9 100644 --- a/kafka/metrics/dict_reporter.py +++ b/kafka/metrics/dict_reporter.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - import logging import threading @@ -29,10 +27,10 @@ def snapshot(self): } } """ - return dict((category, dict((name, metric.value()) - for name, metric in list(metrics.items()))) + return {category: {name: metric.value() + for name, metric in list(metrics.items())} for category, metrics in - list(self._store.items())) + list(self._store.items())} def init(self, metrics): for metric in metrics: @@ -71,7 +69,7 @@ def get_category(self, metric): prefix = None, group = 'bar', tags = None returns: 'bar' """ - tags = ','.join('%s=%s' % (k, v) for k, v in + tags = ','.join(f'{k}={v}' for k, v in sorted(metric.metric_name.tags.items())) return '.'.join(x for x in [self._prefix, metric.metric_name.group, tags] if x) diff --git a/kafka/metrics/kafka_metric.py b/kafka/metrics/kafka_metric.py index 9fb8d89f1..40d74952a 100644 --- a/kafka/metrics/kafka_metric.py +++ b/kafka/metrics/kafka_metric.py @@ -1,9 +1,7 @@ -from __future__ import absolute_import - import time -class KafkaMetric(object): +class KafkaMetric: # NOTE java constructor takes a lock instance def __init__(self, metric_name, measurable, config): if not metric_name: diff --git a/kafka/metrics/measurable.py b/kafka/metrics/measurable.py index b06d4d789..fd5be1205 100644 --- a/kafka/metrics/measurable.py +++ b/kafka/metrics/measurable.py @@ -1,9 +1,7 @@ -from __future__ import absolute_import - import abc -class AbstractMeasurable(object): +class AbstractMeasurable: """A measurable quantity that can be registered as a metric""" @abc.abstractmethod def measure(self, config, now): diff --git a/kafka/metrics/measurable_stat.py b/kafka/metrics/measurable_stat.py index 4487adf6e..dba887d2b 100644 --- a/kafka/metrics/measurable_stat.py +++ b/kafka/metrics/measurable_stat.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - import abc from kafka.metrics.measurable import AbstractMeasurable diff --git a/kafka/metrics/metric_config.py b/kafka/metrics/metric_config.py index 2e55abfcb..39a5b4168 100644 --- a/kafka/metrics/metric_config.py +++ b/kafka/metrics/metric_config.py @@ -1,9 +1,7 @@ -from __future__ import absolute_import - import sys -class MetricConfig(object): +class MetricConfig: """Configuration values for metrics""" def __init__(self, quota=None, samples=2, event_window=sys.maxsize, time_window_ms=30 * 1000, tags=None): diff --git a/kafka/metrics/metric_name.py b/kafka/metrics/metric_name.py index b5acd1662..5739c64f0 100644 --- a/kafka/metrics/metric_name.py +++ b/kafka/metrics/metric_name.py @@ -1,9 +1,7 @@ -from __future__ import absolute_import - import copy -class MetricName(object): +class MetricName: """ This class encapsulates a metric's name, logical group and its related attributes (tags). @@ -102,5 +100,5 @@ def __ne__(self, other): return not self.__eq__(other) def __str__(self): - return 'MetricName(name=%s, group=%s, description=%s, tags=%s)' % ( + return 'MetricName(name={}, group={}, description={}, tags={})'.format( self.name, self.group, self.description, self.tags) diff --git a/kafka/metrics/metrics.py b/kafka/metrics/metrics.py index 2c53488ff..67d609d08 100644 --- a/kafka/metrics/metrics.py +++ b/kafka/metrics/metrics.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - import logging import sys import time @@ -11,7 +9,7 @@ logger = logging.getLogger(__name__) -class Metrics(object): +class Metrics: """ A registry of sensors and metrics. @@ -230,7 +228,7 @@ def register_metric(self, metric): for reporter in self._reporters: reporter.metric_change(metric) - class ExpireSensorTask(object): + class ExpireSensorTask: """ This iterates over every Sensor and triggers a remove_sensor if it has expired. Package private for testing diff --git a/kafka/metrics/metrics_reporter.py b/kafka/metrics/metrics_reporter.py index d8bd12b3b..3f0fe189b 100644 --- a/kafka/metrics/metrics_reporter.py +++ b/kafka/metrics/metrics_reporter.py @@ -1,9 +1,7 @@ -from __future__ import absolute_import - import abc -class AbstractMetricsReporter(object): +class AbstractMetricsReporter: """ An abstract class to allow things to listen as new metrics are created so they can be reported. diff --git a/kafka/metrics/quota.py b/kafka/metrics/quota.py index 4d1b0d6cb..5ec5d13d1 100644 --- a/kafka/metrics/quota.py +++ b/kafka/metrics/quota.py @@ -1,7 +1,4 @@ -from __future__ import absolute_import - - -class Quota(object): +class Quota: """An upper or lower bound for metrics""" def __init__(self, bound, is_upper): self._bound = bound diff --git a/kafka/metrics/stat.py b/kafka/metrics/stat.py index 9fd2f01ec..daf01935d 100644 --- a/kafka/metrics/stat.py +++ b/kafka/metrics/stat.py @@ -1,9 +1,7 @@ -from __future__ import absolute_import - import abc -class AbstractStat(object): +class AbstractStat: """ An AbstractStat is a quantity such as average, max, etc that is computed off the stream of updates to a sensor diff --git a/kafka/oauth/__init__.py b/kafka/oauth/__init__.py index 8c8349564..f4d780892 100644 --- a/kafka/oauth/__init__.py +++ b/kafka/oauth/__init__.py @@ -1,3 +1 @@ -from __future__ import absolute_import - from kafka.oauth.abstract import AbstractTokenProvider diff --git a/kafka/oauth/abstract.py b/kafka/oauth/abstract.py index 8d89ff51d..fa5f4eb99 100644 --- a/kafka/oauth/abstract.py +++ b/kafka/oauth/abstract.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - import abc # This statement is compatible with both Python 2.7 & 3+ diff --git a/kafka/partitioner/__init__.py b/kafka/partitioner/__init__.py index 21a3bbb66..eed1dca69 100644 --- a/kafka/partitioner/__init__.py +++ b/kafka/partitioner/__init__.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - from kafka.partitioner.default import DefaultPartitioner, murmur2 diff --git a/kafka/partitioner/default.py b/kafka/partitioner/default.py index d0914c682..13fef6b76 100644 --- a/kafka/partitioner/default.py +++ b/kafka/partitioner/default.py @@ -1,11 +1,9 @@ -from __future__ import absolute_import - import random from kafka.vendor import six -class DefaultPartitioner(object): +class DefaultPartitioner: """Default partitioner. Hashes key to partition using murmur2 hashing (from java client) diff --git a/kafka/producer/__init__.py b/kafka/producer/__init__.py index 576c772a0..869dbb3dc 100644 --- a/kafka/producer/__init__.py +++ b/kafka/producer/__init__.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - from kafka.producer.kafka import KafkaProducer __all__ = [ diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py index 100801700..0e5b57b93 100644 --- a/kafka/producer/buffer.py +++ b/kafka/producer/buffer.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import, division - import collections import io import threading @@ -10,7 +8,7 @@ import kafka.errors as Errors -class SimpleBufferPool(object): +class SimpleBufferPool: """A simple pool of BytesIO objects with a weak memory ceiling.""" def __init__(self, memory, poolable_size, metrics=None, metric_group_prefix='producer-metrics'): """Create a new buffer pool. diff --git a/kafka/producer/future.py b/kafka/producer/future.py index 07fa4adb4..72a4d3985 100644 --- a/kafka/producer/future.py +++ b/kafka/producer/future.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - import collections import threading @@ -9,17 +7,17 @@ class FutureProduceResult(Future): def __init__(self, topic_partition): - super(FutureProduceResult, self).__init__() + super().__init__() self.topic_partition = topic_partition self._latch = threading.Event() def success(self, value): - ret = super(FutureProduceResult, self).success(value) + ret = super().success(value) self._latch.set() return ret def failure(self, error): - ret = super(FutureProduceResult, self).failure(error) + ret = super().failure(error) self._latch.set() return ret @@ -30,7 +28,7 @@ def wait(self, timeout=None): class FutureRecordMetadata(Future): def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size): - super(FutureRecordMetadata, self).__init__() + super().__init__() self._produce_future = produce_future # packing args as a tuple is a minor speed optimization self.args = (relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size) @@ -59,7 +57,7 @@ def _produce_success(self, offset_and_timestamp): def get(self, timeout=None): if not self.is_done and not self._produce_future.wait(timeout): raise Errors.KafkaTimeoutError( - "Timeout after waiting for %s secs." % (timeout,)) + f"Timeout after waiting for {timeout} secs.") assert self.is_done if self.failed(): raise self.exception # pylint: disable-msg=raising-bad-type diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index dd1cc508c..8e19fe27b 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - import atexit import copy import logging @@ -28,7 +26,7 @@ PRODUCER_CLIENT_ID_SEQUENCE = AtomicInteger() -class KafkaProducer(object): +class KafkaProducer: """A Kafka client that publishes records to the Kafka cluster. The producer is thread safe and sharing a single producer instance across @@ -353,7 +351,7 @@ def __init__(self, **configs): self.config[key] = configs.pop(key) # Only check for extra config keys in top-level class - assert not configs, 'Unrecognized configs: %s' % (configs,) + assert not configs, f'Unrecognized configs: {configs}' if self.config['client_id'] is None: self.config['client_id'] = 'kafka-python-producer-%s' % \ @@ -398,10 +396,10 @@ def __init__(self, **configs): # Check compression_type for library support ct = self.config['compression_type'] if ct not in self._COMPRESSORS: - raise ValueError("Not supported codec: {}".format(ct)) + raise ValueError(f"Not supported codec: {ct}") else: checker, compression_attrs = self._COMPRESSORS[ct] - assert checker(), "Libraries for {} compression codec not found".format(ct) + assert checker(), f"Libraries for {ct} compression codec not found" self.config['compression_attrs'] = compression_attrs message_version = self._max_usable_produce_magic() @@ -453,7 +451,7 @@ def _unregister_cleanup(self): def __del__(self): # Disable logger during destruction to avoid touching dangling references - class NullLogger(object): + class NullLogger: def __getattr__(self, name): return lambda *args: None @@ -703,7 +701,7 @@ def _wait_on_metadata(self, topic, max_wait): elapsed = time.time() - begin if not metadata_event.is_set(): raise Errors.KafkaTimeoutError( - "Failed to update metadata after %.1f secs." % (max_wait,)) + f"Failed to update metadata after {max_wait:.1f} secs.") elif topic in self._metadata.unauthorized_topics: raise Errors.TopicAuthorizationFailedError(topic) else: @@ -743,7 +741,7 @@ def metrics(self, raw=False): return self._metrics.metrics.copy() metrics = {} - for k, v in six.iteritems(self._metrics.metrics.copy()): + for k, v in self._metrics.metrics.copy().items(): if k.group not in metrics: metrics[k.group] = {} if k.name not in metrics[k.group]: diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index a2aa0e8ec..fc4bfeb30 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - import collections import copy import logging @@ -16,7 +14,7 @@ log = logging.getLogger(__name__) -class AtomicInteger(object): +class AtomicInteger: def __init__(self, val=0): self._lock = threading.Lock() self._val = val @@ -35,7 +33,7 @@ def get(self): return self._val -class ProducerBatch(object): +class ProducerBatch: def __init__(self, tp, records, buffer): self.max_record_size = 0 now = time.time() @@ -110,7 +108,7 @@ def maybe_expire(self, request_timeout_ms, retry_backoff_ms, linger_ms, is_full) if error: self.records.close() self.done(-1, None, Errors.KafkaTimeoutError( - "Batch for %s containing %s record(s) expired: %s" % ( + "Batch for {} containing {} record(s) expired: {}".format( self.topic_partition, self.records.next_offset(), error))) return True return False @@ -129,7 +127,7 @@ def __str__(self): self.topic_partition, self.records.next_offset()) -class RecordAccumulator(object): +class RecordAccumulator: """ This class maintains a dequeue per TopicPartition that accumulates messages into MessageSets to be sent to the server. @@ -570,7 +568,7 @@ def close(self): self._closed = True -class IncompleteProducerBatches(object): +class IncompleteProducerBatches: """A threadsafe helper class to hold ProducerBatches that haven't been ack'd yet""" def __init__(self): diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 35688d3f1..132b68d47 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import, division - import collections import copy import logging @@ -35,7 +33,7 @@ class Sender(threading.Thread): } def __init__(self, client, metadata, accumulator, metrics, **configs): - super(Sender, self).__init__() + super().__init__() self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: if key in configs: @@ -118,7 +116,7 @@ def run_once(self): if self.config['guarantee_message_order']: # Mute all the partitions drained - for batch_list in six.itervalues(batches_by_node): + for batch_list in batches_by_node.values(): for batch in batch_list: self._accumulator.muted.add(batch.topic_partition) @@ -142,7 +140,7 @@ def run_once(self): log.debug("Created %d produce requests: %s", len(requests), requests) # trace poll_timeout_ms = 0 - for node_id, request in six.iteritems(requests): + for node_id, request in requests.items(): batches = batches_by_node[node_id] log.debug('Sending Produce Request: %r', request) (self._client.send(node_id, request, wakeup=False) @@ -190,8 +188,8 @@ def _handle_produce_response(self, node_id, send_time, batches, response): # if we have a response, parse it log.debug('Parsing produce response: %r', response) if response: - batches_by_partition = dict([(batch.topic_partition, batch) - for batch in batches]) + batches_by_partition = {batch.topic_partition: batch + for batch in batches} for topic, partitions in response.topics: for partition_info in partitions: @@ -281,7 +279,7 @@ def _create_produce_requests(self, collated): dict: {node_id: ProduceRequest} (version depends on api_version) """ requests = {} - for node_id, batches in six.iteritems(collated): + for node_id, batches in collated.items(): requests[node_id] = self._produce_request( node_id, self.config['acks'], self.config['request_timeout_ms'], batches) @@ -324,7 +322,7 @@ def _produce_request(self, node_id, acks, timeout, batches): timeout=timeout, topics=[(topic, list(partition_info.items())) for topic, partition_info - in six.iteritems(produce_records_by_partition)], + in produce_records_by_partition.items()], **kwargs ) @@ -336,7 +334,7 @@ def bootstrap_connected(self): return self._client.bootstrap_connected() -class SenderMetrics(object): +class SenderMetrics: def __init__(self, metrics, client, metadata): self.metrics = metrics @@ -434,7 +432,7 @@ def add_metric(self, metric_name, measurable, group_name='producer-metrics', def maybe_register_topic_metrics(self, topic): def sensor_name(name): - return 'topic.{0}.{1}'.format(topic, name) + return f'topic.{topic}.{name}' # if one sensor of the metrics has been registered for the topic, # then all other sensors should have been registered; and vice versa diff --git a/kafka/protocol/__init__.py b/kafka/protocol/__init__.py index 025447f99..ff9c68306 100644 --- a/kafka/protocol/__init__.py +++ b/kafka/protocol/__init__.py @@ -1,6 +1,3 @@ -from __future__ import absolute_import - - API_KEYS = { 0: 'Produce', 1: 'Fetch', diff --git a/kafka/protocol/abstract.py b/kafka/protocol/abstract.py index 2de65c4bb..10eed5649 100644 --- a/kafka/protocol/abstract.py +++ b/kafka/protocol/abstract.py @@ -1,9 +1,7 @@ -from __future__ import absolute_import - import abc -class AbstractType(object): +class AbstractType: __metaclass__ = abc.ABCMeta @abc.abstractmethod diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index 0bb1a7acc..6109d90f9 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - from kafka.protocol.api import Request, Response from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String, Float64, CompactString, CompactArray, TaggedFields diff --git a/kafka/protocol/api.py b/kafka/protocol/api.py index f12cb972b..24cf61a62 100644 --- a/kafka/protocol/api.py +++ b/kafka/protocol/api.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - import abc from kafka.protocol.struct import Struct @@ -15,7 +13,7 @@ class RequestHeader(Struct): ) def __init__(self, request, correlation_id=0, client_id='kafka-python'): - super(RequestHeader, self).__init__( + super().__init__( request.API_KEY, request.API_VERSION, correlation_id, client_id ) @@ -31,7 +29,7 @@ class RequestHeaderV2(Struct): ) def __init__(self, request, correlation_id=0, client_id='kafka-python', tags=None): - super(RequestHeaderV2, self).__init__( + super().__init__( request.API_KEY, request.API_VERSION, correlation_id, client_id, tags or {} ) diff --git a/kafka/protocol/commit.py b/kafka/protocol/commit.py index 31fc23707..0fb2b646f 100644 --- a/kafka/protocol/commit.py +++ b/kafka/protocol/commit.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - from kafka.protocol.api import Request, Response from kafka.protocol.types import Array, Int8, Int16, Int32, Int64, Schema, String diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py index f367848ce..3b742b357 100644 --- a/kafka/protocol/fetch.py +++ b/kafka/protocol/fetch.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - from kafka.protocol.api import Request, Response from kafka.protocol.types import Array, Int8, Int16, Int32, Int64, Schema, String, Bytes diff --git a/kafka/protocol/frame.py b/kafka/protocol/frame.py index 7b4a32bcf..10ebf7c9b 100644 --- a/kafka/protocol/frame.py +++ b/kafka/protocol/frame.py @@ -1,6 +1,6 @@ class KafkaBytes(bytearray): def __init__(self, size): - super(KafkaBytes, self).__init__(size) + super().__init__(size) self._idx = 0 def read(self, nbytes=None): diff --git a/kafka/protocol/group.py b/kafka/protocol/group.py index bcb96553b..68efdc8f9 100644 --- a/kafka/protocol/group.py +++ b/kafka/protocol/group.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - from kafka.protocol.api import Request, Response from kafka.protocol.struct import Struct from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index 4c5c031b8..b07d4eb0e 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - import io import time @@ -78,7 +76,7 @@ def _encode_self(self, recalc_crc=True): elif version == 0: fields = (self.crc, self.magic, self.attributes, self.key, self.value) else: - raise ValueError('Unrecognized message version: %s' % (version,)) + raise ValueError(f'Unrecognized message version: {version}') message = Message.SCHEMAS[version].encode(fields) if not recalc_crc: return message @@ -94,7 +92,7 @@ def decode(cls, data): data = io.BytesIO(data) # Partial decode required to determine message version base_fields = cls.SCHEMAS[0].fields[0:3] - crc, magic, attributes = [field.decode(data) for field in base_fields] + crc, magic, attributes = (field.decode(data) for field in base_fields) remaining = cls.SCHEMAS[magic].fields[3:] fields = [field.decode(data) for field in remaining] if magic == 1: @@ -147,7 +145,7 @@ def __hash__(self): class PartialMessage(bytes): def __repr__(self): - return 'PartialMessage(%s)' % (self,) + return f'PartialMessage({self})' class MessageSet(AbstractType): diff --git a/kafka/protocol/metadata.py b/kafka/protocol/metadata.py index 414e5b84a..041444100 100644 --- a/kafka/protocol/metadata.py +++ b/kafka/protocol/metadata.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - from kafka.protocol.api import Request, Response from kafka.protocol.types import Array, Boolean, Int16, Int32, Schema, String diff --git a/kafka/protocol/offset.py b/kafka/protocol/offset.py index 1ed382b0d..34b00a40b 100644 --- a/kafka/protocol/offset.py +++ b/kafka/protocol/offset.py @@ -1,12 +1,10 @@ -from __future__ import absolute_import - from kafka.protocol.api import Request, Response from kafka.protocol.types import Array, Int8, Int16, Int32, Int64, Schema, String UNKNOWN_OFFSET = -1 -class OffsetResetStrategy(object): +class OffsetResetStrategy: LATEST = -1 EARLIEST = -2 NONE = 0 diff --git a/kafka/protocol/parser.py b/kafka/protocol/parser.py index a9e767220..a667105ad 100644 --- a/kafka/protocol/parser.py +++ b/kafka/protocol/parser.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - import collections import logging @@ -12,7 +10,7 @@ log = logging.getLogger(__name__) -class KafkaProtocol(object): +class KafkaProtocol: """Manage the kafka network protocol Use an instance of KafkaProtocol to manage bytes send/recv'd diff --git a/kafka/protocol/pickle.py b/kafka/protocol/pickle.py index d6e5fa74f..cd73b3add 100644 --- a/kafka/protocol/pickle.py +++ b/kafka/protocol/pickle.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - try: import copyreg # pylint: disable=import-error except ImportError: diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py index 9b3f6bf55..b62430c22 100644 --- a/kafka/protocol/produce.py +++ b/kafka/protocol/produce.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - from kafka.protocol.api import Request, Response from kafka.protocol.types import Int16, Int32, Int64, String, Array, Schema, Bytes diff --git a/kafka/protocol/struct.py b/kafka/protocol/struct.py index e9da6e6c1..eb08ac8ef 100644 --- a/kafka/protocol/struct.py +++ b/kafka/protocol/struct.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - from io import BytesIO from kafka.protocol.abstract import AbstractType @@ -57,7 +55,7 @@ def get_item(self, name): def __repr__(self): key_vals = [] for name, field in zip(self.SCHEMA.names, self.SCHEMA.fields): - key_vals.append('%s=%s' % (name, field.repr(self.__dict__[name]))) + key_vals.append(f'{name}={field.repr(self.__dict__[name])}') return self.__class__.__name__ + '(' + ', '.join(key_vals) + ')' def __hash__(self): diff --git a/kafka/protocol/types.py b/kafka/protocol/types.py index 0e3685d73..0118af11b 100644 --- a/kafka/protocol/types.py +++ b/kafka/protocol/types.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - import struct from struct import error @@ -175,7 +173,7 @@ def repr(self, value): field_val = getattr(value, self.names[i]) except AttributeError: field_val = value[i] - key_vals.append('%s=%s' % (self.names[i], self.fields[i].repr(field_val))) + key_vals.append(f'{self.names[i]}={self.fields[i].repr(field_val)}') return '(' + ', '.join(key_vals) + ')' except Exception: return repr(value) @@ -223,7 +221,7 @@ def decode(cls, data): value |= (b & 0x7f) << i i += 7 if i > 28: - raise ValueError('Invalid value {}'.format(value)) + raise ValueError(f'Invalid value {value}') value |= b << i return value @@ -263,7 +261,7 @@ def decode(cls, data): value |= (b & 0x7f) << i i += 7 if i > 63: - raise ValueError('Invalid value {}'.format(value)) + raise ValueError(f'Invalid value {value}') value |= b << i return (value >> 1) ^ -(value & 1) @@ -309,7 +307,7 @@ def decode(cls, data): for i in range(num_fields): tag = UnsignedVarInt32.decode(data) if tag <= prev_tag: - raise ValueError('Invalid or out-of-order tag {}'.format(tag)) + raise ValueError(f'Invalid or out-of-order tag {tag}') prev_tag = tag size = UnsignedVarInt32.decode(data) val = data.read(size) @@ -321,8 +319,8 @@ def encode(cls, value): ret = UnsignedVarInt32.encode(len(value)) for k, v in value.items(): # do we allow for other data types ?? It could get complicated really fast - assert isinstance(v, bytes), 'Value {} is not a byte array'.format(v) - assert isinstance(k, int) and k > 0, 'Key {} is not a positive integer'.format(k) + assert isinstance(v, bytes), f'Value {v} is not a byte array' + assert isinstance(k, int) and k > 0, f'Key {k} is not a positive integer' ret += UnsignedVarInt32.encode(k) ret += v return ret diff --git a/kafka/record/abc.py b/kafka/record/abc.py index 8509e23e5..f45176051 100644 --- a/kafka/record/abc.py +++ b/kafka/record/abc.py @@ -1,8 +1,7 @@ -from __future__ import absolute_import import abc -class ABCRecord(object): +class ABCRecord: __metaclass__ = abc.ABCMeta __slots__ = () @@ -44,7 +43,7 @@ def headers(self): """ -class ABCRecordBatchBuilder(object): +class ABCRecordBatchBuilder: __metaclass__ = abc.ABCMeta __slots__ = () @@ -84,7 +83,7 @@ def build(self): """ -class ABCRecordBatch(object): +class ABCRecordBatch: """ For v2 encapsulates a RecordBatch, for v0/v1 a single (maybe compressed) message. """ @@ -98,7 +97,7 @@ def __iter__(self): """ -class ABCRecords(object): +class ABCRecords: __metaclass__ = abc.ABCMeta __slots__ = () diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index a098c42a9..5045f31ee 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -68,7 +68,7 @@ import kafka.codec as codecs -class DefaultRecordBase(object): +class DefaultRecordBase: __slots__ = () @@ -116,7 +116,7 @@ def _assert_has_codec(self, compression_type): checker, name = codecs.has_zstd, "zstd" if not checker(): raise UnsupportedCodecError( - "Libraries for {} compression codec not found".format(name)) + f"Libraries for {name} compression codec not found") class DefaultRecordBatch(DefaultRecordBase, ABCRecordBatch): @@ -247,7 +247,7 @@ def _read_msg( h_key_len, pos = decode_varint(buffer, pos) if h_key_len < 0: raise CorruptRecordException( - "Invalid negative header key size {}".format(h_key_len)) + f"Invalid negative header key size {h_key_len}") h_key = buffer[pos: pos + h_key_len].decode("utf-8") pos += h_key_len @@ -287,7 +287,7 @@ def __next__(self): msg = self._read_msg() except (ValueError, IndexError) as err: raise CorruptRecordException( - "Found invalid record structure: {!r}".format(err)) + f"Found invalid record structure: {err!r}") else: self._next_record_index += 1 return msg @@ -421,10 +421,10 @@ def append(self, offset, timestamp, key, value, headers, raise TypeError(timestamp) if not (key is None or get_type(key) in byte_like): raise TypeError( - "Not supported type for key: {}".format(type(key))) + f"Not supported type for key: {type(key)}") if not (value is None or get_type(value) in byte_like): raise TypeError( - "Not supported type for value: {}".format(type(value))) + f"Not supported type for value: {type(value)}") # We will always add the first message, so those will be set if self._first_timestamp is None: @@ -598,7 +598,7 @@ def estimate_size_in_bytes(cls, key, value, headers): ) -class DefaultRecordMetadata(object): +class DefaultRecordMetadata: __slots__ = ("_size", "_timestamp", "_offset") diff --git a/kafka/record/legacy_records.py b/kafka/record/legacy_records.py index 2f8523fcb..9ab8873ca 100644 --- a/kafka/record/legacy_records.py +++ b/kafka/record/legacy_records.py @@ -55,7 +55,7 @@ from kafka.errors import CorruptRecordException, UnsupportedCodecError -class LegacyRecordBase(object): +class LegacyRecordBase: __slots__ = () @@ -124,7 +124,7 @@ def _assert_has_codec(self, compression_type): checker, name = codecs.has_lz4, "lz4" if not checker(): raise UnsupportedCodecError( - "Libraries for {} compression codec not found".format(name)) + f"Libraries for {name} compression codec not found") class LegacyRecordBatch(ABCRecordBatch, LegacyRecordBase): @@ -367,11 +367,11 @@ def append(self, offset, timestamp, key, value, headers=None): if not (key is None or isinstance(key, (bytes, bytearray, memoryview))): raise TypeError( - "Not supported type for key: {}".format(type(key))) + f"Not supported type for key: {type(key)}") if not (value is None or isinstance(value, (bytes, bytearray, memoryview))): raise TypeError( - "Not supported type for value: {}".format(type(value))) + f"Not supported type for value: {type(value)}") # Check if we have room for another message pos = len(self._buffer) @@ -514,7 +514,7 @@ def estimate_size_in_bytes(cls, magic, compression_type, key, value): return cls.LOG_OVERHEAD + cls.record_size(magic, key, value) -class LegacyRecordMetadata(object): +class LegacyRecordMetadata: __slots__ = ("_crc", "_size", "_timestamp", "_offset") diff --git a/kafka/record/memory_records.py b/kafka/record/memory_records.py index fc2ef2d6b..7a604887c 100644 --- a/kafka/record/memory_records.py +++ b/kafka/record/memory_records.py @@ -18,7 +18,6 @@ # # So we can iterate over batches just by knowing offsets of Length. Magic is # used to construct the correct class for Batch itself. -from __future__ import division import struct @@ -110,7 +109,7 @@ def next_batch(self, _min_slice=MIN_SLICE, return DefaultRecordBatch(next_slice) -class MemoryRecordsBuilder(object): +class MemoryRecordsBuilder: __slots__ = ("_builder", "_batch_size", "_buffer", "_next_offset", "_closed", "_bytes_written") diff --git a/kafka/sasl/__init__.py b/kafka/sasl/__init__.py index 4a7f21a5f..337c90949 100644 --- a/kafka/sasl/__init__.py +++ b/kafka/sasl/__init__.py @@ -49,6 +49,6 @@ def try_authenticate(conn: BrokerConncetion, future: -> Future): .format(key) ) if key in MECHANISMS: - log.warning('Overriding existing SASL mechanism {}'.format(key)) + log.warning(f'Overriding existing SASL mechanism {key}') MECHANISMS[key] = module diff --git a/kafka/sasl/msk.py b/kafka/sasl/msk.py index 3f2d054e7..83a203270 100644 --- a/kafka/sasl/msk.py +++ b/kafka/sasl/msk.py @@ -76,7 +76,7 @@ def __init__(self, host, access_key, secret_key, region, token=None): self.version = '2020_10_22' self.service = 'kafka-cluster' - self.action = '{}:Connect'.format(self.service) + self.action = f'{self.service}:Connect' now = datetime.datetime.utcnow() self.datestamp = now.strftime('%Y%m%d') diff --git a/kafka/sasl/oauthbearer.py b/kafka/sasl/oauthbearer.py index 2fab7c37b..5d2488baf 100644 --- a/kafka/sasl/oauthbearer.py +++ b/kafka/sasl/oauthbearer.py @@ -74,7 +74,7 @@ def _token_extensions(conn): # Builds up a string separated by \x01 via a dict of key value pairs if (callable(getattr(token_provider, "extensions", None)) and len(token_provider.extensions()) > 0): - msg = "\x01".join(["{}={}".format(k, v) for k, v in token_provider.extensions().items()]) + msg = "\x01".join([f"{k}={v}" for k, v in token_provider.extensions().items()]) return "\x01" + msg else: return "" diff --git a/kafka/scram.py b/kafka/scram.py index 7f003750c..05a7667d8 100644 --- a/kafka/scram.py +++ b/kafka/scram.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - import base64 import hashlib import hmac @@ -8,12 +6,8 @@ from kafka.vendor import six -if six.PY2: - def xor_bytes(left, right): - return bytearray(ord(lb) ^ ord(rb) for lb, rb in zip(left, right)) -else: - def xor_bytes(left, right): - return bytes(lb ^ rb for lb, rb in zip(left, right)) +def xor_bytes(left, right): + return bytes(lb ^ rb for lb, rb in zip(left, right)) class ScramClient: @@ -38,7 +32,7 @@ def __init__(self, user, password, mechanism): self.server_signature = None def first_message(self): - client_first_bare = 'n={},r={}'.format(self.user, self.nonce) + client_first_bare = f'n={self.user},r={self.nonce}' self.auth_message += client_first_bare return 'n,,' + client_first_bare diff --git a/kafka/serializer/__init__.py b/kafka/serializer/__init__.py index 90cd93ab2..168277519 100644 --- a/kafka/serializer/__init__.py +++ b/kafka/serializer/__init__.py @@ -1,3 +1 @@ -from __future__ import absolute_import - from kafka.serializer.abstract import Serializer, Deserializer diff --git a/kafka/serializer/abstract.py b/kafka/serializer/abstract.py index 18ad8d69c..529662b07 100644 --- a/kafka/serializer/abstract.py +++ b/kafka/serializer/abstract.py @@ -1,9 +1,7 @@ -from __future__ import absolute_import - import abc -class Serializer(object): +class Serializer: __meta__ = abc.ABCMeta def __init__(self, **config): @@ -17,7 +15,7 @@ def close(self): pass -class Deserializer(object): +class Deserializer: __meta__ = abc.ABCMeta def __init__(self, **config): diff --git a/kafka/structs.py b/kafka/structs.py index bcb023670..4f11259aa 100644 --- a/kafka/structs.py +++ b/kafka/structs.py @@ -1,5 +1,4 @@ """ Other useful structs """ -from __future__ import absolute_import from collections import namedtuple diff --git a/kafka/util.py b/kafka/util.py index e31d99305..474a5e54d 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -1,28 +1,23 @@ -from __future__ import absolute_import - import binascii import weakref from kafka.vendor import six -if six.PY3: - MAX_INT = 2 ** 31 - TO_SIGNED = 2 ** 32 +MAX_INT = 2 ** 31 +TO_SIGNED = 2 ** 32 - def crc32(data): - crc = binascii.crc32(data) - # py2 and py3 behave a little differently - # CRC is encoded as a signed int in kafka protocol - # so we'll convert the py3 unsigned result to signed - if crc >= MAX_INT: - crc -= TO_SIGNED - return crc -else: - from binascii import crc32 +def crc32(data): + crc = binascii.crc32(data) + # py2 and py3 behave a little differently + # CRC is encoded as a signed int in kafka protocol + # so we'll convert the py3 unsigned result to signed + if crc >= MAX_INT: + crc -= TO_SIGNED + return crc -class WeakMethod(object): +class WeakMethod: """ Callable that weakly references a method and the object it is bound to. It is based on https://stackoverflow.com/a/24287465. diff --git a/kafka/vendor/enum34.py b/kafka/vendor/enum34.py index 5f64bd2d8..363be19b1 100644 --- a/kafka/vendor/enum34.py +++ b/kafka/vendor/enum34.py @@ -39,7 +39,7 @@ def any(iterable): # In Python 3 unicode no longer exists (it's just str) unicode = str -class _RouteClassAttributeToGetattr(object): +class _RouteClassAttributeToGetattr: """Route attribute access on a class to __getattr__. This is a descriptor, used to define attributes that act differently when @@ -103,7 +103,7 @@ class _EnumDict(dict): """ def __init__(self): - super(_EnumDict, self).__init__() + super().__init__() self._member_names = [] def __setitem__(self, key, value): @@ -139,7 +139,7 @@ def __setitem__(self, key, value): # enum overwriting a descriptor? raise TypeError('Key already defined as: %r' % self[key]) self._member_names.append(key) - super(_EnumDict, self).__setitem__(key, value) + super().__setitem__(key, value) # Dummy value for Enum as EnumMeta explicity checks for it, but of course until @@ -170,7 +170,7 @@ def __new__(metacls, cls, bases, classdict): first_enum) # save enum items into separate mapping so they don't get baked into # the new class - members = dict((k, classdict[k]) for k in classdict._member_names) + members = {k: classdict[k] for k in classdict._member_names} for name in classdict._member_names: del classdict[name] @@ -192,16 +192,16 @@ def __new__(metacls, cls, bases, classdict): _order_ += aliases # check for illegal enum names (any others?) - invalid_names = set(members) & set(['mro']) + invalid_names = set(members) & {'mro'} if invalid_names: - raise ValueError('Invalid enum member name(s): %s' % ( - ', '.join(invalid_names), )) + raise ValueError('Invalid enum member name(s): {}'.format( + ', '.join(invalid_names))) # save attributes from super classes so we know if we can take # the shortcut of storing members in the class dict - base_attributes = set([a for b in bases for a in b.__dict__]) + base_attributes = {a for b in bases for a in b.__dict__} # create our new Enum type - enum_class = super(EnumMeta, metacls).__new__(metacls, cls, bases, classdict) + enum_class = super().__new__(metacls, cls, bases, classdict) enum_class._member_names_ = [] # names in random order if OrderedDict is not None: enum_class._member_map_ = OrderedDict() @@ -361,7 +361,7 @@ def __delattr__(cls, attr): if attr in cls._member_map_: raise AttributeError( "%s: cannot delete Enum member." % cls.__name__) - super(EnumMeta, cls).__delattr__(attr) + super().__delattr__(attr) def __dir__(self): return (['__class__', '__doc__', '__members__', '__module__'] + @@ -421,7 +421,7 @@ def __setattr__(cls, name, value): member_map = cls.__dict__.get('_member_map_', {}) if name in member_map: raise AttributeError('Cannot reassign members.') - super(EnumMeta, cls).__setattr__(name, value) + super().__setattr__(name, value) def _create_(cls, class_name, names=None, module=None, type=None, start=1): """Convenience method to create a new Enum class. @@ -663,18 +663,18 @@ def __new__(cls, value): for member in cls._member_map_.values(): if member.value == value: return member - raise ValueError("%s is not a valid %s" % (value, cls.__name__)) + raise ValueError(f"{value} is not a valid {cls.__name__}") temp_enum_dict['__new__'] = __new__ del __new__ def __repr__(self): - return "<%s.%s: %r>" % ( + return "<{}.{}: {!r}>".format( self.__class__.__name__, self._name_, self._value_) temp_enum_dict['__repr__'] = __repr__ del __repr__ def __str__(self): - return "%s.%s" % (self.__class__.__name__, self._name_) + return f"{self.__class__.__name__}.{self._name_}" temp_enum_dict['__str__'] = __str__ del __str__ @@ -719,29 +719,29 @@ def __cmp__(self, other): return 0 return -1 return NotImplemented - raise TypeError("unorderable types: %s() and %s()" % (self.__class__.__name__, other.__class__.__name__)) + raise TypeError(f"unorderable types: {self.__class__.__name__}() and {other.__class__.__name__}()") temp_enum_dict['__cmp__'] = __cmp__ del __cmp__ else: def __le__(self, other): - raise TypeError("unorderable types: %s() <= %s()" % (self.__class__.__name__, other.__class__.__name__)) + raise TypeError(f"unorderable types: {self.__class__.__name__}() <= {other.__class__.__name__}()") temp_enum_dict['__le__'] = __le__ del __le__ def __lt__(self, other): - raise TypeError("unorderable types: %s() < %s()" % (self.__class__.__name__, other.__class__.__name__)) + raise TypeError(f"unorderable types: {self.__class__.__name__}() < {other.__class__.__name__}()") temp_enum_dict['__lt__'] = __lt__ del __lt__ def __ge__(self, other): - raise TypeError("unorderable types: %s() >= %s()" % (self.__class__.__name__, other.__class__.__name__)) + raise TypeError(f"unorderable types: {self.__class__.__name__}() >= {other.__class__.__name__}()") temp_enum_dict['__ge__'] = __ge__ del __ge__ def __gt__(self, other): - raise TypeError("unorderable types: %s() > %s()" % (self.__class__.__name__, other.__class__.__name__)) + raise TypeError(f"unorderable types: {self.__class__.__name__}() > {other.__class__.__name__}()") temp_enum_dict['__gt__'] = __gt__ del __gt__ @@ -804,7 +804,7 @@ def _convert(cls, name, module, filter, source=None): source = vars(source) else: source = module_globals - members = dict((name, value) for name, value in source.items() if filter(name)) + members = {name: value for name, value in source.items() if filter(name)} cls = cls(name, members, module=module) cls.__reduce_ex__ = _reduce_ex_by_name module_globals.update(cls.__members__) @@ -833,7 +833,7 @@ def unique(enumeration): duplicates.append((name, member.name)) if duplicates: duplicate_names = ', '.join( - ["%s -> %s" % (alias, name) for (alias, name) in duplicates] + [f"{alias} -> {name}" for (alias, name) in duplicates] ) raise ValueError('duplicate names found in %r: %s' % (enumeration, duplicate_names) diff --git a/kafka/vendor/selectors34.py b/kafka/vendor/selectors34.py index 787490340..496ad1cd4 100644 --- a/kafka/vendor/selectors34.py +++ b/kafka/vendor/selectors34.py @@ -12,14 +12,13 @@ The following code adapted from trollius.selectors. """ -from __future__ import absolute_import from abc import ABCMeta, abstractmethod from collections import namedtuple try: from collections.abc import Mapping except ImportError: - from collections import Mapping + from collections.abc import Mapping from errno import EINTR import math import select @@ -39,7 +38,7 @@ def _wrap_error(exc, mapping, key): traceback = exc.__traceback__ else: traceback = sys.exc_info()[2] - six.reraise(new_err_cls, new_err, traceback) + raise new_err.with_traceback(traceback) # generic events, that must be mapped to implementation-specific ones @@ -59,16 +58,16 @@ def _fileobj_to_fd(fileobj): Raises: ValueError if the object is invalid """ - if isinstance(fileobj, six.integer_types): + if isinstance(fileobj, int): fd = fileobj else: try: fd = int(fileobj.fileno()) except (AttributeError, TypeError, ValueError): raise ValueError("Invalid file object: " - "{0!r}".format(fileobj)) + "{!r}".format(fileobj)) if fd < 0: - raise ValueError("Invalid file descriptor: {0}".format(fd)) + raise ValueError(f"Invalid file descriptor: {fd}") return fd @@ -91,15 +90,14 @@ def __getitem__(self, fileobj): fd = self._selector._fileobj_lookup(fileobj) return self._selector._fd_to_key[fd] except KeyError: - raise KeyError("{0!r} is not registered".format(fileobj)) + raise KeyError(f"{fileobj!r} is not registered") def __iter__(self): return iter(self._selector._fd_to_key) # Using six.add_metaclass() decorator instead of six.with_metaclass() because # the latter leaks temporary_class to garbage with gc disabled -@six.add_metaclass(ABCMeta) -class BaseSelector(object): +class BaseSelector(metaclass=ABCMeta): """Selector abstract base class. A selector supports registering file objects to be monitored for specific @@ -211,7 +209,7 @@ def get_key(self, fileobj): try: return mapping[fileobj] except KeyError: - raise KeyError("{0!r} is not registered".format(fileobj)) + raise KeyError(f"{fileobj!r} is not registered") @abstractmethod def get_map(self): @@ -255,12 +253,12 @@ def _fileobj_lookup(self, fileobj): def register(self, fileobj, events, data=None): if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)): - raise ValueError("Invalid events: {0!r}".format(events)) + raise ValueError(f"Invalid events: {events!r}") key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data) if key.fd in self._fd_to_key: - raise KeyError("{0!r} (FD {1}) is already registered" + raise KeyError("{!r} (FD {}) is already registered" .format(fileobj, key.fd)) self._fd_to_key[key.fd] = key @@ -270,7 +268,7 @@ def unregister(self, fileobj): try: key = self._fd_to_key.pop(self._fileobj_lookup(fileobj)) except KeyError: - raise KeyError("{0!r} is not registered".format(fileobj)) + raise KeyError(f"{fileobj!r} is not registered") return key def modify(self, fileobj, events, data=None): @@ -278,7 +276,7 @@ def modify(self, fileobj, events, data=None): try: key = self._fd_to_key[self._fileobj_lookup(fileobj)] except KeyError: - raise KeyError("{0!r} is not registered".format(fileobj)) + raise KeyError(f"{fileobj!r} is not registered") if events != key.events: self.unregister(fileobj) key = self.register(fileobj, events, data) @@ -314,12 +312,12 @@ class SelectSelector(_BaseSelectorImpl): """Select-based selector.""" def __init__(self): - super(SelectSelector, self).__init__() + super().__init__() self._readers = set() self._writers = set() def register(self, fileobj, events, data=None): - key = super(SelectSelector, self).register(fileobj, events, data) + key = super().register(fileobj, events, data) if events & EVENT_READ: self._readers.add(key.fd) if events & EVENT_WRITE: @@ -327,7 +325,7 @@ def register(self, fileobj, events, data=None): return key def unregister(self, fileobj): - key = super(SelectSelector, self).unregister(fileobj) + key = super().unregister(fileobj) self._readers.discard(key.fd) self._writers.discard(key.fd) return key @@ -344,7 +342,7 @@ def select(self, timeout=None): ready = [] try: r, w, _ = self._select(self._readers, self._writers, [], timeout) - except select.error as exc: + except OSError as exc: if exc.args[0] == EINTR: return ready else: @@ -370,11 +368,11 @@ class PollSelector(_BaseSelectorImpl): """Poll-based selector.""" def __init__(self): - super(PollSelector, self).__init__() + super().__init__() self._poll = select.poll() def register(self, fileobj, events, data=None): - key = super(PollSelector, self).register(fileobj, events, data) + key = super().register(fileobj, events, data) poll_events = 0 if events & EVENT_READ: poll_events |= select.POLLIN @@ -384,7 +382,7 @@ def register(self, fileobj, events, data=None): return key def unregister(self, fileobj): - key = super(PollSelector, self).unregister(fileobj) + key = super().unregister(fileobj) self._poll.unregister(key.fd) return key @@ -400,7 +398,7 @@ def select(self, timeout=None): ready = [] try: fd_event_list = self._poll.poll(timeout) - except select.error as exc: + except OSError as exc: if exc.args[0] == EINTR: return ready else: @@ -424,14 +422,14 @@ class EpollSelector(_BaseSelectorImpl): """Epoll-based selector.""" def __init__(self): - super(EpollSelector, self).__init__() + super().__init__() self._epoll = select.epoll() def fileno(self): return self._epoll.fileno() def register(self, fileobj, events, data=None): - key = super(EpollSelector, self).register(fileobj, events, data) + key = super().register(fileobj, events, data) epoll_events = 0 if events & EVENT_READ: epoll_events |= select.EPOLLIN @@ -441,10 +439,10 @@ def register(self, fileobj, events, data=None): return key def unregister(self, fileobj): - key = super(EpollSelector, self).unregister(fileobj) + key = super().unregister(fileobj) try: self._epoll.unregister(key.fd) - except IOError: + except OSError: # This can happen if the FD was closed since it # was registered. pass @@ -468,7 +466,7 @@ def select(self, timeout=None): ready = [] try: fd_event_list = self._epoll.poll(timeout, max_ev) - except IOError as exc: + except OSError as exc: if exc.errno == EINTR: return ready else: @@ -487,7 +485,7 @@ def select(self, timeout=None): def close(self): self._epoll.close() - super(EpollSelector, self).close() + super().close() if hasattr(select, 'devpoll'): @@ -496,14 +494,14 @@ class DevpollSelector(_BaseSelectorImpl): """Solaris /dev/poll selector.""" def __init__(self): - super(DevpollSelector, self).__init__() + super().__init__() self._devpoll = select.devpoll() def fileno(self): return self._devpoll.fileno() def register(self, fileobj, events, data=None): - key = super(DevpollSelector, self).register(fileobj, events, data) + key = super().register(fileobj, events, data) poll_events = 0 if events & EVENT_READ: poll_events |= select.POLLIN @@ -513,7 +511,7 @@ def register(self, fileobj, events, data=None): return key def unregister(self, fileobj): - key = super(DevpollSelector, self).unregister(fileobj) + key = super().unregister(fileobj) self._devpoll.unregister(key.fd) return key @@ -548,7 +546,7 @@ def select(self, timeout=None): def close(self): self._devpoll.close() - super(DevpollSelector, self).close() + super().close() if hasattr(select, 'kqueue'): @@ -557,14 +555,14 @@ class KqueueSelector(_BaseSelectorImpl): """Kqueue-based selector.""" def __init__(self): - super(KqueueSelector, self).__init__() + super().__init__() self._kqueue = select.kqueue() def fileno(self): return self._kqueue.fileno() def register(self, fileobj, events, data=None): - key = super(KqueueSelector, self).register(fileobj, events, data) + key = super().register(fileobj, events, data) if events & EVENT_READ: kev = select.kevent(key.fd, select.KQ_FILTER_READ, select.KQ_EV_ADD) @@ -576,7 +574,7 @@ def register(self, fileobj, events, data=None): return key def unregister(self, fileobj): - key = super(KqueueSelector, self).unregister(fileobj) + key = super().unregister(fileobj) if key.events & EVENT_READ: kev = select.kevent(key.fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE) @@ -623,7 +621,7 @@ def select(self, timeout=None): def close(self): self._kqueue.close() - super(KqueueSelector, self).close() + super().close() # Choose the best implementation, roughly: diff --git a/kafka/vendor/six.py b/kafka/vendor/six.py index 319821353..e7057ee30 100644 --- a/kafka/vendor/six.py +++ b/kafka/vendor/six.py @@ -22,7 +22,6 @@ """Utilities for writing code that runs on Python 2 and 3""" -from __future__ import absolute_import import functools import itertools @@ -59,7 +58,7 @@ MAXSIZE = int((1 << 31) - 1) else: # It's possible to have sizeof(long) != sizeof(Py_ssize_t). - class X(object): + class X: def __len__(self): return 1 << 31 @@ -94,7 +93,7 @@ def _import_module(name): return sys.modules[name] -class _LazyDescr(object): +class _LazyDescr: def __init__(self, name): self.name = name @@ -114,7 +113,7 @@ def __get__(self, obj, tp): class MovedModule(_LazyDescr): def __init__(self, name, old, new=None): - super(MovedModule, self).__init__(name) + super().__init__(name) if PY3: if new is None: new = name @@ -135,7 +134,7 @@ def __getattr__(self, attr): class _LazyModule(types.ModuleType): def __init__(self, name): - super(_LazyModule, self).__init__(name) + super().__init__(name) self.__doc__ = self.__class__.__doc__ def __dir__(self): @@ -150,7 +149,7 @@ def __dir__(self): class MovedAttribute(_LazyDescr): def __init__(self, name, old_mod, new_mod, old_attr=None, new_attr=None): - super(MovedAttribute, self).__init__(name) + super().__init__(name) if PY3: if new_mod is None: new_mod = name @@ -172,7 +171,7 @@ def _resolve(self): return getattr(module, self.attr) -class _SixMetaPathImporter(object): +class _SixMetaPathImporter: """ A meta path importer to import six.moves and its submodules. @@ -526,7 +525,7 @@ def remove_move(name): try: del moves.__dict__[name] except KeyError: - raise AttributeError("no such move, %r" % (name,)) + raise AttributeError(f"no such move, {name!r}") if PY3: @@ -582,7 +581,7 @@ def create_bound_method(func, obj): def create_unbound_method(func, cls): return types.MethodType(func, None, cls) - class Iterator(object): + class Iterator: def next(self): return type(self).__next__(self) diff --git a/kafka/vendor/socketpair.py b/kafka/vendor/socketpair.py index b55e629ee..8099f8aea 100644 --- a/kafka/vendor/socketpair.py +++ b/kafka/vendor/socketpair.py @@ -1,6 +1,5 @@ # pylint: skip-file # vendored from https://github.com/mhils/backports.socketpair -from __future__ import absolute_import import sys import socket @@ -35,17 +34,10 @@ def socketpair(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0): csock = socket.socket(family, type, proto) try: csock.setblocking(False) - if sys.version_info >= (3, 0): - try: - csock.connect((addr, port)) - except (BlockingIOError, InterruptedError): - pass - else: - try: - csock.connect((addr, port)) - except socket.error as e: - if e.errno != errno.WSAEWOULDBLOCK: - raise + try: + csock.connect((addr, port)) + except (BlockingIOError, InterruptedError): + pass csock.setblocking(True) ssock, _ = lsock.accept() except Exception: diff --git a/kafka/version.py b/kafka/version.py index 8a26a1868..ee5ea98ce 100644 --- a/kafka/version.py +++ b/kafka/version.py @@ -1,9 +1,6 @@ import sys -if sys.version_info < (3, 8): - from importlib_metadata import version -else: - from importlib.metadata import version +from importlib.metadata import version __version__ = version("kafka-python-ng") diff --git a/test/record/test_default_records.py b/test/record/test_default_records.py index c3a7b02c8..a3f69da6d 100644 --- a/test/record/test_default_records.py +++ b/test/record/test_default_records.py @@ -1,7 +1,5 @@ -# -*- coding: utf-8 -*- -from __future__ import unicode_literals import pytest -from mock import patch +from unittest.mock import patch import kafka.codec from kafka.record.default_records import ( DefaultRecordBatch, DefaultRecordBatchBuilder @@ -197,7 +195,7 @@ def test_unavailable_codec(magic, compression_type, name, checker_name): magic=2, compression_type=compression_type, is_transactional=0, producer_id=-1, producer_epoch=-1, base_sequence=-1, batch_size=1024) - error_msg = "Libraries for {} compression codec not found".format(name) + error_msg = f"Libraries for {name} compression codec not found" with pytest.raises(UnsupportedCodecError, match=error_msg): builder.append(0, timestamp=None, key=None, value=b"M", headers=[]) builder.build() diff --git a/test/record/test_legacy_records.py b/test/record/test_legacy_records.py index 43970f7c9..f16c29809 100644 --- a/test/record/test_legacy_records.py +++ b/test/record/test_legacy_records.py @@ -1,6 +1,5 @@ -from __future__ import unicode_literals import pytest -from mock import patch +from unittest.mock import patch from kafka.record.legacy_records import ( LegacyRecordBatch, LegacyRecordBatchBuilder ) @@ -186,7 +185,7 @@ def test_unavailable_codec(magic, compression_type, name, checker_name): # Check that builder raises error builder = LegacyRecordBatchBuilder( magic=magic, compression_type=compression_type, batch_size=1024) - error_msg = "Libraries for {} compression codec not found".format(name) + error_msg = f"Libraries for {name} compression codec not found" with pytest.raises(UnsupportedCodecError, match=error_msg): builder.append(0, timestamp=None, key=None, value=b"M") builder.build() diff --git a/test/record/test_records.py b/test/record/test_records.py index 9f72234ae..adde3ba06 100644 --- a/test/record/test_records.py +++ b/test/record/test_records.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- -from __future__ import unicode_literals import pytest from kafka.record import MemoryRecords, MemoryRecordsBuilder from kafka.errors import CorruptRecordException diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 90b7ed203..cfe36b500 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -99,6 +99,7 @@ def test_kafka_consumer__blocking(kafka_consumer_factory, topic, send_messages): assert t.interval >= (TIMEOUT_MS / 1000.0) +@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") @pytest.mark.skipif(env_kafka_version() < (0, 8, 1), reason="Requires KAFKA_VERSION >= 0.8.1") def test_kafka_consumer__offset_commit_resume(kafka_consumer_factory, send_messages): GROUP_ID = random_string(10) @@ -143,6 +144,7 @@ def test_kafka_consumer__offset_commit_resume(kafka_consumer_factory, send_messa assert_message_count(output_msgs1 + output_msgs2, 200) +@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") def test_kafka_consumer_max_bytes_simple(kafka_consumer_factory, topic, send_messages): send_messages(range(100, 200), partition=0) @@ -162,6 +164,7 @@ def test_kafka_consumer_max_bytes_simple(kafka_consumer_factory, topic, send_mes assert seen_partitions == {TopicPartition(topic, 0), TopicPartition(topic, 1)} +@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") def test_kafka_consumer_max_bytes_one_msg(kafka_consumer_factory, send_messages): # We send to only 1 partition so we don't have parallel requests to 2 @@ -188,6 +191,7 @@ def test_kafka_consumer_max_bytes_one_msg(kafka_consumer_factory, send_messages) assert_message_count(fetched_msgs, 10) +@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") def test_kafka_consumer_offsets_for_time(topic, kafka_consumer, kafka_producer): late_time = int(time.time()) * 1000 @@ -237,6 +241,7 @@ def test_kafka_consumer_offsets_for_time(topic, kafka_consumer, kafka_producer): assert offsets == {tp: late_msg.offset + 1} +@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") def test_kafka_consumer_offsets_search_many_partitions(kafka_consumer, kafka_producer, topic): tp0 = TopicPartition(topic, 0) @@ -275,6 +280,7 @@ def test_kafka_consumer_offsets_search_many_partitions(kafka_consumer, kafka_pro } +@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") @pytest.mark.skipif(env_kafka_version() >= (0, 10, 1), reason="Requires KAFKA_VERSION < 0.10.1") def test_kafka_consumer_offsets_for_time_old(kafka_consumer, topic): consumer = kafka_consumer @@ -284,6 +290,7 @@ def test_kafka_consumer_offsets_for_time_old(kafka_consumer, topic): consumer.offsets_for_times({tp: int(time.time())}) +@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") def test_kafka_consumer_offsets_for_times_errors(kafka_consumer_factory, topic): consumer = kafka_consumer_factory(fetch_max_wait_ms=200,