Skip to content

Commit

Permalink
add debug logging on 3.4.18 for understanding etcd-io#14338
Browse files Browse the repository at this point in the history
  • Loading branch information
chaochn47 committed Aug 15, 2022
1 parent 72d3e38 commit 477bdf0
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 34 deletions.
2 changes: 1 addition & 1 deletion etcdserver/corrupt.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (s *EtcdServer) checkHashKV() error {
}
s.goAttach(func() {
s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a})
})
}, "checkHashKV")
}

if h2 != h && rev2 == rev && crev == crev2 {
Expand Down
104 changes: 85 additions & 19 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"go.etcd.io/etcd/etcdserver/api/v2store"
"go.etcd.io/etcd/etcdserver/api/v3alarm"
"go.etcd.io/etcd/etcdserver/api/v3compactor"
"go.etcd.io/etcd/etcdserver/etcdserverpb"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/lease"
"go.etcd.io/etcd/lease/leasehttp"
Expand Down Expand Up @@ -738,13 +739,13 @@ func (s *EtcdServer) adjustTicks() {
// should be implemented in goroutines.
func (s *EtcdServer) Start() {
s.start()
s.goAttach(func() { s.adjustTicks() })
s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
s.goAttach(s.purgeFile)
s.goAttach(func() { monitorFileDescriptor(s.getLogger(), s.stopping) })
s.goAttach(s.monitorVersions)
s.goAttach(s.linearizableReadLoop)
s.goAttach(s.monitorKVHash)
s.goAttach(func() { s.adjustTicks() }, "adjustTicks")
s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) }, "publish")
s.goAttach(s.purgeFile, "purgeFile")
s.goAttach(func() { monitorFileDescriptor(s.getLogger(), s.stopping) }, "monitorFileDescriptor")
s.goAttach(s.monitorVersions, "monitorVersions")
s.goAttach(s.linearizableReadLoop, "linearizableReadLoop")
s.goAttach(s.monitorKVHash, "monitorKVHash")
}

// start prepares and starts server in a new goroutine. It is no longer safe to
Expand Down Expand Up @@ -941,7 +942,7 @@ func (s *EtcdServer) run() {
}

// asynchronously accept apply packets, dispatch progress in-order
sched := schedule.NewFIFOScheduler()
sched := schedule.NewFIFOScheduler(lg)

var (
smu sync.RWMutex
Expand Down Expand Up @@ -1013,20 +1014,26 @@ func (s *EtcdServer) run() {

defer func() {
s.wgMu.Lock() // block concurrent waitgroup adds in goAttach while stopping
lg.Info("close s.stopping channel")
close(s.stopping)
s.wgMu.Unlock()
s.cancel()

lg.Info("stopping fifo scheduler")
sched.Stop()

lg.Info("wait for goroutines before closing raft so wal stays open")
// wait for gouroutines before closing raft so wal stays open
s.wg.Wait()
lg.Info("all goroutines already exited after closing s.stopping")

s.SyncTicker.Stop()

// must stop raft after scheduler-- etcdserver can leak rafthttp pipelines
// by adding a peer after raft stops the transport
lg.Info("start stopping raft")
s.r.stop()
lg.Info("raft is stopped")

// kv, lessor and backend can be nil if running without v3 enabled
// or running unit tests.
Expand Down Expand Up @@ -1056,7 +1063,7 @@ func (s *EtcdServer) run() {
for {
select {
case ap := <-s.r.apply():
f := func(context.Context) { s.applyAll(&ep, &ap) }
f := schedule.NewJob("server_applyAll", func(context.Context) { s.applyAll(&ep, &ap) })
sched.Schedule(f)
case leases := <-expiredLeaseC:
s.goAttach(func() {
Expand Down Expand Up @@ -1087,9 +1094,9 @@ func (s *EtcdServer) run() {
}

<-c
})
}, "revoking lease")
}
})
}, "handling expiredLeases")
case err := <-s.errorc:
if lg != nil {
lg.Warn("server error", zap.Error(err))
Expand Down Expand Up @@ -1119,7 +1126,55 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
// wait for the raft routine to finish the disk writes before triggering a
// snapshot. or applied index might be greater than the last index in raft
// storage, since the raft routine might be slower than apply routine.

s.lg.Info("blocking at apply.notifyc")
if len(apply.entries) > 0 {
for i, entry := range apply.entries {
var r raftpb.ConfChange
if err := r.Unmarshal(entry.Data); err == nil {
s.lg.Info("unmarshalled apply entry", zap.Stringer("method", r.Type),
zap.Uint64("id", r.NodeID),
zap.Int("entry-num", i),
)
continue
}
var rr etcdserverpb.InternalRaftRequest
if err := rr.Unmarshal(entry.Data); err == nil {
s.lg.Info("unmarshalled apply entry", zap.String("method", "InternalRaftRequest"),
zap.Uint64("term", entry.Term),
zap.Uint64("index", entry.Index),
zap.String("data", rr.String()),
zap.Int("entry-num", i),
)
continue
}

var rrr etcdserverpb.Request
if err := rrr.Unmarshal(entry.Data); err == nil {
zapFields := []zap.Field{
zap.Uint64("term", entry.Term),
zap.Uint64("index", entry.Index),
zap.Int("entry-num", i),
}
switch rrr.Method {
case "":
s.lg.Info("noop")
case "SYNC":
zapFields = append(zapFields, zap.String("method", "sync"), zap.Time("time", time.Unix(0, rrr.Time)))
s.lg.Info("unmarshalled apply entry", zapFields...)
case "QGET", "DELETE":
zapFields = append(zapFields, zap.String("method", rrr.Method), zap.String("path", excerpt(rrr.Path, 64, 64)))
s.lg.Info("unmarshalled apply entry", zapFields...)
default:
zapFields = append(zapFields, zap.String("method", rrr.Method), zap.String("path", excerpt(rrr.Path, 64, 64)), zap.String("val", excerpt(rrr.Val, 128, 0)))
s.lg.Info("unmarshalled apply entry", zapFields...)
}
}
s.lg.Info("failed to unmarshal apply entry")
}
}
<-apply.notifyc
s.lg.Info("apply.notifyc returned")

s.triggerSnapshot(ep)
select {
Expand All @@ -1131,6 +1186,13 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
}
}

func excerpt(str string, pre, suf int) string {
if pre+suf > len(str) {
return fmt.Sprintf("%q", str)
}
return fmt.Sprintf("%q...%q", str[:pre], str[len(str)-suf:])
}

func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
if raft.IsEmptySnap(apply.snapshot) {
return
Expand Down Expand Up @@ -1996,7 +2058,7 @@ func (s *EtcdServer) sync(timeout time.Duration) {
s.goAttach(func() {
s.r.Propose(ctx, data)
cancel()
})
}, "sync")
}

// publish registers server information into the cluster. The information
Expand Down Expand Up @@ -2121,7 +2183,7 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
}
return
}
})
}, "sendMergedSnap")
}

// apply takes entries received from Raft (after it has been committed) and
Expand Down Expand Up @@ -2252,7 +2314,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
}
s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a})
s.w.Trigger(id, ar)
})
}, "activating no space alarm")
}

// applyConfChange applies a ConfChange to the server. It is only
Expand Down Expand Up @@ -2444,7 +2506,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
} else {
plog.Infof("compacted raft log at %d", compacti)
}
})
}, "snapshot")
}

// CutPeer drops messages to the specified peer.
Expand Down Expand Up @@ -2508,14 +2570,14 @@ func (s *EtcdServer) monitorVersions() {
if v != nil {
verStr = v.String()
}
s.goAttach(func() { s.updateClusterVersion(verStr) })
s.goAttach(func() { s.updateClusterVersion(verStr) }, "update cluster version Line 2511")
continue
}

// update cluster version only if the decided version is greater than
// the current cluster version
if v != nil && s.cluster.Version().LessThan(*v) {
s.goAttach(func() { s.updateClusterVersion(v.String()) })
s.goAttach(func() { s.updateClusterVersion(v.String()) }, "update cluster version Line 2518")
}
}
}
Expand Down Expand Up @@ -2638,7 +2700,7 @@ func (s *EtcdServer) restoreAlarms() error {

// goAttach creates a goroutine on a given function and tracks it using
// the etcdserver waitgroup.
func (s *EtcdServer) goAttach(f func()) {
func (s *EtcdServer) goAttach(f func(), name string) {
s.wgMu.RLock() // this blocks with ongoing close(s.stopping)
defer s.wgMu.RUnlock()
select {
Expand All @@ -2655,8 +2717,12 @@ func (s *EtcdServer) goAttach(f func()) {
// now safe to add since waitgroup wait has not started yet
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer func() {
s.lg.Info("exit", zap.String("func name", name))
s.wg.Done()
}()
f()
s.lg.Info("running", zap.String("func name", name))
}()
}

Expand Down
12 changes: 6 additions & 6 deletions mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentI
compactMainRev: -1,

bytesBuf8: make([]byte, 8),
fifoSched: schedule.NewFIFOScheduler(),
fifoSched: schedule.NewFIFOScheduler(lg),

stopc: make(chan struct{}),

Expand Down Expand Up @@ -171,7 +171,7 @@ func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
// snapshot call, compaction and apply snapshot requests are serialized by
// raft, and do not happen at the same time.
s.mu.Lock()
f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
f := schedule.NewJob("kvstore_compactBarrier", func(ctx context.Context) { s.compactBarrier(ctx, ch) })
s.fifoSched.Schedule(f)
s.mu.Unlock()
}
Expand Down Expand Up @@ -247,7 +247,7 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) {
s.revMu.Lock()
if rev <= s.compactMainRev {
ch := make(chan struct{})
f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
f := schedule.NewJob("kvstore_updateCompactRev_compactBarrier", func(ctx context.Context) { s.compactBarrier(ctx, ch) })
s.fifoSched.Schedule(f)
s.revMu.Unlock()
return ch, ErrCompacted
Expand Down Expand Up @@ -276,7 +276,7 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) {

func (s *store) compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
ch := make(chan struct{})
var j = func(ctx context.Context) {
j := schedule.NewJob("kvstore_compact", func(ctx context.Context) {
if ctx.Err() != nil {
s.compactBarrier(ctx, ch)
return
Expand All @@ -289,7 +289,7 @@ func (s *store) compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err
return
}
close(ch)
}
})

s.fifoSched.Schedule(j)
trace.Step("schedule compaction")
Expand Down Expand Up @@ -353,7 +353,7 @@ func (s *store) Restore(b backend.Backend) error {
s.kvindex = newTreeIndex(s.lg)
s.currentRev = 1
s.compactMainRev = -1
s.fifoSched = schedule.NewFIFOScheduler()
s.fifoSched = schedule.NewFIFOScheduler(s.lg)
s.stopc = make(chan struct{})

return s.restore()
Expand Down
56 changes: 51 additions & 5 deletions pkg/schedule/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,34 @@ package schedule
import (
"context"
"sync"

"go.uber.org/zap"
)

type Job func(context.Context)
type Job interface {
Name() string
Do(context.Context)
}

type job struct {
name string
do func(context.Context)
}

func (j job) Name() string {
return j.name
}

func (j job) Do(ctx context.Context) {
j.do(ctx)
}

func NewJob(name string, do func(ctx context.Context)) Job {
return job{
name: name,
do: do,
}
}

// Scheduler can schedule jobs.
type Scheduler interface {
Expand Down Expand Up @@ -56,14 +81,16 @@ type fifo struct {

finishCond *sync.Cond
donec chan struct{}
lg *zap.Logger
}

// NewFIFOScheduler returns a Scheduler that schedules jobs in FIFO
// order sequentially
func NewFIFOScheduler() Scheduler {
func NewFIFOScheduler(lg *zap.Logger) Scheduler {
f := &fifo{
resume: make(chan struct{}, 1),
donec: make(chan struct{}, 1),
lg: lg,
}
f.finishCond = sync.NewCond(&f.mu)
f.ctx, f.cancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -120,8 +147,11 @@ func (f *fifo) Stop() {
f.mu.Lock()
f.cancel()
f.cancel = nil
f.lg.Info("fifo cancelled")
f.mu.Unlock()
f.lg.Info("waiting for fifo scheduler finished all jobs")
<-f.donec
f.lg.Info("fifo scheduler finished all jobs")
}

func (f *fifo) run() {
Expand Down Expand Up @@ -149,17 +179,33 @@ func (f *fifo) run() {
f.mu.Unlock()
// clean up pending jobs
for _, todo := range pendings {
todo(f.ctx)
f.lg.Info("executing job; update finish stats", zap.String("job", todo.Name()))
f.executeJob(todo, true)
f.lg.Info("executed job; update finish stats", zap.String("job", todo.Name()))
}
return
}
} else {
todo(f.ctx)
f.lg.Info("executing job; does not update finish stats", zap.String("job", todo.Name()))
f.executeJob(todo, false)
f.lg.Info("executed job; does not update finish stats", zap.String("job", todo.Name()))
}
}
}

func (f *fifo) executeJob(todo Job, updatedFinishedStats bool) {
defer func() {
if !updatedFinishedStats {
f.finishCond.L.Lock()
f.finished++
f.pendings = f.pendings[1:]
f.finishCond.Broadcast()
f.finishCond.L.Unlock()
}
}
if err := recover(); err != nil {
f.lg.Panic("execute job failed", zap.String("job", todo.Name()), zap.Any("panic", err))
}
}()

todo.Do(f.ctx)
}
Loading

0 comments on commit 477bdf0

Please sign in to comment.