diff --git a/compactor/compactor.go b/compactor/compactor.go index 322a0987011c..d589dd39204c 100644 --- a/compactor/compactor.go +++ b/compactor/compactor.go @@ -30,7 +30,8 @@ var ( ) const ( - checkCompactionInterval = 5 * time.Minute + checkCompactionInterval = 5 * time.Minute + executeCompactionInterval = time.Hour ) type Compactable interface { @@ -41,6 +42,8 @@ type RevGetter interface { Rev() int64 } +// Periodic compacts the log by purging revisions older than +// the configured retention time. Compaction happenes every hour. type Periodic struct { clock clockwork.Clock periodInHour int @@ -85,11 +88,12 @@ func (t *Periodic) Run() { continue } } - if clock.Now().Sub(last) < time.Duration(t.periodInHour)*time.Hour { + + if clock.Now().Sub(last) < executeCompactionInterval { continue } - rev := t.getRev(t.periodInHour) + rev, remaining := t.getRev(t.periodInHour) if rev < 0 { continue } @@ -97,7 +101,7 @@ func (t *Periodic) Run() { plog.Noticef("Starting auto-compaction at revision %d", rev) _, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev}) if err == nil || err == mvcc.ErrCompacted { - t.revs = make([]int64, 0) + t.revs = remaining last = clock.Now() plog.Noticef("Finished auto-compaction at revision %d", rev) } else { @@ -124,10 +128,13 @@ func (t *Periodic) Resume() { t.paused = false } -func (t *Periodic) getRev(h int) int64 { +func (t *Periodic) getRev(h int) (int64, []int64) { i := len(t.revs) - int(time.Duration(h)*time.Hour/checkCompactionInterval) if i < 0 { - return -1 + return -1, t.revs + } + if i >= len(t.revs) { + i = len(t.revs) - 1 } - return t.revs[i] + return t.revs[i], t.revs[i+1:] } diff --git a/compactor/compactor_test.go b/compactor/compactor_test.go index c4f3ab3b5463..9e20f38ba859 100644 --- a/compactor/compactor_test.go +++ b/compactor/compactor_test.go @@ -26,12 +26,14 @@ import ( ) func TestPeriodic(t *testing.T) { + retentionHours := 2 + fc := clockwork.NewFakeClock() rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} compactable := &fakeCompactable{testutil.NewRecorderStream()} tb := &Periodic{ clock: fc, - periodInHour: 1, + periodInHour: retentionHours, rg: rg, c: compactable, } @@ -40,24 +42,24 @@ func TestPeriodic(t *testing.T) { defer tb.Stop() n := int(time.Hour / checkCompactionInterval) - // collect 3 hours of revisions - for i := 0; i < 3; i++ { + // collect 5 hours of revisions + for i := 0; i < 5; i++ { // advance one hour, one revision for each interval for j := 0; j < n; j++ { - fc.Advance(checkCompactionInterval) rg.Wait(1) + fc.Advance(checkCompactionInterval) } - // ready to acknowledge hour "i" - // block until compactor calls clock.After() - fc.BlockUntil(1) - // unblock the After() - fc.Advance(checkCompactionInterval) - a, err := compactable.Wait(1) - if err != nil { - t.Fatal(err) - } - if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: int64(i*n) + 1}) { - t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: int64(i*n) + 1}) + + // compaction doesn't happen til 2 hours elapses + if i+1 >= retentionHours { + a, err := compactable.Wait(1) + if err != nil { + t.Fatal(err) + } + expectedRevision := int64(1 + (i+1)*n - retentionHours*n) + if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { + t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) + } } } } @@ -79,8 +81,8 @@ func TestPeriodicPause(t *testing.T) { // tb will collect 3 hours of revisions but not compact since paused n := int(time.Hour / checkCompactionInterval) for i := 0; i < 3*n; i++ { - fc.Advance(checkCompactionInterval) rg.Wait(1) + fc.Advance(checkCompactionInterval) } // tb ends up waiting for the clock @@ -93,14 +95,15 @@ func TestPeriodicPause(t *testing.T) { // tb resumes to being blocked on the clock tb.Resume() - // unblock clock, will kick off a compaction at hour 3 + // unblock clock, will kick off a compaction at hour 3:05 + rg.Wait(1) fc.Advance(checkCompactionInterval) a, err := compactable.Wait(1) if err != nil { t.Fatal(err) } - // compact the revision from hour 2 - wreq := &pb.CompactionRequest{Revision: int64(2*n + 1)} + // compact the revision from hour 2:05 + wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)} if !reflect.DeepEqual(a[0].Params[0], wreq) { t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision) }