Skip to content

Commit

Permalink
compactor: support finer retention period in compactor.go
Browse files Browse the repository at this point in the history
  • Loading branch information
fanminshi committed Sep 28, 2017
1 parent 275ffa6 commit c38b923
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 53 deletions.
5 changes: 2 additions & 3 deletions compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ var (
)

const (
checkCompactionInterval = 5 * time.Minute
executeCompactionInterval = time.Hour
checkCompactionInterval = 5 * time.Minute

ModePeriodic = "periodic"
ModeRevision = "revision"
Expand All @@ -57,7 +56,7 @@ type RevGetter interface {
Rev() int64
}

func New(mode string, retention int, rg RevGetter, c Compactable) (Compactor, error) {
func New(mode string, retention time.Duration, rg RevGetter, c Compactable) (Compactor, error) {
switch mode {
case ModePeriodic:
return NewPeriodic(retention, rg, c), nil
Expand Down
46 changes: 23 additions & 23 deletions compactor/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import (
)

// Periodic compacts the log by purging revisions older than
// the configured retention time. Compaction happens hourly.
// the configured retention time.
type Periodic struct {
clock clockwork.Clock
periodInHour int
clock clockwork.Clock
period time.Duration

rg RevGetter
c Compactable
Expand All @@ -38,60 +38,60 @@ type Periodic struct {
ctx context.Context
cancel context.CancelFunc

mu sync.Mutex
// mu protects paused
mu sync.RWMutex
paused bool
}

// NewPeriodic creates a new instance of Periodic compactor that purges
// the log older than h hours.
func NewPeriodic(h int, rg RevGetter, c Compactable) *Periodic {
// the log older than h Duration.
func NewPeriodic(h time.Duration, rg RevGetter, c Compactable) *Periodic {
return &Periodic{
clock: clockwork.NewRealClock(),
periodInHour: h,
rg: rg,
c: c,
clock: clockwork.NewRealClock(),
period: h,
rg: rg,
c: c,
}
}

// periodDivisor divides Periodic.period in into checkCompactInterval duration
const periodDivisor = 10

func (t *Periodic) Run() {
t.ctx, t.cancel = context.WithCancel(context.Background())
t.revs = make([]int64, 0)
clock := t.clock

checkCompactInterval := t.period / time.Duration(periodDivisor)
go func() {
last := clock.Now()
for {
t.revs = append(t.revs, t.rg.Rev())
select {
case <-t.ctx.Done():
return
case <-clock.After(checkCompactionInterval):
case <-clock.After(checkCompactInterval):
t.mu.Lock()
p := t.paused
t.mu.Unlock()
if p {
continue
}
}

if clock.Now().Sub(last) < executeCompactionInterval {
if clock.Now().Sub(last) < t.period {
continue
}

rev, remaining := t.getRev(t.periodInHour)
rev, remaining := t.getRev()
if rev < 0 {
continue
}

plog.Noticef("Starting auto-compaction at revision %d (retention: %d hours)", rev, t.periodInHour)
plog.Noticef("Starting auto-compaction at revision %d (retention: %v )", rev, t.period)
_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
if err == nil || err == mvcc.ErrCompacted {
t.revs = remaining
last = clock.Now()
plog.Noticef("Finished auto-compaction at revision %d", rev)
} else {
plog.Noticef("Failed auto-compaction at revision %d (%v)", err, rev)
plog.Noticef("Retry after %v", checkCompactionInterval)
plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
plog.Noticef("Retry after %v", checkCompactInterval)
}
}
}()
Expand All @@ -113,8 +113,8 @@ func (t *Periodic) Resume() {
t.paused = false
}

func (t *Periodic) getRev(h int) (int64, []int64) {
i := len(t.revs) - int(time.Duration(h)*time.Hour/checkCompactionInterval)
func (t *Periodic) getRev() (int64, []int64) {
i := len(t.revs) - periodDivisor
if i < 0 {
return -1, t.revs
}
Expand Down
53 changes: 26 additions & 27 deletions compactor/periodic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,39 +26,36 @@ import (

func TestPeriodic(t *testing.T) {
retentionHours := 2
retentionDuration := time.Duration(retentionHours) * time.Hour

fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
compactable := &fakeCompactable{testutil.NewRecorderStream()}
tb := &Periodic{
clock: fc,
periodInHour: retentionHours,
rg: rg,
c: compactable,
clock: fc,
period: retentionDuration,
rg: rg,
c: compactable,
}

tb.Run()
defer tb.Stop()

n := int(time.Hour / checkCompactionInterval)
// collect 5 hours of revisions
for i := 0; i < 5; i++ {
// advance one hour, one revision for each interval
for j := 0; j < n; j++ {
rg.Wait(1)
fc.Advance(checkCompactionInterval)
}

// compaction doesn't happen til 2 hours elapses
if i+1 < retentionHours {
checkCompactInterval := retentionDuration / time.Duration(periodDivisor)
n := periodDivisor
// simulate 5 hours worth of intervals.
for i := 0; i < n/retentionHours*5; i++ {
rg.Wait(1)
fc.Advance(checkCompactInterval)
// compaction doesn't happen til 2 hours elapses.
if i < n {
continue
}

// after 2 hours, compaction happens at every checkCompactInterval.
a, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}
expectedRevision := int64(1 + (i+1)*n - retentionHours*n)
expectedRevision := int64(i + 1 - n)
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
}
Expand All @@ -75,21 +72,23 @@ func TestPeriodicPause(t *testing.T) {
fc := clockwork.NewFakeClock()
compactable := &fakeCompactable{testutil.NewRecorderStream()}
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
retentionDuration := time.Hour
tb := &Periodic{
clock: fc,
periodInHour: 1,
rg: rg,
c: compactable,
clock: fc,
period: retentionDuration,
rg: rg,
c: compactable,
}

tb.Run()
tb.Pause()

// tb will collect 3 hours of revisions but not compact since paused
n := int(time.Hour / checkCompactionInterval)
checkCompactInterval := retentionDuration / time.Duration(periodDivisor)
n := periodDivisor
for i := 0; i < 3*n; i++ {
rg.Wait(1)
fc.Advance(checkCompactionInterval)
fc.Advance(checkCompactInterval)
}
// tb ends up waiting for the clock

Expand All @@ -102,14 +101,14 @@ func TestPeriodicPause(t *testing.T) {
// tb resumes to being blocked on the clock
tb.Resume()

// unblock clock, will kick off a compaction at hour 3:05
// unblock clock, will kick off a compaction at hour 3:06
rg.Wait(1)
fc.Advance(checkCompactionInterval)
fc.Advance(checkCompactInterval)
a, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}
// compact the revision from hour 2:05
// compact the revision from hour 2:06
wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)}
if !reflect.DeepEqual(a[0].Params[0], wreq) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision)
Expand Down

0 comments on commit c38b923

Please sign in to comment.