-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for LogAppend timestamp type #2
Conversation
@@ -57,6 +60,9 @@ func (m *Message) encode(pe packetEncoder) error { | |||
pe.putInt8(m.Version) | |||
|
|||
attributes := int8(m.Codec) & compressionCodecMask | |||
if m.LogAppendTime { | |||
attributes |= timestampTypeMask |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
by @mikejquinn:
Does this make sense to be set by a producer? I thought this was a server config setting.
This change is done for the sarama tests to pass. The tests use encode
/decode
to simulate message delivery. So, yes, normally the flag is only set by the Kafka and should not be considered there.
I think I could change the tests to overcome this, but I am not sure it worth the efforts.
message.go
Outdated
@@ -24,6 +24,8 @@ const ( | |||
CompressionLZ4 CompressionCodec = 3 | |||
) | |||
|
|||
const timestampTypeMask = 0x08 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
by @mikejquinn:
Move this below the CompressionCodec.String function and CompressionLevelDefault const.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this was just a completely missing feature on their end?
consumer.go
Outdated
@@ -516,13 +520,17 @@ func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMes | |||
if offset < child.offset { | |||
continue | |||
} | |||
timestamp := batch.FirstTimestamp.Add(rec.TimestampDelta) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not that this computation is particularly expensive but I would probably declare var timestamp time.Time
and then do one or the other in an if/else.
I was surprised myself that such an important feature can be missing, but it seems to be the case. |
36f2440
to
e753d25
Compare
PR as requested in https://github.com/liftoffio/liftoff/pull/4320 . It out local version for IBM#1258 and IBM#1259 .