Skip to content

Commit

Permalink
sql: create builltin generator crdb_internal.probe_range
Browse files Browse the repository at this point in the history
Previously there was difficulty in diagnosing kv layer health when
an incident occurs. This patch introduces the new virtual table
crdb_internal.probe_range which utilizes the kvprober to probe
each range to determine if the range can be reached or not.

resolves #61695

Release note: None
  • Loading branch information
Santamaura committed Apr 13, 2022
1 parent 4940bc1 commit f8ef75e
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 6 deletions.
9 changes: 9 additions & 0 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -3137,6 +3137,15 @@ table. Returns an error if validation fails.</p>
</span></td></tr></tbody>
</table>

### TUPLE{INT AS RANGE_ID, STRING AS ERROR, INT AS END_TO_END_LATENCY_MS, STRING AS VERBOSE_TRACE} functions

<table>
<thead><tr><th>Function &rarr; Returns</th><th>Description</th></tr></thead>
<tbody>
<tr><td><a name="crdb_internal.probe_range"></a><code>crdb_internal.probe_range(timeout: <a href="interval.html">interval</a>, write: <a href="bool.html">bool</a>) &rarr; tuple{int AS range_id, string AS error, int AS end_to_end_latency_ms, string AS verbose_trace}</code></td><td><span class="funcdesc"><p>Returns rows of range data that have been received from the prober</p>
</span></td></tr></tbody>
</table>

### UUID functions

<table>
Expand Down
12 changes: 6 additions & 6 deletions pkg/kv/kvprober/kvprober.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,12 @@ type proberTxn interface {
TxnRootKV(context.Context, func(context.Context, *kv.Txn) error) error
}

// proberOpsImpl is used to probe the kv layer.
type proberOpsImpl struct {
// ProberOpsImpl is used to probe the kv layer.
type ProberOpsImpl struct {
}

// We attempt to commit a txn that reads some data at the key.
func (p *proberOpsImpl) Read(key interface{}) func(context.Context, *kv.Txn) error {
func (p *ProberOpsImpl) Read(key interface{}) func(context.Context, *kv.Txn) error {
return func(ctx context.Context, txn *kv.Txn) error {
_, err := txn.Get(ctx, key)
return err
Expand All @@ -176,7 +176,7 @@ func (p *proberOpsImpl) Read(key interface{}) func(context.Context, *kv.Txn) err
// there is no need to clean up data at the key post range split / merge.
// Note that MVCC tombstones may be left by the probe, but this is okay, as
// GC will clean it up.
func (p *proberOpsImpl) Write(key interface{}) func(context.Context, *kv.Txn) error {
func (p *ProberOpsImpl) Write(key interface{}) func(context.Context, *kv.Txn) error {
return func(ctx context.Context, txn *kv.Txn) error {
if err := txn.Put(ctx, key, putValue); err != nil {
return err
Expand Down Expand Up @@ -272,7 +272,7 @@ func (p *Prober) Start(ctx context.Context, stopper *stop.Stopper) error {

// Doesn't return an error. Instead increments error type specific metrics.
func (p *Prober) readProbe(ctx context.Context, db *kv.DB, pl planner) {
p.readProbeImpl(ctx, &proberOpsImpl{}, &proberTxnImpl{db: p.db}, pl)
p.readProbeImpl(ctx, &ProberOpsImpl{}, &proberTxnImpl{db: p.db}, pl)
}

func (p *Prober) readProbeImpl(ctx context.Context, ops proberOps, txns proberTxn, pl planner) {
Expand Down Expand Up @@ -330,7 +330,7 @@ func (p *Prober) readProbeImpl(ctx context.Context, ops proberOps, txns proberTx

// Doesn't return an error. Instead increments error type specific metrics.
func (p *Prober) writeProbe(ctx context.Context, db *kv.DB, pl planner) {
p.writeProbeImpl(ctx, &proberOpsImpl{}, &proberTxnImpl{db: p.db}, pl)
p.writeProbeImpl(ctx, &ProberOpsImpl{}, &proberTxnImpl{db: p.db}, pl)
}

func (p *Prober) writeProbeImpl(ctx context.Context, ops proberOps, txns proberTxn, pl planner) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/sem/builtins/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ go_library(
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient",
"//pkg/kv/kvprober",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/roachpb",
"//pkg/security",
Expand Down Expand Up @@ -76,6 +77,7 @@ go_library(
"//pkg/util",
"//pkg/util/arith",
"//pkg/util/bitarray",
"//pkg/util/contextutil",
"//pkg/util/duration",
"//pkg/util/encoding",
"//pkg/util/errorutil",
Expand Down
150 changes: 150 additions & 0 deletions pkg/sql/sem/builtins/generator_builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient"
"github.com/cockroachdb/cockroach/pkg/kv/kvprober"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/lexbase"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
Expand All @@ -28,10 +29,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/arith"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/cockroach/pkg/util/errorutil"
"github.com/cockroachdb/cockroach/pkg/util/json"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -466,6 +469,21 @@ The output can be used to recreate a database.'
tree.VolatilityVolatile,
),
),
"crdb_internal.probe_range": makeBuiltin(
tree.FunctionProperties{
Class: tree.GeneratorClass,
},
makeGeneratorOverload(
tree.ArgTypes{
{Name: "timeout", Typ: types.Interval},
{Name: "write", Typ: types.Bool},
},
probeRangeGeneratorType,
makeProbeRangeGenerator,
`Returns rows of range data that have been received from the prober`,
tree.VolatilityVolatile,
),
),
}

var decodePlanGistGeneratorType = types.String
Expand Down Expand Up @@ -2354,3 +2372,135 @@ func makeShowCreateAllTypesGenerator(
acc: ctx.Mon.MakeBoundAccount(),
}, nil
}

// probeRangeTypesGenerator supports the execution of
// crdb_internal.probe_range(timeout).

var probeRangeGeneratorLabels = []string{"range_id", "error", "end_to_end_latency_ms", "verbose_trace"}

var probeRangeGeneratorType = types.MakeLabeledTuple(
[]*types.T{types.Int, types.String, types.Int, types.String},
probeRangeGeneratorLabels,
)

type probeRangeRow struct {
rangeID int
error string
latency time.Duration
verboseTrace string
}

type probeRangeGenerator struct {
db *kv.DB
timeout time.Duration
isWrite bool
tracer *tracing.Tracer
ranges []kv.KeyValue

// The following variables are updated during
// calls to Next() and change throughout the lifecycle of
// probeRangeGenerator.
curr probeRangeRow
rangeIdx int
}

func makeProbeRangeGenerator(ctx *tree.EvalContext, args tree.Datums) (tree.ValueGenerator, error) {
// The user must be an admin to use this builtin.
isAdmin, err := ctx.SessionAccessor.HasAdminRole(ctx.Context)
if err != nil {
return nil, err
}
if !isAdmin {
return nil, pgerror.Newf(
pgcode.InsufficientPrivilege,
"only users with the admin role are allowed to use crdb_internal.probe_range",
)
}
// Handle args passed in.
timeout := time.Duration(tree.MustBeDInterval(args[0]).Duration.Nanos())
isWrite := bool(tree.MustBeDBool(args[1]))
ranges, err := kvclient.ScanMetaKVs(ctx.Context, ctx.Txn, roachpb.Span{
Key: keys.MinKey,
EndKey: keys.MaxKey,
})
if err != nil {
return nil, err
}
return &probeRangeGenerator{
db: ctx.DB,
timeout: timeout,
isWrite: isWrite,
tracer: ctx.Tracer,
ranges: ranges,
rangeIdx: 0,
}, nil
}

// ResolvedType implements the tree.ValueGenerator interface.
func (p *probeRangeGenerator) ResolvedType() *types.T {
return probeRangeGeneratorType
}

// Start implements the tree.ValueGenerator interface.
func (p *probeRangeGenerator) Start(_ context.Context, _ *kv.Txn) error {
return nil
}

// Next implements the tree.ValueGenerator interface.
func (p *probeRangeGenerator) Next(ctx context.Context) (bool, error) {
if p.rangeIdx == len(p.ranges) {
return false, nil
}
var opName string
if p.isWrite {
opName = "write probe"
} else {
opName = "read probe"
}
probeCtx, sp := tracing.EnsureChildSpan(
ctx, p.tracer, opName,
tracing.WithForceRealSpan(),
)
sp.SetRecordingType(tracing.RecordingVerbose)
defer sp.Finish()
r := p.ranges[p.rangeIdx]
var desc roachpb.RangeDescriptor
if err := r.ValueProto(&desc); err != nil {
return false, err
}
ops := &kvprober.ProberOpsImpl{}
tBegin := timeutil.Now()
err := contextutil.RunWithTimeout(probeCtx, opName, p.timeout, func(_ context.Context) error {
if p.isWrite {
ops.Write(r.Key)
} else {
ops.Read(r.Key)
}
return nil
})
if err != nil {
p.curr.error = err.Error()
} else {
p.curr.error = ""
}
p.rangeIdx++

p.curr.rangeID = int(desc.RangeID)
p.curr.latency = timeutil.Since(tBegin)
p.curr.verboseTrace = sp.GetRecording(tracing.RecordingVerbose).String()

return true, nil
}

// Values implements the tree.ValueGenerator interface.
func (p *probeRangeGenerator) Values() (tree.Datums, error) {
return tree.Datums{
tree.NewDInt(tree.DInt(p.curr.rangeID)),
tree.NewDString(p.curr.error),
tree.NewDInt(tree.DInt(p.curr.latency.Milliseconds())),
tree.NewDString(p.curr.verboseTrace),
}, nil
}

// Close implements the tree.ValueGenerator interface.
func (p *probeRangeGenerator) Close(_ context.Context) {}

0 comments on commit f8ef75e

Please sign in to comment.