Skip to content

Commit

Permalink
*: support auto-compaction with finer granularity
Browse files Browse the repository at this point in the history
  • Loading branch information
fanminshi committed Sep 14, 2017
1 parent 4afb99f commit ce4d657
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 61 deletions.
5 changes: 2 additions & 3 deletions compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ var (
)

const (
checkCompactionInterval = 5 * time.Minute
executeCompactionInterval = time.Hour
checkCompactionInterval = 5 * time.Minute

ModePeriodic = "periodic"
ModeRevision = "revision"
Expand All @@ -57,7 +56,7 @@ type RevGetter interface {
Rev() int64
}

func New(mode string, retention int, rg RevGetter, c Compactable) (Compactor, error) {
func New(mode string, retention time.Duration, rg RevGetter, c Compactable) (Compactor, error) {
switch mode {
case ModePeriodic:
return NewPeriodic(retention, rg, c), nil
Expand Down
74 changes: 27 additions & 47 deletions compactor/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,72 +26,60 @@ import (
)

// Periodic compacts the log by purging revisions older than
// the configured retention time. Compaction happens hourly.
// the configured retention time.
type Periodic struct {
clock clockwork.Clock
periodInHour int
clock clockwork.Clock
period time.Duration

rg RevGetter
c Compactable

revs []int64
ctx context.Context
cancel context.CancelFunc

mu sync.Mutex
// mu protects paused
mu sync.RWMutex
paused bool
}

// NewPeriodic creates a new instance of Periodic compactor that purges
// the log older than h hours.
func NewPeriodic(h int, rg RevGetter, c Compactable) *Periodic {
// the log older than h Duration.
func NewPeriodic(h time.Duration, rg RevGetter, c Compactable) *Periodic {
return &Periodic{
clock: clockwork.NewRealClock(),
periodInHour: h,
rg: rg,
c: c,
clock: clockwork.NewRealClock(),
period: h,
rg: rg,
c: c,
}
}

func (t *Periodic) Run() {
t.ctx, t.cancel = context.WithCancel(context.Background())
t.revs = make([]int64, 0)
clock := t.clock
timer := clock.After(t.period)

go func() {
last := clock.Now()
for {
t.revs = append(t.revs, t.rg.Rev())
select {
case <-t.ctx.Done():
return
case <-clock.After(checkCompactionInterval):
t.mu.Lock()
p := t.paused
t.mu.Unlock()
if p {
case <-timer:
timer = clock.After(t.period)
t.mu.RLock()
if t.paused {
t.mu.RUnlock()
continue
}
}

if clock.Now().Sub(last) < executeCompactionInterval {
continue
}

rev, remaining := t.getRev(t.periodInHour)
if rev < 0 {
continue
}

plog.Noticef("Starting auto-compaction at revision %d (retention: %d hours)", rev, t.periodInHour)
_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
if err == nil || err == mvcc.ErrCompacted {
t.revs = remaining
last = clock.Now()
plog.Noticef("Finished auto-compaction at revision %d", rev)
} else {
plog.Noticef("Failed auto-compaction at revision %d (%v)", err, rev)
plog.Noticef("Retry after %v", checkCompactionInterval)
t.mu.RUnlock()
rev := t.rg.Rev()
plog.Noticef("Starting auto-compaction at revision %d (retention: %v )", rev, t.period)
_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
if err == nil || err == mvcc.ErrCompacted {
plog.Noticef("Finished auto-compaction at revision %d", rev)
} else {
plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
plog.Noticef("Retry after %v", t.period)
}
}
}
}()
Expand All @@ -112,11 +100,3 @@ func (t *Periodic) Resume() {
defer t.mu.Unlock()
t.paused = false
}

func (t *Periodic) getRev(h int) (int64, []int64) {
i := len(t.revs) - int(time.Duration(h)*time.Hour/checkCompactionInterval)
if i < 0 {
return -1, t.revs
}
return t.revs[i], t.revs[i+1:]
}
10 changes: 5 additions & 5 deletions compactor/periodic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@ import (
)

func TestPeriodic(t *testing.T) {
retentionHours := 2
retentionHours := time.Duration(2 * time.Hour)

fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
compactable := &fakeCompactable{testutil.NewRecorderStream()}
tb := &Periodic{
clock: fc,
periodInHour: retentionHours,
rg: rg,
c: compactable,
clock: fc,
period: retentionHours,
rg: rg,
c: compactable,
}

tb.Run()
Expand Down
2 changes: 1 addition & 1 deletion embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type Config struct {
MaxWalFiles uint `json:"max-wals"`
Name string `json:"name"`
SnapCount uint64 `json:"snapshot-count"`
AutoCompactionRetention int `json:"auto-compaction-retention"`
AutoCompactionRetention string `json:"auto-compaction-retention"`
AutoCompactionMode string `json:"auto-compaction-mode"`

// TickMs is the number of milliseconds between heartbeat ticks.
Expand Down
17 changes: 16 additions & 1 deletion embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net"
"net/http"
"net/url"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -127,6 +128,20 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
}
}

var (
autoCompactionRetention time.Duration
h int
)
h, err = strconv.Atoi(cfg.AutoCompactionRetention)
if err == nil {
autoCompactionRetention = time.Duration(int64(h)) * time.Hour
} else {
autoCompactionRetention, err = time.ParseDuration(cfg.AutoCompactionRetention)
if err != nil {
return nil, fmt.Errorf("error parsing AutoCompactionRetention: %v", err)
}
}

srvcfg := etcdserver.ServerConfig{
Name: cfg.Name,
ClientURLs: cfg.ACUrls,
Expand All @@ -145,7 +160,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
PeerTLSInfo: cfg.PeerTLSInfo,
TickMs: cfg.TickMs,
ElectionTicks: cfg.ElectionTicks(),
AutoCompactionRetention: cfg.AutoCompactionRetention,
AutoCompactionRetention: autoCompactionRetention,
AutoCompactionMode: cfg.AutoCompactionMode,
QuotaBackendBytes: cfg.QuotaBackendBytes,
MaxTxnOps: cfg.MaxTxnOps,
Expand Down
4 changes: 2 additions & 2 deletions etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ func newConfig() *config {
// version
fs.BoolVar(&cfg.printVersion, "version", false, "Print the version and exit.")

fs.IntVar(&cfg.AutoCompactionRetention, "auto-compaction-retention", 0, "Auto compaction retention for mvcc key value store. 0 means disable auto compaction.")
fs.StringVar(&cfg.AutoCompactionMode, "auto-compaction-mode", "periodic", "Interpret 'auto-compaction-retention' as hours when 'periodic', as revision numbers when 'revision'.")
fs.StringVar(&cfg.AutoCompactionRetention, "auto-compaction-retention", "0", "Auto compaction retention for mvcc key value store. 0 means disable auto compaction.")
fs.StringVar(&cfg.AutoCompactionMode, "auto-compaction-mode", "periodic", "'periodic' means hours if an integer or a duration string otherwise, 'revision' means revision numbers to retain by auto compaction")

// pprof profiler via HTTP
fs.BoolVar(&cfg.EnablePprof, "enable-pprof", false, "Enable runtime profiling data via HTTP server. Address is at client URL + \"/debug/pprof/\"")
Expand Down
2 changes: 1 addition & 1 deletion etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ clustering flags:
--auto-compaction-retention '0'
auto compaction retention length. 0 means disable auto compaction.
--auto-compaction-mode 'periodic'
'periodic' means hours, 'revision' means revision numbers to retain by auto compaction
'periodic' means hours if an integer or a duration string otherwise, 'revision' means revision numbers to retain by auto compaction.
--enable-v2
Accept etcd V2 client requests.
Expand Down
2 changes: 1 addition & 1 deletion etcdserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type ServerConfig struct {
ElectionTicks int
BootstrapTimeout time.Duration

AutoCompactionRetention int
AutoCompactionRetention time.Duration
AutoCompactionMode string
QuotaBackendBytes int64
MaxTxnOps uint
Expand Down

0 comments on commit ce4d657

Please sign in to comment.