Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

store save state as a property of the aggemtric instead of chunk #477

Merged
merged 4 commits into from
Jan 20, 2017

Conversation

woodsaj
Copy link
Member

@woodsaj woodsaj commented Jan 16, 2017

Rather then tracking the state of each individual chunk, just keep
a record of the most recent saveStart(add to write queue)/saveFinish(write to cassandra)
as properties of the aggMetric. When we save chunks we always save all unsaved
chunks, so we dont lose anything by tracking the save state for all chunks in
one variable.

issue #452

Rather then tracking the state of each individual chunk, just keep
a record of the most recent saveStart(add to write queue)/saveFinish(write to cassandra)
as properties of the aggMetric.  When we save chunks we always save all unsaved
chunks, so we dont lose anything by tracking the save state for all chunks in
one variable.

issue #452
totalPoints.Inc()
return nil
}

func (c *Chunk) Clear() {
totalPoints.DecUint64(uint64(c.NumPoints))
}

func (c *Chunk) Finish() {
c.Closed = true
Copy link
Contributor

@replay replay Jan 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the reason why we don't need locks around these chunk properties because we rely on AggMetric to put locks around all operations on chunks?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.

if guess < 0 {
guess += numChunks
}
if ts > a.lastSaveStart {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can see the only call site of SyncChunkSaveState() is in cassandraStore.processWriteQueue(). When a chunk is pushed into the write queue via AggMetric.persist() the AggMetric.lastSaveStart is already updated in persist, there also can't be a race because AggMetric.SyncChunkSaveState() and AggMetric.persist() are both surrounded by locks, so is there really any possible case where this condition is true? It seems like that's not possible unless there is another way than AggMetric.persist() to push chunks into the write queue, but I can't see one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taking that a bit further. the CassandraStore guarantees that on a per aggmetric-level (actually per ChunkWriteRequest but CWR's keys is simply the key of the aggmetric), all chunk writes are in order, so the ts will be in order.

also I believe that persist has an order guarantee: the chunks it persists are in ascending time order, both within 1 call, and across multiple calls.

I like the SaveStart and SaveFinish terminology, and i think we should apply it broader. so instead of SyncChunkSaveState, how about this:

// saveStart lets us mark up to which chunk t0 we've triggered persistence operations. assumes a is locked
func (a *AggMetric) saveStart(ts uint32) {
    a.lastSaveStart = ts
}
// SaveFinish lets external store packages mark up to which chunk t0 has been saved
func (a *AggMetric) SaveFinisht(ts uint32) {
	a.Lock()
	defer a.Unlock()
        a.lastSaveStart = ts
}

the former would be called by persist, the latter by cassandraStore.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SyncChunkSaveState() is also called when metricPerist messages are received.
https://github.com/raintank/metrictank/blob/chunkSaveRefactor/mdata/notifier.go#L87

Copy link
Contributor

@Dieterbe Dieterbe Jan 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah no SyncChunkSaveState is also called by the notifiers, consuming the persistence stream. in case of kafka, ordered. for nsq, unordered. there could be multiple primaries though (not a great setup, but can happen), saving chunks concurrently and potentially even causing out of order messages (with respect to different instances. e.g. an instance could be lagging behind). so the ts check would still be needed.

}

func New(t0 uint32) *Chunk {
// we must set LastWrite here as well to make sure a new Chunk doesn't get immediately
// garbage collected right after creating it, before we can push to it
Copy link
Contributor

@Dieterbe Dieterbe Jan 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the equivalent of this comment should be added to AggMetric where we initialize its lastWrite field, because it's very important

return true
}
} else if !currentChunk.Saving {
} else if a.lastSaveStart < currentChunk.T0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line is equivalent to just else {


if chunk.Saved || chunk.Saving {
// this can happen if chunk was persisted by GC (stale) and then new data triggered another persist call
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this scenario incorrect, or not important enough to be mentioned anymore in the new comment?

I suggest to change the comment to:

		// this can happen when:
		// * 2 primary nodes are saving chunks to Cassandra, A is ahead of B, B got a persist notification from A and then tries to save its chunk (would result in =)
		// * same, but B is lagging behind so much that A saved multiple chunks already (would result in > )
		// * same, but A crashed and B was promoted to primary
		// * chunk was persisted by GC (stale) and then new data triggered another persist call

Copy link
Member Author

@woodsaj woodsaj Jan 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that chunk saving has been decoupled from chunks being closed (chunk.Finish()) the old comment is incorrect and the new one is correct.

persist() is only called when a node is a primary, the chunk is not already closed and
a) a new metric has been received for the next chunk
b) GC has determined the chunk is stale and it is not already saved

Before persist is called, the chunk is closed, which will prevent any future presist() calls from being made.

So, the only way persist can be called on a chunk that is already saved is if there are 2 primary MT nodes both saving chunks to Cassandra or a primary failed and this node was promoted to be primary but metric consuming is lagging. ie exactly what the new comment states.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before persist is called, the chunk is closed, which will prevent any future presist() calls from being made.

that prevention is only in GC() though, not in Add(). So the following scenario can still happen as far as i can tell. e.g. :

  • node is primary
  • metric hasn't been written to in a while, chunk is open -> GC closes it and calls persist.
  • new data comes in for a new chunk -> Add won't close again since already closed, but still calls persist

Copy link
Member Author

@woodsaj woodsaj Jan 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. i see what you mean.

points received for the most recent chunk when that chunk is already being "closed",
ie the end-of-stream marker has been written to the chunk.
This indicates that your GC is actively sealing chunks and saving them before you have the chance to send
your (infrequent) updates. Any points revcieved for a chunk that has already been closed are discarded.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so add_to_closed is the equivalent of add_to_saving.
However, since SyncChunkSaveState doesn't close any chunks or marks chunks as closed, we no longer have a way to notice when we're adding data to a chunk when that chunk has already been saved by a primary, even when we already received a persist notification for it. I.e. there is no equivalent of add_to_saved.

IOW: Add() will add points to chunks that are already saved by the primary, since it only looks at Closed (which it only sets itself if its primary), it does not look at lastSaveFinish. it'll be as if we're adding data regularly but actually something weird is going on (add data to chunks that potentially didn't make it into the chunk in cassandra)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only time a chunk is closed if we receive a point for the next chunk, or it is stale and the GC task closes it.

A chunk being saved by another node should not have any impact on chunks on this node. Both nodes should be receiving the same data and both nodes are bound by the same conditions leading to a chunk being closed.

If nodeA is lagging behind, it should not be prevented from adding points to a chunk just because nodeB has saved the chunk

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not saying it should be prevented. I'm saying I liked the previous behavior (which was adding the points but increasing a metric that signals that this is going on)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But why do i care? I dont see there being any value to having this metric. The metric would be incrementing under normal operation, especially as the ingest rate goes up and there is a lag of a few seconds. (nodeA could be processing data from partitions 0 then 1,2,3,4 etc.. while nodeB could be processes 7 then 6,5,4,etc...)

(add data to chunks that potentially didn't make it into the chunk in cassandra)

this is not possible as all nodes are bound by the same rules for closing a chunk.

Copy link
Contributor

@Dieterbe Dieterbe Jan 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible. consider this:

  • we're adding points t1, t2, t3, t4, t5 (all have the same t0 i.e. go into the same chunkspan)

  • there's a long time between t2 and t3, let's say it's a time just under chunk-max-stale.

  • A is primary, B is secondary.

  • A consumes t1, t2, and then experiences some lagging. t3 is in kafka but A doesn't consume it quite yet

  • A's GC notices that last write was before chunkMinTs, closes the chunk and persists it.

  • A then consumes the points t3 t4 and t5 but rejects them because chunk in closed. This is worth instrumenting (it indicates that maybe you should check out your GC parameters). it does increment add_to_closed_chunk so you can see it on your dashboard when looking at the instance's stats.

  • B on the other hand, does not experience the lagging.

  • it consumes t1, t2 but does not experience the same lag as A. it consumes t3.

  • B's GC sees a last write more recent than chunkMinTs, so doesn't close the chunk.

  • B receives notification from A and updates lastSaveStart and lastSaveFinish accordingly, but does not close any chunk.

  • B consumes t4 and t5, and can push them to the chunk, as it is not closed and we don't look at the lastSave* timestamps.

=> B now added t3, t4 and t5 to its chunk, but we know A did not (we don't know for t3 but we do know for t4 and t5)

there's two problems:

  • when loading your charts, datasource B will show more points than datasource A (when loading from memory)
  • datasource B will show less points once it needs to retrieve the chunk from cassandra

as B could foresee these problems happening (e.g. if it would check lastSave* when adding new points, just like the old code checked currentChunk.Saved when adding new points), it could inform the operator by incrementing this stat.

note that B can never detect all cases of "this point has not been added by primary" (e.g. t3 in the above example) since it needs to wait for a notification and only see the problem with points it receives after processing the notification, so while the stat can't guarantee to account for all these cases, it will be a good indicator for cases where we can be sure it happened.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is such an obscure edge case that it would be crazy to complicate the code just to be able to observe. Even then you wouldnt know when it was happening as the add_to_saving metric would be incrementing for many other legitimate reasons that are not a problem.

The correct way to identified your unlikely scenario is to just compare the add_to_closed_chunk count on both nodes, they should always be the same.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as the add_to_saving metric would be incrementing for many other legitimate reasons that are not a problem.

like what?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like what?

#477 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the record:

dieter [9:06 AM] 
i don't understand the scenario described in https://github.com/raintank/metrictank/pull/477#discussion_r96889476  is A or B a primary?
GitHub
store save state as a property of the aggemtric instead of chunk by woodsaj · Pull Request #477 · raintank/metrictank · GitHub
Rather then tracking the state of each individual chunk, just keep a record of the most recent saveStart(add to write queue)/saveFinish(write to cassandra) as properties of the aggMetric. When we ...
 

awoods [9:07 AM] 
it doesnt matter which is primary.

[9:08]  
The point is 2 nodes wont necessarily process metrics in the same order.

dieter [9:09 AM] 
but for any given metric key, they would be processed in-order though? except when a metric changes its partition assignment

awoods [9:10 AM] 
correct.

dieter [9:10 AM] 
so you're talking about ordering with respect to different metric keys? if so, that shouldn't matter here, since we're talking about per-metric-key things

awoods [9:18 AM] 
it does matter.  THough the points for a key are ordered, the keys are not ordered.  So nodeA could process all of the points for metricX then move on to processing all of the points for metricY.  While nodeB processes all of points for metricY then processes metricX

[9:20]  
If nodeA reads a batch of metrics from a partition, but then a kafka node crashes before nodeB can read the batch, nodeB will have to wait for kafka to handle the failure before it can read the metrics.  This could easily take more then a few seconds.

awoods [9:25 AM] 
I understand your desire to know when two nodes have different data in their chunks. but
a) it is super unlikely and will self heal once the chunks move out of memory as there will only be one chunk, the one in cassandra
b) an add_to_saved_chunk metric only tells you that nodes might be out of sync.  Which is pretty useless as it is not actionable.
c) comparing the add_to_closed_chunk metrics of 2 nodes will tell you if the nodes are out of sync.  As the count should be the same on both nodes.

dieter [9:34 AM] 
i don't understand why different keys processed in different orders matters here. since the scenario we're talking about is with respect to specific chunks for specific metrics, it's a per-metric-phenomenon

awoods [9:35 AM] 
It is a timing problem.  There is not gaurantee that nodeA will process metricX at the same time as nodeB.  The nodes could be out of sync by a few seconds or more, and that is ok.

dieter [9:38 AM] 
ok so a timing offset problem between different instances with respect to a given metric.  you said that that would increase the proposed counter under normal operation (e.g. not in the situation i described). how so?

awoods [9:44 AM] 
NodeA reads a batch of metrics from partition p1.   The batch contains 2 points for metricX.  One point for chunk with T0=10, and one for T0=20.  So chunk T0=10 is saved and the metric perisist message is sent.
NodeB is delayed from reading from the p1 partition for some reason (kafka node failure or new node being added).
NodeB receives the metricPersist message for metricX T0=10
NodeB reads batch of metrics from partition p1, processes the first point for metricX even though the chunk has already been saved by NodeA

[9:49]  
Typically a batch of metrics from Kafka would only contain 1 point for each metric, unless
1) the metrics is being sent with 1second resolution
2) there was a network issue that caused metrics to be buffered somewhere and they were then flushed all at once.

2) is really likely, where metrics could be buffered by the client (cant talk to carbon-relay).  Or buffered by carbon-relay (cant talk to tsdb-gw)

dieter [10:03 AM] 
aha, so another way to look at it is, let's say we do have SyncSaveState close the chunks that the primary closed, in the event B gets points for a chunk that are already closed, there's no way to tell if those points *didn't*  make ito the chunk saved by the primary (the problematic scenario i brought up), versus if the points *did*  make it into the chunk saved by the primary (the non-problematic scenario you brought up)

[10:04]  
we could tell the difference if the persist messages would also contain the ts of the last point that went into the chunk (i'm not saying we should do that, but just a thought)

[10:06]  
ok so let's leave it how it is without this particular metric. sounds like the metric we used to have was flawed as well

[10:08]  
I think the key insight i gathered through these thinking exercises and discussions is that secondaries can't be a reliable indicator of these sorts of problems. instead we should always just rely on the `add_to_closed` of the primary node, which will be an accurate indicator of points not making it into chunks being saved to cassandra

awoods [10:10 AM] 
great. So LGTM?

return true
}
} else if !currentChunk.Saving {
} else if a.lastSaveStart < currentChunk.T0 {
// chunk hasn't been written to in a while, and is not yet queued to persist. Let's persist it
log.Debug("Found stale Chunk, persisting it to Cassandra. key: %s T0: %d", a.Key, currentChunk.T0)
a.persist(a.CurrentChunkPos)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The chunk needs to be "Finished" before calling persist and persist should only be called if the node is a primary

@woodsaj
Copy link
Member Author

woodsaj commented Jan 19, 2017

@Dieterbe, any final comments before this gets merged?

// we are a primary
log.Debug("Found stale Chunk, adding end-of-stream bytes. key: %s T0: %d", a.Key, currentChunk.T0)
currentChunk.Finish()
if cluster.Manager.IsPrimary() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that none of this code is being executed on secondaries. due to AggMetric.GC see #269 However we can just change that run this GC function on secondaries as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 80125f7

- improve comments and log messages
- GC task should "finish" stale chunks and persist them only if the
  node is a primary
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants