Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use PyUpgrade for 3.8+ for updating project #171

Merged
merged 2 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

__title__ = 'kafka'
from kafka.version import __version__
__author__ = 'Dana Powers'
Expand Down
2 changes: 0 additions & 2 deletions kafka/admin/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

from kafka.admin.config_resource import ConfigResource, ConfigResourceType
from kafka.admin.client import KafkaAdminClient
from kafka.admin.acl_resource import (ACL, ACLFilter, ResourcePattern, ResourcePatternFilter, ACLOperation,
Expand Down
11 changes: 5 additions & 6 deletions kafka/admin/acl_resource.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from __future__ import absolute_import
from kafka.errors import IllegalArgumentError

# enum in stdlib as of py3.4
Expand Down Expand Up @@ -69,7 +68,7 @@ class ACLResourcePatternType(IntEnum):
PREFIXED = 4


class ACLFilter(object):
class ACLFilter:
"""Represents a filter to use with describing and deleting ACLs

The difference between this class and the ACL class is mainly that
Expand Down Expand Up @@ -161,7 +160,7 @@ def __init__(
permission_type,
resource_pattern
):
super(ACL, self).__init__(principal, host, operation, permission_type, resource_pattern)
super().__init__(principal, host, operation, permission_type, resource_pattern)
self.validate()

def validate(self):
Expand All @@ -173,7 +172,7 @@ def validate(self):
raise IllegalArgumentError("resource_pattern must be a ResourcePattern object")


class ResourcePatternFilter(object):
class ResourcePatternFilter:
def __init__(
self,
resource_type,
Expand Down Expand Up @@ -232,13 +231,13 @@ def __init__(
resource_name,
pattern_type=ACLResourcePatternType.LITERAL
):
super(ResourcePattern, self).__init__(resource_type, resource_name, pattern_type)
super().__init__(resource_type, resource_name, pattern_type)
self.validate()

def validate(self):
if self.resource_type == ResourceType.ANY:
raise IllegalArgumentError("resource_type cannot be ANY")
if self.pattern_type in [ACLResourcePatternType.ANY, ACLResourcePatternType.MATCH]:
raise IllegalArgumentError(
"pattern_type cannot be {} on a concrete ResourcePattern".format(self.pattern_type.name)
f"pattern_type cannot be {self.pattern_type.name} on a concrete ResourcePattern"
)
10 changes: 4 additions & 6 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

from collections import defaultdict
import copy
import logging
Expand Down Expand Up @@ -32,7 +30,7 @@
log = logging.getLogger(__name__)


class KafkaAdminClient(object):
class KafkaAdminClient:
"""A class for administering the Kafka cluster.

Warning:
Expand Down Expand Up @@ -194,7 +192,7 @@ def __init__(self, **configs):
log.debug("Starting KafkaAdminClient with configuration: %s", configs)
extra_configs = set(configs).difference(self.DEFAULT_CONFIG)
if extra_configs:
raise KafkaConfigurationError("Unrecognized configs: {}".format(extra_configs))
raise KafkaConfigurationError(f"Unrecognized configs: {extra_configs}")

self.config = copy.copy(self.DEFAULT_CONFIG)
self.config.update(configs)
Expand Down Expand Up @@ -874,7 +872,7 @@ def describe_configs(self, config_resources, include_synonyms=False):
))
else:
raise NotImplementedError(
"Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient.".format(version))
f"Support for DescribeConfigs v{version} has not yet been added to KafkaAdminClient.")

self._wait_for_futures(futures)
return [f.value for f in futures]
Expand Down Expand Up @@ -1197,7 +1195,7 @@ def _list_consumer_group_offsets_send_request(self, group_id,
topics_partitions_dict = defaultdict(set)
for topic, partition in partitions:
topics_partitions_dict[topic].add(partition)
topics_partitions = list(six.iteritems(topics_partitions_dict))
topics_partitions = list(topics_partitions_dict.items())
request = OffsetFetchRequest[version](group_id, topics_partitions)
else:
raise NotImplementedError(
Expand Down
4 changes: 1 addition & 3 deletions kafka/admin/config_resource.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

# enum in stdlib as of py3.4
try:
from enum import IntEnum # pylint: disable=import-error
Expand All @@ -15,7 +13,7 @@ class ConfigResourceType(IntEnum):
TOPIC = 2


class ConfigResource(object):
class ConfigResource:
"""A class for specifying config resources.
Arguments:
resource_type (ConfigResourceType): the type of kafka resource
Expand Down
5 changes: 1 addition & 4 deletions kafka/admin/new_partitions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
from __future__ import absolute_import


class NewPartitions(object):
class NewPartitions:
"""A class for new partition creation on existing topics. Note that the length of new_assignments, if specified,
must be the difference between the new total number of partitions and the existing number of partitions.
Arguments:
Expand Down
4 changes: 1 addition & 3 deletions kafka/admin/new_topic.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from __future__ import absolute_import

from kafka.errors import IllegalArgumentError


class NewTopic(object):
class NewTopic:
""" A class for new topic creation
Arguments:
name (string): name of the topic
Expand Down
22 changes: 8 additions & 14 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import, division

import collections
import copy
import logging
Expand Down Expand Up @@ -32,14 +30,10 @@
from kafka.vendor import socketpair
from kafka.version import __version__

if six.PY2:
ConnectionError = None


log = logging.getLogger('kafka.client')


class KafkaClient(object):
class KafkaClient:
"""
A network client for asynchronous request/response network I/O.

Expand Down Expand Up @@ -374,7 +368,7 @@ def _maybe_connect(self, node_id):

if conn is None:
broker = self.cluster.broker_metadata(node_id)
assert broker, 'Broker id %s not in current metadata' % (node_id,)
assert broker, 'Broker id {} not in current metadata'.format(node_id)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

f-string is easier to read, write, and less computationally expensive than legacy string formatting. Explained here.

log.debug("Initiating connection to node %s at %s:%s",
node_id, broker.host, broker.port)
Expand Down Expand Up @@ -686,7 +680,7 @@ def _poll(self, timeout):
unexpected_data = key.fileobj.recv(1)
if unexpected_data: # anything other than a 0-byte read means protocol issues
log.warning('Protocol out of sync on %r, closing', conn)
except socket.error:
except OSError:
pass
conn.close(Errors.KafkaConnectionError('Socket EVENT_READ without in-flight-requests'))
continue
Expand All @@ -701,7 +695,7 @@ def _poll(self, timeout):
if conn not in processed and conn.connected() and conn._sock.pending():
self._pending_completion.extend(conn.recv())

for conn in six.itervalues(self._conns):
for conn in self._conns.values():
if conn.requests_timed_out():
log.warning('%s timed out after %s ms. Closing connection.',
conn, conn.config['request_timeout_ms'])
Expand Down Expand Up @@ -941,7 +935,7 @@ def wakeup(self):
except socket.timeout:
log.warning('Timeout to send to wakeup socket!')
raise Errors.KafkaTimeoutError()
except socket.error as e:
except OSError as e:
log.warning('Unable to send to wakeup socket!')
if self._raise_upon_socket_err_during_wakeup:
raise e
Expand All @@ -951,7 +945,7 @@ def _clear_wake_fd(self):
while True:
try:
self._wake_r.recv(1024)
except socket.error:
except OSError:
break

def _maybe_close_oldest_connection(self):
Expand Down Expand Up @@ -981,7 +975,7 @@ def bootstrap_connected(self):
OrderedDict = dict


class IdleConnectionManager(object):
class IdleConnectionManager:
def __init__(self, connections_max_idle_ms):
if connections_max_idle_ms > 0:
self.connections_max_idle = connections_max_idle_ms / 1000
Expand Down Expand Up @@ -1043,7 +1037,7 @@ def poll_expired_connection(self):
return None


class KafkaClientMetrics(object):
class KafkaClientMetrics:
def __init__(self, metrics, metric_group_prefix, conns):
self.metrics = metrics
self.metric_group_name = metric_group_prefix + '-metrics'
Expand Down
12 changes: 5 additions & 7 deletions kafka/cluster.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

import collections
import copy
import logging
Expand All @@ -16,7 +14,7 @@
log = logging.getLogger(__name__)


class ClusterMetadata(object):
class ClusterMetadata:
"""
A class to manage kafka cluster metadata.

Expand Down Expand Up @@ -128,9 +126,9 @@ def available_partitions_for_topic(self, topic):
"""
if topic not in self._partitions:
return None
return set([partition for partition, metadata
in six.iteritems(self._partitions[topic])
if metadata.leader != -1])
return {partition for partition, metadata
in self._partitions[topic].items()
if metadata.leader != -1}

def leader_for_partition(self, partition):
"""Return node_id of leader, -1 unavailable, None if unknown."""
Expand Down Expand Up @@ -361,7 +359,7 @@ def add_group_coordinator(self, group, response):

# Use a coordinator-specific node id so that group requests
# get a dedicated connection
node_id = 'coordinator-{}'.format(response.coordinator_id)
node_id = f'coordinator-{response.coordinator_id}'
coordinator = BrokerMetadata(
node_id,
response.host,
Expand Down
6 changes: 0 additions & 6 deletions kafka/codec.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

import gzip
import io
import platform
Expand Down Expand Up @@ -149,10 +147,6 @@ def snappy_encode(payload, xerial_compatible=True, xerial_blocksize=32*1024):
# buffer... likely a python-snappy bug, so just use a slice copy
chunker = lambda payload, i, size: payload[i:size+i]

elif six.PY2:
# Sliced buffer avoids additional copies
# pylint: disable-msg=undefined-variable
chunker = lambda payload, i, size: buffer(payload, i, size)
else:
# snappy.compress does not like raw memoryviews, so we have to convert
# tobytes, which is a copy... oh well. it's the thought that counts.
Expand Down
Loading
Loading