Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaProducer produces corrupt "double-compressed" messages on retry when compression is enabled. KafkaConsumer gets "stuck" consuming them #718

Closed
zoltan-fedor opened this issue Jun 3, 2016 · 41 comments · Fixed by #1677

Comments

@zoltan-fedor
Copy link

This is an interesting one.

In all of our topics every day a handful of partitions get "stuck".
Basically the reading of the partition stops at a given message and kafka-python reports that there are no more messages in the given partition (just like it would have consumed all messages), while there are unconsumed messages.
The only way to get the consumers moving again is to manually seek the offset forward by stepping over the "stuck" messages and then works again for a few million records and then get stuck again at some later offset.

I have multiple consumers consuming from the same topic and they all get stuck at the same messages of the same topics. Random number of partitions are affected day-to-day.

We are using Kafka broker version 0.9.0.1, kafka-python 1.2.1 (had the same issue with 1.1.1).

The consumer code is very simple (the below code is trying to read only partition #1, which is currently "stuck"):

kafka_consumer = KafkaConsumer(
    group_id=kafka_group_id,
    bootstrap_servers=kafka_servers,
    enable_auto_commit=False,
    consumer_timeout_ms=10000,
    fetch_max_wait_ms=10*1000,
    request_timeout_ms=10*1000
)
topics = [TopicPartition(topic, 1)]
kafka_consumer.assign(topics)

for message in kafka_consumer:
    print(message)

print("Completed")

The above code prints "Completed", but not the messages, while there is a 5M offset lag in partition 1, so there would be plenty of messages to read.
After seeking the consumer offset forward the code works again until it doesn't get "stuck" again.

@dpkp
Copy link
Owner

dpkp commented Jun 3, 2016

This can happen to a kafka system if there is a message larger than your consumer's max_partition_fetch_bytes . Have you tried tuning that?

@zoltan-fedor
Copy link
Author

Thanks. The max_partition_fetch_bytes is already set to pretty high (10*1048576), so I don't think that would be the issue, but I will try to increase it even further to verify.

@zoltan-fedor
Copy link
Author

Unfortunately as expected, the max_partition_fetch_bytes did not make any difference, even increasing it 1000-fold did make the consumer read the message from the partition.

So with having:
max_partition_fetch_bytes=1000*1048576

The issue is the same as above.

@dpkp
Copy link
Owner

dpkp commented Jun 3, 2016

Can you get debug logs for a consumer in this "stuck" state?

@zoltan-fedor
Copy link
Author

zoltan-fedor commented Jun 3, 2016

Sure, here comes the debug log of the consumer when it is in this "stuck" state.
I am only trying to read one partition (partition 11), which I know that it is currently "stuck".

It seems it cannot find the message it is looking for (message # 812899910).

DEBUG:kafka.client:Attempting to bootstrap via node at 10.1.1.2:9092
DEBUG:kafka.conn:: creating new socket
DEBUG:kafka.conn:: established TCP connection
DEBUG:kafka.client:Node bootstrap connected
DEBUG:kafka.conn: Request 1: MetadataRequest_v0(topics=[])
DEBUG:kafka.conn: Response 1: MetadataResponse_v0(brokers=[(node_id=0, host='fr.amers1.cloud', port=9092), (node_id=1, host='fr2.amers1.cloud', port=9092)], topics=[(error_code=0, topic='opsonsole', partitions=[(error_code=0, partition=2, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=1, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=3, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=0, leader=1, replicas=[1, 0], isr=[1, 0])]), (error_code=0, topic='test', partitions=[(error_code=0, partition=0, leader=1, replicas=[1, 0], isr=[1, 0])]), (error_code=0, topic='_schemas', partitions=[(error_code=0, partition=0, leader=0, replicas=[0, 1], isr=[0, 1])]), (error_code=0, topic='opsconsole', partitions=[(error_code=0, partition=8, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=11, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=2, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=5, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=14, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=4, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=13, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=7, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=10, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=1, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=9, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=3, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=12, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=6, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=15, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=0, leader=1, replicas=[1, 0], isr=[1, 0])]), (error_code=0, topic='opsconsole_test', partitions=[(error_code=0, partition=1, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=0, leader=1, replicas=[1, 0], isr=[1, 0])]), (error_code=0, topic='__consumer_offsets', partitions=[(error_code=0, partition=23, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=41, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=32, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=8, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=44, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=17, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=35, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=26, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=11, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=38, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=29, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=47, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=20, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=2, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=5, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=14, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=46, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=40, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=49, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=13, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=4, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=22, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=31, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=16, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=7, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=43, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=25, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=34, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=10, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=37, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=1, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=28, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=19, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=45, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=36, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=27, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=9, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=18, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=21, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=48, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=3, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=12, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=30, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=39, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=15, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=42, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=33, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=24, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=6, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=0, leader=1, replicas=[1, 0], isr=[1, 0])]), (error_code=0, topic='_schemas2', partitions=[(error_code=0, partition=0, leader=0, replicas=[0, 1], isr=[0, 1])]), (error_code=0, topic='apphits_test', partitions=[(error_code=0, partition=1, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=0, leader=0, replicas=[0, 1], isr=[0, 1])]), (error_code=0, topic='apphits', partitions=[(error_code=0, partition=8, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=11, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=2, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=5, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=14, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=13, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=4, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=7, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=1, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=10, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=9, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=3, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=12, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=15, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=6, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=0, leader=1, replicas=[1, 0], isr=[1, 0])])])
DEBUG:kafka.cluster:Updated cluster metadata to Cluster(brokers: 2, topics: 9, groups: 0)
DEBUG:kafka.client:No connected nodes found. Trying disconnected nodes.
DEBUG:kafka.client:No luck. Trying all broker metadata
DEBUG:kafka.client:Initiating connection to node 0 at fr.amers1.cloud:9092
DEBUG:kafka.conn:: creating new socket
DEBUG:kafka.conn:: established TCP connection
DEBUG:kafka.client:Node 0 connected
INFO:kafka.conn:Broker is not v0.10 -- it did not recognize ApiVersionRequest_v0
DEBUG:kafka.conn:: creating new socket
DEBUG:kafka.conn:: established TCP connection
DEBUG:kafka.client:Node 0 connected
DEBUG:kafka.conn: Response 3: ListGroupsResponse_v0(error_code=0, groups=[])
INFO:kafka.conn:Broker version identifed as 0.9
INFO:kafka.conn:Set configuration api_version='0.9' to skip auto check_version requests on startup
DEBUG:kafka.metrics.metrics:Added sensor with name bytes-fetched
DEBUG:kafka.metrics.metrics:Added sensor with name records-fetched
DEBUG:kafka.metrics.metrics:Added sensor with name fetch-latency
DEBUG:kafka.metrics.metrics:Added sensor with name records-lag
DEBUG:kafka.metrics.metrics:Added sensor with name fetch-throttle-time
DEBUG:kafka.metrics.metrics:Added sensor with name commit-latency
2016-06-03 14:09:28,682 (5605) [0:00:13.194239] - INFO - Uploaded the uuid-hashed uuid dict with 808642 records. - [module:opsconsole, line:334, functionname:]
INFO:Opsconsole ETL: Uploaded the uuid-hashed uuid dict with 808642 records.
2016-06-03 14:09:28,722 (5605) [0:00:00.040302] - INFO - Uploaded the statcodes set with 658 records. - [module:opsconsole, line:348, functionname:]
INFO:Opsconsole ETL: Uploaded the statcodes set with 658 records.
DEBUG:kafka.coordinator:Sending group coordinator request for group biapp_opsconsole_etl to broker 0
DEBUG:kafka.conn: Request 5: GroupCoordinatorRequest_v0(consumer_group='biapp_opsconsole_etl')
DEBUG:kafka.client:Sending metadata request MetadataRequest_v0(topics=['opsconsole']) to node 0
DEBUG:kafka.conn: Request 6: MetadataRequest_v0(topics=['opsconsole'])
WARNING:kafka.conn: timed out after 10000 ms. Closing connection.
WARNING:kafka.client:Node 0 connection failed -- refreshing metadata
ERROR:kafka.coordinator:Error sending GroupCoordinatorRequest_v0 to node 0 [RequestTimedOutError - 7 - This error is thrown if the request exceeds the user-specified time limit in the request.]
DEBUG:kafka.client:Initializing connection to node 0 for metadata request
DEBUG:kafka.conn:: creating new socket
DEBUG:kafka.conn:: established TCP connection
DEBUG:kafka.client:Node 0 connected
DEBUG:kafka.client:Sending metadata request MetadataRequest_v0(topics=['opsconsole']) to node 0
DEBUG:kafka.conn: Request 7: MetadataRequest_v0(topics=['opsconsole'])
DEBUG:kafka.conn: Response 7: MetadataResponse_v0(brokers=[(node_id=0, host='fr.amers1.cloud', port=9092), (node_id=1, host='fr2.amers1.cloud', port=9092)], topics=[(error_code=0, topic='opsconsole', partitions=[(error_code=0, partition=8, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=11, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=2, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=5, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=14, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=4, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=13, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=7, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=10, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=1, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=9, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=3, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=12, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=6, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=15, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=0, leader=1, replicas=[1, 0], isr=[1, 0])])])
DEBUG:kafka.cluster:Updated cluster metadata to Cluster(brokers: 2, topics: 1, groups: 0)
DEBUG:kafka.coordinator:Sending group coordinator request for group biapp_opsconsole_etl to broker 0
DEBUG:kafka.conn: Request 8: GroupCoordinatorRequest_v0(consumer_group='biapp_opsconsole_etl')
DEBUG:kafka.conn: Response 8: GroupCoordinatorResponse_v0(error_code=0, coordinator_id=1, host='fr2.amers1.cloud', port=9092)
DEBUG:kafka.coordinator:Received group coordinator response GroupCoordinatorResponse_v0(error_code=0, coordinator_id=1, host='fr2.amers1.cloud', port=9092)
DEBUG:kafka.cluster:Updating coordinator for biapp_opsconsole_etl: GroupCoordinatorResponse_v0(error_code=0, coordinator_id=1, host='fr2.amers1.cloud', port=9092)
INFO:kafka.cluster:Group coordinator for biapp_opsconsole_etl is BrokerMetadata(nodeId=1, host='fr2.amers1.cloud', port=9092)
INFO:kafka.coordinator:Discovered coordinator 1 for group biapp_opsconsole_etl
DEBUG:kafka.client:Initiating connection to node 1 at fr2.amers1.cloud:9092
DEBUG:kafka.conn:: creating new socket
DEBUG:kafka.coordinator.consumer:Node 1 not ready -- failing offset fetch request
DEBUG:kafka.conn:: established TCP connection
DEBUG:kafka.client:Node 1 connected
DEBUG:kafka.coordinator.consumer:Group biapp_opsconsole_etl fetching committed offsets for partitions: {TopicPartition(topic='opsconsole', partition=11)}
DEBUG:kafka.conn: Request 1: OffsetFetchRequest_v1(consumer_group='biapp_opsconsole_etl', topics=[(topic='opsconsole', partitions=[11])])
DEBUG:kafka.conn: Response 1: OffsetFetchResponse_v1(topics=[(topic='opsconsole', partitions=[(partition=11, offset=812899910, metadata='', error_code=0)])])
DEBUG:kafka.consumer.fetcher:Resetting offset for partition TopicPartition(topic='opsconsole', partition=11) to the committed offset 812899910
DEBUG:kafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='opsconsole', partition=11) at offset 812899910
DEBUG:kafka.consumer.fetcher:Sending FetchRequest to node 0
DEBUG:kafka.conn: Request 9: FetchRequest_v1(replica_id=-1, max_wait_time=10000, min_bytes=1, topics=[(topic='opsconsole', partitions=[(partition=11, offset=812899910, max_bytes=1048576)])])
DEBUG:kafka.conn: Response 9: FetchResponse_v1(throttle_time_ms=0, topics=[(topics='opsconsole', partitions=[(partition=11, error_code=0, highwater_offset=819289683, message_set=['(offset=812899910, message=373)',
...
'(offset=812905264, message=1226)', '(offset=812905265, message=325)', '(offset=812905278, message=1478)', '(offset=812905279, message=292)', '(offset=None, message=None)'])])])
DEBUG:kafka.consumer.fetcher:Adding fetched record for partition TopicPartition(topic='opsconsole', partition=11) with offset 812899910 to buffered record list
DEBUG:kafka.metrics.metrics:Added sensor with name topic.opsconsole.bytes-fetched
DEBUG:kafka.metrics.metrics:Added sensor with name topic.opsconsole.records-fetched
DEBUG:kafka.consumer.fetcher:Skipping message offset: 0 (expecting 812899910)
DEBUG:kafka.consumer.fetcher:Skipping message offset: 0 (expecting 812899910)
DEBUG:kafka.consumer.fetcher:Skipping message offset: 812899912 (expecting 812899910)
DEBUG:kafka.consumer.fetcher:Skipping message offset: 812899913 (expecting 812899910)
DEBUG:kafka.consumer.fetcher:Skipping message offset: 812899914 (expecting 812899910)
...
DEBUG:kafka.consumer.fetcher:Skipping message offset: 812905278 (expecting 812899910)
DEBUG:kafka.consumer.fetcher:Skipping message offset: 812905279 (expecting 812899910)
DEBUG:kafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='opsconsole', partition=11) at offset 812899910
...

@dpkp
Copy link
Owner

dpkp commented Jun 3, 2016

This is useful, thanks! I'll investigate.

@dpkp
Copy link
Owner

dpkp commented Jun 4, 2016

This is strange -- it appears that the compressed messageset that we get back from the broker has packed incorrect message offsets.

The logs suggest that the outer offset is correct:

DEBUG:kafka.conn: Response 9: FetchResponse_v1(throttle_time_ms=0, topics=[(topics='opsconsole',
partitions=[(partition=11, error_code=0, highwater_offset=819289683,
message_set=['(offset=812899910, message=373)', 
...

But the inner offsets appear to be 0:

DEBUG:kafka.consumer.fetcher:Skipping message offset: 0 (expecting 812899910)
DEBUG:kafka.consumer.fetcher:Skipping message offset: 0 (expecting 812899910)

You are running 0.9 brokers with the v0 message format, correct? If so, compressed messages should always include absolute offsets as calculated by the broker. What are you using to produce these messages?

@zoltan-fedor
Copy link
Author

Yes, I am having 0.9.0.1 brokers from Confluent (confluent-kafka-2.11.7-0.9.0.1-1.noarch).

Not sure about the v0 message format. How could I check that?

I use kafka-python to produce the messages, see (simplified) code below:

from confluent.schemaregistry.client import CachedSchemaRegistryClient
from confluent.schemaregistry.serializers import MessageSerializer, Util
from kafka import KafkaProducer

# Initialize the client by connecting to the Schema Registry
client = CachedSchemaRegistryClient(url='http://'+schema_registry_server)

# Load the current schema file
avro_schema = Util.parse_schema_from_file(current_folder+'/jobs/'+avro_schema_file)

# get the schema_id for the given schema file and topic
schema_id = client.register(kafka_topic, avro_schema)

# Avro message serialized
serializer = MessageSerializer(client)

producer = KafkaProducer(
        bootstrap_servers=kafka_servers.split(','),
        compression_type='snappy',
        retries=5
    )

for line in f:
    data = json.loads(line)
    avro_encoded = serializer.encode_record_with_schema_id(schema_id, data)  # encode the data into avro according to the Schema
    producer.send(kafka_topic, avro_encoded)  # upload to Kafka

producer.flush()

@dpkp
Copy link
Owner

dpkp commented Jun 4, 2016

Can you try grabbing the specific message using something like this:

https://gist.github.com/dpkp/452ea2080d54bb615ae4779851ada689

Point that at the host / port for the leader of the topic-partition that has the "stuck" message and send me the output.

@zoltan-fedor
Copy link
Author

I hope you don't mind but I have emailed the output to you.
The error behavior has change somewhat, now it simply cannot read the message - just runs the CPU on 100% in kafa-python trying to read it.
I used your snippet to export the message into a file and the funny thing that when I try to open it in a text editor (pluma) on my linux desktop, it has the same problem, CPU 100% trying to open the file.

The same happening across multiple topics now - CPU is at 100%, but kafka-python gets stuck reading the messages at a given point and can't move any further.
It is like the compression would have hit a bug, causing CPU 100%.

@dpkp
Copy link
Owner

dpkp commented Jun 4, 2016

Thanks -- specifically, which offset are you seeing as "stuck" ?

@zoltan-fedor
Copy link
Author

It is 813721754

@iiivnv
Copy link

iiivnv commented Jun 8, 2016

I have the same issue and as I can see all requests goes out with default group_id kafka-python-default-group, despite I set another one - zinc_group:
2016-06-07 14:11:51,965.965.569019318:kafka.coordinator.consumer:140668873398080:DEBUG:11571:Group kafka-python-default-group fetching committed offsets for partitions: set([TopicPartition(topic='catalog-test1', partition=0)])

May this be the reason why we don't see any new messages?

I use kafka-python 1.2.1
Broker version: 0.8.1

@zoltan-fedor
Copy link
Author

I believe my issue was related to snappy compression. Since I turned snappy compression off in the producers a few days ago, the issue is gone.

@iiivnv
Copy link

iiivnv commented Jun 8, 2016

Great, I also found the issue. Now it works. Thanks.

@dpkp
Copy link
Owner

dpkp commented Jun 8, 2016

I haven't been able to identify the root cause here. The debug logs show that it is related to the fetcher discarding compressed messages that have offsets that appear too low. In normal operation this can happen if you request a message offset that falls in the middle of a compressed message set. In that case the fetch correctly scans through the compressed message set, dropping messages until it gets to the requested offset. If messages are not compressed, this code is not necessary because kafka can always return the exact message offset requested. But something is happening here that is causing the compressed message set scan to fail. I assume this bug is on the client side, but I have been unable to track it down. Help appreciated! If anyone can provide a reproducible test case, that would also be very useful.

@wushujames
Copy link

Could it be related to this? https://issues.apache.org/jira/browse/KAFKA-3789

Although that seems to only apply to Kafka 0.10, so maybe isn't applicable.

@benauthor
Copy link
Contributor

I saw this behavior today. I'm using snappy compression. In my case, the problem message was not decoded during Message.decompress -- the call succeeded but the message.value appeared unchanged. I'm not clear on whether the data was corrupt or there was a failure in the decoding caused by some bug. After this, since kafka-python was not seeing the offset as it scanned over the messages in the set, it was stuck in a loop making the same request and scanning it over and over.

kafka-python 1.2.1, python-snappy 0.5, kafka 0.9.0.1

@benauthor
Copy link
Contributor

Please disregard what I said about problems in decompression -- the message eventually show up decompressed. I haven't quite wrapped my head around the recursive _unpack_message_set calls and the relative vs. absolute offsets, but my stuck message ends up being yielded as a ConsumerRecord with a (relative) offset 0, and the scan at https://github.com/dpkp/kafka-python/blob/master/kafka/consumer/fetcher.py#L440 is checking for an absolute:

DEBUG:kafka.consumer.fetcher:Adding fetched record for partition TopicPartition(topic=u'topic_name_redacted', partition=98) with offset 1057322 to buffered record list
DEBUG:kafka.consumer.fetcher:Skipping message offset: 0 (expecting 1057322)
DEBUG:kafka.consumer.fetcher:Skipping message offset: 1057323 (expecting 1057322)
DEBUG:kafka.consumer.fetcher:Skipping message offset: 1057324 (expecting 1057322)
DEBUG:kafka.consumer.fetcher:Skipping message offset: 1057325 (expecting 1057322)

Hope this is useful.

@dpkp
Copy link
Owner

dpkp commented Jun 21, 2016

Thanks for the details. The _unpack_message_set should only be recursive when dealing with compressed messages. To encode "compressed" messages, kafka writes the messages into a message set structure, encodes the message set into bytes, compresses those bytes, writes the compressed bytes as a new, single message with the compression flag set, and then writes this new message as the only item in a new message set. It is this new "compressed" message set that is sent to and received from the broker. So in practice decoding a compressed messageset requires decoding the wrapper message set, getting the underlying message, decompressing the message into a new messageset, and decoding the new messageset into a list of uncompressed messages.

The relative / absolute offset change happened in kafka 0.10 clients/brokers. Older messages had encoded the underlying messages with absolute offsets. But this required the broker to decompress / recompress every compressed messageset as it was produced. The new approach only writes the absolute offset to the outer messageset, and uses relative offsets in the inner messageset in order to avoid the broker decompress/recompress step when handling produced messages.

I'm not sure whether that leaves you more confused or less, but hopefully this adds some useful background.

@benauthor
Copy link
Contributor

Yes, that helps -- explains why I had a compressed message even after I decompressed it once. I can poke around in the debugger again to try to understand how that 0 offset is getting out to that level.

@dpkp
Copy link
Owner

dpkp commented Jun 21, 2016

Thanks -- it could be a bug in _unpack_message_set or it could be that there is an edge case where a compressed message set gets returned with 0 as the first offset and maybe the java client handles this case? I tried to look at raw data from zoltan to see if it was the 0 offset case, but the data looked normal and I was unable to reproduce in a repl. If you can get this to happen in a debug session, please post details! I have not been able to reproduce so far.

@benauthor
Copy link
Contributor

Here's a narrative from my debug session:

# we get to the top level _unpack_message_set with the right offset 
> msg.is_compressed()
True
> offset
1057322
> relative_offset
0
# after mset = msg.decompress()
> mset[0][0]
1057322
# call _unpack_message_set a second time,
>msg.is_compressed()
True
> offset
1057322
> relative_offset
0
# after  msg.decompress()
ipdb> mset[0][0]
0
# _unpack_message_set third time around
> msg.is_compressed()
False
> offset
0
> relative_offset
0
# finally we yield ConsumerRecord(... offset + relative_offset)  <-- with the undesirable zero

Whereas in the normal, working case:

# _unpack_message_set
> msg.is_compressed()
True
> offset
1057321
> mset[0][0]
1057321
# _unpack_message_set again
ipdb> msg.is_compressed()
False
> offset
1057321
> relative_offset
0
# yield ConsumerRecord(... offset + relative_offset)  <-- 1057321

I haven't busted into the raw response yet...

@rediceb
Copy link

rediceb commented Jul 1, 2016

I got the same issue. after I changed " log.cleanup.policy = compact" in the server.properties to "log.cleanup.policy = delete" . Work well now!

@ayiis
Copy link

ayiis commented Jul 7, 2016

Hey, seems I got a same problem.

kafka-python (1.0.2)
kafka-0.9.0.0

Here's my consumer

consumer = KafkaConsumer(
    bootstrap_servers = 'xxx', 
    group_id = 'my.group', 
    max_partition_fetch_bytes = 1048576*1000,
)

consumer.assign([
    TopicPartition(topic=my_topic, partition=0),
])

consumer.seek(TopicPartition(topic=my_topic, partition=0), 706) 

for msg in consumer: 
    print msg.topic, msg.partition, msg.offset, msg.key

The "stuck" offset is 707, so I read from 706.

And here comes the log (I replaced the value with none sense here, and the original value is in gzip compression if it matters)

https://gist.github.com/ayiis/b5d5738722a0bfb184bf21c8230f4776

~PS

I upgrade kafka-python to 1.2.3 and seems it makes a little deferences

https://gist.github.com/ayiis/45e7489f7e8792888f3f799fc9652ad0

~PS

I upgrade kafka-python to 1.2.4 and it turns out that

The offset 707 had been ignored and stepped over, with no exception

First I got 706 and the next one is 708 and 709 and goes on.

@dpkp
Copy link
Owner

dpkp commented Jul 11, 2016

Are any of you consuming from topics that are being produced by MirrorMaker ?

@dpkp
Copy link
Owner

dpkp commented Jul 11, 2016

I'm still unable to reproduce this issue, but it would be great if anyone that can could test against PR #755

@ayiis
Copy link

ayiis commented Jul 11, 2016

I tried PR 755 and it worked fine!

Now I can consume 707 as well as the other.

btw, I use kafka-python (1.0.2) as my producer.

Thank you.

~PS

I looked into the raw_bytes in Message.decompress(). And I found that

The logs suggest that the outer offset is correct

But the inner offsets appear to be 0

is exactly what has happened.

Still dont know why. but i guess my message in kafka is somehow broken or invalid.

@dpkp dpkp added the bug label Jul 12, 2016
@dpkp dpkp changed the title Consumers get stuck with no new data, while there are unread messages KafkaConsumer can get "stuck" on compressed topics Jul 12, 2016
@ayiis
Copy link

ayiis commented Jul 13, 2016

I reproduced this one.

In my case, it was caused by producer in a bad network.

kafka-python (1.2.4)
kafka-0.9.0.0
for x in xrange(100):
    try:
        producer = KafkaProducer(
            bootstrap_servers = 'xxx',
            compression_type = 'gzip',
            retries = 5,
            retry_backoff_ms = 1000,
        )

        print 'create producer success'
        # Start sending RST to this producer whatever producer.send(), to cause a network RESET.
        # To cause producer retries
        # PS: This will not cause [BrokerConnection reset] Exception
        time.sleep(5)

        record_metadata = producer.send(topic = 'test.ay.1', value = 'Hello kafka!' + str(x)).get()

        producer.flush()
        producer.close()

    except Exception, e:
        print 'Exception:', e

Keep responsing RST to producer.send().get() to cause producer retries.

Stop sending RST after the producer have retried a few times, to allow producer sending message to kafka. Then, one of those messages may "stuck" the KafkaConsumer.

@dpkp
Copy link
Owner

dpkp commented Jul 13, 2016

aha! that makes sense. KafkaProducer may actually be the culprit here by double-compressing messages if there is a failure + retry.

@dpkp
Copy link
Owner

dpkp commented Jul 13, 2016

I can reproduce -- excellent work, @ayiis !

@dpkp
Copy link
Owner

dpkp commented Jul 13, 2016

I was able to force this behavior in KafkaProducer by hacking the retry code (this forces retries even on success):

diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index 958e165..2b56da1 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -215,7 +215,7 @@ class Sender(threading.Thread):
         if error is Errors.NoError:
             error = None

-        if error is not None and self._can_retry(batch, error):
+        if self._can_retry(batch, error):
             # retry
             log.warning("Got error produce response on topic-partition %s,"
                         " retrying (%d attempts left). Error: %s",
@@ -243,8 +243,8 @@ class Sender(threading.Thread):
         We can retry a send if the error is transient and the number of
         attempts taken is fewer than the maximum allowed
         """
-        return (batch.attempts < self.config['retries']
-                and getattr(error, 'retriable', False))
+        return batch.attempts < self.config['retries']
+                #and getattr(error, 'retriable', False))

using a producer configured like this: KafkaProducer(retries=1, compression_type='gzip') I am able to produce messages and cause KafkaConsumer to get "stuck" (prior to commit 003bb0a) or skip the message (after commit 003bb0a).

I will submit a patch to fix KafkaProducer. The current behavior of KafkaConsumer is to skip these double-compressed messages. I will think about whether we can or should attempt to decompress them now that we know how they are constructed. Unfortunately these messages are likely incompatible with other clients and so I'm actually hesitant to add code to handle them here. So I am currently leaning towards skip w/ warning. Thoughts?

@benauthor
Copy link
Contributor

Yeah, if a message is malformed, discarding while making noise about it seems appropriate.

@ayiis
Copy link

ayiis commented Jul 14, 2016

Quite agree with benauthor.

@dpkp dpkp added the producer label Jul 15, 2016
@dpkp dpkp changed the title KafkaConsumer can get "stuck" on compressed topics KafkaProducer produces corrupt "double-compressed" messages on retry when compression is enabled. KafkaConsumer gets "stuck" consuming them Jul 15, 2016
@dpkp
Copy link
Owner

dpkp commented Jul 15, 2016

I made some changes to the consumer side of compressed message sets in #755 . I have tried a few different approaches and at this point I am leaning towards making the default behavior to return the corrupt message to the user. I added a configuration parameter named skip_double_compressed_messages that can be set to skip the messages.

Does this sound like a reasonable solution?

@benauthor
Copy link
Contributor

I can abide that; returning the inner compressed data would be what you'd expect kafka-python to do if the producer bug was in some other library's producer.

@zoltan-fedor
Copy link
Author

For these double-compressed messages, sure the consumer could return the corrupt message or has a setting to suppress them (step them over).
In my case I actually use Kafka-python as the producer and still saw this issue on the consumer side, so we should also look at the Producer side of Kafka-python to prevent this double-compression occurring at the first place, next to making the Consumer handle it in the above described manner.

@dpkp
Copy link
Owner

dpkp commented Jul 15, 2016 via email

@ayiis
Copy link

ayiis commented Jul 15, 2016

That would be great!

@dpkp
Copy link
Owner

dpkp commented Jul 16, 2016

Fixes to both KafkaProducer and KafkaConsumer have been released in 1.2.5 -- please reopen if this issue resurfaces!

@dpkp dpkp closed this as completed Jul 16, 2016
@dpkp
Copy link
Owner

dpkp commented Jul 16, 2016

Thanks again to everyone for all the hard work tracking this one down!!

jeffwidman added a commit that referenced this issue Dec 13, 2018
This `skip_double_compressed_messages` flag was added in #755 in
order to fix #718.

However, grep'ing through the code, it looks like it this is no longer
used anywhere and doesn't do anything.

So removing it
jeffwidman added a commit that referenced this issue Dec 13, 2018
This `skip_double_compressed_messages` flag was added in #755 in
order to fix #718.

However, grep'ing through the code, it looks like it this is no longer
used anywhere and doesn't do anything.

So removing it.
jeffwidman added a commit that referenced this issue Jan 13, 2019
This `skip_double_compressed_messages` flag was added in #755 in
order to fix #718.

However, grep'ing through the code, it looks like it this is no longer
used anywhere and doesn't do anything.

So removing it.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants