-
Notifications
You must be signed in to change notification settings - Fork 104
Make kafka-cluster chunk save messages partition aware. #472
Conversation
c0a9918
to
b0226dc
Compare
log.Info("kafka-mdm: available partitions %v", availParts) | ||
if partitionStr == "*" { | ||
partitions = availParts | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like the whole block in the if
on lines 111 - 118
could be moved in here and that if
could be dropped, that would safe an if
. On the other hand it would instantiate sarama
even if the defined partitionStr
has an invalid format, but I'd just assume the format should be right in the "normal" case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good call. i have made that change.
return nil, fmt.Errorf("No partitions returned for topic %s", topic) | ||
} | ||
if i > 0 { | ||
if len(partitions) != partitionCount { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't there also be a check to verify that all topics are using the same set of partitions? otherwise GetPartitions()
will always just return the partitions of the last topic in topics
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kafka uses sequential partition numbers. So if the partitionCount is the same, then the partitions will also be the same.
log.Debug("kafka-notifier sending %d batch metricPersist messages", len(msg.SavedChunks)) | ||
|
||
data, err := json.Marshal(&msg) | ||
// In order to correctly routes the saveMessages to the correct partition, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
route
without s
binary.Write(buf, binary.LittleEndian, uint8(mdata.PersistMessageBatchV1)) | ||
buf.Write(data) | ||
encoder := json.NewEncoder(buf) | ||
pMsg = mdata.PersistMessageBatch{Instance: c.instance, SavedChunks: c.buf[i:i]} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm confused about [i:i]
, wouldn't that always return an empty list? should that be [i:i+1]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, that should definitely be [i:i+1]
2ea6951
to
c428c3d
Compare
- Move partition fetching/validation to shared kafka lib - Refactor sending MetricPersist messages to set the correct PartitionKey. - Update consumer to only consume from configured partitions
c428c3d
to
a5e0f31
Compare
PartitionKey.
Issue #452