From 948679278abade2645538d9ceaa91239572151c7 Mon Sep 17 00:00:00 2001 From: woodsaj Date: Fri, 20 Jan 2017 00:03:48 +0800 Subject: [PATCH] minor fixes - improve comments and log messages - GC task should "finish" stale chunks and persist them only if the node is a primary --- mdata/aggmetric.go | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/mdata/aggmetric.go b/mdata/aggmetric.go index ced5d6d151..390a9e2cb3 100644 --- a/mdata/aggmetric.go +++ b/mdata/aggmetric.go @@ -45,6 +45,8 @@ func NewAggMetric(store Store, key string, chunkSpan, numChunks uint32, ttl uint NumChunks: numChunks, Chunks: make([]*chunk.Chunk, 0, numChunks), ttl: ttl, + // we set LastWrite here to make sure a new Chunk doesn't get immediately + // garbage collected right after creating it, before we can push to it. lastWrite: uint32(time.Now().Unix()), } for _, as := range aggsetting { @@ -256,8 +258,10 @@ func (a *AggMetric) persist(pos int) { pre := time.Now() if a.lastSaveStart >= chunk.T0 { - // this can happen if there are 2 primary MT nodes both saving chunks to Cassandra or - // a primary failed and this node was promoted to be primary but metric consuming is lagging. + // this can happen if + // a) there are 2 primary MT nodes both saving chunks to Cassandra + // b) a primary failed and this node was promoted to be primary but metric consuming is lagging. + // c) chunk was persisted by GC (stale) and then new data triggered another persist call log.Debug("AM persist(): duplicate persist call for chunk.") return } @@ -388,7 +392,7 @@ func (a *AggMetric) Add(ts uint32, val float64) { // If we are a primary node, then add the chunk to the write queue to be saved to Cassandra if cluster.Manager.IsPrimary() { if LogLevel < 2 { - log.Debug("AM persist(): node is primary, saving chunk.") + log.Debug("AM persist(): node is primary, saving chunk. %s T0: %d", a.Key, currentChunk.T0) } // persist the chunk. If the writeQueue is full, then this will block. a.persist(a.CurrentChunkPos) @@ -430,15 +434,24 @@ func (a *AggMetric) GC(chunkMinTs, metricMinTs uint32) bool { } if a.lastWrite < chunkMinTs { - if a.lastSaveStart >= currentChunk.T0 { - // already saved. lets check if we should just delete the metric from memory. + if currentChunk.Closed { + // already closed and should be saved, though we cant guarantee that. + // Check if we should just delete the metric from memory. if a.lastWrite < metricMinTs { return true } - } else if a.lastSaveStart < currentChunk.T0 { - // chunk hasn't been written to in a while, and is not yet queued to persist. Let's persist it - log.Debug("Found stale Chunk, persisting it to Cassandra. key: %s T0: %d", a.Key, currentChunk.T0) - a.persist(a.CurrentChunkPos) + } else { + // chunk hasn't been written to in a while, and is not yet closed. Let's close it and persist it if + // we are a primary + log.Debug("Found stale Chunk, adding end-of-stream bytes. key: %s T0: %d", a.Key, currentChunk.T0) + currentChunk.Finish() + if cluster.Manager.IsPrimary() { + if LogLevel < 2 { + log.Debug("AM persist(): node is primary, saving chunk. %s T0: %d", a.Key, currentChunk.T0) + } + // persist the chunk. If the writeQueue is full, then this will block. + a.persist(a.CurrentChunkPos) + } } } return false