-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Not seeing all messages when syncProducer called from goroutines #1032
Comments
Hmm, this is very strange. The symptoms sound a lot like you have multiple partitions for that topic and don't realize it, but it looks like you created it with only one. It may still be worth checking the state of your cluster to ensure it doesn't have multiple for some reason. Also concerning (but not obviously related to your issue) is the "sarama: client.go:655: client/metadata found some partitions to be leaderless" in the logs - sounds like maybe your cluster isn't entirely healthy. The code you provided looks correct to me. |
I wonder if the problem is that the main program is being allowed to terminate before sarama has managed to send the messages to kafka? I've noticed a similar problem in one of my tests and if I add a sleep before exiting main, this gives sarama's goroutines a chance to do the necessary. |
@glyn I tried adding a sleep for 5 seconds to my code after the waitgroup returns and still no dice :( So I started with a fresh zookeeper and kafka and created the topic with only one partition to be sure there weren't multiple partitions and ensure a healthy cluster. Unfortunately, I still saw the issue. A colleague of mine suggested that I try running the sarama producer without specifying I wanted to see what the maximum version of kafka I could use and still have it produce all of the messages and I discovered it to be This is a stab in the dark but could it be that the protocol for producing messages changed with the addition of headers in 0.11 and that's causing the producer to return before the message is fully produced? |
The flow of the code is marginally different, but I can't imagine that we wouldn't be putting messages on the wire in the first place. It does seem possible though that we're producing malformed requests and that the broker is silently rejecting them. If you look at the broker-side logs, are there any messages suggesting that it's not accepting some of the messages? Also worth checking: if you take the |
cc @wladh since this may be a record-batch implementation issue of some sort |
@kenschneider18 I met a similar problem described on #1034 |
I don't think it's really related, but I'd be curious if #1038 has any effect on this issue. |
@eapache So I set the broker logs level to debug and tried running my program once as is and once without the version. There wasn't any noteworthy difference in the logs either time. It's worth noting that in both cases even though I'm producing 5 messages the logs only seem to output for updating the Here's the log from a run without a specified version (all of the messages are produced):
Here's the log from the run with the version specified:
Tried taking the headers out of the producer requests with the version set to 11 and it didn't resolve the issue. |
Ya that's normal, Sarama will collect multiple messages into a single request when possible. Did all five messages make it in the second (version-specified) case? It looks like they did, since the high watermark gets updated to 5 still. What would be helpful would be broker logs from a case where not all the messages make it (though I don't know how hard that is to produce with a small number of messages?) |
Actually in the second (version-specified) case they don't make it, which is weird. |
That's really really weird, since the high watermark is still getting updated and the value after it ( |
One thing to note is if I put a lock around my call to producer.SendMessage() my problem is resolved. I decided to take a look at the actual topic log to see what's there. Obviously it's a lot of gibberish but from what I can see it looks like its all there. I did two runs, both with the version. The first run I did without a mutex around SendMessage() the second run had it. The log from the second run are longer, but again, from the fields I can read it everything including the headers appears to be there. First run (no mutex):
Second run (with mutex):
EDIT: Another thing worth noting, I made the |
Tried running with the code from #1038 and no dice. |
I'm seeing the same issue
Kafka settings [defaults except] Writing messages using the Sync Producer. Adding log.message.format.version=0.10.2 to the kafka config makes the problem go away. |
@superfell Unfortunately that only seems to work if you're not sending record headers. If you try to send headers an error is returned to sarama and the broker spits out this error:
Still an interesting observation though. |
I put together a repro case that spins up a number of goroutines calling a producer, writes a known set of data, consumes that range and compares the results of what the producer reported with what the consumer saw. with a producer concurrency of 1 looks ok, as you ramp up concurrency messages seem to end up with different offsets, the consumer see's holes in the offsets. code and some example runs at |
I ran with the go race detector which didn't report anything. then I looked at the actual segments on the broker, Here's the LogSegment dump from a good run
And one from a bad run, perhaps the encoding for multiple records has an issue somewhere, notice the difference in sizes.
|
Also the fact that the offsets are no longer consecutive. Unless this is only dumping record-batches and not records? But that would explain the size differences too, sort-of. |
I was able to capture this produce request message with wireshark (which unfortuantly doesn't seem to update the v3 API format), am still picking it apart.
|
I decoded the produce message by hand, the only thing that looks possibly suspect to me is that Last Offset Delta isn't set to anything.
|
I couldn't see anywhere that was setting LastOffsetDelta, I added a change to produceSet::buildRequest to set it at the same time its assigning offsetDelta's to records and that resolved the issue in my test app. |
So after looking into the code and the kafka protocol changes for 0.11 a bit and I understand now that they were pretty significant. I decided to test out my sample program and remove the concurrency but use the |
Looks like @superfell found it, still need to test it myself. |
I think @superfell is correct, looking at my WIP-transaction code I needed to add the Also from the |
I tried the tweak below and it seems to be working. The question is whether or not its a sane fix. In //pe.putInt32(b.LastOffsetDelta)
pe.putInt32(int32(len(b.Records) - 1)) // Is it possible to get here with empty b.Records?
for i, r := range b.Records {
r.OffsetDelta = int64(i)
} If these offsets can be computed on the fly in the encode methods, are the |
Scratch that thought - I looked at the impact of removing those fields. I like @superfell 's approach of adding the fix in if req.Version >= 3 {
rb := set.recordsToSend.recordBatch
rb.LastOffsetDelta = int32(len(rb.Records) - 1) // Is it possible to get here with empty b.Records?
for i, record := range rb.Records {
record.OffsetDelta = int64(i)
}
req.AddBatch(topic, partition, rb)
continue
} |
@pkedy 's solution works in my code and I agree that it should be set in buildRequest. |
As the code exists in master right now, there will also need to be a change to RecordBatch::encode. It was changed at some point to use this for LastOffsetDelta: pe.putInt32(int32(len(b.Records))) https://github.com/Shopify/sarama/blob/master/record_batch.go#L67 It will need to be changed back to this for the change in produce_set::buildRequest to be picked up: pe.putInt32(b.LastOffsetDelta) |
cc @bobrik since you changed Actually, I assume somebody has already reproduced this issue with the code currently in master (including Ivan's tweak to that line) but if not then maybe that already fixed it? It looks like it would be encoding the same value as if you set it in |
@eapache - @kenschneider18 and I suspect the I'm about to submit a PR assuming this is correct. |
@eapache I can confirm that the issue still existed in master. The key is the use of len(b.records) - 1 rather than len(b.records) for LastOffsetDelta. |
Change LGTM. I don't see how this triggers missing messages, though. |
@bobrik - Good point. @kenschneider18 and I were not on the latest master. Along the journey of discovering the issue for ourselves, we noticed the off by 1. After the PR is merged, we can close this ticket. |
Thank you all for your hard work digging into this and finding a solution! I'll let it soak over the weekend, and likely push a new release with Phil's fix on Monday. |
Just for the record, this was the root cause of the issue I was trying to fix with #1037 |
I also logged a bug in Kafka as ideally the broker shouldn't accept the bad request to start with |
…nges Document recordbatch offset changes from #1032
Versions
Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.
Sarama Version: 1.15.0
Kafka Version: 0.11.0
Go Version: 1.9.2 darwin/amd64
Configuration
What configuration values are you using for Sarama and Kafka?
Sarama configuration
Kafka configuration
Logs
When filing an issue please provide logs from Sarama and Kafka if at all
possible. You can set
sarama.Logger
to alog.Logger
to capture Sarama debugoutput.
Problem Description
When calling SendMessage on a single instance of syncProducer from multiple goroutines, some messages seem to fail to be produced to Kafka. I've looked at what ends up on the stream using Apache's kafka-console-consumer and it shows only a fraction of the messages on the stream anywhere from half of the messages down to none. I wrote my own consumer using sarama and it's the same issue, however I get the below error message back from sarama. I want to use syncProducer because I need to guarantee that messages will be published to the stream in the order that they're received by my application. Maybe I've just implemented it wrong, but right now I'm out of ideas and I'm hoping someone on here can help me out.
Here's how I created my topic:
bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic people
I'm running a single broker on my local machine. I've written a sample program that can reproduce the issue. It's also worth noting that none of the calls to sendMessage() are returning errors when I run the code.
main.go
streamer.go
The text was updated successfully, but these errors were encountered: