Skip to content

Commit

Permalink
cli: productionize debug check-store
Browse files Browse the repository at this point in the history
In light of the stats inconsistency seen in [clearrange], we want to be
stricter about verifying the stats in nightly testing. This commit makes
sure `./cockroach debug check-store` is fast enough to do so:

On a ~71GB fully compacted store directory it reliably takes well below
two minutes (on GCE local SSD).

[clearrange]: cockroachdb#38720 (comment)

Release note (performance improvement): The `./cockroach debug check-store` command is now faster.
  • Loading branch information
tbg committed Oct 25, 2019
1 parent c36ee1a commit f858044
Show file tree
Hide file tree
Showing 2 changed files with 271 additions and 63 deletions.
228 changes: 165 additions & 63 deletions pkg/cli/debug_check_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"bytes"
"context"
"fmt"
"runtime"
"strings"

"github.com/cockroachdb/cockroach/pkg/keys"
Expand All @@ -25,12 +26,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/stateloader"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
"github.com/kr/pretty"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"go.etcd.io/etcd/raft/raftpb"
"golang.org/x/sync/errgroup"
)

var debugCheckStoreCmd = &cobra.Command{
Expand All @@ -47,6 +48,36 @@ Capable of detecting the following errors:
RunE: MaybeDecorateGRPCError(runDebugCheckStoreCmd),
}

var errCheckFoundProblem = errors.New("check-store found problems")

func runDebugCheckStoreCmd(cmd *cobra.Command, args []string) error {
ctx := context.Background()
dir := args[0]
foundProblem := false
// At time of writing, this takes around ~110s for 71GB (1k warehouse TPCC
// fully compacted) on local SSD. This is quite fast, well north of 600MB/s.
err := checkStoreRangeStats(ctx, dir, func(args ...interface{}) {
fmt.Println(args...)
})
foundProblem = foundProblem || err != nil
if err != nil && !errors.Is(err, errCheckFoundProblem) {
_, _ = fmt.Println(err)
}
// This is not optimized at all, but for the same data set as above, it
// returns instantly, so we won't need to optimize it for quite some time.
err = checkStoreRaftState(ctx, dir, func(format string, args ...interface{}) {
_, _ = fmt.Printf(format, args...)
})
foundProblem = foundProblem || err != nil
if err != nil && !errors.Is(err, errCheckFoundProblem) {
fmt.Println(err)
}
if foundProblem {
return errCheckFoundProblem
}
return nil
}

type replicaCheckInfo struct {
truncatedIndex uint64
appliedIndex uint64
Expand All @@ -55,68 +86,147 @@ type replicaCheckInfo struct {
committedIndex uint64
}

func runDebugCheckStoreCmd(cmd *cobra.Command, args []string) error {
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
type checkInput struct {
eng engine.Engine
desc *roachpb.RangeDescriptor
sl stateloader.StateLoader
}

ctx := context.Background()
type checkResult struct {
desc *roachpb.RangeDescriptor
err error
claimMS, actMS enginepb.MVCCStats
}

db, err := OpenExistingStore(args[0], stopper, true /* readOnly */)
if err != nil {
return err
func (cr *checkResult) Error() error {
var errs []string
if cr.err != nil {
errs = append(errs, cr.err.Error())
}
var hasError bool
if err := runDebugCheckStoreRaft(ctx, db); err != nil {
hasError = true
log.Warning(ctx, err)
if !cr.actMS.Equal(enginepb.MVCCStats{}) && !cr.actMS.Equal(cr.claimMS) && !cr.claimMS.ContainsEstimates {
err := fmt.Sprintf("stats inconsistency:\n- stored:\n%+v\n- recomputed:\n%+v\n- diff:\n%s",
cr.claimMS, cr.actMS, strings.Join(pretty.Diff(cr.claimMS, cr.actMS), ","),
)
errs = append(errs, err)
}
if err := runDebugCheckStoreDescriptors(ctx, db); err != nil {
hasError = true
log.Warning(ctx, err)
}
if hasError {
return errors.New("errors detected")
if len(errs) > 0 {
if cr.desc != nil {
prefix := cr.desc.String() + ": "
for i := range errs {
errs[i] = prefix + errs[i]
}
}
return errors.New(strings.Join(errs, "\n"))
}
return nil
}

func runDebugCheckStoreDescriptors(ctx context.Context, db *engine.RocksDB) error {
fmt.Println("checking MVCC stats")
defer fmt.Println()
func worker(ctx context.Context, in checkInput) checkResult {
desc, eng := in.desc, in.eng

var failed bool
if err := storage.IterateRangeDescriptors(ctx, db,
func(desc roachpb.RangeDescriptor) (bool, error) {
claimedMS, err := stateloader.Make(desc.RangeID).LoadMVCCStats(ctx, db)
if err != nil {
return false, err
}
ms, err := rditer.ComputeStatsForRange(&desc, db, claimedMS.LastUpdateNanos)
if err != nil {
return false, err
res := checkResult{desc: desc}
claimedMS, err := in.sl.LoadMVCCStats(ctx, eng)
if err != nil {
res.err = err
return res
}
ms, err := rditer.ComputeStatsForRange(desc, eng, claimedMS.LastUpdateNanos)
if err != nil {
res.err = err
return res
}
res.claimMS = claimedMS
res.actMS = ms
return res
}

func checkStoreRangeStats(
ctx context.Context,
dir string, // the store directory
println func(...interface{}), // fmt.Println outside of tests
) error {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

eng, err := OpenExistingStore(dir, stopper, true /* readOnly */)
if err != nil {
return err
}

inCh := make(chan checkInput)
outCh := make(chan checkResult, 1000)

n := runtime.NumCPU()
var g errgroup.Group
for i := 0; i < n; i++ {
g.Go(func() error {
for in := range inCh {
outCh <- worker(ctx, in)
}
return nil
})
}

if !ms.Equal(claimedMS) {
var prefix string
if !claimedMS.ContainsEstimates {
failed = true
} else {
ms.ContainsEstimates = true
prefix = "(ignored) "
}
fmt.Printf("\n%s%+v: diff(actual, claimed): %s\n", prefix, desc, strings.Join(pretty.Diff(ms, claimedMS), "\n"))
go func() {
if err := storage.IterateRangeDescriptors(ctx, eng,
func(desc roachpb.RangeDescriptor) (bool, error) {
inCh <- checkInput{eng: eng, desc: &desc, sl: stateloader.Make(desc.RangeID)}
return false, nil
}); err != nil {
outCh <- checkResult{err: err}
}
close(inCh) // we were the only writer
if err := g.Wait(); err != nil {
outCh <- checkResult{err: err}
}
close(outCh) // all writers done due to Wait()
}()

foundProblem := false
var total enginepb.MVCCStats
var cR, cE int
for res := range outCh {
cR++
if err := res.Error(); err != nil {
foundProblem = true
errS := err.Error()
println(errS)
} else {
if res.claimMS.ContainsEstimates {
cE++
}
return false, nil
}); err != nil {
return err
total.Add(res.actMS)
}
}
if failed {
return errors.New("check failed")

println(fmt.Sprintf("scanned %d ranges (%d with estimates), total stats %s", cR, cE, &total))

if foundProblem {
// The details were already emitted.
return errCheckFoundProblem
}
return nil
}

func runDebugCheckStoreRaft(ctx context.Context, db *engine.RocksDB) error {
func checkStoreRaftState(
ctx context.Context,
dir string, // the store directory
printf func(string, ...interface{}), // fmt.Printf outside of tests
) error {
foundProblem := false
goldenPrintf := printf
printf = func(format string, args ...interface{}) {
foundProblem = true
goldenPrintf(format, args...)
}
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())

db, err := OpenExistingStore(dir, stopper, true /* readOnly */)
if err != nil {
return err
}

// Iterate over the entire range-id-local space.
start := roachpb.Key(keys.LocalRangeIDPrefix)
end := start.PrefixEnd()
Expand All @@ -130,8 +240,6 @@ func runDebugCheckStoreRaft(ctx context.Context, db *engine.RocksDB) error {
return replicaInfo[rangeID]
}

var hasError bool

if _, err := engine.MVCCIterate(ctx, db, start, end, hlc.MaxTimestamp,
engine.MVCCScanOptions{Inconsistent: true}, func(kv roachpb.KeyValue) (bool, error) {
rangeID, _, suffix, detail, err := keys.DecodeRangeIDKey(kv.Key)
Expand Down Expand Up @@ -175,9 +283,8 @@ func runDebugCheckStoreRaft(ctx context.Context, db *engine.RocksDB) error {
ri.lastIndex = index
} else {
if index != ri.lastIndex+1 {
fmt.Printf("range %s: log index anomaly: %v followed by %v\n",
printf("range %s: log index anomaly: %v followed by %v\n",
rangeID, ri.lastIndex, index)
hasError = true
}
ri.lastIndex = index
}
Expand All @@ -190,33 +297,28 @@ func runDebugCheckStoreRaft(ctx context.Context, db *engine.RocksDB) error {

for rangeID, info := range replicaInfo {
if info.truncatedIndex != 0 && info.truncatedIndex != info.firstIndex-1 {
hasError = true
fmt.Printf("range %s: truncated index %v should equal first index %v - 1\n",
printf("range %s: truncated index %v should equal first index %v - 1\n",
rangeID, info.truncatedIndex, info.firstIndex)
}
if info.firstIndex > info.lastIndex {
hasError = true
fmt.Printf("range %s: [first index, last index] is [%d, %d]\n",
printf("range %s: [first index, last index] is [%d, %d]\n",
rangeID, info.firstIndex, info.lastIndex)
}
if info.appliedIndex < info.firstIndex || info.appliedIndex > info.lastIndex {
hasError = true
fmt.Printf("range %s: applied index %v should be between first index %v and last index %v\n",
printf("range %s: applied index %v should be between first index %v and last index %v\n",
rangeID, info.appliedIndex, info.firstIndex, info.lastIndex)
}
if info.appliedIndex > info.committedIndex {
hasError = true
fmt.Printf("range %s: committed index %d must not trail applied index %d\n",
printf("range %s: committed index %d must not trail applied index %d\n",
rangeID, info.committedIndex, info.appliedIndex)
}
if info.committedIndex > info.lastIndex {
hasError = true
fmt.Printf("range %s: committed index %d ahead of last index %d\n",
printf("range %s: committed index %d ahead of last index %d\n",
rangeID, info.committedIndex, info.lastIndex)
}
}
if hasError {
return errors.New("anomalies detected in Raft state")
if foundProblem {
return errCheckFoundProblem
}

return nil
Expand Down
Loading

0 comments on commit f858044

Please sign in to comment.