Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: truncate raft log less aggressively when replica is missing #38484

Merged
merged 2 commits into from
Jul 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,10 @@ var (
"COCKROACH_RAFT_ELECTION_TIMEOUT_TICKS", 15)

// defaultRaftLogTruncationThreshold specifies the upper bound that a single
// Range's Raft log can grow to before log truncations are triggered, even
// if that means a snapshot will be required for a straggling follower.
// Range's Raft log can grow to before log truncations are triggered while at
// least one follower is missing. If all followers are active, the quota pool
// is responsible for ensuring the raft log doesn't grow without bound by
// making sure the leader doesn't get too far ahead.
defaultRaftLogTruncationThreshold = envutil.EnvOrDefaultInt64(
"COCKROACH_RAFT_LOG_TRUNCATION_THRESHOLD", 4<<20 /* 4 MB */)

Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func registerTests(r *registry) {
registerQueue(r)
registerRebalanceLoad(r)
registerReplicaGC(r)
registerRestart(r)
registerRestore(r)
registerRoachmart(r)
registerScaleData(r)
Expand Down
95 changes: 95 additions & 0 deletions pkg/cmd/roachtest/restart.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package main

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

func runRestart(ctx context.Context, t *test, c *cluster, downDuration time.Duration) {
crdbNodes := c.Range(1, c.nodes)
workloadNode := c.Node(1)
const restartNode = 3

t.Status("installing cockroach")
c.Put(ctx, cockroach, "./cockroach", crdbNodes)
c.Start(ctx, t, crdbNodes, startArgs(`--args=--vmodule=raft_log_queue=3`))

// We don't really need tpcc, we just need a good amount of traffic and a good
// amount of data.
t.Status("importing tpcc fixture")
c.Run(ctx, workloadNode,
"./cockroach workload fixtures import tpcc --warehouses=100 --fks=false --checks=false")

// Wait a full scanner cycle (10m) for the raft log queue to truncate the
// sstable entries from the import. They're huge and are not representative of
// normal traffic.
//
// NB: less would probably do a good enough job, but let's play it safe.
//
// TODO(dan/tbg): It's awkward that this is necessary. We should be able to
// do a better job here, for example by truncating only a smaller prefix of
// the log instead of all of it (right now there's no notion of per-entry
// size when we do truncate). Also having quiescing ranges truncate to
// lastIndex will be helpful because that drives the log size down eagerly
// when things are healthy.
t.Status("waiting for addsstable truncations")
time.Sleep(11 * time.Minute)

// Stop a node.
c.Stop(ctx, c.Node(restartNode))

// Wait for between 10s and `server.time_until_store_dead` while sending
// traffic to one of the nodes that are not down. This used to cause lots of
// raft log truncation, which caused node 3 to need lots of snapshots when it
// came back up.
c.Run(ctx, workloadNode, "./cockroach workload run tpcc --warehouses=100 "+
fmt.Sprintf("--tolerate-errors --wait=false --duration=%s", downDuration))

// Bring it back up and make sure it can serve a query within a reasonable
// time limit. For now, less time than it was down for.
c.Start(ctx, t, c.Node(restartNode))

// Dialing the formerly down node may still be prevented by the circuit breaker
// for a short moment (seconds) after n3 restarts. If it happens, the COUNT(*)
// can fail with a "no inbound stream connection" error. This is not what we
// want to catch in this test, so work around it.
//
// See
time.Sleep(15 * time.Second)

start := timeutil.Now()
restartNodeDB := c.Conn(ctx, restartNode)
if _, err := restartNodeDB.Exec(`SELECT count(*) FROM tpcc.order_line`); err != nil {
t.Fatal(err)
}
if took := timeutil.Since(start); took > downDuration {
t.Fatalf(`expected to recover within %s took %s`, downDuration, took)
} else {
c.l.Printf(`connecting and query finished in %s`, took)
}
}

func registerRestart(r *registry) {
r.Add(testSpec{
Name: fmt.Sprintf("restart/down-for-2m"),
Cluster: makeClusterSpec(3),
// "cockroach workload is only in 19.1+"
MinVersion: "v19.1.0",
Run: func(ctx context.Context, t *test, c *cluster) {
runRestart(ctx, t, c, 2*time.Minute)
},
})
}
116 changes: 63 additions & 53 deletions pkg/storage/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,19 @@ func (td *truncateDecision) ShouldTruncate() bool {
(n > 0 && td.Input.LogSize >= RaftLogQueueStaleSize)
}

// ProtectIndex attempts to "protect" a position in the log by making sure it's
// not truncated away. Specifically it lowers the proposed truncation point
// (which will be the new first index after the truncation) to the given index
// if it would be truncating at a point past it. If a change is made, the
// ChosenVia is updated with the one given. This protection is not guaranteed if
// the protected index is outside of the existing [FirstIndex,LastIndex] bounds.
func (td *truncateDecision) ProtectIndex(index uint64, chosenVia string) {
if td.NewFirstIndex > index {
td.NewFirstIndex = index
td.ChosenVia = chosenVia
}
}

// computeTruncateDecision returns the oldest index that cannot be
// truncated. If there is a behind node, we want to keep old raft logs so it
// can catch up without having to send a full snapshot. However, if a node down
Expand All @@ -376,70 +389,67 @@ func computeTruncateDecision(input truncateDecisionInput) truncateDecision {
decision := truncateDecision{Input: input}
decision.QuorumIndex = getQuorumIndex(&input.RaftStatus)

decision.NewFirstIndex = decision.QuorumIndex
decision.ChosenVia = truncatableIndexChosenViaQuorumIndex
// The last index is most aggressive possible truncation that we could do.
// Everything else in this method makes the truncation less aggressive.
decision.NewFirstIndex = decision.Input.LastIndex
decision.ChosenVia = truncatableIndexChosenViaLastIndex

// Start by trying to truncate at the quorum index. Naively, you would expect
// lastIndex to never be smaller than quorumIndex, but
// RaftStatus.Progress.Match is updated on the leader when a command is
// proposed and in a single replica Raft group this also means that
// RaftStatus.Commit is updated at propose time.
decision.ProtectIndex(decision.QuorumIndex, truncatableIndexChosenViaQuorumIndex)

for _, progress := range input.RaftStatus.Progress {
if !progress.RecentActive {
// If a follower isn't recently active, don't lower the truncation
// index for it as the follower is likely not online at all and would
// block log truncation forever.
continue
}
// Snapshots are expensive, so we try our best to avoid truncating past
// where a follower is.

// Generally we truncate to the quorum commit index when the log becomes
// too large, but we make an exception for live followers which are
// being probed (i.e. the leader doesn't know how far they've caught
// up). In that case the Match index is too large, and so the quorum
// index can be, too. We don't want these followers to require a
// snapshot since they are most likely going to be caught up very soon
// (they respond with the "right index" to the first probe or don't
// respond, in which case they should end up as not recently active).
// But we also don't know their index, so we can't possible make a
// truncation decision that avoids that at this point and make the
// truncation a no-op.
// First, we never truncate off a recently active follower, no matter how
// large the log gets. Recently active shares the (currently 10s) constant
// as the quota pool, so the quota pool should put a bound on how much the
// raft log can grow due to this.
//
// The scenario in which this is most relevant is during restores, where
// we split off new ranges that rapidly receive very large log entries
// while the Raft group is still in a state of discovery (a new leader
// starts probing followers at its own last index). Additionally, these
// ranges will be split many times over, resulting in a flurry of
// snapshots with overlapping bounds that put significant stress on the
// Raft snapshot queue.
if progress.State == raft.ProgressStateProbe {
if decision.NewFirstIndex > decision.Input.FirstIndex {
decision.NewFirstIndex = decision.Input.FirstIndex
decision.ChosenVia = truncatableIndexChosenViaProbingFollower
// For live followers which are being probed (i.e. the leader doesn't know
// how far they've caught up), the Match index is too large, and so the
// quorum index can be, too. We don't want these followers to require a
// snapshot since they are most likely going to be caught up very soon (they
// respond with the "right index" to the first probe or don't respond, in
// which case they should end up as not recently active). But we also don't
// know their index, so we can't possible make a truncation decision that
// avoids that at this point and make the truncation a no-op.
//
// The scenario in which this is most relevant is during restores, where we
// split off new ranges that rapidly receive very large log entries while
// the Raft group is still in a state of discovery (a new leader starts
// probing followers at its own last index). Additionally, these ranges will
// be split many times over, resulting in a flurry of snapshots with
// overlapping bounds that put significant stress on the Raft snapshot
// queue.
if progress.RecentActive {
if progress.State == raft.ProgressStateProbe {
decision.ProtectIndex(decision.Input.FirstIndex, truncatableIndexChosenViaProbingFollower)
} else {
decision.ProtectIndex(progress.Match, truncatableIndexChosenViaFollowers)
}
} else if !input.LogTooLarge() && decision.NewFirstIndex > progress.Match {
decision.NewFirstIndex = progress.Match
decision.ChosenVia = truncatableIndexChosenViaFollowers
continue
}

// Second, if the follower has not been recently active, we don't
// truncate it off as long as the raft log is not too large.
if !input.LogTooLarge() {
decision.ProtectIndex(progress.Match, truncatableIndexChosenViaFollowers)
}

// Otherwise, we let it truncate to the quorum index.
}

// The pending snapshot index acts as a placeholder for a replica that is
// about to be added to the range (or is in Raft recovery). We don't want to
// truncate the log in a way that will require that new replica to be caught
// up via yet another Raft snapshot.
if input.PendingPreemptiveSnapshotIndex > 0 && decision.NewFirstIndex > input.PendingPreemptiveSnapshotIndex {
decision.NewFirstIndex = input.PendingPreemptiveSnapshotIndex
decision.ChosenVia = truncatableIndexChosenViaPendingSnap
}

// Advance to the first index, but never truncate past the quorum commit
// index.
if decision.NewFirstIndex < input.FirstIndex && input.FirstIndex <= decision.QuorumIndex {
decision.NewFirstIndex = input.FirstIndex
decision.ChosenVia = truncatableIndexChosenViaFirstIndex
}
// Never truncate past the last index. Naively, you would expect lastIndex to
// never be smaller than quorumIndex, but RaftStatus.Progress.Match is
// updated on the leader when a command is proposed and in a single replica
// Raft group this also means that RaftStatus.Commit is updated at propose
// time.
if decision.NewFirstIndex > input.LastIndex {
decision.NewFirstIndex = input.LastIndex
decision.ChosenVia = truncatableIndexChosenViaLastIndex
if input.PendingPreemptiveSnapshotIndex > 0 {
decision.ProtectIndex(input.PendingPreemptiveSnapshotIndex, truncatableIndexChosenViaPendingSnap)
}

// If new first index dropped below first index, make them equal (resulting
Expand Down Expand Up @@ -493,7 +503,7 @@ func (rlq *raftLogQueue) shouldQueue(
}

// shouldQueueImpl returns whether the given truncate decision should lead to
// a log truncation. This is either the case if the decision says so or of
// a log truncation. This is either the case if the decision says so or if
// we want to recompute the log size (in which case `recomputeRaftLogSize` and
// `shouldQ` are both true and a reasonable priority is returned).
func (rlq *raftLogQueue) shouldQueueImpl(
Expand Down
22 changes: 11 additions & 11 deletions pkg/storage/raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,13 @@ func TestComputeTruncateDecision(t *testing.T) {
{
// Nothing to truncate.
[]uint64{1, 2}, 100, 1, 1, 0,
"should truncate: false [truncate 0 entries to first index 1 (chosen via: quorum)]"},
"should truncate: false [truncate 0 entries to first index 1 (chosen via: last index)]"},
{
// Nothing to truncate on this replica, though a quorum elsewhere has more progress.
// NB this couldn't happen if we're truly the Raft leader, unless we appended to our
// own log asynchronously.
[]uint64{1, 5, 5}, 100, 1, 1, 0,
"should truncate: false [truncate 0 entries to first index 1 (chosen via: followers)]",
"should truncate: false [truncate 0 entries to first index 1 (chosen via: last index)]",
},
{
// We're not truncating anything, but one follower is already cut off. There's no pending
Expand All @@ -145,7 +145,7 @@ func TestComputeTruncateDecision(t *testing.T) {
{
// The happy case.
[]uint64{5, 5, 5}, 100, 2, 5, 0,
"should truncate: false [truncate 3 entries to first index 5 (chosen via: quorum)]",
"should truncate: false [truncate 3 entries to first index 5 (chosen via: last index)]",
},
{
// No truncation, but the outstanding snapshot is made obsolete by the truncation. However
Expand All @@ -170,10 +170,10 @@ func TestComputeTruncateDecision(t *testing.T) {
[]uint64{1, 2, 3, 4}, 100, 2, 2, 0,
"should truncate: false [truncate 0 entries to first index 2 (chosen via: first index)]",
},
// If over targetSize, should truncate to quorum committed index. Minority will need snapshots.
// Don't truncate off active followers, even if over targetSize.
{
[]uint64{1, 3, 3, 4}, 2000, 1, 3, 0,
"should truncate: false [truncate 2 entries to first index 3 (chosen via: quorum); log too large (2.0 KiB > 1000 B); implies 1 Raft snapshot]",
"should truncate: false [truncate 0 entries to first index 1 (chosen via: followers); log too large (2.0 KiB > 1000 B)]",
},
// Don't truncate away pending snapshot, even when log too large.
{
Expand All @@ -182,11 +182,11 @@ func TestComputeTruncateDecision(t *testing.T) {
},
{
[]uint64{1, 3, 3, 4}, 2000, 2, 3, 0,
"should truncate: false [truncate 1 entries to first index 3 (chosen via: quorum); log too large (2.0 KiB > 1000 B)]",
"should truncate: false [truncate 0 entries to first index 2 (chosen via: first index); log too large (2.0 KiB > 1000 B)]",
},
{
[]uint64{1, 3, 3, 4}, 2000, 3, 3, 0,
"should truncate: false [truncate 0 entries to first index 3 (chosen via: quorum); log too large (2.0 KiB > 1000 B)]",
"should truncate: false [truncate 0 entries to first index 3 (chosen via: first index); log too large (2.0 KiB > 1000 B)]",
},
// The pending snapshot index affects the quorum commit index.
{
Expand All @@ -209,7 +209,7 @@ func TestComputeTruncateDecision(t *testing.T) {
"should truncate: false [truncate 0 entries to first index 2 (chosen via: first index)]",
}}
for i, c := range testCases {
t.Run(fmt.Sprintf("%+v", c), func(t *testing.T) {
t.Run("", func(t *testing.T) {
status := raft.Status{
Progress: make(map[uint64]raft.Progress),
}
Expand All @@ -227,7 +227,7 @@ func TestComputeTruncateDecision(t *testing.T) {
}
decision := computeTruncateDecision(input)
if act, exp := decision.String(), c.exp; act != exp {
t.Errorf("%d: got:\n%s\nwanted:\n%s", i, act, exp)
t.Errorf("%d:\ngot:\n%s\nwanted:\n%s", i, act, exp)
}

// Verify the triggers that queue a range for recomputation. In
Expand Down Expand Up @@ -266,11 +266,11 @@ func TestComputeTruncateDecisionProgressStatusProbe(t *testing.T) {
exp := map[bool]map[bool]string{ // (tooLarge, active)
false: {
true: "should truncate: false [truncate 0 entries to first index 10 (chosen via: probing follower)]",
false: "should truncate: true [truncate 190 entries to first index 200 (chosen via: followers)]",
false: "should truncate: false [truncate 0 entries to first index 10 (chosen via: first index)]",
},
true: {
true: "should truncate: false [truncate 0 entries to first index 10 (chosen via: probing follower); log too large (2.0 KiB > 1.0 KiB)]",
false: "should truncate: true [truncate 290 entries to first index 300 (chosen via: quorum); log too large (2.0 KiB > 1.0 KiB); implies 1 Raft snapshot]",
false: "should truncate: true [truncate 190 entries to first index 200 (chosen via: followers); log too large (2.0 KiB > 1.0 KiB)]",
},
}

Expand Down