Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
95963: kvserver: use narrow checkpoints in consistency checker r=tbg,erikgrinaker a=pavelkalinnikov

This commit modifies the consistency checker to save partial checkpoints
instead of full ones. The partial checkpoints contain the data for the
inconsistent range and its immediate left and right neighbour in the Store.
Only the inconsistent range's Replica is locked at the time of snapshotting the
store before creating the checkpoint, other ranges can be out of sync. They are
checkpointed too, to give more context when e.g. debugging non-trivial
split/merge scenarios.

Release note (ops change): In the rare event of Range inconsistency, the
consistency checker saves a storage checkpoint on each storage the Range
belongs to. Previously, this was a full checkpoint, so its cost could quickly
escalate on the nodes that went on running. This change makes the checkpoints
partial, i.e. they now only contain the relevant range and its neighbours. This
eliminates the time pressure on the cluster operator to remove the checkpoints.

Resolves #90543
Epic: none

96243: schemachanger: Implement `ADD CONSTRAINT NOT VALID` r=Xiang-Gu a=Xiang-Gu

This PR implements the following three statements in the declarative schema changer:

 - `ALTER TABLE ... ADD CHECK ... NOT VALID`
 - `ALTER TABLE ... ADD UNIQUE WITHOUT INDEX ... NOT VALID`
 - `ALTER TABLE ... ADD FOREIGN KEY ... NOT VALID`

The idea is to introduce a new element for each statement. That element is like simple
dependent and will transition between ABSENT and PUBLIC directly.

Epic: None

96598: sql: use `descs.Txn` for `schemachanger.txn` r=ajwerner a=ZhouXing19

This commit includes just mechanical changes to use the new internal executor interface introduced by #93218 for `schemachanger.txn()`.

informs #91004
Release Note: None

Co-authored-by: Pavel Kalinnikov <pavel@cockroachlabs.com>
Co-authored-by: Xiang Gu <xiang@cockroachlabs.com>
Co-authored-by: Jane Xing <zhouxing@uchicago.edu>
  • Loading branch information
4 people committed Feb 6, 2023
4 parents 4ab6b68 + c87ca2d + b9a0471 + 8e149bf commit b031933
Show file tree
Hide file tree
Showing 72 changed files with 1,662 additions and 472 deletions.
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1917,8 +1917,8 @@ func revalidateIndexes(
// We don't actually need the 'historical' read the way the schema change does
// since our table is offline.
runner := descs.NewHistoricalInternalExecTxnRunner(hlc.Timestamp{}, func(ctx context.Context, fn descs.InternalExecFn) error {
return execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
return fn(ctx, txn, descs.FromTxn(txn))
return execCfg.InternalDB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error {
return fn(ctx, txn)
})
})

Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/schemachangerccl/backup_base_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 39 additions & 38 deletions pkg/cmd/roachtest/tests/inconsistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,9 @@ func registerInconsistency(r registry.Registry) {
}

func runInconsistency(ctx context.Context, t test.Test, c cluster.Cluster) {
startOps := option.DefaultStartOpts()

nodes := c.Range(1, 3)
c.Put(ctx, t.Cockroach(), "./cockroach", nodes)
c.Start(ctx, t.L(), startOps, install.MakeClusterSettings(), nodes)
c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(), nodes)

{
db := c.Conn(ctx, t.L(), 1)
Expand All @@ -45,19 +43,18 @@ func runInconsistency(ctx context.Context, t test.Test, c cluster.Cluster) {
// to expect it.
_, err := db.ExecContext(ctx, `SET CLUSTER SETTING server.consistency_check.interval = '0'`)
require.NoError(t, err)
err = WaitFor3XReplication(ctx, t, db)
require.NoError(t, err)
_, db = db.Close(), nil
require.NoError(t, WaitFor3XReplication(ctx, t, db))
require.NoError(t, db.Close())
}

// Stop the cluster "gracefully" by letting each node initiate a "hard
// shutdown" This will prevent any potential problems in which data isn't
// shutdown". This will prevent any potential problems in which data isn't
// synced. It seems (remotely) possible (see #64602) that without this, we
// sometimes let the inconsistency win by ending up replicating it to all
// nodes. This has not been conclusively proven, though.
//
// First SIGINT initiates graceful shutdown, second one initiates a
// "hard" (i.e. don't shed leases, etc) shutdown.
// First SIGINT initiates graceful shutdown, second one initiates a "hard"
// (i.e. don't shed leases, etc) shutdown.
stopOpts := option.DefaultStopOpts()
stopOpts.RoachprodOpts.Wait = false
stopOpts.RoachprodOpts.Sig = 2
Expand Down Expand Up @@ -95,7 +92,8 @@ func runInconsistency(ctx context.Context, t test.Test, c cluster.Cluster) {
// If the consistency check "fails to fail", the verbose logging will help
// determine why.
startOpts := option.DefaultStartOpts()
startOpts.RoachprodOpts.ExtraArgs = append(startOpts.RoachprodOpts.ExtraArgs, "--vmodule=consistency_queue=5,replica_consistency=5,queue=5")
startOpts.RoachprodOpts.ExtraArgs = append(startOpts.RoachprodOpts.ExtraArgs,
"--vmodule=consistency_queue=5,replica_consistency=5,queue=5")
c.Start(ctx, t.L(), startOpts, install.MakeClusterSettings(), nodes)
m.Go(func(ctx context.Context) error {
select {
Expand All @@ -107,54 +105,57 @@ func runInconsistency(ctx context.Context, t test.Test, c cluster.Cluster) {

time.Sleep(10 * time.Second) // wait for n1-n3 to all be known as live to each other

// set an aggressive consistency check interval, but only now (that we're
// Set an aggressive consistency check interval, but only now (that we're
// reasonably sure all nodes are live, etc). This makes sure that the consistency
// check runs against all three nodes. If it targeted only two nodes, a random
// one would fatal - not what we want.
{
db := c.Conn(ctx, t.L(), 2)
_, err := db.ExecContext(ctx, `SET CLUSTER SETTING server.consistency_check.interval = '10ms'`)
if err != nil {
t.Fatal(err)
}
_ = db.Close()
}

if err := m.WaitE(); err == nil {
t.Fatal("expected a node to crash")
require.NoError(t, err)
require.NoError(t, db.Close())
}

require.Error(t, m.WaitE(), "expected a node to crash")
time.Sleep(20 * time.Second) // wait for liveness to time out for dead nodes

db := c.Conn(ctx, t.L(), 2)
rows, err := db.Query(`SELECT node_id FROM crdb_internal.gossip_nodes WHERE is_live = false;`)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
var ids []int
for rows.Next() {
var id int
if err := rows.Scan(&id); err != nil {
t.Fatal(err)
}
require.NoError(t, rows.Scan(&id))
ids = append(ids, id)
}
if err := rows.Err(); err != nil {
t.Fatal(err)
}
if len(ids) != 1 {
t.Fatalf("expected one dead NodeID, got %v", ids)
}
const expr = "This.node.is.terminating.because.a.replica.inconsistency.was.detected"
c.Run(ctx, c.Node(1), "grep "+
expr+" "+"{log-dir}/cockroach.log")
require.NoError(t, rows.Err())
require.Len(t, ids, 1, "expected one dead NodeID")

if err := c.StartE(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(), c.Node(1)); err == nil {
// NB: we can't easily verify the error because there's a lot of output
// which isn't fully included in the error returned from StartE.
t.Fatalf("node restart should have failed")
const expr = "This.node.is.terminating.because.a.replica.inconsistency.was.detected"
c.Run(ctx, c.Node(1), "grep "+expr+" {log-dir}/cockroach.log")

// Make sure that every node creates a checkpoint.
for n := 1; n <= 3; n++ {
// Notes it in the log.
const expr = "creating.checkpoint.*with.spans"
c.Run(ctx, c.Node(n), "grep "+expr+" {log-dir}/cockroach.log")
// Creates at least one checkpoint directory (in rare cases it can be
// multiple if multiple consistency checks fail in close succession), and
// puts spans information into the checkpoint.txt file in it.
c.Run(ctx, c.Node(n), "find {store-dir}/auxiliary/checkpoints -name checkpoint.txt")
// The checkpoint can be inspected by the tooling.
c.Run(ctx, c.Node(n), "./cockroach debug range-descriptors "+
"$(find {store-dir}/auxiliary/checkpoints/* -type d -depth 0 | head -n1)")
c.Run(ctx, c.Node(n), "./cockroach debug range-data --limit 10 "+
"$(find {store-dir}/auxiliary/checkpoints/* -type d -depth 0 | head -n1) 1")
}

// NB: we can't easily verify the error because there's a lot of output which
// isn't fully included in the error returned from StartE.
require.Error(t, c.StartE(
ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(), c.Node(1),
), "node restart should have failed")

// roachtest checks that no nodes are down when the test finishes, but in this
// case we have a down node that we can't restart. Remove the data dir, which
// tells roachtest to ignore this node.
Expand Down
4 changes: 2 additions & 2 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ type Registry struct {
adoptionCh chan adoptionNotice
sqlInstance sqlliveness.Instance

// sessionBoundInternalExecutorFactory provides a way for jobs to create
// internal executors. This is rarely needed, and usually job resumers should
// internalDB provides a way for jobs to create internal executors.
// This is rarely needed, and usually job resumers should
// use the internal executor from the JobExecCtx. The intended user of this
// interface is the schema change job resumer, which needs to set the
// tableCollectionModifier on the internal executor to different values in
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/consistency_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,9 @@ func TestCheckConsistencyInconsistent(t *testing.T) {
stickyEngineRegistry := server.NewStickyInMemEnginesRegistry()
defer stickyEngineRegistry.CloseAllStickyInMemEngines()

// The cluster has 3 node, with 1 store per node. The test writes a few KVs to
// a range, which gets replicated to all 3 stores. Then it manually replaces
// an entry in s2. The consistency check must detect this and terminate n2/s2.
// The cluster has 3 nodes, one store per node. The test writes a few KVs to a
// range, which gets replicated to all 3 stores. Then it manually replaces an
// entry in s2. The consistency check must detect this and terminate n2/s2.
const numStores = 3
testKnobs := kvserver.StoreTestingKnobs{DisableConsistencyQueue: true}
var tc *testcluster.TestCluster
Expand Down
31 changes: 14 additions & 17 deletions pkg/kv/kvserver/replica_consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,8 +665,10 @@ func (r *Replica) computeChecksumPostApply(
}
// NB: the names here will match on all nodes, which is nice for debugging.
tag := fmt.Sprintf("r%d_at_%d", r.RangeID, as.RaftAppliedIndex)
if dir, err := r.store.checkpoint(ctx, tag); err != nil {
log.Warningf(ctx, "unable to create checkpoint %s: %+v", dir, err)
spans := r.store.checkpointSpans(&desc)
log.Warningf(ctx, "creating checkpoint %s with spans %+v", tag, spans)
if dir, err := r.store.checkpoint(tag, spans); err != nil {
log.Warningf(ctx, "unable to create checkpoint %s: %+v", tag, err)
} else {
log.Warningf(ctx, "created checkpoint %s", dir)
}
Expand Down Expand Up @@ -721,7 +723,6 @@ func (r *Replica) computeChecksumPostApply(
}
r.computeChecksumDone(c, result)
}

var shouldFatal bool
for _, rDesc := range cc.Terminate {
if rDesc.StoreID == r.store.StoreID() && rDesc.ReplicaID == r.replicaID {
Expand Down Expand Up @@ -757,23 +758,19 @@ A file preventing this node from restarting was placed at:
Checkpoints are created on each node/store hosting this range, to help
investigate the cause. Only nodes that are more likely to have incorrect data
are terminated, and usually a majority of replicas continue running.
The storage checkpoint directory MUST be deleted or moved away timely, on the
nodes that continue operating. Over time the storage engine gets updated and
compacted, which leads to checkpoints becoming a full copy of a past state. Even
with no writes to the database, on these stores disk consumption may double in a
matter of hours/days, depending on compaction schedule.
are terminated, and usually a majority of replicas continue running. Checkpoints
are partial, i.e. contain only the data from to the inconsistent range, and
possibly its neighbouring ranges.
Checkpoints are very helpful in debugging this issue, so before deleting them,
please consider alternative actions:
The storage checkpoint directories can/should be deleted when no longer needed.
They are very helpful in debugging this issue, so before deleting them, please
consider alternative actions:
- If the store has enough capacity, hold off deleting the checkpoint until CRDB
staff has diagnosed the issue.
- Consider backing up the checkpoints before removing them, e.g. by snapshotting
the disk.
- If the store has enough capacity, hold off the deletion until CRDB staff has
diagnosed the issue.
- Back up the checkpoints for later investigation.
- If the stores are nearly full, but the cluster has enough capacity, consider
gradually decomissioning the affected nodes, to retain the checkpoints.
gradually decommissioning the affected nodes, to retain the checkpoints.
To inspect the checkpoints, one can use the cockroach debug range-data tool, and
command line tools like diff. For example:
Expand Down
86 changes: 86 additions & 0 deletions pkg/kv/kvserver/replica_consistency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,92 @@ func TestReplicaChecksumVersion(t *testing.T) {
})
}

func TestStoreCheckpointSpans(t *testing.T) {
defer leaktest.AfterTest(t)()

s := Store{}
s.mu.replicasByKey = newStoreReplicaBTree()
s.mu.replicaPlaceholders = map[roachpb.RangeID]*ReplicaPlaceholder{}

makeDesc := func(rangeID roachpb.RangeID, start, end string) roachpb.RangeDescriptor {
desc := roachpb.RangeDescriptor{RangeID: rangeID}
if start != "" {
desc.StartKey = roachpb.RKey(start)
desc.EndKey = roachpb.RKey(end)
}
return desc
}
var descs []roachpb.RangeDescriptor
addReplica := func(rangeID roachpb.RangeID, start, end string) {
desc := makeDesc(rangeID, start, end)
r := &Replica{RangeID: rangeID, startKey: desc.StartKey}
r.mu.state.Desc = &desc
r.isInitialized.Set(desc.IsInitialized())
require.NoError(t, s.addToReplicasByRangeIDLocked(r))
if r.IsInitialized() {
require.NoError(t, s.addToReplicasByKeyLocked(r))
descs = append(descs, desc)
}
}
addPlaceholder := func(rangeID roachpb.RangeID, start, end string) {
require.NoError(t, s.addPlaceholderLocked(
&ReplicaPlaceholder{rangeDesc: makeDesc(rangeID, start, end)},
))
}

addReplica(1, "a", "b")
addReplica(4, "b", "c")
addPlaceholder(5, "c", "d")
addReplica(2, "e", "f")
addReplica(3, "", "") // uninitialized

want := [][]string{{
// r1 with keys [a, b). The checkpoint includes range-ID replicated and
// unreplicated keyspace for ranges 1-2 and 4. Range 2 is included because
// it's a neighbour of r1 by range ID. The checkpoint also includes
// replicated user keyspace {a-c} owned by ranges 1 and 4.
"/Local/RangeID/{1\"\"-3\"\"}",
"/Local/RangeID/{4\"\"-5\"\"}",
"/Local/Range\"{a\"-c\"}",
"/Local/Lock/Intent/Local/Range\"{a\"-c\"}",
"/Local/Lock/Intent\"{a\"-c\"}",
"{a-c}",
}, {
// r4 with keys [b, c). The checkpoint includes range-ID replicated and
// unreplicated keyspace for ranges 3-4, 1 and 2. Range 3 is included
// because it's a neighbour of r4 by range ID. The checkpoint also includes
// replicated user keyspace {a-f} owned by ranges 1, 4, and 2.
"/Local/RangeID/{3\"\"-5\"\"}",
"/Local/RangeID/{1\"\"-2\"\"}",
"/Local/RangeID/{2\"\"-3\"\"}",
"/Local/Range\"{a\"-f\"}",
"/Local/Lock/Intent/Local/Range\"{a\"-f\"}",
"/Local/Lock/Intent\"{a\"-f\"}",
"{a-f}",
}, {
// r2 with keys [e, f). The checkpoint includes range-ID replicated and
// unreplicated keyspace for ranges 1-3 and 4. Ranges 1 and 3 are included
// because they are neighbours of r2 by range ID. The checkpoint also
// includes replicated user keyspace {b-f} owned by ranges 4 and 2.
"/Local/RangeID/{1\"\"-4\"\"}",
"/Local/RangeID/{4\"\"-5\"\"}",
"/Local/Range\"{b\"-f\"}",
"/Local/Lock/Intent/Local/Range\"{b\"-f\"}",
"/Local/Lock/Intent\"{b\"-f\"}",
"{b-f}",
}}

require.Len(t, want, len(descs))
for i, desc := range descs {
spans := s.checkpointSpans(&desc)
got := make([]string, 0, len(spans))
for _, s := range spans {
got = append(got, s.String())
}
require.Equal(t, want[i], got, i)
}
}

func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down
Loading

0 comments on commit b031933

Please sign in to comment.