Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Keep track of last chunk save for each metric #452

Closed
woodsaj opened this issue Jan 6, 2017 · 10 comments · Fixed by #507
Closed

Keep track of last chunk save for each metric #452

woodsaj opened this issue Jan 6, 2017 · 10 comments · Fixed by #507
Assignees

Comments

@woodsaj
Copy link
Member

woodsaj commented Jan 6, 2017

A new MT instance that starts up has no knowledge of what chunks have been saved. Though we have metric_persist messages that are read from Kafka, they are consumed at a much faster rate than the data messages and are ignored if they are for a chunk that has not yet been started (ie the metric_persist is processed before the first metric for the chunk is seen, which is always the case during startup)

So if a new instance is started as primary, set to consume from kafka from some point in the past, it will re-save chunks to cassandra that have already been saved. The first chunk processed will almost always be a partial chunk and this partial chunk will be saved to cassandra overwriting the previously saved chunk that had all data.

We have worked around this by never starting a node as primary (except in single instance deployments) and always promoting an instance that has been online for a long time to the primary role when the current primary is restarted. However this is not a reliable approach

  • if all instances crash data will be lost
  • we need to wait at least 6hours between upgrading each node in a cluster
  • makes it very difficult to build a self-healing cluster that can tolerate instance failure.
  • data loss can not be avoided for single instance deployments

To alleviate this, I would like to keep track of the last chunk save time for each metric. This index can be rebuilt at startup be consuming from the metric_persist kafka topic. This will allow us to run dedicated Write instances of MT that can be restarted without causing data loss.

The idea I am proposing is to:

  1. Make the metric_persist messages partitioned using the same partition key as the metrics. So a metric_persist message for a metric goes to the same partition number as the data for that metric. This will allow each MT instance to only consume metric_persist messages for the metrics that it is processing.

  2. Track LastSavedChunk as an attribute of mdata.AggMetric rather than using the current Chunk.Saved attribute

  3. At startup if an instance is a primary, replay all metric_persist messages from the last N minutes, where N > the the largest chunkspan used. These messages should be processed before we start consuming from the data topics.

  4. When processing metric_persist messages, initialize missing AggMetric structs when they dont exist rather then discarding the metric_persist message like we currently do.

As well as providing a more robust and reliable startup sequence for nodes, this will also simplify the codebase in mdata.aggmetric.go and also fix:

#155
#357

@Dieterbe
Copy link
Contributor

Dieterbe commented Jan 9, 2017

we need to wait at least 6hours between upgrading each node in a cluster

Clarifications (correct me if I'm wrong):

At startup if an instance is a primary, replay all metric_persist messages from the last N minutes, where N > the the largest chunkspan used.

This should also work if people are sending sparse data or due to an issue on their end have not sent data for a while, right?
I was just trying to come up with a scenario but it seems even for those cases it works fine.

How would you say the "data seek back" (DSB) offset should correspond to the "metricpersist seek back" (MSB) offset? MSB should always > DSB I think, but I need to think about it more.
also should we seek back to specific points in time e.g. 6:00 or 12:00 when using 6h chunks, irrespective of when the instance starts at 12:00 or at 17:59 ? Or should we always just seek back 6hours? Also we have to be mindful of the timestamps of metricdata not matching the timestamps of the actual kafka messages. users may send data (point ts < kafka ts) or they may send future data (point ts > kafka ts), the latter is an allowance that we haven't been very explicit about yet.

also fix #155

this new approach would still require that we update the code to add a reference to the aggmetric inside of the CWR, right? that is what would allow us to lock the aggmetric and update the property?
isn't that similar to what we would have to do otherwise to fix 155?

also fix #357

so the problem described there is that e.g. with 30minutely datapoints, and you go into a new chunk interval, and you want tho chunks saved, you may have to wait up to 30minutes for new data to come in which triggers the save on the older chunk. As that ticket states, you don't have this problem when you use kafka offset to seek back in time, because you can wait as long as needed for all chunks to save, and then use kafka offset seeking to replay all data that came in while you waited and didn't save yet. (when going this route, another problem may appear which is replaying all that data might take too long rendering your instance unavailable for too long)

While the suggestion in OP does make seeking back in time more viable, it IMHO does not fix 357. But then again we're not sure yet if we need to fix 357 so that's OK.

Also,
keep in mind we currently support kafka and NSQ as metricpersist transport. The idea behind the latter is, make it easy for people who just use carbon / carbon-relays and don't want to run kafka (reasons could be they think kafka is too complex to run, too resource intensive, whatever), to run a MT cluster.
if we're going to say that a metricpersist transport needs to support the discussed semantics to do a proper cluster , then we're essentially making kafka a hard dependency.

I wonder if it makes sense to rely on kafka's log compaction for the metricpersist topic. basically it retains all the last messages of a topic and removes the earlier ones.
So we can consume the entire stream from the start and it will gives us all the latest chunksave messages for all metrics (and possibly a few older ones too, which we would drop) See
http://spootnik.org/entries/2015/03/10_simple-materialized-views-in-kafka-and-clojure.html
for more details. But this would also retain (and deliver) chunksave messages for old metrics that haven't been seen in days, unless we send delete messages to kafka (after which the metric can be added again).
I think there's a lot to like about log compaction that we could leverage, but in this case, I think just seeking back into a regular topic may be better.

@woodsaj
Copy link
Member Author

woodsaj commented Jan 9, 2017

we need the wait before we can promote a secondary to primary. not in between each node. for clusters with >2 nodes (like we use for worldping) this an important distinction

Not really. Nodes really shouldnt be added back into rotation to receive queries until they have crossed the 6h (aggregation chunkspan) window. If a node is queried before this, the firstChunk in memory will be a partial chunk and so will be excluded from use (as we know it is missing data). https://github.com/raintank/metrictank/blob/master/mdata/aggmetric.go#L247-L250 . By excluding the chunk we force a lookup to cassandra, however as the chunk is incomplete there will be no data in Cassandra either.

Additionally, the purpose of running 3 nodes is to provide N+2 redundancy to significantly reduce the possibility of data loss. If you have two instances running which are unable to be promoted to primary the cluster is vulnerable to data loss from a single MT crash or server reboot.

This should also work if people are sending sparse data or due to an issue on their end have not sent data for a while, right?

Not entirely.
Assuming
metricMaxStale=6h
offset=7h
aggMetrics=10m:6h:2:120d

If a user stops sending data at 05:30 and MT is restarted at 10:00. Then the last chunkSave will have been at 00:00, but data will only be replayed from 03:00, resulting in the 00:00-03:00 data being lost.

MSB should always > DSB

yes

also should we seek back to specific points in time

No. That would only be possible it we knew the data was being sent in realtime, which isnt true as there are lots of scenarios resulting in data being sent with a lag.

Or should we always just seek back 6hours?

We should always seek max chunkSpan + some buffer.

also fix #357

#357 asks for forcible chunk saving to recduce data loss when restarting single node instances. However, that doesnt work. If you saved partial chunks on exit, then started up again with offset=latest the saved chunks would be re-created without the previously seen data and then re-saved.
However, if the change suggested in here was implemented and you restarted at 00:35, You would only need to replay data from 00:00 and you wouldn't lose any data.

keep in mind we currently support kafka and NSQ as metricpersist transport

Keeping support for NSQ is not a requirement. If keeping NSQ prevents us from delivering the features needed for our own use case, then NSQ needs to go. Making kakfa a requirement for HA clusters seems completely reasonable to me.

I wonder if it makes sense to rely on kafka's log compaction for the metricpersist topic

This is an optimization we can look at later, but is not needed right now.

@Dieterbe Dieterbe added this to the hosted-metrics-alpha milestone Jan 9, 2017
@woodsaj woodsaj self-assigned this Jan 10, 2017
@woodsaj
Copy link
Member Author

woodsaj commented Jan 10, 2017

This can be split into at least 3 PRs.

  1. make the metric_persist messages partitioned,
  2. refactor tracking of the last chunk save, storing the TS in aggmetric
  3. ensuring that all old metric_persist messages have been consumed before we start consuming the data topic.

woodsaj pushed a commit that referenced this issue Jan 16, 2017
Rather then tracking the state of each individual chunk, just keep
a record of the most recent saveStart(add to write queue)/saveFinish(write to cassandra)
as properties of the aggMetric.  When we save chunks we always save all unsaved
chunks, so we dont lose anything by tracking the save state for all chunks in
one variable.

issue #452
woodsaj pushed a commit that referenced this issue Jan 16, 2017
Rather then tracking the state of each individual chunk, just keep
a record of the most recent saveStart(add to write queue)/saveFinish(write to cassandra)
as properties of the aggMetric.  When we save chunks we always save all unsaved
chunks, so we dont lose anything by tracking the save state for all chunks in
one variable.

issue #452
woodsaj pushed a commit that referenced this issue Jan 16, 2017
Rather then tracking the state of each individual chunk, just keep
a record of the most recent saveStart(add to write queue)/saveFinish(write to cassandra)
as properties of the aggMetric.  When we save chunks we always save all unsaved
chunks, so we dont lose anything by tracking the save state for all chunks in
one variable.

issue #452
@Dieterbe
Copy link
Contributor

If a user stops sending data at 05:30 and MT is restarted at 10:00. Then the last chunkSave will have been at 00:00, but data will only be replayed from 03:00, resulting in the 00:00-03:00 data being lost.

I haven't put as much thinking into this problem as you have, do you see a way to solve this? E.g. to avoid data loss in case people send data sparsely or with gaps. maybe based on the lastupdate field in the index?

@woodsaj
Copy link
Member Author

woodsaj commented Jan 18, 2017

do you see a way to solve this?

the only way to completely solve this problem is with a WAL.
#275

woodsaj pushed a commit that referenced this issue Jan 23, 2017
Rather then tracking the state of each individual chunk, just keep
a record of the most recent saveStart(add to write queue)/saveFinish(write to cassandra)
as properties of the aggMetric.  When we save chunks we always save all unsaved
chunks, so we dont lose anything by tracking the save state for all chunks in
one variable.

issue #452
woodsaj pushed a commit that referenced this issue Jan 23, 2017
Rather then tracking the state of each individual chunk, just keep
a record of the most recent saveStart(add to write queue)/saveFinish(write to cassandra)
as properties of the aggMetric.  When we save chunks we always save all unsaved
chunks, so we dont lose anything by tracking the save state for all chunks in
one variable.

issue #452
@Dieterbe
Copy link
Contributor

To alleviate this, I would like to keep track of the last chunk save time for each metric. This index can be rebuilt at startup be consuming from the metric_persist kafka topic. This will allow us to run dedicated Write instances of MT that can be restarted without causing data loss.

I thought a bit more about this and have a question:

does the delay before a message goes into the metricpersist topic, cause any trouble?
consider what happens when a primary consumes a data point from the mdm topic (one that crosses a boundary). the point needs to be processed, trigger a persist call, the chunk is sealed and put in the queue, it may spend 30~60 min queued up, then gets saved to cassandra, after which the persistmessage is generated and a bit later is published to kafka. it'll be very common for there to be a >30min delay between metric data in mdm topic and the corresponding persist message in kafka. if cassandra is having issues or the primary has some issues talking to kafka, this could be much more. (1h ~ 10h...)

For the node starting up and forcing itself to consume all persist messages before consuming metrics, the consequence is that when it starts consuming metrics, it may consume points that will trigger persist calls and it'll try to save chunks even though those chunks may be in the write queue on (or have just been saved by) another node (even if that node has been depromoted, it'll still drain its write queues. or do we require that an ex-primary for a shardset must be fully shutdown before starting a new one? -- relevant #198 )

Is the solution to simply seek back in time a lot to make sure this effect can't manifest?

@woodsaj
Copy link
Member Author

woodsaj commented Jan 23, 2017

does the delay before a message goes into the metricpersist topic, cause any trouble?

No.

the consequence is that when it starts consuming metrics, it may consume points that will trigger persist calls

This doesnt make sense. This is the consequence of decoupling the saves from metric ingestion and so has always been the case. Nothing proposed here changes that.

it'll try to save chunks even though those chunks may be in the write queue on (or have just been saved by) another node

If you wanted to promote secondary nodes to be primaries, the only thing that has changed is that you can now promote secondary nodes that have not been online for long (as they will only save chunks that have not already been saved due to knowing what chunks were saved before the node started). If you demote a node that has lots of chunks in its write queue and immediately promote a secondary node, things will work exactly as they always have and you run the risk of both nodes saving the chunks.

But i think you are missing the point that having to manually demote/promote nodes is filled with problems and the main purpose of these changes are to:

This will allow us to run dedicated Write instances

woodsaj pushed a commit that referenced this issue Jan 24, 2017
Rather then tracking the state of each individual chunk, just keep
a record of the most recent saveStart(add to write queue)/saveFinish(write to cassandra)
as properties of the aggMetric.  When we save chunks we always save all unsaved
chunks, so we dont lose anything by tracking the save state for all chunks in
one variable.

issue #452
woodsaj pushed a commit that referenced this issue Jan 24, 2017
Rather then tracking the state of each individual chunk, just keep
a record of the most recent saveStart(add to write queue)/saveFinish(write to cassandra)
as properties of the aggMetric.  When we save chunks we always save all unsaved
chunks, so we dont lose anything by tracking the save state for all chunks in
one variable.

issue #452
@Dieterbe
Copy link
Contributor

I had a call with AJ to clear things up.
Here's some important things:

  1. self-healing cluster here means there is a static assignment, i.e. a config for a bunch of nodes, some of which are secondaries, some of which are primaries. when one fails, we can automatically restart it (possibly on a different host) using an orchestrator. but this is not about (automatically) changing primary status on any node
  2. if the primary that died/was stopped, took a long time to save its chunks or was not able to save certain chunks at all the new (or restarted) instance needs to be able to seek back to the start of (first point of) the oldest unsaved chunk, otherwise there is dataloss. this is nothing new indeed. It's still up to the operator to assure kafka retention has enough "spare buffer" to deal with this. In practice, we would set kafka retention to largest-chunkspan + a couple hours. that way if an issue arises, the operator has a few hours to resolve it, and they can also adjust the kafka retention on the fly as needed.
  3. to make the above work, all primaries are set up to seek to offsetOldest when they start up. (note in the future we could optimize this and have them not consume lots of old data if they don't save it)

@Dieterbe
Copy link
Contributor

Dieterbe commented Jan 25, 2017

MSB should always > DSB

yes

thinking about it more, I don't see a reason (in theory) to seek back further for MSB, since MSB "lags behind" DSB. so consuming both from the same point should always be enough.
In theory, having the same retention for both the persist and mdm topic and consuming both with offset = oldest should be all that's needed, and probably the easiest way to set things up for our kubernetes deployment model. However in practice, I think kafka may not expire segments right away, and there could be edge cases where it expires metricpersist segments before it expires mdm segments. So for that reason , in practice maybe we should set retention for persist topic to a few hours more. (and use offset oldest for both)

@woodsaj
Copy link
Member Author

woodsaj commented Jan 27, 2017

MSB really only needs to be set to value large enough to capture the last save for all chunks. So if the max chunkspan is 6hours and writes can take up to 1hour to complete, then we only need to seek back 7hours.

This is because we only need the last latest metricPersist message for each Metric.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants