-
Notifications
You must be signed in to change notification settings - Fork 105
store save state as a property of the aggemtric instead of chunk #477
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,9 @@ type AggMetric struct { | |
aggregators []*Aggregator | ||
firstChunkT0 uint32 | ||
ttl uint32 | ||
lastSaveStart uint32 // last chunk T0 that was added to the write Queue. | ||
lastSaveFinish uint32 // last chunk T0 successfully written to Cassandra. | ||
lastWrite uint32 | ||
} | ||
|
||
// NewAggMetric creates a metric with given key, it retains the given number of chunks each chunkSpan seconds long | ||
|
@@ -42,6 +45,7 @@ func NewAggMetric(store Store, key string, chunkSpan, numChunks uint32, ttl uint | |
NumChunks: numChunks, | ||
Chunks: make([]*chunk.Chunk, 0, numChunks), | ||
ttl: ttl, | ||
lastWrite: uint32(time.Now().Unix()), | ||
} | ||
for _, as := range aggsetting { | ||
m.aggregators = append(m.aggregators, NewAggregator(store, key, as.Span, as.ChunkSpan, as.NumChunks, as.Ttl)) | ||
|
@@ -54,99 +58,15 @@ func NewAggMetric(store Store, key string, chunkSpan, numChunks uint32, ttl uint | |
func (a *AggMetric) SyncChunkSaveState(ts uint32) { | ||
a.Lock() | ||
defer a.Unlock() | ||
chunk := a.getChunkByT0(ts) | ||
if chunk != nil { | ||
if LogLevel < 2 { | ||
log.Debug("AM marking chunk %s:%d as saved.", a.Key, chunk.T0) | ||
} | ||
chunk.Saved = true | ||
} | ||
} | ||
|
||
/* Get a chunk by its T0. It is expected that the caller has acquired a.Lock()*/ | ||
func (a *AggMetric) getChunkByT0(ts uint32) *chunk.Chunk { | ||
// we have no chunks. | ||
if len(a.Chunks) == 0 { | ||
return nil | ||
} | ||
|
||
currentT0 := a.Chunks[a.CurrentChunkPos].T0 | ||
|
||
if ts == currentT0 { | ||
//found our chunk. | ||
return a.Chunks[a.CurrentChunkPos] | ||
} | ||
|
||
// requested Chunk is not in our dataset. | ||
if ts > currentT0 { | ||
return nil | ||
} | ||
|
||
// requested Chunk is not in our dataset. | ||
if len(a.Chunks) == 1 { | ||
return nil | ||
} | ||
|
||
// calculate the number of chunks ago our requested T0 is, | ||
// assuming that chunks are sequential. | ||
chunksAgo := int((currentT0 - ts) / a.ChunkSpan) | ||
|
||
numChunks := len(a.Chunks) | ||
oldestPos := a.CurrentChunkPos + 1 | ||
if oldestPos >= numChunks { | ||
oldestPos = 0 | ||
if ts > a.lastSaveFinish { | ||
a.lastSaveFinish = ts | ||
} | ||
|
||
var guess int | ||
|
||
if chunksAgo >= (numChunks - 1) { | ||
// set guess to the oldest chunk. | ||
guess = oldestPos | ||
} else { | ||
guess = a.CurrentChunkPos - chunksAgo | ||
if guess < 0 { | ||
guess += numChunks | ||
} | ||
if ts > a.lastSaveStart { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As far as I can see the only call site of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. taking that a bit further. the CassandraStore guarantees that on a per aggmetric-level (actually per ChunkWriteRequest but CWR's keys is simply the key of the aggmetric), all chunk writes are in order, so the ts will be in order. also I believe that persist has an order guarantee: the chunks it persists are in ascending time order, both within 1 call, and across multiple calls. I like the SaveStart and SaveFinish terminology, and i think we should apply it broader. so instead of
the former would be called by persist, the latter by cassandraStore. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SyncChunkSaveState() is also called when metricPerist messages are received. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah no SyncChunkSaveState is also called by the notifiers, consuming the persistence stream. in case of kafka, ordered. for nsq, unordered. there could be multiple primaries though (not a great setup, but can happen), saving chunks concurrently and potentially even causing out of order messages (with respect to different instances. e.g. an instance could be lagging behind). so the ts check would still be needed. |
||
a.lastSaveStart = ts | ||
} | ||
|
||
// we now have a good guess at which chunk position our requested TO is in. | ||
c := a.Chunks[guess] | ||
|
||
if c.T0 == ts { | ||
// found our chunk. | ||
return c | ||
} | ||
|
||
if ts > c.T0 { | ||
// we need to check newer chunks | ||
for c.T0 < currentT0 { | ||
guess += 1 | ||
if guess >= numChunks { | ||
guess = 0 | ||
} | ||
c = a.Chunks[guess] | ||
if c.T0 == ts { | ||
//found our chunk | ||
return c | ||
} | ||
} | ||
} else { | ||
// we need to check older chunks | ||
oldestT0 := a.Chunks[oldestPos].T0 | ||
for c.T0 >= oldestT0 && c.T0 < currentT0 { | ||
guess -= 1 | ||
if guess < 0 { | ||
guess += numChunks | ||
} | ||
c = a.Chunks[guess] | ||
if c.T0 == ts { | ||
//found or chunk. | ||
return c | ||
} | ||
} | ||
if LogLevel < 2 { | ||
log.Debug("AM metric %s at chunk T0=%d has been saved.", a.Key, ts) | ||
} | ||
// chunk not found. | ||
return nil | ||
} | ||
|
||
func (a *AggMetric) getChunk(pos int) *chunk.Chunk { | ||
|
@@ -332,18 +252,12 @@ func (a *AggMetric) addAggregators(ts uint32, val float64) { | |
|
||
// write a chunk to persistent storage. This should only be called while holding a.Lock() | ||
func (a *AggMetric) persist(pos int) { | ||
if !cluster.Manager.IsPrimary() { | ||
if LogLevel < 2 { | ||
log.Debug("AM persist(): node is not primary, not saving chunk.") | ||
} | ||
return | ||
} | ||
|
||
pre := time.Now() | ||
chunk := a.Chunks[pos] | ||
pre := time.Now() | ||
|
||
if chunk.Saved || chunk.Saving { | ||
// this can happen if chunk was persisted by GC (stale) and then new data triggered another persist call | ||
if a.lastSaveStart >= chunk.T0 { | ||
// this can happen if there are 2 primary MT nodes both saving chunks to Cassandra or | ||
// a primary failed and this node was promoted to be primary but metric consuming is lagging. | ||
log.Debug("AM persist(): duplicate persist call for chunk.") | ||
return | ||
} | ||
|
@@ -352,11 +266,12 @@ func (a *AggMetric) persist(pos int) { | |
pending := make([]*ChunkWriteRequest, 1) | ||
// add the current chunk to the list of chunks to send to the writeQueue | ||
pending[0] = &ChunkWriteRequest{ | ||
metric: a, | ||
key: a.Key, | ||
chunk: chunk, | ||
span: a.ChunkSpan, | ||
ttl: a.ttl, | ||
chunk: chunk, | ||
timestamp: time.Now(), | ||
span: a.ChunkSpan, | ||
} | ||
|
||
// if we recently became the primary, there may be older chunks | ||
|
@@ -367,16 +282,17 @@ func (a *AggMetric) persist(pos int) { | |
previousPos += len(a.Chunks) | ||
} | ||
previousChunk := a.Chunks[previousPos] | ||
for (previousChunk.T0 < chunk.T0) && !previousChunk.Saved && !previousChunk.Saving { | ||
for (previousChunk.T0 < chunk.T0) && (a.lastSaveStart < previousChunk.T0) { | ||
if LogLevel < 2 { | ||
log.Debug("AM persist(): old chunk needs saving. Adding %s:%d to writeQueue", a.Key, previousChunk.T0) | ||
} | ||
pending = append(pending, &ChunkWriteRequest{ | ||
metric: a, | ||
key: a.Key, | ||
chunk: previousChunk, | ||
span: a.ChunkSpan, | ||
ttl: a.ttl, | ||
chunk: previousChunk, | ||
timestamp: time.Now(), | ||
span: a.ChunkSpan, | ||
}) | ||
previousPos-- | ||
if previousPos < 0 { | ||
|
@@ -385,6 +301,9 @@ func (a *AggMetric) persist(pos int) { | |
previousChunk = a.Chunks[previousPos] | ||
} | ||
|
||
// Every chunk with a T0 <= this chunks' T0 is now either saved, or in the writeQueue. | ||
a.lastSaveStart = chunk.T0 | ||
|
||
if LogLevel < 2 { | ||
log.Debug("AM persist(): sending %d chunks to write queue", len(pending)) | ||
} | ||
|
@@ -402,9 +321,8 @@ func (a *AggMetric) persist(pos int) { | |
if LogLevel < 2 { | ||
log.Debug("AM persist(): sealing chunk %d/%d (%s:%d) and adding to write queue.", pendingChunk, len(pending), a.Key, chunk.T0) | ||
} | ||
pending[pendingChunk].chunk.Finish() | ||
a.store.Add(pending[pendingChunk]) | ||
pending[pendingChunk].chunk.Saving = true | ||
|
||
pendingChunk-- | ||
} | ||
persistDuration.Value(time.Now().Sub(pre)) | ||
|
@@ -432,6 +350,7 @@ func (a *AggMetric) Add(ts uint32, val float64) { | |
} | ||
|
||
log.Debug("AM %s Add(): created first chunk with first point: %v", a.Key, a.Chunks[0]) | ||
a.lastWrite = uint32(time.Now().Unix()) | ||
a.addAggregators(ts, val) | ||
return | ||
} | ||
|
@@ -440,34 +359,40 @@ func (a *AggMetric) Add(ts uint32, val float64) { | |
|
||
if t0 == currentChunk.T0 { | ||
// last prior data was in same chunk as new point | ||
if currentChunk.Saving { | ||
// if we're already saving the chunk, it means it has the end-of-stream marker and any new points behind it wouldn't be read by an iterator | ||
if currentChunk.Closed { | ||
// if we've already 'finished' the chunk, it means it has the end-of-stream marker and any new points behind it wouldn't be read by an iterator | ||
// you should monitor this metric closely, it indicates that maybe your GC settings don't match how you actually send data (too late) | ||
addToSavingChunk.Inc() | ||
addToClosedChunk.Inc() | ||
return | ||
} | ||
|
||
if err := currentChunk.Push(ts, val); err == nil { | ||
if currentChunk.Saved { | ||
// if we're here, it means we marked it as Saved because it was saved by an other primary, not by us since Saving is false. | ||
// typically this happens when non-primaries receive metrics that the primary already saved (maybe cause their metrics consumer is laggy) | ||
// we allow adding data to such chunks in that case, though this open the possibility for data to be rejected by the primary, to be | ||
// visible on secondaries. | ||
addToSavedChunk.Inc() | ||
} | ||
} else { | ||
if err := currentChunk.Push(ts, val); err != nil { | ||
log.Debug("AM failed to add metric to chunk for %s. %s", a.Key, err) | ||
metricsTooOld.Inc() | ||
return | ||
} | ||
a.lastWrite = uint32(time.Now().Unix()) | ||
log.Debug("AM %s Add(): pushed new value to last chunk: %v", a.Key, a.Chunks[0]) | ||
} else if t0 < currentChunk.T0 { | ||
log.Debug("AM Point at %d has t0 %d, goes back into previous chunk. CurrentChunk t0: %d, LastTs: %d", ts, t0, currentChunk.T0, currentChunk.LastTs) | ||
metricsTooOld.Inc() | ||
return | ||
} else { | ||
// persist the chunk. If the writeQueue is full, then this will block. | ||
a.persist(a.CurrentChunkPos) | ||
// Data belongs in a new chunk. | ||
|
||
// If it isnt finished already, add the end-of-stream marker and flag the chunk as "closed" | ||
if !currentChunk.Closed { | ||
currentChunk.Finish() | ||
} | ||
|
||
// If we are a primary node, then add the chunk to the write queue to be saved to Cassandra | ||
if cluster.Manager.IsPrimary() { | ||
if LogLevel < 2 { | ||
log.Debug("AM persist(): node is primary, saving chunk.") | ||
} | ||
// persist the chunk. If the writeQueue is full, then this will block. | ||
a.persist(a.CurrentChunkPos) | ||
} | ||
|
||
a.CurrentChunkPos++ | ||
if a.CurrentChunkPos >= int(a.NumChunks) { | ||
|
@@ -490,6 +415,7 @@ func (a *AggMetric) Add(ts uint32, val float64) { | |
} | ||
log.Debug("AM %s Add(): cleared chunk at %d of %d and replaced with new. and added the new point: %s", a.Key, a.CurrentChunkPos, len(a.Chunks), a.Chunks[a.CurrentChunkPos]) | ||
} | ||
a.lastWrite = uint32(time.Now().Unix()) | ||
|
||
} | ||
a.addAggregators(ts, val) | ||
|
@@ -503,13 +429,13 @@ func (a *AggMetric) GC(chunkMinTs, metricMinTs uint32) bool { | |
return false | ||
} | ||
|
||
if currentChunk.LastWrite < chunkMinTs { | ||
if currentChunk.Saved { | ||
if a.lastWrite < chunkMinTs { | ||
if a.lastSaveStart >= currentChunk.T0 { | ||
// already saved. lets check if we should just delete the metric from memory. | ||
if currentChunk.LastWrite < metricMinTs { | ||
if a.lastWrite < metricMinTs { | ||
return true | ||
} | ||
} else if !currentChunk.Saving { | ||
} else if a.lastSaveStart < currentChunk.T0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this line is equivalent to just |
||
// chunk hasn't been written to in a while, and is not yet queued to persist. Let's persist it | ||
log.Debug("Found stale Chunk, persisting it to Cassandra. key: %s T0: %d", a.Key, currentChunk.T0) | ||
a.persist(a.CurrentChunkPos) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The chunk needs to be "Finished" before calling persist and persist should only be called if the node is a primary |
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so add_to_closed is the equivalent of add_to_saving.
However, since SyncChunkSaveState doesn't close any chunks or marks chunks as closed, we no longer have a way to notice when we're adding data to a chunk when that chunk has already been saved by a primary, even when we already received a persist notification for it. I.e. there is no equivalent of add_to_saved.
IOW: Add() will add points to chunks that are already saved by the primary, since it only looks at Closed (which it only sets itself if its primary), it does not look at lastSaveFinish. it'll be as if we're adding data regularly but actually something weird is going on (add data to chunks that potentially didn't make it into the chunk in cassandra)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only time a chunk is closed if we receive a point for the next chunk, or it is stale and the GC task closes it.
A chunk being saved by another node should not have any impact on chunks on this node. Both nodes should be receiving the same data and both nodes are bound by the same conditions leading to a chunk being closed.
If nodeA is lagging behind, it should not be prevented from adding points to a chunk just because nodeB has saved the chunk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm not saying it should be prevented. I'm saying I liked the previous behavior (which was adding the points but increasing a metric that signals that this is going on)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But why do i care? I dont see there being any value to having this metric. The metric would be incrementing under normal operation, especially as the ingest rate goes up and there is a lag of a few seconds. (nodeA could be processing data from partitions 0 then 1,2,3,4 etc.. while nodeB could be processes 7 then 6,5,4,etc...)
this is not possible as all nodes are bound by the same rules for closing a chunk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is possible. consider this:
we're adding points t1, t2, t3, t4, t5 (all have the same t0 i.e. go into the same chunkspan)
there's a long time between t2 and t3, let's say it's a time just under chunk-max-stale.
A is primary, B is secondary.
A consumes t1, t2, and then experiences some lagging. t3 is in kafka but A doesn't consume it quite yet
A's GC notices that last write was before chunkMinTs, closes the chunk and persists it.
A then consumes the points t3 t4 and t5 but rejects them because chunk in closed. This is worth instrumenting (it indicates that maybe you should check out your GC parameters). it does increment
add_to_closed_chunk
so you can see it on your dashboard when looking at the instance's stats.B on the other hand, does not experience the lagging.
it consumes t1, t2 but does not experience the same lag as A. it consumes t3.
B's GC sees a last write more recent than chunkMinTs, so doesn't close the chunk.
B receives notification from A and updates lastSaveStart and lastSaveFinish accordingly, but does not close any chunk.
B consumes t4 and t5, and can push them to the chunk, as it is not closed and we don't look at the lastSave* timestamps.
=> B now added t3, t4 and t5 to its chunk, but we know A did not (we don't know for t3 but we do know for t4 and t5)
there's two problems:
as B could foresee these problems happening (e.g. if it would check
lastSave*
when adding new points, just like the old code checkedcurrentChunk.Saved
when adding new points), it could inform the operator by incrementing this stat.note that B can never detect all cases of "this point has not been added by primary" (e.g. t3 in the above example) since it needs to wait for a notification and only see the problem with points it receives after processing the notification, so while the stat can't guarantee to account for all these cases, it will be a good indicator for cases where we can be sure it happened.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is such an obscure edge case that it would be crazy to complicate the code just to be able to observe. Even then you wouldnt know when it was happening as the add_to_saving metric would be incrementing for many other legitimate reasons that are not a problem.
The correct way to identified your unlikely scenario is to just compare the add_to_closed_chunk count on both nodes, they should always be the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
like what?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#477 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for the record: