Skip to content

Commit

Permalink
Drop recursion in _unpack_message_set
Browse files Browse the repository at this point in the history
  • Loading branch information
dpkp committed Jul 11, 2016
1 parent 1eb7e05 commit 5d2c792
Showing 1 changed file with 36 additions and 17 deletions.
53 changes: 36 additions & 17 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,33 +352,52 @@ def fetched_records(self):
position)
return dict(drained)

def _unpack_message_set(self, tp, messages, relative_offset=0):
def _unpack_message_set(self, tp, messages):
try:
for offset, size, msg in messages:
if self.config['check_crcs'] and not msg.validate_crc():
raise Errors.InvalidMessageError(msg)
elif msg.is_compressed():
mset = msg.decompress()
# new format uses relative offsets for compressed messages
# If relative offset is used, we need to decompress the entire message first to compute
# the absolute offset.
inner_mset = msg.decompress()
if msg.magic > 0:
last_offset, _, _ = mset[-1]
relative = offset - last_offset
last_offset, _, _ = inner_mset[-1]
absolute_base_offset = offset - last_offset
else:
relative = 0
for record in self._unpack_message_set(tp, mset, relative):
yield record
absolute_base_offset = -1

for inner_offset, inner_size, inner_msg in inner_mset:
if msg.magic > 0:
# When magic value is greater than 0, the timestamp
# of a compressed message depends on the
# typestamp type of the wrapper message:

if msg.timestamp_type == 0: # CREATE_TIME (0)
inner_timestamp = inner_msg.timestamp

elif msg.timestamp_type == 1: # LOG_APPEND_TIME (1)
inner_timestamp = msg.timestamp

else:
raise ValueError('Unknown timestamp type: {}'.format(msg.timestamp_type))
else:
inner_timestamp = msg.timestamp

if absolute_base_offset >= 0:
inner_offset += absolute_base_offset

key, value = self._deserialize(inner_msg)
yield ConsumerRecord(tp.topic, tp.partition, inner_offset,
inner_timestamp, msg.timestamp_type,
key, value)

else:
# Message v1 adds timestamp
if msg.magic > 0:
timestamp = msg.timestamp
timestamp_type = msg.timestamp_type
else:
timestamp = timestamp_type = None
key, value = self._deserialize(msg)
yield ConsumerRecord(tp.topic, tp.partition,
offset + relative_offset,
timestamp, timestamp_type,
yield ConsumerRecord(tp.topic, tp.partition, offset,
msg.timestamp, msg.timestamp_type,
key, value)

# If unpacking raises StopIteration, it is erroneously
# caught by the generator. We want all exceptions to be raised
# back to the user. See Issue 545
Expand Down

0 comments on commit 5d2c792

Please sign in to comment.