Skip to content

Commit

Permalink
compactor: support finner compaction retention in compactor.go
Browse files Browse the repository at this point in the history
  • Loading branch information
fanminshi committed Sep 26, 2017
1 parent 39e68ef commit 0d3bea6
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 81 deletions.
5 changes: 2 additions & 3 deletions compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ var (
)

const (
checkCompactionInterval = 5 * time.Minute
executeCompactionInterval = time.Hour
checkCompactionInterval = 5 * time.Minute

ModePeriodic = "periodic"
ModeRevision = "revision"
Expand All @@ -57,7 +56,7 @@ type RevGetter interface {
Rev() int64
}

func New(mode string, retention int, rg RevGetter, c Compactable) (Compactor, error) {
func New(mode string, retention time.Duration, rg RevGetter, c Compactable) (Compactor, error) {
switch mode {
case ModePeriodic:
return NewPeriodic(retention, rg, c), nil
Expand Down
89 changes: 41 additions & 48 deletions compactor/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,72 +26,73 @@ import (
)

// Periodic compacts the log by purging revisions older than
// the configured retention time. Compaction happens hourly.
// the configured retention time.
type Periodic struct {
clock clockwork.Clock
periodInHour int
clock clockwork.Clock
period time.Duration

rg RevGetter
c Compactable

revs []int64
ctx context.Context
cancel context.CancelFunc

mu sync.Mutex
// mu protects paused
mu sync.RWMutex
paused bool
}

// NewPeriodic creates a new instance of Periodic compactor that purges
// the log older than h hours.
func NewPeriodic(h int, rg RevGetter, c Compactable) *Periodic {
// the log older than h Duration.
func NewPeriodic(h time.Duration, rg RevGetter, c Compactable) *Periodic {
return &Periodic{
clock: clockwork.NewRealClock(),
periodInHour: h,
rg: rg,
c: c,
clock: clockwork.NewRealClock(),
period: h,
rg: rg,
c: c,
}
}

func (t *Periodic) Run() {
t.ctx, t.cancel = context.WithCancel(context.Background())
t.revs = make([]int64, 0)
clock := t.clock

wait := t.period
retryTimeout := time.Duration(5) * time.Minute
if wait < retryTimeout {
retryTimeout = wait
}
go func() {
last := clock.Now()
revTimer := clock.After(t.period)
lastRev := t.rg.Rev()
for {
t.revs = append(t.revs, t.rg.Rev())
select {
case <-revTimer:
revTimer = clock.After(t.period)
lastRev = t.rg.Rev()
default:
}
timer := clock.After(wait)
select {
case <-t.ctx.Done():
return
case <-clock.After(checkCompactionInterval):
t.mu.Lock()
p := t.paused
t.mu.Unlock()
if p {
case <-timer:
t.mu.RLock()
paused := t.paused
t.mu.RUnlock()
if paused {
wait = retryTimeout
continue
}
}

if clock.Now().Sub(last) < executeCompactionInterval {
continue
}

rev, remaining := t.getRev(t.periodInHour)
if rev < 0 {
continue
}

plog.Noticef("Starting auto-compaction at revision %d (retention: %d hours)", rev, t.periodInHour)
_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
if err == nil || err == mvcc.ErrCompacted {
t.revs = remaining
last = clock.Now()
plog.Noticef("Finished auto-compaction at revision %d", rev)
} else {
plog.Noticef("Failed auto-compaction at revision %d (%v)", err, rev)
plog.Noticef("Retry after %v", checkCompactionInterval)
plog.Noticef("Starting auto-compaction at revision %d (retention: %v )", lastRev, wait)
_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: lastRev})
if err == nil || err == mvcc.ErrCompacted {
wait = t.period
plog.Noticef("Finished auto-compaction at revision %d", lastRev)
} else {
wait = retryTimeout
plog.Noticef("Failed auto-compaction at revision %d (%v)", lastRev, err)
plog.Noticef("Retry after %v", wait)
}
}
}
}()
Expand All @@ -112,11 +113,3 @@ func (t *Periodic) Resume() {
defer t.mu.Unlock()
t.paused = false
}

func (t *Periodic) getRev(h int) (int64, []int64) {
i := len(t.revs) - int(time.Duration(h)*time.Hour/checkCompactionInterval)
if i < 0 {
return -1, t.revs
}
return t.revs[i], t.revs[i+1:]
}
51 changes: 21 additions & 30 deletions compactor/periodic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,40 +25,32 @@ import (
)

func TestPeriodic(t *testing.T) {
retentionHours := 2
retentionHours := time.Hour

fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
compactable := &fakeCompactable{testutil.NewRecorderStream()}
tb := &Periodic{
clock: fc,
periodInHour: retentionHours,
rg: rg,
c: compactable,
clock: fc,
period: retentionHours,
rg: rg,
c: compactable,
}

tb.Run()
defer tb.Stop()

n := int(time.Hour / checkCompactionInterval)
// collect 5 hours of revisions
// simulates 5 hours time elapse
for i := 0; i < 5; i++ {
// advance one hour, one revision for each interval
for j := 0; j < n; j++ {
rg.Wait(1)
fc.Advance(checkCompactionInterval)
}

// compaction doesn't happen til 2 hours elapses
if i+1 < retentionHours {
continue
}
rg.Wait(1)
fc.Advance(time.Hour)

a, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}
expectedRevision := int64(1 + (i+1)*n - retentionHours*n)
expectedRevision := int64(i + 1)
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
}
Expand All @@ -76,23 +68,22 @@ func TestPeriodicPause(t *testing.T) {
compactable := &fakeCompactable{testutil.NewRecorderStream()}
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
tb := &Periodic{
clock: fc,
periodInHour: 1,
rg: rg,
c: compactable,
clock: fc,
period: time.Hour,
rg: rg,
c: compactable,
}

tb.Run()
tb.Pause()

// tb will collect 3 hours of revisions but not compact since paused
n := int(time.Hour / checkCompactionInterval)
for i := 0; i < 3*n; i++ {
// simulate 3 hours time elapse
for i := 0; i < 3; i++ {
rg.Wait(1)
fc.Advance(checkCompactionInterval)
fc.Advance(time.Hour)
}
// tb ends up waiting for the clock

// Since tb is paused, expect no compaction event.
select {
case a := <-compactable.Chan():
t.Fatalf("unexpected action %v", a)
Expand All @@ -102,15 +93,15 @@ func TestPeriodicPause(t *testing.T) {
// tb resumes to being blocked on the clock
tb.Resume()

// unblock clock, will kick off a compaction at hour 3:05
// unblock clock, will kick off a compaction at hour 4:00
rg.Wait(1)
fc.Advance(checkCompactionInterval)
fc.Advance(time.Hour)
a, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}
// compact the revision from hour 2:05
wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)}
// compact the revision from hour 3:00
wreq := &pb.CompactionRequest{Revision: int64(4)}
if !reflect.DeepEqual(a[0].Params[0], wreq) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision)
}
Expand Down

0 comments on commit 0d3bea6

Please sign in to comment.