Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

store save state as a property of the aggemtric instead of chunk #477

Merged
merged 4 commits into from
Jan 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions dashboard.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
{
"aliasColors": {
"ES backpressure": "#052B51",
"add-to-saving": "#C15C17",
"add-to-closed": "#C15C17",
"primary": "#2F575E",
"store backpressure": "#962D82",
"too old": "#890F02"
Expand Down Expand Up @@ -124,22 +124,18 @@
},
{
"refId": "C",
"target": "alias(perSecond(metrictank.stats.$environment.$instance.tank.add_to_saved_chunk.counter32), 'add-to-saved')"
"target": "alias(perSecond(metrictank.stats.$environment.$instance.tank.add_to_closed_chunk.counter32), 'add-to-closed')"
},
{
"refId": "D",
"target": "alias(perSecond(metrictank.stats.$environment.$instance.tank.add_to_saving_chunk.counter32), 'add-to-saving')"
},
{
"refId": "E",
"target": "aliasSub(perSecond(metrictank.stats.$environment.$instance.input.*.metrics_decode_err.counter32), '.*\\.([^\\.]+)\\.metrics_decode_err.*', '\\1 decode err')"
},
{
"refId": "F",
"refId": "E",
"target": "aliasSub(perSecond(metrictank.stats.$environment.$instance.input.*.metric_invalid.counter32), '.*\\.([^\\.]+)\\.metric_invalid.*', '\\1 metric invalid')"
},
{
"refId": "G",
"refId": "F",
"target": "aliasByNode(scale(perSecond(metrictank.stats.$environment.$instance.input.*.pressure.*.counter32), 0.000000001), 6, 7)"
}
],
Expand Down
14 changes: 5 additions & 9 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,11 @@ the duration of a put in the wait queue
how many rows come per get response
* `store.cassandra.to_iter`:
the duration of converting chunks to iterators
* `tank.add_to_saved_chunk`:
points received - by a secondary node - for the most recent chunk when that chunk
has already been saved by a primary. A secondary can add this data to its chunks.
* `tank.add_to_saving_chunk`:
points received - by the primary node - for the most recent chunk
when that chunk is already being saved (or has been saved).
this indicates that your GC is actively sealing chunks and saving them before you have the chance to send
your (infrequent) updates. The primary won't add them to its in-memory chunks, but secondaries will
(because they are never in "saving" state for them), see below.
* `tank.add_to_closed_chunk`:
points received for the most recent chunk when that chunk is already being "closed",
ie the end-of-stream marker has been written to the chunk.
This indicates that your GC is actively sealing chunks and saving them before you have the chance to send
your (infrequent) updates. Any points revcieved for a chunk that has already been closed are discarded.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so add_to_closed is the equivalent of add_to_saving.
However, since SyncChunkSaveState doesn't close any chunks or marks chunks as closed, we no longer have a way to notice when we're adding data to a chunk when that chunk has already been saved by a primary, even when we already received a persist notification for it. I.e. there is no equivalent of add_to_saved.

IOW: Add() will add points to chunks that are already saved by the primary, since it only looks at Closed (which it only sets itself if its primary), it does not look at lastSaveFinish. it'll be as if we're adding data regularly but actually something weird is going on (add data to chunks that potentially didn't make it into the chunk in cassandra)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only time a chunk is closed if we receive a point for the next chunk, or it is stale and the GC task closes it.

A chunk being saved by another node should not have any impact on chunks on this node. Both nodes should be receiving the same data and both nodes are bound by the same conditions leading to a chunk being closed.

If nodeA is lagging behind, it should not be prevented from adding points to a chunk just because nodeB has saved the chunk

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not saying it should be prevented. I'm saying I liked the previous behavior (which was adding the points but increasing a metric that signals that this is going on)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But why do i care? I dont see there being any value to having this metric. The metric would be incrementing under normal operation, especially as the ingest rate goes up and there is a lag of a few seconds. (nodeA could be processing data from partitions 0 then 1,2,3,4 etc.. while nodeB could be processes 7 then 6,5,4,etc...)

(add data to chunks that potentially didn't make it into the chunk in cassandra)

this is not possible as all nodes are bound by the same rules for closing a chunk.

Copy link
Contributor

@Dieterbe Dieterbe Jan 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible. consider this:

  • we're adding points t1, t2, t3, t4, t5 (all have the same t0 i.e. go into the same chunkspan)

  • there's a long time between t2 and t3, let's say it's a time just under chunk-max-stale.

  • A is primary, B is secondary.

  • A consumes t1, t2, and then experiences some lagging. t3 is in kafka but A doesn't consume it quite yet

  • A's GC notices that last write was before chunkMinTs, closes the chunk and persists it.

  • A then consumes the points t3 t4 and t5 but rejects them because chunk in closed. This is worth instrumenting (it indicates that maybe you should check out your GC parameters). it does increment add_to_closed_chunk so you can see it on your dashboard when looking at the instance's stats.

  • B on the other hand, does not experience the lagging.

  • it consumes t1, t2 but does not experience the same lag as A. it consumes t3.

  • B's GC sees a last write more recent than chunkMinTs, so doesn't close the chunk.

  • B receives notification from A and updates lastSaveStart and lastSaveFinish accordingly, but does not close any chunk.

  • B consumes t4 and t5, and can push them to the chunk, as it is not closed and we don't look at the lastSave* timestamps.

=> B now added t3, t4 and t5 to its chunk, but we know A did not (we don't know for t3 but we do know for t4 and t5)

there's two problems:

  • when loading your charts, datasource B will show more points than datasource A (when loading from memory)
  • datasource B will show less points once it needs to retrieve the chunk from cassandra

as B could foresee these problems happening (e.g. if it would check lastSave* when adding new points, just like the old code checked currentChunk.Saved when adding new points), it could inform the operator by incrementing this stat.

note that B can never detect all cases of "this point has not been added by primary" (e.g. t3 in the above example) since it needs to wait for a notification and only see the problem with points it receives after processing the notification, so while the stat can't guarantee to account for all these cases, it will be a good indicator for cases where we can be sure it happened.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is such an obscure edge case that it would be crazy to complicate the code just to be able to observe. Even then you wouldnt know when it was happening as the add_to_saving metric would be incrementing for many other legitimate reasons that are not a problem.

The correct way to identified your unlikely scenario is to just compare the add_to_closed_chunk count on both nodes, they should always be the same.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as the add_to_saving metric would be incrementing for many other legitimate reasons that are not a problem.

like what?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like what?

#477 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the record:

dieter [9:06 AM] 
i don't understand the scenario described in https://github.com/raintank/metrictank/pull/477#discussion_r96889476  is A or B a primary?
GitHub
store save state as a property of the aggemtric instead of chunk by woodsaj · Pull Request #477 · raintank/metrictank · GitHub
Rather then tracking the state of each individual chunk, just keep a record of the most recent saveStart(add to write queue)/saveFinish(write to cassandra) as properties of the aggMetric. When we ...
 

awoods [9:07 AM] 
it doesnt matter which is primary.

[9:08]  
The point is 2 nodes wont necessarily process metrics in the same order.

dieter [9:09 AM] 
but for any given metric key, they would be processed in-order though? except when a metric changes its partition assignment

awoods [9:10 AM] 
correct.

dieter [9:10 AM] 
so you're talking about ordering with respect to different metric keys? if so, that shouldn't matter here, since we're talking about per-metric-key things

awoods [9:18 AM] 
it does matter.  THough the points for a key are ordered, the keys are not ordered.  So nodeA could process all of the points for metricX then move on to processing all of the points for metricY.  While nodeB processes all of points for metricY then processes metricX

[9:20]  
If nodeA reads a batch of metrics from a partition, but then a kafka node crashes before nodeB can read the batch, nodeB will have to wait for kafka to handle the failure before it can read the metrics.  This could easily take more then a few seconds.

awoods [9:25 AM] 
I understand your desire to know when two nodes have different data in their chunks. but
a) it is super unlikely and will self heal once the chunks move out of memory as there will only be one chunk, the one in cassandra
b) an add_to_saved_chunk metric only tells you that nodes might be out of sync.  Which is pretty useless as it is not actionable.
c) comparing the add_to_closed_chunk metrics of 2 nodes will tell you if the nodes are out of sync.  As the count should be the same on both nodes.

dieter [9:34 AM] 
i don't understand why different keys processed in different orders matters here. since the scenario we're talking about is with respect to specific chunks for specific metrics, it's a per-metric-phenomenon

awoods [9:35 AM] 
It is a timing problem.  There is not gaurantee that nodeA will process metricX at the same time as nodeB.  The nodes could be out of sync by a few seconds or more, and that is ok.

dieter [9:38 AM] 
ok so a timing offset problem between different instances with respect to a given metric.  you said that that would increase the proposed counter under normal operation (e.g. not in the situation i described). how so?

awoods [9:44 AM] 
NodeA reads a batch of metrics from partition p1.   The batch contains 2 points for metricX.  One point for chunk with T0=10, and one for T0=20.  So chunk T0=10 is saved and the metric perisist message is sent.
NodeB is delayed from reading from the p1 partition for some reason (kafka node failure or new node being added).
NodeB receives the metricPersist message for metricX T0=10
NodeB reads batch of metrics from partition p1, processes the first point for metricX even though the chunk has already been saved by NodeA

[9:49]  
Typically a batch of metrics from Kafka would only contain 1 point for each metric, unless
1) the metrics is being sent with 1second resolution
2) there was a network issue that caused metrics to be buffered somewhere and they were then flushed all at once.

2) is really likely, where metrics could be buffered by the client (cant talk to carbon-relay).  Or buffered by carbon-relay (cant talk to tsdb-gw)

dieter [10:03 AM] 
aha, so another way to look at it is, let's say we do have SyncSaveState close the chunks that the primary closed, in the event B gets points for a chunk that are already closed, there's no way to tell if those points *didn't*  make ito the chunk saved by the primary (the problematic scenario i brought up), versus if the points *did*  make it into the chunk saved by the primary (the non-problematic scenario you brought up)

[10:04]  
we could tell the difference if the persist messages would also contain the ts of the last point that went into the chunk (i'm not saying we should do that, but just a thought)

[10:06]  
ok so let's leave it how it is without this particular metric. sounds like the metric we used to have was flawed as well

[10:08]  
I think the key insight i gathered through these thinking exercises and discussions is that secondaries can't be a reliable indicator of these sorts of problems. instead we should always just rely on the `add_to_closed` of the primary node, which will be an accurate indicator of points not making it into chunks being saved to cassandra

awoods [10:10 AM] 
great. So LGTM?

* `tank.chunk_operations.clear`:
a counter of how many chunks are cleared (replaced by new chunks)
* `tank.chunk_operations.create`:
Expand Down
197 changes: 68 additions & 129 deletions mdata/aggmetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type AggMetric struct {
aggregators []*Aggregator
firstChunkT0 uint32
ttl uint32
lastSaveStart uint32 // last chunk T0 that was added to the write Queue.
lastSaveFinish uint32 // last chunk T0 successfully written to Cassandra.
lastWrite uint32
}

// NewAggMetric creates a metric with given key, it retains the given number of chunks each chunkSpan seconds long
Expand All @@ -42,6 +45,9 @@ func NewAggMetric(store Store, key string, chunkSpan, numChunks uint32, ttl uint
NumChunks: numChunks,
Chunks: make([]*chunk.Chunk, 0, numChunks),
ttl: ttl,
// we set LastWrite here to make sure a new Chunk doesn't get immediately
// garbage collected right after creating it, before we can push to it.
lastWrite: uint32(time.Now().Unix()),
}
for _, as := range aggsetting {
m.aggregators = append(m.aggregators, NewAggregator(store, key, as.Span, as.ChunkSpan, as.NumChunks, as.Ttl))
Expand All @@ -54,99 +60,15 @@ func NewAggMetric(store Store, key string, chunkSpan, numChunks uint32, ttl uint
func (a *AggMetric) SyncChunkSaveState(ts uint32) {
a.Lock()
defer a.Unlock()
chunk := a.getChunkByT0(ts)
if chunk != nil {
if LogLevel < 2 {
log.Debug("AM marking chunk %s:%d as saved.", a.Key, chunk.T0)
}
chunk.Saved = true
}
}

/* Get a chunk by its T0. It is expected that the caller has acquired a.Lock()*/
func (a *AggMetric) getChunkByT0(ts uint32) *chunk.Chunk {
// we have no chunks.
if len(a.Chunks) == 0 {
return nil
}

currentT0 := a.Chunks[a.CurrentChunkPos].T0

if ts == currentT0 {
//found our chunk.
return a.Chunks[a.CurrentChunkPos]
}

// requested Chunk is not in our dataset.
if ts > currentT0 {
return nil
if ts > a.lastSaveFinish {
a.lastSaveFinish = ts
}

// requested Chunk is not in our dataset.
if len(a.Chunks) == 1 {
return nil
if ts > a.lastSaveStart {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can see the only call site of SyncChunkSaveState() is in cassandraStore.processWriteQueue(). When a chunk is pushed into the write queue via AggMetric.persist() the AggMetric.lastSaveStart is already updated in persist, there also can't be a race because AggMetric.SyncChunkSaveState() and AggMetric.persist() are both surrounded by locks, so is there really any possible case where this condition is true? It seems like that's not possible unless there is another way than AggMetric.persist() to push chunks into the write queue, but I can't see one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taking that a bit further. the CassandraStore guarantees that on a per aggmetric-level (actually per ChunkWriteRequest but CWR's keys is simply the key of the aggmetric), all chunk writes are in order, so the ts will be in order.

also I believe that persist has an order guarantee: the chunks it persists are in ascending time order, both within 1 call, and across multiple calls.

I like the SaveStart and SaveFinish terminology, and i think we should apply it broader. so instead of SyncChunkSaveState, how about this:

// saveStart lets us mark up to which chunk t0 we've triggered persistence operations. assumes a is locked
func (a *AggMetric) saveStart(ts uint32) {
    a.lastSaveStart = ts
}
// SaveFinish lets external store packages mark up to which chunk t0 has been saved
func (a *AggMetric) SaveFinisht(ts uint32) {
	a.Lock()
	defer a.Unlock()
        a.lastSaveStart = ts
}

the former would be called by persist, the latter by cassandraStore.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SyncChunkSaveState() is also called when metricPerist messages are received.
https://github.com/raintank/metrictank/blob/chunkSaveRefactor/mdata/notifier.go#L87

Copy link
Contributor

@Dieterbe Dieterbe Jan 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah no SyncChunkSaveState is also called by the notifiers, consuming the persistence stream. in case of kafka, ordered. for nsq, unordered. there could be multiple primaries though (not a great setup, but can happen), saving chunks concurrently and potentially even causing out of order messages (with respect to different instances. e.g. an instance could be lagging behind). so the ts check would still be needed.

a.lastSaveStart = ts
}

// calculate the number of chunks ago our requested T0 is,
// assuming that chunks are sequential.
chunksAgo := int((currentT0 - ts) / a.ChunkSpan)

numChunks := len(a.Chunks)
oldestPos := a.CurrentChunkPos + 1
if oldestPos >= numChunks {
oldestPos = 0
}

var guess int

if chunksAgo >= (numChunks - 1) {
// set guess to the oldest chunk.
guess = oldestPos
} else {
guess = a.CurrentChunkPos - chunksAgo
if guess < 0 {
guess += numChunks
}
}

// we now have a good guess at which chunk position our requested TO is in.
c := a.Chunks[guess]

if c.T0 == ts {
// found our chunk.
return c
}

if ts > c.T0 {
// we need to check newer chunks
for c.T0 < currentT0 {
guess += 1
if guess >= numChunks {
guess = 0
}
c = a.Chunks[guess]
if c.T0 == ts {
//found our chunk
return c
}
}
} else {
// we need to check older chunks
oldestT0 := a.Chunks[oldestPos].T0
for c.T0 >= oldestT0 && c.T0 < currentT0 {
guess -= 1
if guess < 0 {
guess += numChunks
}
c = a.Chunks[guess]
if c.T0 == ts {
//found or chunk.
return c
}
}
if LogLevel < 2 {
log.Debug("AM metric %s at chunk T0=%d has been saved.", a.Key, ts)
}
// chunk not found.
return nil
}

func (a *AggMetric) getChunk(pos int) *chunk.Chunk {
Expand Down Expand Up @@ -332,18 +254,14 @@ func (a *AggMetric) addAggregators(ts uint32, val float64) {

// write a chunk to persistent storage. This should only be called while holding a.Lock()
func (a *AggMetric) persist(pos int) {
if !cluster.Manager.IsPrimary() {
if LogLevel < 2 {
log.Debug("AM persist(): node is not primary, not saving chunk.")
}
return
}

pre := time.Now()
chunk := a.Chunks[pos]
pre := time.Now()

if chunk.Saved || chunk.Saving {
// this can happen if chunk was persisted by GC (stale) and then new data triggered another persist call
if a.lastSaveStart >= chunk.T0 {
// this can happen if
// a) there are 2 primary MT nodes both saving chunks to Cassandra
// b) a primary failed and this node was promoted to be primary but metric consuming is lagging.
// c) chunk was persisted by GC (stale) and then new data triggered another persist call
log.Debug("AM persist(): duplicate persist call for chunk.")
return
}
Expand All @@ -352,11 +270,12 @@ func (a *AggMetric) persist(pos int) {
pending := make([]*ChunkWriteRequest, 1)
// add the current chunk to the list of chunks to send to the writeQueue
pending[0] = &ChunkWriteRequest{
metric: a,
key: a.Key,
chunk: chunk,
span: a.ChunkSpan,
ttl: a.ttl,
chunk: chunk,
timestamp: time.Now(),
span: a.ChunkSpan,
}

// if we recently became the primary, there may be older chunks
Expand All @@ -367,16 +286,17 @@ func (a *AggMetric) persist(pos int) {
previousPos += len(a.Chunks)
}
previousChunk := a.Chunks[previousPos]
for (previousChunk.T0 < chunk.T0) && !previousChunk.Saved && !previousChunk.Saving {
for (previousChunk.T0 < chunk.T0) && (a.lastSaveStart < previousChunk.T0) {
if LogLevel < 2 {
log.Debug("AM persist(): old chunk needs saving. Adding %s:%d to writeQueue", a.Key, previousChunk.T0)
}
pending = append(pending, &ChunkWriteRequest{
metric: a,
key: a.Key,
chunk: previousChunk,
span: a.ChunkSpan,
ttl: a.ttl,
chunk: previousChunk,
timestamp: time.Now(),
span: a.ChunkSpan,
})
previousPos--
if previousPos < 0 {
Expand All @@ -385,6 +305,9 @@ func (a *AggMetric) persist(pos int) {
previousChunk = a.Chunks[previousPos]
}

// Every chunk with a T0 <= this chunks' T0 is now either saved, or in the writeQueue.
a.lastSaveStart = chunk.T0

if LogLevel < 2 {
log.Debug("AM persist(): sending %d chunks to write queue", len(pending))
}
Expand All @@ -402,9 +325,8 @@ func (a *AggMetric) persist(pos int) {
if LogLevel < 2 {
log.Debug("AM persist(): sealing chunk %d/%d (%s:%d) and adding to write queue.", pendingChunk, len(pending), a.Key, chunk.T0)
}
pending[pendingChunk].chunk.Finish()
a.store.Add(pending[pendingChunk])
pending[pendingChunk].chunk.Saving = true

pendingChunk--
}
persistDuration.Value(time.Now().Sub(pre))
Expand Down Expand Up @@ -432,6 +354,7 @@ func (a *AggMetric) Add(ts uint32, val float64) {
}

log.Debug("AM %s Add(): created first chunk with first point: %v", a.Key, a.Chunks[0])
a.lastWrite = uint32(time.Now().Unix())
a.addAggregators(ts, val)
return
}
Expand All @@ -440,34 +363,40 @@ func (a *AggMetric) Add(ts uint32, val float64) {

if t0 == currentChunk.T0 {
// last prior data was in same chunk as new point
if currentChunk.Saving {
// if we're already saving the chunk, it means it has the end-of-stream marker and any new points behind it wouldn't be read by an iterator
if currentChunk.Closed {
// if we've already 'finished' the chunk, it means it has the end-of-stream marker and any new points behind it wouldn't be read by an iterator
// you should monitor this metric closely, it indicates that maybe your GC settings don't match how you actually send data (too late)
addToSavingChunk.Inc()
addToClosedChunk.Inc()
return
}

if err := currentChunk.Push(ts, val); err == nil {
if currentChunk.Saved {
// if we're here, it means we marked it as Saved because it was saved by an other primary, not by us since Saving is false.
// typically this happens when non-primaries receive metrics that the primary already saved (maybe cause their metrics consumer is laggy)
// we allow adding data to such chunks in that case, though this open the possibility for data to be rejected by the primary, to be
// visible on secondaries.
addToSavedChunk.Inc()
}
} else {
if err := currentChunk.Push(ts, val); err != nil {
log.Debug("AM failed to add metric to chunk for %s. %s", a.Key, err)
metricsTooOld.Inc()
return
}
a.lastWrite = uint32(time.Now().Unix())
log.Debug("AM %s Add(): pushed new value to last chunk: %v", a.Key, a.Chunks[0])
} else if t0 < currentChunk.T0 {
log.Debug("AM Point at %d has t0 %d, goes back into previous chunk. CurrentChunk t0: %d, LastTs: %d", ts, t0, currentChunk.T0, currentChunk.LastTs)
metricsTooOld.Inc()
return
} else {
// persist the chunk. If the writeQueue is full, then this will block.
a.persist(a.CurrentChunkPos)
// Data belongs in a new chunk.

// If it isnt finished already, add the end-of-stream marker and flag the chunk as "closed"
if !currentChunk.Closed {
currentChunk.Finish()
}

// If we are a primary node, then add the chunk to the write queue to be saved to Cassandra
if cluster.Manager.IsPrimary() {
if LogLevel < 2 {
log.Debug("AM persist(): node is primary, saving chunk. %s T0: %d", a.Key, currentChunk.T0)
}
// persist the chunk. If the writeQueue is full, then this will block.
a.persist(a.CurrentChunkPos)
}

a.CurrentChunkPos++
if a.CurrentChunkPos >= int(a.NumChunks) {
Expand All @@ -490,6 +419,7 @@ func (a *AggMetric) Add(ts uint32, val float64) {
}
log.Debug("AM %s Add(): cleared chunk at %d of %d and replaced with new. and added the new point: %s", a.Key, a.CurrentChunkPos, len(a.Chunks), a.Chunks[a.CurrentChunkPos])
}
a.lastWrite = uint32(time.Now().Unix())

}
a.addAggregators(ts, val)
Expand All @@ -503,16 +433,25 @@ func (a *AggMetric) GC(chunkMinTs, metricMinTs uint32) bool {
return false
}

if currentChunk.LastWrite < chunkMinTs {
if currentChunk.Saved {
// already saved. lets check if we should just delete the metric from memory.
if currentChunk.LastWrite < metricMinTs {
if a.lastWrite < chunkMinTs {
if currentChunk.Closed {
// already closed and should be saved, though we cant guarantee that.
// Check if we should just delete the metric from memory.
if a.lastWrite < metricMinTs {
return true
}
} else if !currentChunk.Saving {
// chunk hasn't been written to in a while, and is not yet queued to persist. Let's persist it
log.Debug("Found stale Chunk, persisting it to Cassandra. key: %s T0: %d", a.Key, currentChunk.T0)
a.persist(a.CurrentChunkPos)
} else {
// chunk hasn't been written to in a while, and is not yet closed. Let's close it and persist it if
// we are a primary
log.Debug("Found stale Chunk, adding end-of-stream bytes. key: %s T0: %d", a.Key, currentChunk.T0)
currentChunk.Finish()
if cluster.Manager.IsPrimary() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that none of this code is being executed on secondaries. due to AggMetric.GC see #269 However we can just change that run this GC function on secondaries as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 80125f7

if LogLevel < 2 {
log.Debug("AM persist(): node is primary, saving chunk. %s T0: %d", a.Key, currentChunk.T0)
}
// persist the chunk. If the writeQueue is full, then this will block.
a.persist(a.CurrentChunkPos)
}
}
}
return false
Expand Down
4 changes: 0 additions & 4 deletions mdata/aggmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"sync"
"time"

"github.com/raintank/metrictank/cluster"
"github.com/raintank/worldping-api/pkg/log"
)

Expand Down Expand Up @@ -47,9 +46,6 @@ func (ms *AggMetrics) GC() {
unix := time.Duration(time.Now().UnixNano())
diff := ms.gcInterval - (unix % ms.gcInterval)
time.Sleep(diff + time.Minute)
if !cluster.Manager.IsPrimary() {
continue
}
log.Info("checking for stale chunks that need persisting.")
now := uint32(time.Now().Unix())
chunkMinTs := now - (now % ms.chunkSpan) - uint32(ms.chunkMaxStale)
Expand Down
Loading