Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compactor: Add Revisional compactor #8123

Merged
merged 1 commit into from
Jun 22, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 27 additions & 95 deletions compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
package compactor

import (
"sync"
"fmt"
"time"

pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/pkg/capnslog"
"github.com/jonboulle/clockwork"
"golang.org/x/net/context"

pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)

var (
Expand All @@ -32,8 +31,24 @@ var (
const (
checkCompactionInterval = 5 * time.Minute
executeCompactionInterval = time.Hour

ModePeriodic = "periodic"
ModeRevision = "revision"
)

// Compactor purges old log from the storage periodically.
type Compactor interface {
// Run starts the main loop of the compactor in background.
// Use Stop() to halt the loop and release the resource.
Run()
// Stop halts the main loop of the compactor.
Stop()
// Pause temporally suspend the compactor not to run compaction. Resume() to unpose.
Pause()
// Resume restarts the compactor suspended by Pause().
Resume()
}

type Compactable interface {
Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)
}
Expand All @@ -42,96 +57,13 @@ type RevGetter interface {
Rev() int64
}

// Periodic compacts the log by purging revisions older than
// the configured retention time. Compaction happens hourly.
type Periodic struct {
clock clockwork.Clock
periodInHour int

rg RevGetter
c Compactable

revs []int64
ctx context.Context
cancel context.CancelFunc

mu sync.Mutex
paused bool
}

func NewPeriodic(h int, rg RevGetter, c Compactable) *Periodic {
return &Periodic{
clock: clockwork.NewRealClock(),
periodInHour: h,
rg: rg,
c: c,
}
}

func (t *Periodic) Run() {
t.ctx, t.cancel = context.WithCancel(context.Background())
t.revs = make([]int64, 0)
clock := t.clock

go func() {
last := clock.Now()
for {
t.revs = append(t.revs, t.rg.Rev())
select {
case <-t.ctx.Done():
return
case <-clock.After(checkCompactionInterval):
t.mu.Lock()
p := t.paused
t.mu.Unlock()
if p {
continue
}
}

if clock.Now().Sub(last) < executeCompactionInterval {
continue
}

rev, remaining := t.getRev(t.periodInHour)
if rev < 0 {
continue
}

plog.Noticef("Starting auto-compaction at revision %d", rev)
_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
if err == nil || err == mvcc.ErrCompacted {
t.revs = remaining
last = clock.Now()
plog.Noticef("Finished auto-compaction at revision %d", rev)
} else {
plog.Noticef("Failed auto-compaction at revision %d (%v)", err, rev)
plog.Noticef("Retry after %v", checkCompactionInterval)
}
}
}()
}

func (t *Periodic) Stop() {
t.cancel()
}

func (t *Periodic) Pause() {
t.mu.Lock()
defer t.mu.Unlock()
t.paused = true
}

func (t *Periodic) Resume() {
t.mu.Lock()
defer t.mu.Unlock()
t.paused = false
}

func (t *Periodic) getRev(h int) (int64, []int64) {
i := len(t.revs) - int(time.Duration(h)*time.Hour/checkCompactionInterval)
if i < 0 {
return -1, t.revs
func New(mode string, retention int, rg RevGetter, c Compactable) (Compactor, error) {
switch mode {
case ModePeriodic:
return NewPeriodic(retention, rg, c), nil
case ModeRevision:
return NewRevision(int64(retention), rg, c), nil
default:
return nil, fmt.Errorf("unsupported compaction mode %s", mode)
}
return t.revs[i], t.revs[i+1:]
}
97 changes: 0 additions & 97 deletions compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,108 +15,11 @@
package compactor

import (
"reflect"
"testing"
"time"

pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/testutil"
"github.com/jonboulle/clockwork"
"golang.org/x/net/context"
)

func TestPeriodic(t *testing.T) {
retentionHours := 2

fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
compactable := &fakeCompactable{testutil.NewRecorderStream()}
tb := &Periodic{
clock: fc,
periodInHour: retentionHours,
rg: rg,
c: compactable,
}

tb.Run()
defer tb.Stop()

n := int(time.Hour / checkCompactionInterval)
// collect 5 hours of revisions
for i := 0; i < 5; i++ {
// advance one hour, one revision for each interval
for j := 0; j < n; j++ {
rg.Wait(1)
fc.Advance(checkCompactionInterval)
}

// compaction doesn't happen til 2 hours elapses
if i+1 < retentionHours {
continue
}

a, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}
expectedRevision := int64(1 + (i+1)*n - retentionHours*n)
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
}
}

// unblock the rev getter, so we can stop the compactor routine.
_, err := rg.Wait(1)
if err != nil {
t.Fatal(err)
}
}

func TestPeriodicPause(t *testing.T) {
fc := clockwork.NewFakeClock()
compactable := &fakeCompactable{testutil.NewRecorderStream()}
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
tb := &Periodic{
clock: fc,
periodInHour: 1,
rg: rg,
c: compactable,
}

tb.Run()
tb.Pause()

// tb will collect 3 hours of revisions but not compact since paused
n := int(time.Hour / checkCompactionInterval)
for i := 0; i < 3*n; i++ {
rg.Wait(1)
fc.Advance(checkCompactionInterval)
}
// tb ends up waiting for the clock

select {
case a := <-compactable.Chan():
t.Fatalf("unexpected action %v", a)
case <-time.After(10 * time.Millisecond):
}

// tb resumes to being blocked on the clock
tb.Resume()

// unblock clock, will kick off a compaction at hour 3:05
rg.Wait(1)
fc.Advance(checkCompactionInterval)
a, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}
// compact the revision from hour 2:05
wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)}
if !reflect.DeepEqual(a[0].Params[0], wreq) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision)
}
}

type fakeCompactable struct {
testutil.Recorder
}
Expand Down
122 changes: 122 additions & 0 deletions compactor/periodic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package compactor

import (
"sync"
"time"

"github.com/jonboulle/clockwork"
"golang.org/x/net/context"

pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/mvcc"
)

// Periodic compacts the log by purging revisions older than
// the configured retention time. Compaction happens hourly.
type Periodic struct {
clock clockwork.Clock
periodInHour int

rg RevGetter
c Compactable

revs []int64
ctx context.Context
cancel context.CancelFunc

mu sync.Mutex
paused bool
}

// NewPeriodic creates a new instance of Periodic compactor that purges
// the log older than h hours.
func NewPeriodic(h int, rg RevGetter, c Compactable) *Periodic {
return &Periodic{
clock: clockwork.NewRealClock(),
periodInHour: h,
rg: rg,
c: c,
}
}

func (t *Periodic) Run() {
t.ctx, t.cancel = context.WithCancel(context.Background())
t.revs = make([]int64, 0)
clock := t.clock

go func() {
last := clock.Now()
for {
t.revs = append(t.revs, t.rg.Rev())
select {
case <-t.ctx.Done():
return
case <-clock.After(checkCompactionInterval):
t.mu.Lock()
p := t.paused
t.mu.Unlock()
if p {
continue
}
}

if clock.Now().Sub(last) < executeCompactionInterval {
continue
}

rev, remaining := t.getRev(t.periodInHour)
if rev < 0 {
continue
}

plog.Noticef("Starting auto-compaction at revision %d (retention: %d hours)", rev, t.periodInHour)
_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
if err == nil || err == mvcc.ErrCompacted {
t.revs = remaining
last = clock.Now()
plog.Noticef("Finished auto-compaction at revision %d", rev)
} else {
plog.Noticef("Failed auto-compaction at revision %d (%v)", err, rev)
plog.Noticef("Retry after %v", checkCompactionInterval)
}
}
}()
}

func (t *Periodic) Stop() {
t.cancel()
}

func (t *Periodic) Pause() {
t.mu.Lock()
defer t.mu.Unlock()
t.paused = true
}

func (t *Periodic) Resume() {
t.mu.Lock()
defer t.mu.Unlock()
t.paused = false
}

func (t *Periodic) getRev(h int) (int64, []int64) {
i := len(t.revs) - int(time.Duration(h)*time.Hour/checkCompactionInterval)
if i < 0 {
return -1, t.revs
}
return t.revs[i], t.revs[i+1:]
}
Loading