From 3be380b8abb7b1e0a6d2683873968c5d1c3b4a3c Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Fri, 26 Apr 2019 07:32:40 +0530 Subject: [PATCH] Truncate Raft logs even when no txn commits are happening If there are no txn commits but many txns which allocate new read timestamps, the Raft log can still grow a lot because of MaxAssigned updates. This PR would truncate those logs as well to create new snapshot. --- worker/draft.go | 36 +++++++++++++++++++++++++++--------- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/worker/draft.go b/worker/draft.go index aaf38925075..3b6ed969397 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -546,9 +546,13 @@ func (n *node) retrieveSnapshot(snap pb.Snapshot) error { func (n *node) proposeSnapshot(discardN int) error { snap, err := n.calculateSnapshot(discardN) - if err != nil || snap == nil { + if err != nil { + glog.Warningf("Got error while calculating snapshot: %v", err) return err } + if snap == nil { + return nil + } proposal := &pb.Proposal{ Snapshot: snap, } @@ -959,7 +963,7 @@ func (n *node) abortOldTransactions() { // At i7, min pending start ts = S3, therefore snapshotIdx = i5 - 1 = i4. // At i7, max commit ts = C1, therefore readTs = C1. func (n *node) calculateSnapshot(discardN int) (*pb.Snapshot, error) { - _, span := otrace.StartSpan(n.ctx, "Propose.Snapshot") + _, span := otrace.StartSpan(n.ctx, "Calculate.Snapshot") defer span.End() if atomic.LoadInt32(&n.streaming) > 0 { @@ -974,6 +978,18 @@ func (n *node) calculateSnapshot(discardN int) (*pb.Snapshot, error) { } span.Annotatef(nil, "First index: %d", first) + rsnap, err := n.Store.Snapshot() + if err != nil { + return nil, err + } + var snap pb.Snapshot + if len(rsnap.Data) > 0 { + if err := snap.Unmarshal(rsnap.Data); err != nil { + return nil, err + } + } + span.Annotatef(nil, "Last snapshot: %+v", snap) + last := n.Applied.DoneUntil() if int(last-first) < discardN { span.Annotate(nil, "Skipping due to insufficient entries") @@ -999,7 +1015,8 @@ func (n *node) calculateSnapshot(discardN int) (*pb.Snapshot, error) { // snapshotIdx. In any case, we continue picking up txn updates, to generate // a maxCommitTs, which would become the readTs for the snapshot. minPendingStart := posting.Oracle().MinPendingStartTs() - var maxCommitTs, snapshotIdx, maxCommitIdx uint64 + maxCommitTs := snap.ReadTs + var snapshotIdx uint64 for _, entry := range entries { if entry.Type != raftpb.EntryNormal { continue @@ -1019,7 +1036,6 @@ func (n *node) calculateSnapshot(discardN int) (*pb.Snapshot, error) { for _, txn := range proposal.Delta.GetTxns() { maxCommitTs = x.Max(maxCommitTs, txn.CommitTs) } - maxCommitIdx = entry.Index } } if maxCommitTs == 0 { @@ -1029,8 +1045,10 @@ func (n *node) calculateSnapshot(discardN int) (*pb.Snapshot, error) { if snapshotIdx <= 0 { // It is possible that there are no pending transactions. In that case, // snapshotIdx would be zero. - span.Annotatef(nil, "Using maxCommitIdx as snapshotIdx: %d", maxCommitIdx) - snapshotIdx = maxCommitIdx + if len(entries) > 0 { + snapshotIdx = entries[len(entries)-1].Index + } + span.Annotatef(nil, "snapshotIdx is zero. Using last entry's index: %d", snapshotIdx) } numDiscarding := snapshotIdx - first + 1 @@ -1045,13 +1063,13 @@ func (n *node) calculateSnapshot(discardN int) (*pb.Snapshot, error) { return nil, nil } - snap := &pb.Snapshot{ + result := &pb.Snapshot{ Context: n.RaftContext, Index: snapshotIdx, ReadTs: maxCommitTs, } - span.Annotatef(nil, "Got snapshot: %+v", snap) - return snap, nil + span.Annotatef(nil, "Got snapshot: %+v", result) + return result, nil } func (n *node) joinPeers() error {