Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop recursion in Fetcher _unpack_message_set #755

Merged
merged 3 commits into from
Jul 15, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 56 additions & 17 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class Fetcher(six.Iterator):
'fetch_max_wait_ms': 500,
'max_partition_fetch_bytes': 1048576,
'check_crcs': True,
'skip_double_compressed_messages': False,
'iterator_refetch_records': 1, # undocumented -- interface may change
'api_version': (0, 8, 0),
}
Expand Down Expand Up @@ -71,6 +72,13 @@ def __init__(self, client, subscriptions, metrics, metric_group_prefix,
consumed. This ensures no on-the-wire or on-disk corruption to
the messages occurred. This check adds some overhead, so it may
be disabled in cases seeking extreme performance. Default: True
skip_double_compressed_messages (bool): A bug in KafkaProducer
caused some messages to be corrupted via double-compression.
By default, the fetcher will return the messages as a compressed
blob of bytes with a single offset, i.e. how the message was
actually published to the cluster. If you prefer to have the
fetcher automatically detect corrupt messages and skip them,
set this option to True. Default: False.
"""
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
Expand Down Expand Up @@ -352,33 +360,64 @@ def fetched_records(self):
position)
return dict(drained)

def _unpack_message_set(self, tp, messages, relative_offset=0):
def _unpack_message_set(self, tp, messages):
try:
for offset, size, msg in messages:
if self.config['check_crcs'] and not msg.validate_crc():
raise Errors.InvalidMessageError(msg)
elif msg.is_compressed():
mset = msg.decompress()
# new format uses relative offsets for compressed messages
# If relative offset is used, we need to decompress the entire message first to compute
# the absolute offset.
inner_mset = msg.decompress()

# There should only ever be a single layer of compression
if inner_mset[0][-1].is_compressed():
log.warning('MessageSet at %s offset %d appears '
' double-compressed. This should not'
' happen -- check your producers!',
tp, offset)
if self.config['skip_double_compressed_messages']:
log.warning('Skipping double-compressed message at'
' %s %d', tp, offset)
continue

if msg.magic > 0:
last_offset, _, _ = mset[-1]
relative = offset - last_offset
last_offset, _, _ = inner_mset[-1]
absolute_base_offset = offset - last_offset
else:
relative = 0
for record in self._unpack_message_set(tp, mset, relative):
yield record
absolute_base_offset = -1

for inner_offset, inner_size, inner_msg in inner_mset:
if msg.magic > 0:
# When magic value is greater than 0, the timestamp
# of a compressed message depends on the
# typestamp type of the wrapper message:

if msg.timestamp_type == 0: # CREATE_TIME (0)
inner_timestamp = inner_msg.timestamp

elif msg.timestamp_type == 1: # LOG_APPEND_TIME (1)
inner_timestamp = msg.timestamp

else:
raise ValueError('Unknown timestamp type: {}'.format(msg.timestamp_type))
else:
inner_timestamp = msg.timestamp

if absolute_base_offset >= 0:
inner_offset += absolute_base_offset

key, value = self._deserialize(inner_msg)
yield ConsumerRecord(tp.topic, tp.partition, inner_offset,
inner_timestamp, msg.timestamp_type,
key, value)

else:
# Message v1 adds timestamp
if msg.magic > 0:
timestamp = msg.timestamp
timestamp_type = msg.timestamp_type
else:
timestamp = timestamp_type = None
key, value = self._deserialize(msg)
yield ConsumerRecord(tp.topic, tp.partition,
offset + relative_offset,
timestamp, timestamp_type,
yield ConsumerRecord(tp.topic, tp.partition, offset,
msg.timestamp, msg.timestamp_type,
key, value)

# If unpacking raises StopIteration, it is erroneously
# caught by the generator. We want all exceptions to be raised
# back to the user. See Issue 545
Expand Down
8 changes: 8 additions & 0 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ class KafkaConsumer(six.Iterator):
consumer_timeout_ms (int): number of milliseconds to block during
message iteration before raising StopIteration (i.e., ending the
iterator). Default -1 (block forever).
skip_double_compressed_messages (bool): A bug in KafkaProducer <= 1.2.4
caused some messages to be corrupted via double-compression.
By default, the fetcher will return these messages as a compressed
blob of bytes with a single offset, i.e. how the message was
actually published to the cluster. If you prefer to have the
fetcher automatically detect corrupt messages and skip them,
set this option to True. Default: False.
security_protocol (str): Protocol used to communicate with brokers.
Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
Expand Down Expand Up @@ -189,6 +196,7 @@ class KafkaConsumer(six.Iterator):
'send_buffer_bytes': None,
'receive_buffer_bytes': None,
'consumer_timeout_ms': -1,
'skip_double_compressed_messages': False,
'security_protocol': 'PLAINTEXT',
'ssl_context': None,
'ssl_check_hostname': True,
Expand Down