Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compactor: adjust interval for period <1-hour #9474

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 82 additions & 39 deletions compactor/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,50 +61,101 @@ func newPeriodic(clock clockwork.Clock, h time.Duration, rg RevGetter, c Compact
return t
}

// periodDivisor divides Periodic.period in into checkCompactInterval duration
const periodDivisor = 10

// Run runs periodic compactor.
func (t *Periodic) Run() {
interval := t.period / time.Duration(periodDivisor)
go func() {
initialWait := t.clock.Now()
for {
t.revs = append(t.revs, t.rg.Rev())
select {
case <-t.ctx.Done():
return
case <-t.clock.After(interval):
t.mu.Lock()
p := t.paused
t.mu.Unlock()
if p {
continue
}
}
go t.run()
}

// wait up to initial given period
if t.clock.Now().Sub(initialWait) < t.period {
continue
}
// periodically fetches revisions and ensures that
// first element is always up-to-date for retention window
func (t *Periodic) run() {
initialWait := t.clock.Now()
fetchInterval := t.getInterval()
retryInterval, retries := t.getRetryInterval()

// e.g. period 9h with compaction period 1h, then retain up-to 9 revs
// e.g. period 12h with compaction period 1h, then retain up-to 12 revs
// e.g. period 20m with compaction period 20m, then retain up-to 1 rev
retentions := int(t.period / fetchInterval)
for {
t.revs = append(t.revs, t.rg.Rev())
if len(t.revs) > retentions {
t.revs = t.revs[1:]
}

rev, remaining := t.getRev()
if rev < 0 {
select {
case <-t.ctx.Done():
return
case <-t.clock.After(fetchInterval):
t.mu.RLock()
p := t.paused
t.mu.RUnlock()
if p {
continue
}
}

// no compaction until initial wait period
if t.clock.Now().Sub(initialWait) < t.period {
continue
}
rev := t.revs[0]

for i := 0; i < retries; i++ {
plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, t.period)
_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
if err == nil || err == mvcc.ErrCompacted {
// move to next sliding window
t.revs = remaining
// compactor succeeds at revs[0], move sliding window
t.revs = t.revs[1:]
plog.Noticef("Finished auto-compaction at revision %d", rev)
} else {
plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
plog.Noticef("Retry after %v", interval)
break
}

// compactor fails at revs[0]:
// 1. retry revs[0], so long as revs[0] is up-to-date
// 2. retry revs[1], when revs[0] becomes stale
plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
plog.Noticef("Retry after %v", retryInterval)
paused := false
select {
case <-t.ctx.Done():
return
case <-t.clock.After(retryInterval):
t.mu.RLock()
paused = t.paused
t.mu.RUnlock()
}
if paused {
break
}
}
}()
}
}

// If given compaction period x is <1-hour, compact every x duration, with x retention window
// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='10m', then compact every 10-minute).
// If given compaction period x is >1-hour, compact every 1-hour, with x retention window
// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='72h', then compact every 1-hour).
func (t *Periodic) getInterval() time.Duration {
itv := t.period
if itv > time.Hour {
itv = time.Hour
}
return itv
}

const retryDivisor = 10

// divide by 10 to retry faster
// e.g. given period 2-hour, retry in 12-min rather than 1-hour (compaction period)
func (t *Periodic) getRetryInterval() (itv time.Duration, retries int) {
// if period 10-min, retry once rather than 10 times (100-min)
itv, retries = t.period, 1
if itv > time.Hour {
itv /= retryDivisor
retries = retryDivisor
}
return itv, retries
}

// Stop stops periodic compactor.
Expand All @@ -125,11 +176,3 @@ func (t *Periodic) Resume() {
defer t.mu.Unlock()
t.paused = false
}

func (t *Periodic) getRev() (int64, []int64) {
i := len(t.revs) - periodDivisor
if i < 0 {
return -1, t.revs
}
return t.revs[i], t.revs[i+1:]
}
114 changes: 67 additions & 47 deletions compactor/periodic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,83 +25,103 @@ import (
"github.com/jonboulle/clockwork"
)

func TestPeriodic(t *testing.T) {
retentionHours := 2
retentionDuration := time.Duration(retentionHours) * time.Hour
func TestPeriodicHourly(t *testing.T) {
fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
compactable := &fakeCompactable{testutil.NewRecorderStream()}
tb := newPeriodic(fc, 12*time.Hour, rg, compactable)

tb.Run()
defer tb.Stop()

for i := 0; i < 24; i++ {
// first 12-hour only with rev gets
if _, err := rg.Wait(1); err != nil {
t.Fatal(err)
}
fc.Advance(tb.getInterval())

// after 12-hour, periodic compact begins, every hour
// with 12-hour retention window
if i >= 11 {
ca, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}
expectedRevision := int64(i + 2 - int(tb.period/time.Hour))
if !reflect.DeepEqual(ca[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
t.Fatalf("compact request = %v, want %v", ca[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
}
}
}
}

func TestPeriodicEveryMinute(t *testing.T) {
fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
compactable := &fakeCompactable{testutil.NewRecorderStream()}
tb := newPeriodic(fc, retentionDuration, rg, compactable)
tb := newPeriodic(fc, time.Minute, rg, compactable)

tb.Run()
defer tb.Stop()
checkCompactInterval := retentionDuration / time.Duration(periodDivisor)
n := periodDivisor
// simulate 5 hours worth of intervals.
for i := 0; i < n/retentionHours*5; i++ {
rg.Wait(1)
fc.Advance(checkCompactInterval)
// compaction doesn't happen til 2 hours elapses.
if i < n {
continue

// expect compact every minute
for i := 0; i < 10; i++ {
if _, err := rg.Wait(1); err != nil {
t.Fatal(err)
}
// after 2 hours, compaction happens at every checkCompactInterval.
a, err := compactable.Wait(1)
fc.Advance(time.Minute)

ca, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}
expectedRevision := int64(i + 1 - n)
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
expectedRevision := int64(i + 1)
if !reflect.DeepEqual(ca[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
t.Errorf("compact request = %v, want %v", ca[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
}
}

// unblock the rev getter, so we can stop the compactor routine.
_, err := rg.Wait(1)
if err != nil {
t.Fatal(err)
}
}

func TestPeriodicPause(t *testing.T) {
func TestPeriodicPauseHourly(t *testing.T) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shows what would happen when compactor fails for 30-hours.

fc := clockwork.NewFakeClock()
retentionDuration := time.Hour
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
compactable := &fakeCompactable{testutil.NewRecorderStream()}
tb := newPeriodic(fc, retentionDuration, rg, compactable)
tb := newPeriodic(fc, 15*time.Hour, rg, compactable)

tb.Run()
defer tb.Stop()

tb.Pause()

// tb will collect 3 hours of revisions but not compact since paused
checkCompactInterval := retentionDuration / time.Duration(periodDivisor)
n := periodDivisor
for i := 0; i < 3*n; i++ {
rg.Wait(1)
fc.Advance(checkCompactInterval)
// collect 15*2 hours of revisions with no compaction
for i := 0; i < 15*2; i++ {
if _, err := rg.Wait(1); err != nil {
t.Fatal(err)
}
fc.Advance(tb.getInterval())
}
// tb ends up waiting for the clock

select {
case a := <-compactable.Chan():
t.Fatalf("unexpected action %v", a)
case <-time.After(10 * time.Millisecond):
}

// tb resumes to being blocked on the clock
tb.Resume()

// unblock clock, will kick off a compaction at hour 3:06
rg.Wait(1)
fc.Advance(checkCompactInterval)
a, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}
// compact the revision from hour 2:06
wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)}
if !reflect.DeepEqual(a[0].Params[0], wreq) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision)
for i := 0; i < 20; i++ {
if _, err := rg.Wait(1); err != nil {
t.Fatal(err)
}
fc.Advance(tb.getInterval())

ca, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}
expectedRevision := int64(i + 17)
if !reflect.DeepEqual(ca[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
t.Errorf("compact request = %v, want %v", ca[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
}
}
}