Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

introduce gc delay if rob enabled #781

Merged
merged 2 commits into from
Dec 14, 2017
Merged

introduce gc delay if rob enabled #781

merged 2 commits into from
Dec 14, 2017

Conversation

replay
Copy link
Contributor

@replay replay commented Dec 11, 2017

this should fix #776

still need to reproduce the problem locally in order to be able to test that fix

@shanson7
Copy link
Collaborator

shanson7 commented Dec 11, 2017

This won't completely fix the issue because for the first GC, there may be no chunks with data if the rob is holding all the datapoints. This line will likely need to change to account for data living in the rob

@replay replay force-pushed the gcDelay branch 4 times, most recently from fad1f1c to e111fd2 Compare December 12, 2017 07:18
@replay
Copy link
Contributor Author

replay commented Dec 12, 2017

Good point. You're right that by changing the line you've pointed out i could fix it.
But actually now I'm thinking it might be the cleaner solution to just make sure .lastWrite gets set to the correct value on Add(), so we don't even need to keep track of this gcDelay. This is because if we want to reuse the .lastWrite property for other stuff we'll need to keep adding those "exceptions" for the case of the rob.
So I think this might be better to avoid exceptions, and it makes sense to already update .lastWrite once a datapoint has been accepted into the rob, because once it is there it will be readable/queriable by the user: e111fd2

@woodsaj
Copy link
Member

woodsaj commented Dec 12, 2017

@replay The AggMetric.GC() needs to be updated to handle the ReorderBuffer. The GC() call is designed to force chunks to be persisted when users stop sending data (chunks are normally only persisted when a point in the next chunk is received). Currently, there will be reorderWindow points that are not yet in a chunk. When GC() closes the current chunk and persists it, those points will be lost.

I think adding this to GC() would work

func (a *AggMetric) GC(chunkMinTs, metricMinTs uint32) bool {
	a.Lock()
	defer a.Unlock()
       
	// if the reorderBuffer is enabled and we have not received a datapoint in a while,
       // then flush the reorder buffer.
	if a.rob != nil && a.lastWrite < chunkMinTs{
                tmpLastWrite = a.lastWrite
		pts := a.rob.Flush()
                for _, p := range pts {
                    a.add(p.Ts, p.Val)
                }
                // adding points will cause our lastWrite to be updated, but we want to keep the old value
                a.lastWrite = tmpLastWrite
         }

@shanson7
Copy link
Collaborator

FWIW, @woodsaj solution above is almost exactly what I stuck in my local branch. I added a flush method and called that during GC.

It's worth noting that there could be data in the rob that isn't ready to flush, so len(chunks) can still be 0.

@replay
Copy link
Contributor Author

replay commented Dec 12, 2017

@woodsaj thx, i'll use that. Just to be clear, the change I already made, to update .lastWrite after a datapoint got accepted into the rob, would still be necessary to avoid unnecessary GCs, right?

}

func (rob *ReorderBuffer) Flush() []schema.Point {
res := rob.Get()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will remove datapoints that might still need to stay in the rob for reordering. In my mind, Flush should only remove the datapoints that have aged out of the rob.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we are now updating lastWrite when adding to the rob, flush() will always want to get all points as they will all be older then chunkMaxStale. It is important that metrics are not delayed from being persisted for more then chunkMaxStale. If the metrics dont get written, and the MT instance restarts the metrics could be lost, depending on the kafka retention.
To prevent loss, the kafka retention needs to be at least (chunk-max-stale + gc-interval)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we are now updating lastWrite when adding to the rob, flush() will always want to get all points as they will all be older then chunkMaxStale

I don't think this is true. The gc-interval and chunk-max-stale are completely independent and Flush() is going to be called unconditionally for every GC cycle. This means you could write data to the rob just 1 second before a GC cycle happens to kick off. That data is not yet ready to Flush() and is part of the currentChunk.

If the rob were only Flush()d AFTER checking the lastWriteTime, then I believe you would be correct.

Copy link
Collaborator

@shanson7 shanson7 Dec 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. I missed the check of the lastWriteTime.

@shanson7
Copy link
Collaborator

There are 2 reasons an AggMetric gets GC has 2 functions:

  1. Remove AggMetrics that haven't been written to in metric-max-stale time to free up memory
  2. Persist chunks that haven't been written persisted in chunk-max-stale

There is a sneaky extra way to GC an AggMetric (like function 1) which is if there are no chunks in it. In my particular case, the gc-interval setting lined up with the occasional publishing of the timeseries, so the AggMetric was getting GC'd before the rob got that second datapoint that would have flushed it and created chunk 1. This caused the AggMetric to get recreated on the next point with a brand new rob and still 0 chunks, and was then GC'd again. Rinse and repeat.

Flushing the rob in the GC call seems like the right way to go, but Flush should just extract the data that needs to age out of the rob, not ALL of it (this would actually break the reordering ability in this edge case). Additionally, the check that len(chunks) == 0 should include and && !rob.HasData(). This will protect against the case like:

  1. New data comes in creating adding a few datapoints to the rob
  2. GC kicks off, but the data in the rob shouldn't be flushed, since the data is within the rob window (and therefore still eligible for reordering)
  3. There are STILL no chunks, but we have data in the rob, so this AggMetric is safe.

@woodsaj
Copy link
Member

woodsaj commented Dec 12, 2017

  1. New data comes in creating adding a few datapoints to the rob
  2. GC kicks off, but the data in the rob shouldn't be flushed, since the data is within the rob window (and therefore still eligible for reordering)
  3. There are STILL no chunks, but we have data in the rob, so this AggMetric is safe.

With the changes already added to update lastWrite when points are added to the rob, this scenario is not possible.

All that will happen is

  1. new data comes in and gets added to the rob, lastWrite is updated everytime a point is received.
  2. GC runs, only if lastWrite is older then chunk-max-stale will the rob be flushed. This will create 1 or more chunks
  3. if lastWrite is newer then chunk-max-stale, then nothing further is done.
  4. if lastWrite is older then chunk-max-stale (the rob was flushed), the unfinished chunk will be closed off and persisted.

This is the desired behaviour. If the last point received (lastWrite) is older then chunk-max-stale it means every point in the rob is older then chunk-max-stale, so they all need to be persisted.

@shanson7
Copy link
Collaborator

Actually what will happen with the current changes and the description I posted is:

  1. new data comes in and gets added to the rob, lastWrite is updated everytime a point is received.
  2. GC runs, and the rob isn't flushed since the data was just recently written and a.lastWrite > chunk-max-stale (no chunks are created)
  3. len(chunks) == 0 and this AggMetric is marked as eligible for GC.

@woodsaj
Copy link
Member

woodsaj commented Dec 12, 2017

@shanson7 you are right. I didnt notice the return true if len(a.chunks) == 0

So we should just change that to if len(a.chunks) == 0 && ! a.rob.HasData()

@shanson7
Copy link
Collaborator

Haha, this is why multiple reviewers is great. You caught the case where I missed something and I did the other one. I think that should do the trick.

@replay replay force-pushed the gcDelay branch 2 times, most recently from 2ff3e3b to c082a4c Compare December 13, 2017 07:08
@replay
Copy link
Contributor Author

replay commented Dec 13, 2017

Great discussion @woodsaj and @shanson7 :)
I updated the commit: 4c026eb

@replay replay force-pushed the gcDelay branch 2 times, most recently from e50e76b to 4c026eb Compare December 13, 2017 07:12
@Dieterbe
Copy link
Contributor

can anyone paraphrase all these back-and-forths into a final conclusion of what exactly the problems are and what the solutions are? thanks!

@shanson7
Copy link
Collaborator

There are 2 basic issues:

  1. If a given series stops getting data for a long time, the chunk can be persisted/GC'd in AggMetric::GC() with data sitting in the rob. This causes data loss.
  2. If a given AggMetric has data in the rob but no created chunks (either window hasn't elapsed, or data stopped flowing as in issue (1)), it will be GC'd . This also causes data loss.

The solution to issue (1):

  1. Update lastWriteTime when adding datapoints to the rob (previously was just when adding to the chunk)
  2. In the GC, if stale, take the data from the rob, write it to the chunk and reset the rob.

The solution to issue (2):

  1. len(chunks) == 0-> len(chunks) == 0 && !rob.HasData() to determine if the AggMetric is eligible to be GC'd.

return res
}

func (rob *ReorderBuffer) HasData() bool {
Copy link
Contributor

@Dieterbe Dieterbe Dec 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super minor comment, but empty seems a bit more common than hasdata in Go source code. plus allows us to write positive checks instead of negations which I think are slightly easier to read

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

k, i'll update & then merge

buf := NewReorderBuffer(10, 1)

if buf.HasData() != false {
t.Fatalf("Expected HasData() to be false")
if buf.IsEmpty() != true {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just evaluate buf.IsEmpty directly in all these cases instead of doing comparisons to booleans

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

k, updated again

if buf.HasData() != false {
t.Fatalf("Expected HasData() to be false")
if !buf.IsEmpty() {
t.Fatalf("Expected IsEmpty() to be false")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expected it to be true :p the other messages need updating too :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

@replay replay merged commit cdc737f into master Dec 14, 2017
@Dieterbe Dieterbe deleted the gcDelay branch December 15, 2017 19:48
@Dieterbe Dieterbe mentioned this pull request Mar 15, 2018
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

reorderBuffer question
4 participants