-
Notifications
You must be signed in to change notification settings - Fork 104
Conversation
cmd/mt-replicator/main.go
Outdated
@@ -100,13 +100,13 @@ func main() { | |||
select { | |||
case <-consumer.Done: | |||
log.Info("metrics consumer ended.") | |||
publisher.Stop() | |||
return | |||
case <-sigChan: | |||
log.Info("metrics shutdown started.") | |||
consumer.Stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't we need to also stop publisher here?
cmd/mt-replicator/main.go
Outdated
partitionScheme = flag.String("partition-scheme", "byOrg", "method used for partitioning metrics. (byOrg|bySeries)") | ||
compression = flag.String("compression", "none", "compression: none|gzip|snappy") | ||
replicateMetrics = flag.Bool("metrics", false, "replicate metrics") | ||
replicatePersist = flag.Bool("persist", false, "replicate persistMetrics") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's print an error if neither of these modes is enabled.
also:
|
I think this code would be quite a bit easier to understand if there was a bit more symmetry between the metrics relay and the persist relay, right now the code is structured differently (see PersistRelay type vs Consumer and Producer type), for the metrics we call Stop(), but for the persistrelay we call Close() which then calls Stop(), so why not use the Stop() terminology as well? |
cmd/mt-replicator/main.go
Outdated
for { | ||
select { | ||
case <-metricPersist.Done: | ||
log.Info("metricPersist ended.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we close the producer here? (or inside of the actual relay code)
If metric relay was more important or common than persist relaying, or the default one, then we could have it as it was. However since both are disabled by default, it makes more sense to be explicit
42d9dfb
to
874732a
Compare
- use similar code structure for replicating metrics and persist messages. - send persistMessages in batches to greatly improve performance
874732a
to
e764d96
Compare
for the persist relay, don't we have to make sure to route the persist messages to the correct partition? since notifierKafka has recently become partition aware. |
cmd/mt-replicator/metrics.go
Outdated
func (r *MetricsReplicator) Flush(metrics []*schema.MetricData) { | ||
if len(metrics) == 0 { | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since you already check the len before calling Flush, this can be removed. it would also be more akin to metricpersist which also doesn't have this check.
we do, by re-using the sarama.ProducerMessage.Key from the original message. The sarama lib will then correctly set the partitionId based on the number of partitions in use on the destination kafka cluster. |
that makes sense. So would it be correct to say that now mt-replicator can switch the partitioning method (byOrg vs bySeries) for metrics, but not for metricpersist messages? (which is fine by me) other then my small last comment about the check in Flush, this LGTM |
correct. the metricpersist messages can be replicated and re-allocated to new partitions, but the partitioning method cant be changed. |
No description provided.