Skip to content

Commit

Permalink
sql: create builltin generator crdb_internal.probe_range
Browse files Browse the repository at this point in the history
Previously there was difficulty in diagnosing kv layer health when
an incident occurs. This patch introduces the new virtual table
crdb_internal.probe_range which utilizes the kvprober to probe
each range to determine if the range can be reached or not.

resolves #61695

Release note: None
  • Loading branch information
Santamaura committed Apr 7, 2022
1 parent 4940bc1 commit 4da1cbc
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 1 deletion.
2 changes: 2 additions & 0 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,8 @@ available replica will error.</p>
</span></td></tr>
<tr><td><a name="crc32ieee"></a><code>crc32ieee(<a href="string.html">string</a>...) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>Calculates the CRC-32 hash using the IEEE polynomial.</p>
</span></td></tr>
<tr><td><a name="crdb_internal.probe_range"></a><code>crdb_internal.probe_range(timeout: <a href="int.html">int</a>) &rarr; tuple{int AS range_id, string AS error, int AS end_to_end_latency_ms, string AS verbose_trace}</code></td><td><span class="funcdesc"><p>Returns rows of range data that have been received from the prober</p>
</span></td></tr>
<tr><td><a name="fnv32"></a><code>fnv32(<a href="bytes.html">bytes</a>...) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>Calculates the 32-bit FNV-1 hash value of a set of values.</p>
</span></td></tr>
<tr><td><a name="fnv32"></a><code>fnv32(<a href="string.html">string</a>...) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>Calculates the 32-bit FNV-1 hash value of a set of values.</p>
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvprober/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"kvprober.go",
"planner.go",
"settings.go",
"helpers.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvprober",
visibility = ["//visibility:public"],
Expand All @@ -31,7 +32,7 @@ go_library(
go_test(
name = "kvprober_test",
srcs = [
"helpers_test.go",
"helpers.go",
"kvprober_integration_test.go",
"kvprober_test.go",
"main_test.go",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)

// Below are exported to enable testing from kvprober_test.
Expand Down Expand Up @@ -58,6 +59,10 @@ func (p *Prober) ReadPlannerNext(ctx context.Context) (Step, error) {
return p.readPlanner.next(ctx)
}

func (p *Prober) WritePlannerNext(ctx context.Context) (Step, error) {
return p.writePlanner.next(ctx)
}

func (p *Prober) SetPlanningRateLimits(d time.Duration) {
p.readPlanner.(*meta2Planner).getRateLimit = func(_ time.Duration, _ *cluster.Settings) time.Duration {
return d
Expand All @@ -66,3 +71,7 @@ func (p *Prober) SetPlanningRateLimits(d time.Duration) {
return d
}
}

func (p *Prober) GetProbeTracer() *tracing.Tracer {
return p.tracer
}
2 changes: 2 additions & 0 deletions pkg/sql/sem/builtins/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ go_library(
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient",
"//pkg/kv/kvprober",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/roachpb",
"//pkg/security",
Expand Down Expand Up @@ -76,6 +77,7 @@ go_library(
"//pkg/util",
"//pkg/util/arith",
"//pkg/util/bitarray",
"//pkg/util/contextutil",
"//pkg/util/duration",
"//pkg/util/encoding",
"//pkg/util/errorutil",
Expand Down
193 changes: 193 additions & 0 deletions pkg/sql/sem/builtins/generator_builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient"
"github.com/cockroachdb/cockroach/pkg/kv/kvprober"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/lexbase"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
Expand All @@ -28,10 +29,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/arith"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/cockroach/pkg/util/errorutil"
"github.com/cockroachdb/cockroach/pkg/util/json"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -466,6 +469,20 @@ The output can be used to recreate a database.'
tree.VolatilityVolatile,
),
),
"crdb_internal.probe_range": makeBuiltin(
tree.FunctionProperties{
Class: tree.GeneratorClass,
},
makeGeneratorOverload(
tree.ArgTypes{
{Name: "timeout", Typ: types.Int},
},
probeRangeGeneratorType,
makeProbeRangeGenerator,
`Returns rows of range data that have been received from the prober`,
tree.VolatilityVolatile,
),
),
}

var decodePlanGistGeneratorType = types.String
Expand Down Expand Up @@ -1729,6 +1746,7 @@ var rangeKeyIteratorType = types.MakeLabeledTuple(

// rangeKeyIterator is a ValueGenerator that iterates over all
// SQL keys in a target range.
// REFER TO HERE
type rangeKeyIterator struct {
// rangeID is the ID of the range to iterate over. rangeID is set
// by the constructor of the rangeKeyIterator.
Expand Down Expand Up @@ -2354,3 +2372,178 @@ func makeShowCreateAllTypesGenerator(
acc: ctx.Mon.MakeBoundAccount(),
}, nil
}

// probeRangeTypesGenerator supports the execution of
// crdb_internal.probe_range(timeout).

var probeRangeGeneratorLabels = []string{"range_id", "error", "end_to_end_latency_ms", "verbose_trace"}

var probeRangeGeneratorType = types.MakeLabeledTuple(
[]*types.T{types.Int, types.String, types.Int, types.String},
probeRangeGeneratorLabels,
)

type probeRangeRow struct {
rangeID int64
error string
latency time.Duration
verboseTrace string
}

type probeRangeGenerator struct {
db *kv.DB
timeout time.Duration
prober kvprober.Prober

// The following variables are updated during
// calls to Next() and change throughout the lifecycle of
// probeRangeGenerator.
ranges map[int64]int
curr probeRangeRow
remainingRanges int64
}

func makeProbeRangeGenerator(
ctx *tree.EvalContext, args tree.Datums,
) (tree.ValueGenerator, error) {
// The user must be an admin to use this builtin.
isAdmin, err := ctx.SessionAccessor.HasAdminRole(ctx.Context)
if err != nil {
return nil, err
}
if !isAdmin {
return nil, pgerror.Newf(
pgcode.InsufficientPrivilege,
"only users with the admin role are allowed to use crdb_internal.probe_range",
)
}
var timeout time.Duration
if len(args) == 0 {
timeout = time.Duration(int64(1e+9))
} else {
timeout = time.Duration(int64(tree.MustBeDInt(args[0]) * 1e+6))
}

row, err := ctx.Planner.QueryRowEx(
ctx.Context,
"range-count",
ctx.Txn,
sessiondata.InternalExecutorOverride{User: ctx.SessionData().User()},
"SELECT count(*) FROM crdb_internal.ranges",
)
if err != nil {
return nil, pgerror.Newf(
pgcode.InFailedSQLTransaction,
"failed to read crdb_internal.ranges",
)
}
if row == nil {
return nil, pgerror.Newf(
pgcode.NoDataFound,
"crdb_internal.ranges is empty",
)
}
numRanges, ok := tree.AsDInt(row[0])
if !ok {
return nil, pgerror.Newf(
pgcode.DatatypeMismatch,
"type assertion failed on json: %T", row,
)
}

kvProber := kvprober.NewProber(kvprober.Opts{
DB: ctx.DB,
Settings: ctx.Settings,
Tracer: ctx.Tracer,
HistogramWindowInterval: time.Minute,
})
return &probeRangeGenerator{
db: ctx.DB,
timeout: timeout,
prober: *kvProber,
ranges: make(map[int64]int),
remainingRanges: int64(numRanges),
}, nil
}

// ResolvedType implements the tree.ValueGenerator interface.
func (p *probeRangeGenerator) ResolvedType() *types.T {
return probeRangeGeneratorType
}

// Start implements the tree.ValueGenerator interface.
func (p *probeRangeGenerator) Start(_ context.Context, _ *kv.Txn) error {
return nil
}

// Next implements the tree.ValueGenerator interface.
func (p *probeRangeGenerator) Next(ctx context.Context) (bool, error) {
if p.remainingRanges == 0 {
return false, nil
}
probeCtx, sp := tracing.EnsureChildSpan(
ctx, p.prober.GetProbeTracer(), "write probe",
tracing.WithForceRealSpan(),
)
sp.SetRecordingType(tracing.RecordingVerbose)
step, err := p.prober.WritePlannerNext(probeCtx)
if err != nil {
return false, err
}
nextRangeID := int64(step.RangeID)
p.curr.error = "write probe execution timed out"
tBegin := timeutil.Now()
if p.ranges[nextRangeID] == 0 {
p.WriteProbeWithTimeout(probeCtx, true)
} else {
// Execute the probe until we encounter a range
// not seen before. We don't clear timeout error while
// skipping over ranges already seen.
for p.ranges[nextRangeID] != 0 {
p.WriteProbeWithTimeout(probeCtx, false)
step, err := p.prober.WritePlannerNext(probeCtx)
if err != nil {
return false, err
}
nextRangeID = int64(step.RangeID)

}
p.WriteProbeWithTimeout(probeCtx, true)
}
sp.Finish()
p.ranges[nextRangeID]++
p.remainingRanges--

p.curr.rangeID = nextRangeID
p.curr.latency = timeutil.Since(tBegin)
p.curr.verboseTrace = sp.GetRecording(tracing.RecordingVerbose).String()

return true, nil
}

// Values implements the tree.ValueGenerator interface.
func (p *probeRangeGenerator) Values() (tree.Datums, error) {
return tree.Datums{
tree.NewDInt(tree.DInt(p.curr.rangeID)),
tree.NewDString(p.curr.error),
tree.NewDInt(tree.DInt(p.curr.latency.Milliseconds())),
tree.NewDString(p.curr.verboseTrace),
}, nil
}

// Close implements the tree.ValueGenerator interface.
func (p *probeRangeGenerator) Close(_ context.Context) {}

// WriteProbeWithTimeout is a helper function that executes writes probe
// and will clear timeout error if the bool passed in is true and write
// probe successfully completed within the timeout.
func (p *probeRangeGenerator) WriteProbeWithTimeout(probeCtx context.Context, clearTimeoutErr bool) {
_ = contextutil.RunWithTimeout(probeCtx, "write probe", p.timeout, func(_ context.Context) error {
p.prober.WriteProbe(probeCtx, p.db)
// If clearTimeoutErr is passed in we can clear error at this point
if clearTimeoutErr {
p.curr.error = ""
}
return nil
})
}

0 comments on commit 4da1cbc

Please sign in to comment.