-
Notifications
You must be signed in to change notification settings - Fork 105
process old metricPersist messages before consuming metrics. #485
Conversation
18a38a5
to
2e3d926
Compare
// caught up to these offsets. | ||
bootTimeOffsets = make(map[int32]int64) | ||
for _, part := range partitions { | ||
offset, err := client.GetOffset(topic, part, -1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the -1
could also be replaced with sarama.OffsetNewest
for _, part := range partitions { | ||
offset, err := client.GetOffset(topic, part, -1) | ||
if err != nil { | ||
log.Fatal(4, "kakfa-cluster: failed to get newest offset for %s:%d. %s", topic, part) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log.Fatal
basically kills metrictank, right? Or does that get caught by the recover
, I assume it won't because the log docs say Fatal is equivalent to Print() followed by a call to os.Exit(1)
. Do we really want to kill MT if for some reason Kafka is unreachable for a moment? I see that further up in the same file there are quite a few places where Fatal
is called if there are issues talking to Kafka.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we want this to kill metrictank. This code is run when metrictank first starts up, if we cant talk to kafka to get the current state of the topics/partitions then we cant start.
@@ -83,7 +84,10 @@ func (cl Notifier) Handle(data []byte) { | |||
} | |||
|
|||
// get metric | |||
if agg, ok := cl.Metrics.Get(ms.Key); ok { | |||
if cl.CreateMissingMetrics { | |||
agg := cl.Metrics.GetOrCreate(ms.Key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If GetOrCreate
wouldn't directly acquire a write lock instead of first checking whether the metric needs to be created while only holding a read lock then in situations like this one we could just call cl.Metrics.GetOrCreate()
without this whole "if the metric exists do this or otherwise that"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure what you are saying here.
The reason for the if cl.CreateMissingMetrics
is that there are two notifier handlers, kafka and NSQ. NSQ is not partition aware. so it could receive metricPersist messages for metrics this node does not handle. In that case we dont want to automatically create any missing AggMetrics and we dont want to call syncSaveState
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh i see, Ok that makes sense, Wasn't aware that in NSQ it will see messages about metrics this node doesn't handle.
@@ -109,6 +134,10 @@ func (c *NotifierKafka) consumePartition(topic string, partition int32, partitio | |||
currentOffset := partitionOffset | |||
messages := pc.Messages() | |||
ticker := time.NewTicker(offsetCommitInterval) | |||
startingUp := true | |||
// the bootTimeOffset is the next available offset. There may not yet be a message with that | |||
// offset yet, so we subtract 1 to get the highest offset that we can fetch. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
two times yet
for { | ||
select { | ||
case <-ticker.C: | ||
log.Warn("kafka-cluster: Processing metricPersist backlog has taken too long, giving up lock.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if this reaches the timeout we only put a warning message in the log and then continue as usual? That means if the timeout gets reached and we're not done processing the backlog yet, it's fine to still just continue starting up as usual. That basically makes the whole timeout only informational for people who're looking at the logs, but otherwise the waiting for a timeout has no function.
Wouldn't it be more consequent to print a warning like that each minute and then continue waiting for the backlog until it's really processed? Or otherwise, if the waiting isn't really required, why do we wait at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
processing the backlog of metricPersist messages happens at boot time. Ideally, we should replay the full backlog of messages before we start consuming metrics, but it is not essential. As long processing the metricPersist messages has a large enough head start on metric processing, things should just work.
The reason for the circuit breaker here is that we dont want to block for too long. So we probably should make this timeout value configurarable to allow customization for specific use cases.
@@ -398,6 +398,8 @@ func main() { | |||
handlers = append(handlers, notifierNsq.New(*instance, metrics)) | |||
} | |||
|
|||
// The notifierKafka handler will block here until it has processed the backlog of metricPersist messages. | |||
// it will block for 60seconds at most. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.start()
seems to get called in notifierKafka.New()
, so actually it's New()
which blocks and not the next line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are correct.
040f20d
to
94c3dee
Compare
} else { | ||
partitionLogSizeMetric.Set(int(offset)) | ||
partitionLagMetric.Set(int(offset - currentOffset)) | ||
if currentOffset >= 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is true in all cases, except when offset is sarama.OffsetNewest or sarama.OffsetOldest, right?
seems to me we should report these 3 metrics also in those cases. IOW why did you add this condition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from the below comment. "we cant set the offsetMetrics until we know what offset we are at."
the stats package uses unsigned INTs. So setting a value to a negative results in a really big number being assigned, which causes havoc in graphs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, if it's < 0 it's not a real value and can't be used. makes lots of sense now :)
@@ -251,6 +254,10 @@ func (k *KafkaMdm) consumePartition(topic string, partition int32, currentOffset | |||
if err := offsetMgr.Commit(topic, partition, currentOffset); err != nil { | |||
log.Error(3, "kafka-mdm failed to commit offset for %s:%d, %s", topic, partition, err) | |||
} | |||
if currentOffset < 0 { | |||
// we have not yet consumed any messages. | |||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about the scenario where new metrics get added to kafka but we didn't consume them yet.
would it make sense to call k.client.GetOffset(topic, partition, sarama.OffsetNewest)
and update our metrics?
I suspect it would be unlikely that we're not consuming messages but that such a call would work fine (and return quickly)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i have no idea what you are trying to say here.
The primary purpose of these metrics is identify when there are new metrics being added to kafka but not being consumed (or not being consumed fast enough)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, i understand what you are trying to say. If we never start consuming, we will never lookup offset.Newest.
That is true and as you mentioned unlikely. But it wouldnt hurt to always capture the logSize.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah to be more specific what I meant was, ideally we update the 3 metrics consistently. so that in grafana we have accurate granular values for each point in time, instead of nulls.
if currentoffset < 0 it looks like we cannot report lag or current offset (correct?), but we can report logsize at least.
However if currentOffset remains < 0 despite logsize increasing that seems like a rather big problem consuming from (and potentially talking to) kafka (?) which made me wonder whether it's safe/recommended to still call k.client.GetOffset
in that case.
log.Warn("kafka-cluster: Processing metricPersist backlog has taken too long, giving up lock after %s.", backlogProcessTimeout.String()) | ||
return | ||
case <-backlogProcessed: | ||
log.Info("kafka-cluster: metricPersist backlog processed in %s.", time.Since(pre).String()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need for .String()
I think
backlogProcessed <- struct{}{} | ||
}() | ||
ticker := time.NewTicker(backlogProcessTimeout) | ||
for { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need for for
. just a single run of the select will do.
@@ -121,6 +150,10 @@ func (c *NotifierKafka) consumePartition(topic string, partition int32, partitio | |||
if err := c.offsetMgr.Commit(topic, partition, currentOffset); err != nil { | |||
log.Error(3, "kafka-cluster failed to commit offset for %s:%d, %s", topic, partition, err) | |||
} | |||
if startingUp && currentOffset+1 >= bootTimeOffset { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why currentOffset plus one? if bootTimeOffset is the highest offset that we can fetch, then we should compare it directly to currentOffset I think.
this git history is quite messy (introduces non-changes, has multiple changes together that are not covered by the message and should be separate commits, introduces bad things which are then later resolved, has certain commits which me as author which i never authored) but fear not, it is easy to fix. here is how:
|
f6a5bc6
to
1744412
Compare
My commit history got all messed up. So i just created a fresh branch from master and applied a patch of the changes made.
|
1744412
to
4ed7a38
Compare
@@ -251,12 +255,18 @@ func (k *KafkaMdm) consumePartition(topic string, partition int32, currentOffset | |||
if err := offsetMgr.Commit(topic, partition, currentOffset); err != nil { | |||
log.Error(3, "kafka-mdm failed to commit offset for %s:%d, %s", topic, partition, err) | |||
} | |||
partitionOffsetMetric.Set(int(currentOffset)) | |||
offset, err := k.client.GetOffset(topic, partition, sarama.OffsetNewest) | |||
if err != nil { | |||
log.Error(3, "kafka-mdm failed to get log-size of partition %s:%d. %s", topic, partition, err) | |||
} else { | |||
partitionLogSizeMetric.Set(int(offset)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the metrics here are set at the pace of the offset commit interval. (default 5s)
ideally the metrics are reported at the stats interval (default 1s) though hardcoded at 1s would also be OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Querying kafka once per second per partition seems a little too aggressive to me.
if you really want metrics that often, you can still just set the offset commit interval to 1s
fs.StringVar(&offsetStr, "offset", "last", "Set the offset to start consuming from. Can be one of newest, oldest,last or a time duration") | ||
fs.StringVar(&dataDir, "data-dir", "", "Directory to store partition offsets index") | ||
fs.DurationVar(&offsetCommitInterval, "offset-commit-interval", time.Second*5, "Interval at which offsets should be saved.") | ||
fs.DurationVar(&backlogProcessTimeout, "backlog-process-timeout", time.Minute, "Maximum time backlog processing can block during metrictank startup.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this variable is not validated. if user enters invalid expression this will be 0 , and the backlog processing will immediately be canceled. oops :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was intentional. The backlog processing wont be canceled, we just wont block while it runs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm i would expect setting timeout to 0 would mean keep blocking until it's done.
people might be surprised if they run into dataloss with a timeout of 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this is the intended workflow, then we should at least document that setting timeout to 0 will cause ingestion not to wait at all, which may result in data loss.
ticker := time.NewTicker(backlogProcessTimeout) | ||
|
||
select { | ||
case <-ticker.C: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need for a ticker. just do channel read from time.After(backlogProcessTimeout)
// wait for our backlog to be processed before returning. This will block metrictank from consuming metrics until | ||
// we have processed old metricPersist messages. The end result is that we wont overwrite chunks in cassandra that | ||
// have already been previously written. | ||
// We dont wait more then 60seconds for the backlog to be processed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not 60 seconds, backlog-process-timeout
fs.StringVar(&offsetStr, "offset", "last", "Set the offset to start consuming from. Can be one of newest, oldest,last or a time duration") | ||
fs.StringVar(&dataDir, "data-dir", "", "Directory to store partition offsets index") | ||
fs.DurationVar(&offsetCommitInterval, "offset-commit-interval", time.Second*5, "Interval at which offsets should be saved.") | ||
fs.DurationVar(&backlogProcessTimeout, "backlog-process-timeout", time.Minute, "Maximum time backlog processing can block during metrictank startup.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we use a more conservative default?
I think most people would probably want to wait a bit longer and be given a chance to resolve a problem, rather than just having MT abort the backlog processing and start overwriting full chunks with incomplete ones. I have 2 suggestions here:
- during processing, periodically print out messages, so the operator knows what MT is doing, both in the good case as in the its-getting-dangerous-case. maybe up to 10 (e.g. every 6s if timeout is 60s). something like:
INFO kafka-cluster: still catching up with metricpersist messages (6s of allowed 60s used)
INFO kafka-cluster: still catching up with metricpersist messages (12s of allowed 60s used)
INFO kafka-cluster: still catching up with metricpersist messages (18s of allowed 60s used)
INFO kafka-cluster: still catching up with metricpersist messages (24s of allowed 60s used)
WARN kafka-cluster: still catching up with metricpersist messages (30s of allowed 60s used) will abort in 30s, at which point you may incur some data loss. (you can still abort and increase backlog-process-timeout)
WARN kafka-cluster: still catching up with metricpersist messages (36s of allowed 60s used) will abort in 24s, at which point you may incur some data loss. (you can still abort and increase backlog-process-timeout)
WARN kafka-cluster: still catching up with metricpersist messages (42s of allowed 60s used) will abort in 18s, at which point you may incur some data loss. (you can still abort and increase backlog-process-timeout)
WARN kafka-cluster: still catching up with metricpersist messages (48s of allowed 60s used) will abort in 12s, at which point you may incur some data loss. (you can still abort and increase backlog-process-timeout)
WARN kafka-cluster: still catching up with metricpersist messages (54s of allowed 60s used) will abort in 6s, at which point you may incur some data loss. (you can still abort and increase backlog-process-timeout)
WARN kafka-cluster: Processing metricPersist backlog has taken too long, giving up lock after 60s. when chunks are being saved, they may be incomplete
bonuspoints if we can also mention, at each step, how many offsets we have to process in total, and how many we've done so far. that gives the operator a clue as to what kind of change they may want to make.
double bonus points if we can use that to project whether it will complete in under the timeout setting or not.
i would always warn if we've exceeded half of the timout and are still working on it, but with this we could also warn even earlier based on the projection.
- 60s is:
- not a lot of time for an operator to figure out something is up and act accordingly
- not a lot of time to recover from a transient kafka issue.
- enough time maybe (?? any insights here) to process a large amount of metricpersist messages. can we slap a number on this? e.g. at 10k/s (made up number) for 60s that would mean 600k persist messages can be processed.
i would feel more comfortable with a default of 5min or so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
processing the backlog of metricPersist messages happens at boot time. Ideally, we should replay the full backlog of messages before we start consuming metrics, but it is not essential. As long processing the metricPersist messages has a large enough head start on metric processing, things should just work.
The reason for the circuit breaker here is that we dont want to block for too long resulting in a long delay before the node starts consuming metrics and can become ready to handle queries.
enough time maybe (?? any insights here) to process a large amount of metricpersist messages. can we slap a number on this? e.g. at 10k/s (made up number) for 60s that would mean 600k persist messages can be processed.
60seconds is definitely long enough for small-to-medium sized installations. (the size installations where users would be happy to just run with defaults without testing first). MT can consume from kafka at a few hundred thousand messages per second on 4cpu cores. So assuming
- a single node with no sharding,
- raw data in 30minute chunks, and 2 aggregates in 6hour chunks
- a modest ingestion rate of 100k/s
60seconds is enough time to process the metricPersist messages for 200k unique series (That is bigger than our own ops cluster) over the last 7hours.
during processing, periodically print out messages,
long term, i want to record these bootup phases (metricPersist backlog process, rebuilding the memorIdx from cassandra, etc..) as node states that can be queried through the api and are reported as metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok that's all fine but it still doesn't really help the operator knowing what's going on and give him the ability to react for example in a scenario where he wants to avoid data loss at all costs, and has a timeout of 60s, but it seems like it would take about 120s. with something like the proposed messages it's trivial to ctrl-c metrictank, bump the timeout and retry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is extreme and unnecessary for this PR. I dont know of any operator who spends their days tailing logs waiting for a service to restart.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No I meant in a manual restart scenario. When manually restarting it is common to tail a log and see how the process is doing starting up. In that case, it could be very helpful to have the proposed messages to 1) know what's going on, 2) be able to restart with a different timeout setting should it look like data loss can be easily be avoided by bumping the timeout.
4ed7a38
to
b41f87b
Compare
- During startup we wait for the kafkaNotifier to consume its backlog of metricPersist messages before we start consuming metrics. - When using the kafkaNotifier we now also create Aggmetrics for any missing key provided in a metricPersist message. This ensures that when we do start consuming metrcs, the chunkSaveStart/chunkSaveFinish timestamps will be correctly set for the metric and we wont try and save chunks that have already been saved
b41f87b
to
e03d2fd
Compare
This needs #477 before it can be merged.