diff --git a/dashboard.json b/dashboard.json index d29eec05f0..c67c9a0667 100644 --- a/dashboard.json +++ b/dashboard.json @@ -38,7 +38,7 @@ { "aliasColors": { "ES backpressure": "#052B51", - "add-to-saving": "#C15C17", + "add-to-closed": "#C15C17", "primary": "#2F575E", "store backpressure": "#962D82", "too old": "#890F02" @@ -124,22 +124,18 @@ }, { "refId": "C", - "target": "alias(perSecond(metrictank.stats.$environment.$instance.tank.add_to_saved_chunk.counter32), 'add-to-saved')" + "target": "alias(perSecond(metrictank.stats.$environment.$instance.tank.add_to_closed_chunk.counter32), 'add-to-closed')" }, { "refId": "D", - "target": "alias(perSecond(metrictank.stats.$environment.$instance.tank.add_to_saving_chunk.counter32), 'add-to-saving')" - }, - { - "refId": "E", "target": "aliasSub(perSecond(metrictank.stats.$environment.$instance.input.*.metrics_decode_err.counter32), '.*\\.([^\\.]+)\\.metrics_decode_err.*', '\\1 decode err')" }, { - "refId": "F", + "refId": "E", "target": "aliasSub(perSecond(metrictank.stats.$environment.$instance.input.*.metric_invalid.counter32), '.*\\.([^\\.]+)\\.metric_invalid.*', '\\1 metric invalid')" }, { - "refId": "G", + "refId": "F", "target": "aliasByNode(scale(perSecond(metrictank.stats.$environment.$instance.input.*.pressure.*.counter32), 0.000000001), 6, 7)" } ], diff --git a/docs/metrics.md b/docs/metrics.md index 278b679779..70eab5ffb6 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -144,15 +144,11 @@ the duration of a put in the wait queue how many rows come per get response * `store.cassandra.to_iter`: the duration of converting chunks to iterators -* `tank.add_to_saved_chunk`: -points received - by a secondary node - for the most recent chunk when that chunk -has already been saved by a primary. A secondary can add this data to its chunks. -* `tank.add_to_saving_chunk`: -points received - by the primary node - for the most recent chunk -when that chunk is already being saved (or has been saved). -this indicates that your GC is actively sealing chunks and saving them before you have the chance to send -your (infrequent) updates. The primary won't add them to its in-memory chunks, but secondaries will -(because they are never in "saving" state for them), see below. +* `tank.add_to_closed_chunk`: +points received for the most recent chunk when that chunk is already being "closed", +ie the end-of-stream marker has been written to the chunk. +This indicates that your GC is actively sealing chunks and saving them before you have the chance to send +your (infrequent) updates. Any points revcieved for a chunk that has already been closed are discarded. * `tank.chunk_operations.clear`: a counter of how many chunks are cleared (replaced by new chunks) * `tank.chunk_operations.create`: diff --git a/mdata/aggmetric.go b/mdata/aggmetric.go index 46f7b75e73..9b412655b5 100644 --- a/mdata/aggmetric.go +++ b/mdata/aggmetric.go @@ -30,6 +30,9 @@ type AggMetric struct { aggregators []*Aggregator firstChunkT0 uint32 ttl uint32 + lastSaveStart uint32 // last chunk T0 that was added to the write Queue. + lastSaveFinish uint32 // last chunk T0 successfully written to Cassandra. + lastWrite uint32 } // NewAggMetric creates a metric with given key, it retains the given number of chunks each chunkSpan seconds long @@ -42,6 +45,7 @@ func NewAggMetric(store Store, key string, chunkSpan, numChunks uint32, ttl uint NumChunks: numChunks, Chunks: make([]*chunk.Chunk, 0, numChunks), ttl: ttl, + lastWrite: uint32(time.Now().Unix()), } for _, as := range aggsetting { m.aggregators = append(m.aggregators, NewAggregator(store, key, as.Span, as.ChunkSpan, as.NumChunks, as.Ttl)) @@ -54,99 +58,15 @@ func NewAggMetric(store Store, key string, chunkSpan, numChunks uint32, ttl uint func (a *AggMetric) SyncChunkSaveState(ts uint32) { a.Lock() defer a.Unlock() - chunk := a.getChunkByT0(ts) - if chunk != nil { - if LogLevel < 2 { - log.Debug("AM marking chunk %s:%d as saved.", a.Key, chunk.T0) - } - chunk.Saved = true - } -} - -/* Get a chunk by its T0. It is expected that the caller has acquired a.Lock()*/ -func (a *AggMetric) getChunkByT0(ts uint32) *chunk.Chunk { - // we have no chunks. - if len(a.Chunks) == 0 { - return nil - } - - currentT0 := a.Chunks[a.CurrentChunkPos].T0 - - if ts == currentT0 { - //found our chunk. - return a.Chunks[a.CurrentChunkPos] - } - - // requested Chunk is not in our dataset. - if ts > currentT0 { - return nil - } - - // requested Chunk is not in our dataset. - if len(a.Chunks) == 1 { - return nil - } - - // calculate the number of chunks ago our requested T0 is, - // assuming that chunks are sequential. - chunksAgo := int((currentT0 - ts) / a.ChunkSpan) - - numChunks := len(a.Chunks) - oldestPos := a.CurrentChunkPos + 1 - if oldestPos >= numChunks { - oldestPos = 0 + if ts > a.lastSaveFinish { + a.lastSaveFinish = ts } - - var guess int - - if chunksAgo >= (numChunks - 1) { - // set guess to the oldest chunk. - guess = oldestPos - } else { - guess = a.CurrentChunkPos - chunksAgo - if guess < 0 { - guess += numChunks - } + if ts > a.lastSaveStart { + a.lastSaveStart = ts } - - // we now have a good guess at which chunk position our requested TO is in. - c := a.Chunks[guess] - - if c.T0 == ts { - // found our chunk. - return c - } - - if ts > c.T0 { - // we need to check newer chunks - for c.T0 < currentT0 { - guess += 1 - if guess >= numChunks { - guess = 0 - } - c = a.Chunks[guess] - if c.T0 == ts { - //found our chunk - return c - } - } - } else { - // we need to check older chunks - oldestT0 := a.Chunks[oldestPos].T0 - for c.T0 >= oldestT0 && c.T0 < currentT0 { - guess -= 1 - if guess < 0 { - guess += numChunks - } - c = a.Chunks[guess] - if c.T0 == ts { - //found or chunk. - return c - } - } + if LogLevel < 2 { + log.Debug("AM metric %s at chunk T0=%d has been saved.", a.Key, ts) } - // chunk not found. - return nil } func (a *AggMetric) getChunk(pos int) *chunk.Chunk { @@ -332,18 +252,12 @@ func (a *AggMetric) addAggregators(ts uint32, val float64) { // write a chunk to persistent storage. This should only be called while holding a.Lock() func (a *AggMetric) persist(pos int) { - if !cluster.ThisNode.IsPrimary() { - if LogLevel < 2 { - log.Debug("AM persist(): node is not primary, not saving chunk.") - } - return - } - - pre := time.Now() chunk := a.Chunks[pos] + pre := time.Now() - if chunk.Saved || chunk.Saving { - // this can happen if chunk was persisted by GC (stale) and then new data triggered another persist call + if a.lastSaveStart >= chunk.T0 { + // this can happen if there are 2 primary MT nodes both saving chunks to Cassandra or + // a primary failed and this node was promoted to be primary but metric consuming is lagging. log.Debug("AM persist(): duplicate persist call for chunk.") return } @@ -352,11 +266,12 @@ func (a *AggMetric) persist(pos int) { pending := make([]*ChunkWriteRequest, 1) // add the current chunk to the list of chunks to send to the writeQueue pending[0] = &ChunkWriteRequest{ + metric: a, key: a.Key, - chunk: chunk, + span: a.ChunkSpan, ttl: a.ttl, + chunk: chunk, timestamp: time.Now(), - span: a.ChunkSpan, } // if we recently became the primary, there may be older chunks @@ -367,16 +282,17 @@ func (a *AggMetric) persist(pos int) { previousPos += len(a.Chunks) } previousChunk := a.Chunks[previousPos] - for (previousChunk.T0 < chunk.T0) && !previousChunk.Saved && !previousChunk.Saving { + for (previousChunk.T0 < chunk.T0) && (a.lastSaveStart < previousChunk.T0) { if LogLevel < 2 { log.Debug("AM persist(): old chunk needs saving. Adding %s:%d to writeQueue", a.Key, previousChunk.T0) } pending = append(pending, &ChunkWriteRequest{ + metric: a, key: a.Key, - chunk: previousChunk, + span: a.ChunkSpan, ttl: a.ttl, + chunk: previousChunk, timestamp: time.Now(), - span: a.ChunkSpan, }) previousPos-- if previousPos < 0 { @@ -385,6 +301,9 @@ func (a *AggMetric) persist(pos int) { previousChunk = a.Chunks[previousPos] } + // Every chunk with a T0 <= this chunks' T0 is now either saved, or in the writeQueue. + a.lastSaveStart = chunk.T0 + if LogLevel < 2 { log.Debug("AM persist(): sending %d chunks to write queue", len(pending)) } @@ -402,9 +321,8 @@ func (a *AggMetric) persist(pos int) { if LogLevel < 2 { log.Debug("AM persist(): sealing chunk %d/%d (%s:%d) and adding to write queue.", pendingChunk, len(pending), a.Key, chunk.T0) } - pending[pendingChunk].chunk.Finish() a.store.Add(pending[pendingChunk]) - pending[pendingChunk].chunk.Saving = true + pendingChunk-- } persistDuration.Value(time.Now().Sub(pre)) @@ -432,6 +350,7 @@ func (a *AggMetric) Add(ts uint32, val float64) { } log.Debug("AM %s Add(): created first chunk with first point: %v", a.Key, a.Chunks[0]) + a.lastWrite = uint32(time.Now().Unix()) a.addAggregators(ts, val) return } @@ -440,34 +359,40 @@ func (a *AggMetric) Add(ts uint32, val float64) { if t0 == currentChunk.T0 { // last prior data was in same chunk as new point - if currentChunk.Saving { - // if we're already saving the chunk, it means it has the end-of-stream marker and any new points behind it wouldn't be read by an iterator + if currentChunk.Closed { + // if we've already 'finished' the chunk, it means it has the end-of-stream marker and any new points behind it wouldn't be read by an iterator // you should monitor this metric closely, it indicates that maybe your GC settings don't match how you actually send data (too late) - addToSavingChunk.Inc() + addToClosedChunk.Inc() return } - if err := currentChunk.Push(ts, val); err == nil { - if currentChunk.Saved { - // if we're here, it means we marked it as Saved because it was saved by an other primary, not by us since Saving is false. - // typically this happens when non-primaries receive metrics that the primary already saved (maybe cause their metrics consumer is laggy) - // we allow adding data to such chunks in that case, though this open the possibility for data to be rejected by the primary, to be - // visible on secondaries. - addToSavedChunk.Inc() - } - } else { + if err := currentChunk.Push(ts, val); err != nil { log.Debug("AM failed to add metric to chunk for %s. %s", a.Key, err) metricsTooOld.Inc() return } + a.lastWrite = uint32(time.Now().Unix()) log.Debug("AM %s Add(): pushed new value to last chunk: %v", a.Key, a.Chunks[0]) } else if t0 < currentChunk.T0 { log.Debug("AM Point at %d has t0 %d, goes back into previous chunk. CurrentChunk t0: %d, LastTs: %d", ts, t0, currentChunk.T0, currentChunk.LastTs) metricsTooOld.Inc() return } else { - // persist the chunk. If the writeQueue is full, then this will block. - a.persist(a.CurrentChunkPos) + // Data belongs in a new chunk. + + // If it isnt finished already, add the end-of-stream marker and flag the chunk as "closed" + if !currentChunk.Closed { + currentChunk.Finish() + } + + // If we are a primary node, then add the chunk to the write queue to be saved to Cassandra + if cluster.ThisNode.IsPrimary() { + if LogLevel < 2 { + log.Debug("AM persist(): node is primary, saving chunk.") + } + // persist the chunk. If the writeQueue is full, then this will block. + a.persist(a.CurrentChunkPos) + } a.CurrentChunkPos++ if a.CurrentChunkPos >= int(a.NumChunks) { @@ -490,6 +415,7 @@ func (a *AggMetric) Add(ts uint32, val float64) { } log.Debug("AM %s Add(): cleared chunk at %d of %d and replaced with new. and added the new point: %s", a.Key, a.CurrentChunkPos, len(a.Chunks), a.Chunks[a.CurrentChunkPos]) } + a.lastWrite = uint32(time.Now().Unix()) } a.addAggregators(ts, val) @@ -503,13 +429,13 @@ func (a *AggMetric) GC(chunkMinTs, metricMinTs uint32) bool { return false } - if currentChunk.LastWrite < chunkMinTs { - if currentChunk.Saved { + if a.lastWrite < chunkMinTs { + if a.lastSaveStart >= currentChunk.T0 { // already saved. lets check if we should just delete the metric from memory. - if currentChunk.LastWrite < metricMinTs { + if a.lastWrite < metricMinTs { return true } - } else if !currentChunk.Saving { + } else if a.lastSaveStart < currentChunk.T0 { // chunk hasn't been written to in a while, and is not yet queued to persist. Let's persist it log.Debug("Found stale Chunk, persisting it to Cassandra. key: %s T0: %d", a.Key, currentChunk.T0) a.persist(a.CurrentChunkPos) diff --git a/mdata/chunk/chunk.go b/mdata/chunk/chunk.go index 19130619f2..fd53f3801c 100644 --- a/mdata/chunk/chunk.go +++ b/mdata/chunk/chunk.go @@ -2,7 +2,6 @@ package chunk import ( "fmt" - "time" "github.com/dgryski/go-tsz" "github.com/raintank/metrictank/stats" @@ -16,19 +15,20 @@ type Chunk struct { tsz.Series LastTs uint32 // last TS seen, not computed or anything NumPoints uint32 - Saved bool - Saving bool - LastWrite uint32 + Closed bool } func New(t0 uint32) *Chunk { - // we must set LastWrite here as well to make sure a new Chunk doesn't get immediately - // garbage collected right after creating it, before we can push to it - return &Chunk{*tsz.New(t0), 0, 0, false, false, uint32(time.Now().Unix())} + return &Chunk{ + Series: *tsz.New(t0), + LastTs: 0, + NumPoints: 0, + Closed: false, + } } func (c *Chunk) String() string { - return fmt.Sprintf("", c.T0, c.LastTs, c.NumPoints, c.Saved) + return fmt.Sprintf("", c.T0, c.LastTs, c.NumPoints, c.Closed) } func (c *Chunk) Push(t uint32, v float64) error { @@ -38,7 +38,6 @@ func (c *Chunk) Push(t uint32, v float64) error { c.Series.Push(t, v) c.NumPoints += 1 c.LastTs = t - c.LastWrite = uint32(time.Now().Unix()) totalPoints.Inc() return nil } @@ -46,3 +45,8 @@ func (c *Chunk) Push(t uint32, v float64) error { func (c *Chunk) Clear() { totalPoints.DecUint64(uint64(c.NumPoints)) } + +func (c *Chunk) Finish() { + c.Closed = true + c.Series.Finish() +} diff --git a/mdata/cwr.go b/mdata/cwr.go index 27b2c98aec..a8d0843034 100644 --- a/mdata/cwr.go +++ b/mdata/cwr.go @@ -16,6 +16,7 @@ type ChunkReadRequest struct { } type ChunkWriteRequest struct { + metric *AggMetric key string chunk *chunk.Chunk ttl uint32 diff --git a/mdata/init.go b/mdata/init.go index b1354349f0..9746c3aded 100644 --- a/mdata/init.go +++ b/mdata/init.go @@ -19,16 +19,11 @@ var ( // that is not higher than the timestamp of the last written timestamp for that series. metricsTooOld = stats.NewCounter32("tank.metrics_too_old") - // metric tank.add_to_saving_chunk is points received - by the primary node - for the most recent chunk - // when that chunk is already being saved (or has been saved). + // metric tank.add_to_closed_chunk is points received for the most recent chunk + // when that chunk is already being "closed", ie the end-of-stream marker has been written to the chunk. // this indicates that your GC is actively sealing chunks and saving them before you have the chance to send - // your (infrequent) updates. The primary won't add them to its in-memory chunks, but secondaries will - // (because they are never in "saving" state for them), see below. - addToSavingChunk = stats.NewCounter32("tank.add_to_saving_chunk") - - // metric tank.add_to_saved_chunk is points received - by a secondary node - for the most recent chunk when that chunk - // has already been saved by a primary. A secondary can add this data to its chunks. - addToSavedChunk = stats.NewCounter32("tank.add_to_saved_chunk") + // your (infrequent) updates. Any points revcieved for a chunk that has already been closed are discarded. + addToClosedChunk = stats.NewCounter32("tank.add_to_closed_chunk") // metric mem.to_iter is how long it takes to transform in-memory chunks to iterators memToIterDuration = stats.NewLatencyHistogram15s32("mem.to_iter") diff --git a/mdata/store_cassandra.go b/mdata/store_cassandra.go index d998dad352..1ecd707a7b 100644 --- a/mdata/store_cassandra.go +++ b/mdata/store_cassandra.go @@ -211,7 +211,7 @@ func (c *cassandraStore) processWriteQueue(queue chan *ChunkWriteRequest, meter err := c.insertChunk(cwr.key, cwr.chunk.T0, buf.Bytes(), int(cwr.ttl)) if err == nil { success = true - cwr.chunk.Saved = true + cwr.metric.SyncChunkSaveState(cwr.chunk.T0) SendPersistMessage(cwr.key, cwr.chunk.T0) log.Debug("CS: save complete. %s:%d %v", cwr.key, cwr.chunk.T0, cwr.chunk) chunkSaveOk.Inc()