-
Notifications
You must be signed in to change notification settings - Fork 105
add tool for replicating metrics to second kafka cluster #435
Conversation
@@ -17,6 +17,6 @@ export CGO_ENABLED=0 | |||
# Build binary | |||
cd $GOPATH/src/github.com/raintank/metrictank/cmd | |||
for tool in *; do | |||
cd $tool | |||
cd $GOPATH/src/github.com/raintank/metrictank/cmd/$tool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we still want this change? b89b20e should fix the same problem. we can probably just remove this commit
showVersion = flag.Bool("version", false, "print version string") | ||
logLevel = flag.Int("log-level", 2, "log level. 0=TRACE|1=DEBUG|2=INFO|3=WARN|4=ERROR|5=CRITICAL|6=FATAL") | ||
|
||
partitionBy = flag.String("partition-by", "byOrg", "method used for paritioning metrics. (byOrg|bySeries)") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same thought here as https://github.com/raintank/tsdb-gw/pull/20#pullrequestreview-14312450
Since we decided not to use sarama-cluster (rather use sarama directly) for input plugins so that we had full control over our offset (#236 ), why are we using it here? also it's interesting that we decode all data in the consumer, pass the structs to the producer, which encodes them again. we could probably just pass the binary data directly? but i guess decoding might be useful if we want to print what's going on, or something. |
|
9cad750
to
f8fa86e
Compare
config.Group.Return.Notifications = true | ||
config.ChannelBufferSize = 1000 | ||
config.Consumer.Fetch.Min = int32(1) | ||
config.Consumer.Fetch.Default = int32(32768) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you need to specify the type explicitly. the compiler should be able to deduce it from the type of the attribute you're assigning to.
log.Debug("flushing metricData buffer to kafka.") | ||
complete := false | ||
for !complete { | ||
if err = publisher.Send(buf); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at our volume levels, it's probably not really a concern, so I'm not asking we make a change at this point, but if we decoupled publisher and consumer more they could work concurrently and faster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was a deliberate choice to reduce complexity. Performance can easily be scaled by just running multiple mt-replicators.
buf = buf[:0] | ||
c.consumer.MarkPartitionOffset(m.Topic, m.Partition, m.Offset, "") | ||
case <-ticker.C: | ||
log.Info("%d metrics procesed in last 10seconds.", counter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's very possible that the prior case will keep the select case rather busy (especially when it hits a failure and sleeps a bit), in which case this ticker can silently drop ticks. This is not a big issue in this case, except we shouldn't claim it's in last 10seconds
. it might very well be in the last 20s or 30s. so i would just remove that part, we can just use the timestamps of the messages for guidance. (or actually measure how long it took since last time)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just tracking the time between ticker.C channel reads and using that instead of printing "10seconds" is the best option I think
payload := make([]*sarama.ProducerMessage, len(metrics)) | ||
|
||
for i, metric := range metrics { | ||
data, err := metric.MarshalMsg(data[:]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wouldn't this keep accumulating data, because every time it appends the msgp bytes for a metric, it does so after the msgp bytes of prior metrics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, that should be data[:0]
@@ -19,4 +19,5 @@ cd $GOPATH/src/github.com/raintank/metrictank/cmd | |||
for tool in *; do | |||
cd $tool | |||
go build -ldflags "-X main.GitHash=$GITVERSION" -o $BUILDDIR/$tool | |||
cd .. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW you can just rebase this branch on top of master and then you don't need this commit anymore.
Key: sarama.ByteEncoder(key), | ||
Topic: p.topic, | ||
Value: sarama.ByteEncoder(data), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if we do this we'll hit the same problem as https://github.com/raintank/tsdb-gw/issues/3 because a ProducerMessage's Value will point to an array that will be overwritten by subsequent loop iterations. (sarama.ByteEncoder doesn't copy or encode anything when called. it's just an alias for []byte which it will "encode" (return) later)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, i should just stop trying to be clever with this and just accept the allocations. We can use a bufferPool later if we need the performance boost.
This tool will read from one kafka cluster and write to another. The source and destination kafka topics can have different names and different number of partitions.
@Dieterbe anything preventing this from being merged? |
yes #435 (comment) |
that comment wasnt showing in github for me. must be some caching issue. fixed. |
sweet, thanks @woodsaj ! |
No description provided.