-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
make decoding unknown message versions error #940
make decoding unknown message versions error #940
Conversation
message.go
Outdated
@@ -144,6 +144,10 @@ func (m *Message) decode(pd packetDecoder) (err error) { | |||
m.Timestamp = timestamp | |||
} | |||
|
|||
if m.Version > 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might as well put this check immediately after extracting the version
message_test.go
Outdated
167, 236, 104, 3, // CRC | ||
0x01, // magic version byte | ||
0x00, // attribute flags | ||
0x00, 0x00, 0x00, 0x00, // timestamp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timestamp is an int64, aka 8 bytes not 4
@eapache thanks! I rebased the changes into this branch. |
a194c39
to
0eb6982
Compare
@eapache the test failure now is due to a v1 message failing with a crc mismatch. Do you have a good way of recomputing the CRC? |
I think the last time I had to do one of these I just stuck a print into the test and ran it locally. Are you still unable to run the local tests? |
@eapache yeah, still confused about my local state. Instead, I pushed an |
crc32_field.go
Outdated
return PacketDecodingError{"CRC didn't match"} | ||
expected := binary.BigEndian.Uint32(buf[c.startOffset:]) | ||
if crc != expected { | ||
return PacketDecodingError{fmt.Sprintf("CRC didn't match expected %v got %v", expected, crc)} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 you probably want to print these as hex though for convenience
749486f
to
60dc446
Compare
If sarama encounters an unknown message version somehow (e.g. due to a bug in Kafka 0.11's downconversion), right now sarama will carry on decoding the message, resulting in much harder to understand errors. Instead, bail out early with a friendly error message.
60dc446
to
f5ef216
Compare
@eapache ok, all ready! I got tests working locally eventually. |
Thanks! |
Hello, @tcrayford @eapache When my process running with kafka0.11, I catch this error. If I ignore this error, some messages are missing. But I can't found any reason about this. Can you give me a favor? Maybe some archives about a |
I am not aware of any bugs in the downconversion, but it's possible. I'd also note that Sarama master branch just got support for talking in the 0.11 record format, so you might be able to avoid the whole problem with that. |
@eapache @catlittlechen you're likely hitting a thing that is not a bug, but a documented fact about 0.11 - if your "max message size" on the consumer's |
Thank you. @eapache @tcrayford |
If sarama encounters an unknown message version somehow (e.g. due to a
bug in Kafka 0.11's downconversion), right now sarama will carry on
decoding the message, resulting in much harder to understand errors.
Instead, bail out early with a friendly error message.
I added a test, but struggled running tests on my laptop due to (I think) out of date dependencies, so will rely on CI running here. I also added a test for decoding a message with the
1
magic byte, which sarama supports but I could not see a test inmessage_test.go
for.