From c596193999d754a09e2baa99f9e50c583d66f4a2 Mon Sep 17 00:00:00 2001 From: woodsaj Date: Mon, 16 Jan 2017 19:16:34 +0800 Subject: [PATCH 1/3] store save state as a property of the aggemtric instead of chunk Rather then tracking the state of each individual chunk, just keep a record of the most recent saveStart(add to write queue)/saveFinish(write to cassandra) as properties of the aggMetric. When we save chunks we always save all unsaved chunks, so we dont lose anything by tracking the save state for all chunks in one variable. issue #452 --- dashboard.json | 12 +-- docs/metrics.md | 14 ++-- mdata/aggmetric.go | 176 ++++++++++++--------------------------- mdata/chunk/chunk.go | 22 +++-- mdata/cwr.go | 1 + mdata/init.go | 13 +-- mdata/store_cassandra.go | 2 +- 7 files changed, 79 insertions(+), 161 deletions(-) diff --git a/dashboard.json b/dashboard.json index d29eec05f0..c67c9a0667 100644 --- a/dashboard.json +++ b/dashboard.json @@ -38,7 +38,7 @@ { "aliasColors": { "ES backpressure": "#052B51", - "add-to-saving": "#C15C17", + "add-to-closed": "#C15C17", "primary": "#2F575E", "store backpressure": "#962D82", "too old": "#890F02" @@ -124,22 +124,18 @@ }, { "refId": "C", - "target": "alias(perSecond(metrictank.stats.$environment.$instance.tank.add_to_saved_chunk.counter32), 'add-to-saved')" + "target": "alias(perSecond(metrictank.stats.$environment.$instance.tank.add_to_closed_chunk.counter32), 'add-to-closed')" }, { "refId": "D", - "target": "alias(perSecond(metrictank.stats.$environment.$instance.tank.add_to_saving_chunk.counter32), 'add-to-saving')" - }, - { - "refId": "E", "target": "aliasSub(perSecond(metrictank.stats.$environment.$instance.input.*.metrics_decode_err.counter32), '.*\\.([^\\.]+)\\.metrics_decode_err.*', '\\1 decode err')" }, { - "refId": "F", + "refId": "E", "target": "aliasSub(perSecond(metrictank.stats.$environment.$instance.input.*.metric_invalid.counter32), '.*\\.([^\\.]+)\\.metric_invalid.*', '\\1 metric invalid')" }, { - "refId": "G", + "refId": "F", "target": "aliasByNode(scale(perSecond(metrictank.stats.$environment.$instance.input.*.pressure.*.counter32), 0.000000001), 6, 7)" } ], diff --git a/docs/metrics.md b/docs/metrics.md index 278b679779..70eab5ffb6 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -144,15 +144,11 @@ the duration of a put in the wait queue how many rows come per get response * `store.cassandra.to_iter`: the duration of converting chunks to iterators -* `tank.add_to_saved_chunk`: -points received - by a secondary node - for the most recent chunk when that chunk -has already been saved by a primary. A secondary can add this data to its chunks. -* `tank.add_to_saving_chunk`: -points received - by the primary node - for the most recent chunk -when that chunk is already being saved (or has been saved). -this indicates that your GC is actively sealing chunks and saving them before you have the chance to send -your (infrequent) updates. The primary won't add them to its in-memory chunks, but secondaries will -(because they are never in "saving" state for them), see below. +* `tank.add_to_closed_chunk`: +points received for the most recent chunk when that chunk is already being "closed", +ie the end-of-stream marker has been written to the chunk. +This indicates that your GC is actively sealing chunks and saving them before you have the chance to send +your (infrequent) updates. Any points revcieved for a chunk that has already been closed are discarded. * `tank.chunk_operations.clear`: a counter of how many chunks are cleared (replaced by new chunks) * `tank.chunk_operations.create`: diff --git a/mdata/aggmetric.go b/mdata/aggmetric.go index 46f7b75e73..9b412655b5 100644 --- a/mdata/aggmetric.go +++ b/mdata/aggmetric.go @@ -30,6 +30,9 @@ type AggMetric struct { aggregators []*Aggregator firstChunkT0 uint32 ttl uint32 + lastSaveStart uint32 // last chunk T0 that was added to the write Queue. + lastSaveFinish uint32 // last chunk T0 successfully written to Cassandra. + lastWrite uint32 } // NewAggMetric creates a metric with given key, it retains the given number of chunks each chunkSpan seconds long @@ -42,6 +45,7 @@ func NewAggMetric(store Store, key string, chunkSpan, numChunks uint32, ttl uint NumChunks: numChunks, Chunks: make([]*chunk.Chunk, 0, numChunks), ttl: ttl, + lastWrite: uint32(time.Now().Unix()), } for _, as := range aggsetting { m.aggregators = append(m.aggregators, NewAggregator(store, key, as.Span, as.ChunkSpan, as.NumChunks, as.Ttl)) @@ -54,99 +58,15 @@ func NewAggMetric(store Store, key string, chunkSpan, numChunks uint32, ttl uint func (a *AggMetric) SyncChunkSaveState(ts uint32) { a.Lock() defer a.Unlock() - chunk := a.getChunkByT0(ts) - if chunk != nil { - if LogLevel < 2 { - log.Debug("AM marking chunk %s:%d as saved.", a.Key, chunk.T0) - } - chunk.Saved = true - } -} - -/* Get a chunk by its T0. It is expected that the caller has acquired a.Lock()*/ -func (a *AggMetric) getChunkByT0(ts uint32) *chunk.Chunk { - // we have no chunks. - if len(a.Chunks) == 0 { - return nil - } - - currentT0 := a.Chunks[a.CurrentChunkPos].T0 - - if ts == currentT0 { - //found our chunk. - return a.Chunks[a.CurrentChunkPos] - } - - // requested Chunk is not in our dataset. - if ts > currentT0 { - return nil - } - - // requested Chunk is not in our dataset. - if len(a.Chunks) == 1 { - return nil - } - - // calculate the number of chunks ago our requested T0 is, - // assuming that chunks are sequential. - chunksAgo := int((currentT0 - ts) / a.ChunkSpan) - - numChunks := len(a.Chunks) - oldestPos := a.CurrentChunkPos + 1 - if oldestPos >= numChunks { - oldestPos = 0 + if ts > a.lastSaveFinish { + a.lastSaveFinish = ts } - - var guess int - - if chunksAgo >= (numChunks - 1) { - // set guess to the oldest chunk. - guess = oldestPos - } else { - guess = a.CurrentChunkPos - chunksAgo - if guess < 0 { - guess += numChunks - } + if ts > a.lastSaveStart { + a.lastSaveStart = ts } - - // we now have a good guess at which chunk position our requested TO is in. - c := a.Chunks[guess] - - if c.T0 == ts { - // found our chunk. - return c - } - - if ts > c.T0 { - // we need to check newer chunks - for c.T0 < currentT0 { - guess += 1 - if guess >= numChunks { - guess = 0 - } - c = a.Chunks[guess] - if c.T0 == ts { - //found our chunk - return c - } - } - } else { - // we need to check older chunks - oldestT0 := a.Chunks[oldestPos].T0 - for c.T0 >= oldestT0 && c.T0 < currentT0 { - guess -= 1 - if guess < 0 { - guess += numChunks - } - c = a.Chunks[guess] - if c.T0 == ts { - //found or chunk. - return c - } - } + if LogLevel < 2 { + log.Debug("AM metric %s at chunk T0=%d has been saved.", a.Key, ts) } - // chunk not found. - return nil } func (a *AggMetric) getChunk(pos int) *chunk.Chunk { @@ -332,18 +252,12 @@ func (a *AggMetric) addAggregators(ts uint32, val float64) { // write a chunk to persistent storage. This should only be called while holding a.Lock() func (a *AggMetric) persist(pos int) { - if !cluster.ThisNode.IsPrimary() { - if LogLevel < 2 { - log.Debug("AM persist(): node is not primary, not saving chunk.") - } - return - } - - pre := time.Now() chunk := a.Chunks[pos] + pre := time.Now() - if chunk.Saved || chunk.Saving { - // this can happen if chunk was persisted by GC (stale) and then new data triggered another persist call + if a.lastSaveStart >= chunk.T0 { + // this can happen if there are 2 primary MT nodes both saving chunks to Cassandra or + // a primary failed and this node was promoted to be primary but metric consuming is lagging. log.Debug("AM persist(): duplicate persist call for chunk.") return } @@ -352,11 +266,12 @@ func (a *AggMetric) persist(pos int) { pending := make([]*ChunkWriteRequest, 1) // add the current chunk to the list of chunks to send to the writeQueue pending[0] = &ChunkWriteRequest{ + metric: a, key: a.Key, - chunk: chunk, + span: a.ChunkSpan, ttl: a.ttl, + chunk: chunk, timestamp: time.Now(), - span: a.ChunkSpan, } // if we recently became the primary, there may be older chunks @@ -367,16 +282,17 @@ func (a *AggMetric) persist(pos int) { previousPos += len(a.Chunks) } previousChunk := a.Chunks[previousPos] - for (previousChunk.T0 < chunk.T0) && !previousChunk.Saved && !previousChunk.Saving { + for (previousChunk.T0 < chunk.T0) && (a.lastSaveStart < previousChunk.T0) { if LogLevel < 2 { log.Debug("AM persist(): old chunk needs saving. Adding %s:%d to writeQueue", a.Key, previousChunk.T0) } pending = append(pending, &ChunkWriteRequest{ + metric: a, key: a.Key, - chunk: previousChunk, + span: a.ChunkSpan, ttl: a.ttl, + chunk: previousChunk, timestamp: time.Now(), - span: a.ChunkSpan, }) previousPos-- if previousPos < 0 { @@ -385,6 +301,9 @@ func (a *AggMetric) persist(pos int) { previousChunk = a.Chunks[previousPos] } + // Every chunk with a T0 <= this chunks' T0 is now either saved, or in the writeQueue. + a.lastSaveStart = chunk.T0 + if LogLevel < 2 { log.Debug("AM persist(): sending %d chunks to write queue", len(pending)) } @@ -402,9 +321,8 @@ func (a *AggMetric) persist(pos int) { if LogLevel < 2 { log.Debug("AM persist(): sealing chunk %d/%d (%s:%d) and adding to write queue.", pendingChunk, len(pending), a.Key, chunk.T0) } - pending[pendingChunk].chunk.Finish() a.store.Add(pending[pendingChunk]) - pending[pendingChunk].chunk.Saving = true + pendingChunk-- } persistDuration.Value(time.Now().Sub(pre)) @@ -432,6 +350,7 @@ func (a *AggMetric) Add(ts uint32, val float64) { } log.Debug("AM %s Add(): created first chunk with first point: %v", a.Key, a.Chunks[0]) + a.lastWrite = uint32(time.Now().Unix()) a.addAggregators(ts, val) return } @@ -440,34 +359,40 @@ func (a *AggMetric) Add(ts uint32, val float64) { if t0 == currentChunk.T0 { // last prior data was in same chunk as new point - if currentChunk.Saving { - // if we're already saving the chunk, it means it has the end-of-stream marker and any new points behind it wouldn't be read by an iterator + if currentChunk.Closed { + // if we've already 'finished' the chunk, it means it has the end-of-stream marker and any new points behind it wouldn't be read by an iterator // you should monitor this metric closely, it indicates that maybe your GC settings don't match how you actually send data (too late) - addToSavingChunk.Inc() + addToClosedChunk.Inc() return } - if err := currentChunk.Push(ts, val); err == nil { - if currentChunk.Saved { - // if we're here, it means we marked it as Saved because it was saved by an other primary, not by us since Saving is false. - // typically this happens when non-primaries receive metrics that the primary already saved (maybe cause their metrics consumer is laggy) - // we allow adding data to such chunks in that case, though this open the possibility for data to be rejected by the primary, to be - // visible on secondaries. - addToSavedChunk.Inc() - } - } else { + if err := currentChunk.Push(ts, val); err != nil { log.Debug("AM failed to add metric to chunk for %s. %s", a.Key, err) metricsTooOld.Inc() return } + a.lastWrite = uint32(time.Now().Unix()) log.Debug("AM %s Add(): pushed new value to last chunk: %v", a.Key, a.Chunks[0]) } else if t0 < currentChunk.T0 { log.Debug("AM Point at %d has t0 %d, goes back into previous chunk. CurrentChunk t0: %d, LastTs: %d", ts, t0, currentChunk.T0, currentChunk.LastTs) metricsTooOld.Inc() return } else { - // persist the chunk. If the writeQueue is full, then this will block. - a.persist(a.CurrentChunkPos) + // Data belongs in a new chunk. + + // If it isnt finished already, add the end-of-stream marker and flag the chunk as "closed" + if !currentChunk.Closed { + currentChunk.Finish() + } + + // If we are a primary node, then add the chunk to the write queue to be saved to Cassandra + if cluster.ThisNode.IsPrimary() { + if LogLevel < 2 { + log.Debug("AM persist(): node is primary, saving chunk.") + } + // persist the chunk. If the writeQueue is full, then this will block. + a.persist(a.CurrentChunkPos) + } a.CurrentChunkPos++ if a.CurrentChunkPos >= int(a.NumChunks) { @@ -490,6 +415,7 @@ func (a *AggMetric) Add(ts uint32, val float64) { } log.Debug("AM %s Add(): cleared chunk at %d of %d and replaced with new. and added the new point: %s", a.Key, a.CurrentChunkPos, len(a.Chunks), a.Chunks[a.CurrentChunkPos]) } + a.lastWrite = uint32(time.Now().Unix()) } a.addAggregators(ts, val) @@ -503,13 +429,13 @@ func (a *AggMetric) GC(chunkMinTs, metricMinTs uint32) bool { return false } - if currentChunk.LastWrite < chunkMinTs { - if currentChunk.Saved { + if a.lastWrite < chunkMinTs { + if a.lastSaveStart >= currentChunk.T0 { // already saved. lets check if we should just delete the metric from memory. - if currentChunk.LastWrite < metricMinTs { + if a.lastWrite < metricMinTs { return true } - } else if !currentChunk.Saving { + } else if a.lastSaveStart < currentChunk.T0 { // chunk hasn't been written to in a while, and is not yet queued to persist. Let's persist it log.Debug("Found stale Chunk, persisting it to Cassandra. key: %s T0: %d", a.Key, currentChunk.T0) a.persist(a.CurrentChunkPos) diff --git a/mdata/chunk/chunk.go b/mdata/chunk/chunk.go index 19130619f2..fd53f3801c 100644 --- a/mdata/chunk/chunk.go +++ b/mdata/chunk/chunk.go @@ -2,7 +2,6 @@ package chunk import ( "fmt" - "time" "github.com/dgryski/go-tsz" "github.com/raintank/metrictank/stats" @@ -16,19 +15,20 @@ type Chunk struct { tsz.Series LastTs uint32 // last TS seen, not computed or anything NumPoints uint32 - Saved bool - Saving bool - LastWrite uint32 + Closed bool } func New(t0 uint32) *Chunk { - // we must set LastWrite here as well to make sure a new Chunk doesn't get immediately - // garbage collected right after creating it, before we can push to it - return &Chunk{*tsz.New(t0), 0, 0, false, false, uint32(time.Now().Unix())} + return &Chunk{ + Series: *tsz.New(t0), + LastTs: 0, + NumPoints: 0, + Closed: false, + } } func (c *Chunk) String() string { - return fmt.Sprintf("", c.T0, c.LastTs, c.NumPoints, c.Saved) + return fmt.Sprintf("", c.T0, c.LastTs, c.NumPoints, c.Closed) } func (c *Chunk) Push(t uint32, v float64) error { @@ -38,7 +38,6 @@ func (c *Chunk) Push(t uint32, v float64) error { c.Series.Push(t, v) c.NumPoints += 1 c.LastTs = t - c.LastWrite = uint32(time.Now().Unix()) totalPoints.Inc() return nil } @@ -46,3 +45,8 @@ func (c *Chunk) Push(t uint32, v float64) error { func (c *Chunk) Clear() { totalPoints.DecUint64(uint64(c.NumPoints)) } + +func (c *Chunk) Finish() { + c.Closed = true + c.Series.Finish() +} diff --git a/mdata/cwr.go b/mdata/cwr.go index 27b2c98aec..a8d0843034 100644 --- a/mdata/cwr.go +++ b/mdata/cwr.go @@ -16,6 +16,7 @@ type ChunkReadRequest struct { } type ChunkWriteRequest struct { + metric *AggMetric key string chunk *chunk.Chunk ttl uint32 diff --git a/mdata/init.go b/mdata/init.go index b1354349f0..9746c3aded 100644 --- a/mdata/init.go +++ b/mdata/init.go @@ -19,16 +19,11 @@ var ( // that is not higher than the timestamp of the last written timestamp for that series. metricsTooOld = stats.NewCounter32("tank.metrics_too_old") - // metric tank.add_to_saving_chunk is points received - by the primary node - for the most recent chunk - // when that chunk is already being saved (or has been saved). + // metric tank.add_to_closed_chunk is points received for the most recent chunk + // when that chunk is already being "closed", ie the end-of-stream marker has been written to the chunk. // this indicates that your GC is actively sealing chunks and saving them before you have the chance to send - // your (infrequent) updates. The primary won't add them to its in-memory chunks, but secondaries will - // (because they are never in "saving" state for them), see below. - addToSavingChunk = stats.NewCounter32("tank.add_to_saving_chunk") - - // metric tank.add_to_saved_chunk is points received - by a secondary node - for the most recent chunk when that chunk - // has already been saved by a primary. A secondary can add this data to its chunks. - addToSavedChunk = stats.NewCounter32("tank.add_to_saved_chunk") + // your (infrequent) updates. Any points revcieved for a chunk that has already been closed are discarded. + addToClosedChunk = stats.NewCounter32("tank.add_to_closed_chunk") // metric mem.to_iter is how long it takes to transform in-memory chunks to iterators memToIterDuration = stats.NewLatencyHistogram15s32("mem.to_iter") diff --git a/mdata/store_cassandra.go b/mdata/store_cassandra.go index d998dad352..1ecd707a7b 100644 --- a/mdata/store_cassandra.go +++ b/mdata/store_cassandra.go @@ -211,7 +211,7 @@ func (c *cassandraStore) processWriteQueue(queue chan *ChunkWriteRequest, meter err := c.insertChunk(cwr.key, cwr.chunk.T0, buf.Bytes(), int(cwr.ttl)) if err == nil { success = true - cwr.chunk.Saved = true + cwr.metric.SyncChunkSaveState(cwr.chunk.T0) SendPersistMessage(cwr.key, cwr.chunk.T0) log.Debug("CS: save complete. %s:%d %v", cwr.key, cwr.chunk.T0, cwr.chunk) chunkSaveOk.Inc() From 948679278abade2645538d9ceaa91239572151c7 Mon Sep 17 00:00:00 2001 From: woodsaj Date: Fri, 20 Jan 2017 00:03:48 +0800 Subject: [PATCH 2/3] minor fixes - improve comments and log messages - GC task should "finish" stale chunks and persist them only if the node is a primary --- mdata/aggmetric.go | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/mdata/aggmetric.go b/mdata/aggmetric.go index ced5d6d151..390a9e2cb3 100644 --- a/mdata/aggmetric.go +++ b/mdata/aggmetric.go @@ -45,6 +45,8 @@ func NewAggMetric(store Store, key string, chunkSpan, numChunks uint32, ttl uint NumChunks: numChunks, Chunks: make([]*chunk.Chunk, 0, numChunks), ttl: ttl, + // we set LastWrite here to make sure a new Chunk doesn't get immediately + // garbage collected right after creating it, before we can push to it. lastWrite: uint32(time.Now().Unix()), } for _, as := range aggsetting { @@ -256,8 +258,10 @@ func (a *AggMetric) persist(pos int) { pre := time.Now() if a.lastSaveStart >= chunk.T0 { - // this can happen if there are 2 primary MT nodes both saving chunks to Cassandra or - // a primary failed and this node was promoted to be primary but metric consuming is lagging. + // this can happen if + // a) there are 2 primary MT nodes both saving chunks to Cassandra + // b) a primary failed and this node was promoted to be primary but metric consuming is lagging. + // c) chunk was persisted by GC (stale) and then new data triggered another persist call log.Debug("AM persist(): duplicate persist call for chunk.") return } @@ -388,7 +392,7 @@ func (a *AggMetric) Add(ts uint32, val float64) { // If we are a primary node, then add the chunk to the write queue to be saved to Cassandra if cluster.Manager.IsPrimary() { if LogLevel < 2 { - log.Debug("AM persist(): node is primary, saving chunk.") + log.Debug("AM persist(): node is primary, saving chunk. %s T0: %d", a.Key, currentChunk.T0) } // persist the chunk. If the writeQueue is full, then this will block. a.persist(a.CurrentChunkPos) @@ -430,15 +434,24 @@ func (a *AggMetric) GC(chunkMinTs, metricMinTs uint32) bool { } if a.lastWrite < chunkMinTs { - if a.lastSaveStart >= currentChunk.T0 { - // already saved. lets check if we should just delete the metric from memory. + if currentChunk.Closed { + // already closed and should be saved, though we cant guarantee that. + // Check if we should just delete the metric from memory. if a.lastWrite < metricMinTs { return true } - } else if a.lastSaveStart < currentChunk.T0 { - // chunk hasn't been written to in a while, and is not yet queued to persist. Let's persist it - log.Debug("Found stale Chunk, persisting it to Cassandra. key: %s T0: %d", a.Key, currentChunk.T0) - a.persist(a.CurrentChunkPos) + } else { + // chunk hasn't been written to in a while, and is not yet closed. Let's close it and persist it if + // we are a primary + log.Debug("Found stale Chunk, adding end-of-stream bytes. key: %s T0: %d", a.Key, currentChunk.T0) + currentChunk.Finish() + if cluster.Manager.IsPrimary() { + if LogLevel < 2 { + log.Debug("AM persist(): node is primary, saving chunk. %s T0: %d", a.Key, currentChunk.T0) + } + // persist the chunk. If the writeQueue is full, then this will block. + a.persist(a.CurrentChunkPos) + } } } return false From f8612f44bfb2606493b80601bb1fdcafdb486a44 Mon Sep 17 00:00:00 2001 From: woodsaj Date: Fri, 20 Jan 2017 14:05:20 +0800 Subject: [PATCH 3/3] secondaries should also clean up stale metrics fixes #269 --- mdata/aggmetrics.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/mdata/aggmetrics.go b/mdata/aggmetrics.go index 0f1e4d4376..17372b9031 100644 --- a/mdata/aggmetrics.go +++ b/mdata/aggmetrics.go @@ -4,7 +4,6 @@ import ( "sync" "time" - "github.com/raintank/metrictank/cluster" "github.com/raintank/worldping-api/pkg/log" ) @@ -47,9 +46,6 @@ func (ms *AggMetrics) GC() { unix := time.Duration(time.Now().UnixNano()) diff := ms.gcInterval - (unix % ms.gcInterval) time.Sleep(diff + time.Minute) - if !cluster.Manager.IsPrimary() { - continue - } log.Info("checking for stale chunks that need persisting.") now := uint32(time.Now().Unix()) chunkMinTs := now - (now % ms.chunkSpan) - uint32(ms.chunkMaxStale)