diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 5f3eb1d98..34ff4cb28 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -39,6 +39,7 @@ class Fetcher(six.Iterator): 'fetch_max_wait_ms': 500, 'max_partition_fetch_bytes': 1048576, 'check_crcs': True, + 'skip_double_compressed_messages': False, 'iterator_refetch_records': 1, # undocumented -- interface may change 'api_version': (0, 8, 0), } @@ -71,6 +72,13 @@ def __init__(self, client, subscriptions, metrics, metric_group_prefix, consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. Default: True + skip_double_compressed_messages (bool): A bug in KafkaProducer + caused some messages to be corrupted via double-compression. + By default, the fetcher will return the messages as a compressed + blob of bytes with a single offset, i.e. how the message was + actually published to the cluster. If you prefer to have the + fetcher automatically detect corrupt messages and skip them, + set this option to True. Default: False. """ self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: @@ -352,33 +360,64 @@ def fetched_records(self): position) return dict(drained) - def _unpack_message_set(self, tp, messages, relative_offset=0): + def _unpack_message_set(self, tp, messages): try: for offset, size, msg in messages: if self.config['check_crcs'] and not msg.validate_crc(): raise Errors.InvalidMessageError(msg) elif msg.is_compressed(): - mset = msg.decompress() - # new format uses relative offsets for compressed messages + # If relative offset is used, we need to decompress the entire message first to compute + # the absolute offset. + inner_mset = msg.decompress() + + # There should only ever be a single layer of compression + if inner_mset[0][-1].is_compressed(): + log.warning('MessageSet at %s offset %d appears ' + ' double-compressed. This should not' + ' happen -- check your producers!', + tp, offset) + if self.config['skip_double_compressed_messages']: + log.warning('Skipping double-compressed message at' + ' %s %d', tp, offset) + continue + if msg.magic > 0: - last_offset, _, _ = mset[-1] - relative = offset - last_offset + last_offset, _, _ = inner_mset[-1] + absolute_base_offset = offset - last_offset else: - relative = 0 - for record in self._unpack_message_set(tp, mset, relative): - yield record + absolute_base_offset = -1 + + for inner_offset, inner_size, inner_msg in inner_mset: + if msg.magic > 0: + # When magic value is greater than 0, the timestamp + # of a compressed message depends on the + # typestamp type of the wrapper message: + + if msg.timestamp_type == 0: # CREATE_TIME (0) + inner_timestamp = inner_msg.timestamp + + elif msg.timestamp_type == 1: # LOG_APPEND_TIME (1) + inner_timestamp = msg.timestamp + + else: + raise ValueError('Unknown timestamp type: {}'.format(msg.timestamp_type)) + else: + inner_timestamp = msg.timestamp + + if absolute_base_offset >= 0: + inner_offset += absolute_base_offset + + key, value = self._deserialize(inner_msg) + yield ConsumerRecord(tp.topic, tp.partition, inner_offset, + inner_timestamp, msg.timestamp_type, + key, value) + else: - # Message v1 adds timestamp - if msg.magic > 0: - timestamp = msg.timestamp - timestamp_type = msg.timestamp_type - else: - timestamp = timestamp_type = None key, value = self._deserialize(msg) - yield ConsumerRecord(tp.topic, tp.partition, - offset + relative_offset, - timestamp, timestamp_type, + yield ConsumerRecord(tp.topic, tp.partition, offset, + msg.timestamp, msg.timestamp_type, key, value) + # If unpacking raises StopIteration, it is erroneously # caught by the generator. We want all exceptions to be raised # back to the user. See Issue 545 diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 85099993f..7fe509a52 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -123,6 +123,13 @@ class KafkaConsumer(six.Iterator): consumer_timeout_ms (int): number of milliseconds to block during message iteration before raising StopIteration (i.e., ending the iterator). Default -1 (block forever). + skip_double_compressed_messages (bool): A bug in KafkaProducer <= 1.2.4 + caused some messages to be corrupted via double-compression. + By default, the fetcher will return these messages as a compressed + blob of bytes with a single offset, i.e. how the message was + actually published to the cluster. If you prefer to have the + fetcher automatically detect corrupt messages and skip them, + set this option to True. Default: False. security_protocol (str): Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT. ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping @@ -189,6 +196,7 @@ class KafkaConsumer(six.Iterator): 'send_buffer_bytes': None, 'receive_buffer_bytes': None, 'consumer_timeout_ms': -1, + 'skip_double_compressed_messages': False, 'security_protocol': 'PLAINTEXT', 'ssl_context': None, 'ssl_check_hostname': True,