Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
minor fixes
Browse files Browse the repository at this point in the history
- improve comments and log messages
- GC task should "finish" stale chunks and persist them only if the
  node is a primary
  • Loading branch information
woodsaj committed Jan 20, 2017
1 parent 788aab6 commit 9486792
Showing 1 changed file with 22 additions and 9 deletions.
31 changes: 22 additions & 9 deletions mdata/aggmetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ func NewAggMetric(store Store, key string, chunkSpan, numChunks uint32, ttl uint
NumChunks: numChunks,
Chunks: make([]*chunk.Chunk, 0, numChunks),
ttl: ttl,
// we set LastWrite here to make sure a new Chunk doesn't get immediately
// garbage collected right after creating it, before we can push to it.
lastWrite: uint32(time.Now().Unix()),
}
for _, as := range aggsetting {
Expand Down Expand Up @@ -256,8 +258,10 @@ func (a *AggMetric) persist(pos int) {
pre := time.Now()

if a.lastSaveStart >= chunk.T0 {
// this can happen if there are 2 primary MT nodes both saving chunks to Cassandra or
// a primary failed and this node was promoted to be primary but metric consuming is lagging.
// this can happen if
// a) there are 2 primary MT nodes both saving chunks to Cassandra
// b) a primary failed and this node was promoted to be primary but metric consuming is lagging.
// c) chunk was persisted by GC (stale) and then new data triggered another persist call
log.Debug("AM persist(): duplicate persist call for chunk.")
return
}
Expand Down Expand Up @@ -388,7 +392,7 @@ func (a *AggMetric) Add(ts uint32, val float64) {
// If we are a primary node, then add the chunk to the write queue to be saved to Cassandra
if cluster.Manager.IsPrimary() {
if LogLevel < 2 {
log.Debug("AM persist(): node is primary, saving chunk.")
log.Debug("AM persist(): node is primary, saving chunk. %s T0: %d", a.Key, currentChunk.T0)
}
// persist the chunk. If the writeQueue is full, then this will block.
a.persist(a.CurrentChunkPos)
Expand Down Expand Up @@ -430,15 +434,24 @@ func (a *AggMetric) GC(chunkMinTs, metricMinTs uint32) bool {
}

if a.lastWrite < chunkMinTs {
if a.lastSaveStart >= currentChunk.T0 {
// already saved. lets check if we should just delete the metric from memory.
if currentChunk.Closed {
// already closed and should be saved, though we cant guarantee that.
// Check if we should just delete the metric from memory.
if a.lastWrite < metricMinTs {
return true
}
} else if a.lastSaveStart < currentChunk.T0 {
// chunk hasn't been written to in a while, and is not yet queued to persist. Let's persist it
log.Debug("Found stale Chunk, persisting it to Cassandra. key: %s T0: %d", a.Key, currentChunk.T0)
a.persist(a.CurrentChunkPos)
} else {
// chunk hasn't been written to in a while, and is not yet closed. Let's close it and persist it if
// we are a primary
log.Debug("Found stale Chunk, adding end-of-stream bytes. key: %s T0: %d", a.Key, currentChunk.T0)
currentChunk.Finish()
if cluster.Manager.IsPrimary() {
if LogLevel < 2 {
log.Debug("AM persist(): node is primary, saving chunk. %s T0: %d", a.Key, currentChunk.T0)
}
// persist the chunk. If the writeQueue is full, then this will block.
a.persist(a.CurrentChunkPos)
}
}
}
return false
Expand Down

0 comments on commit 9486792

Please sign in to comment.