-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathkvprober.go
382 lines (337 loc) · 13.7 KB
/
kvprober.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
// Package kvprober sends queries to KV in a loop, with configurable sleep
// times, in order to generate data about the healthiness or unhealthiness of
// kvclient & below.
//
// Prober increments metrics that SRE & other operators can use as alerting
// signals. It also writes to logs to help narrow down the problem (e.g. which
// range(s) are acting up).
package kvprober
import (
"context"
"math/rand"
"time"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/logtags"
)
const putValue = "thekvproberwrotethis"
// Prober sends queries to KV in a loop. See package docstring for more.
type Prober struct {
db *kv.DB
settings *cluster.Settings
// planner is an interface for selecting a range to probe. There are
// separate planners for the read & write probe loops, so as to achieve
// a balanced probing of the keyspace, regardless of differences in the rate
// at which Prober sends different probes. Also note that planner is
// NOT thread-safe.
readPlanner planner
writePlanner planner
// metrics wraps up the set of prometheus metrics that the prober sets; the
// goal of the prober IS to populate these metrics.
metrics Metrics
tracer *tracing.Tracer
}
// Opts provides knobs to control kvprober.Prober.
type Opts struct {
DB *kv.DB
Settings *cluster.Settings
Tracer *tracing.Tracer
// The windowed portion of the latency histogram retains values for
// approximately histogramWindow. See metrics library for more.
HistogramWindowInterval time.Duration
}
var (
metaReadProbeAttempts = metric.Metadata{
Name: "kv.prober.read.attempts",
Help: "Number of attempts made to read probe KV, regardless of outcome",
Measurement: "Queries",
Unit: metric.Unit_COUNT,
}
metaReadProbeFailures = metric.Metadata{
Name: "kv.prober.read.failures",
Help: "Number of attempts made to read probe KV that failed, " +
"whether due to error or timeout",
Measurement: "Queries",
Unit: metric.Unit_COUNT,
}
metaReadProbeLatency = metric.Metadata{
Name: "kv.prober.read.latency",
Help: "Latency of successful KV read probes",
Measurement: "Latency",
Unit: metric.Unit_NANOSECONDS,
}
metaWriteProbeAttempts = metric.Metadata{
Name: "kv.prober.write.attempts",
Help: "Number of attempts made to write probe KV, regardless of outcome",
Measurement: "Queries",
Unit: metric.Unit_COUNT,
}
metaWriteProbeFailures = metric.Metadata{
Name: "kv.prober.write.failures",
Help: "Number of attempts made to write probe KV that failed, " +
"whether due to error or timeout",
Measurement: "Queries",
Unit: metric.Unit_COUNT,
}
metaWriteProbeLatency = metric.Metadata{
Name: "kv.prober.write.latency",
Help: "Latency of successful KV write probes",
Measurement: "Latency",
Unit: metric.Unit_NANOSECONDS,
}
metaProbePlanAttempts = metric.Metadata{
Name: "kv.prober.planning_attempts",
Help: "Number of attempts at planning out probes made; " +
"in order to probe KV we need to plan out which ranges to probe;",
Measurement: "Runs",
Unit: metric.Unit_COUNT,
}
metaProbePlanFailures = metric.Metadata{
Name: "kv.prober.planning_failures",
Help: "Number of attempts at planning out probes that failed; " +
"in order to probe KV we need to plan out which ranges to probe; " +
"if planning fails, then kvprober is not able to send probes to " +
"all ranges; consider alerting on this metric as a result",
Measurement: "Runs",
Unit: metric.Unit_COUNT,
}
// TODO(josh): Add a histogram that captures where in the "rangespace" errors
// are occurring. This will allow operators to see at a glance what percentage
// of ranges are affected.
)
// Metrics groups together the metrics that kvprober exports.
type Metrics struct {
ReadProbeAttempts *metric.Counter
ReadProbeFailures *metric.Counter
ReadProbeLatency *metric.Histogram
WriteProbeAttempts *metric.Counter
WriteProbeFailures *metric.Counter
WriteProbeLatency *metric.Histogram
ProbePlanAttempts *metric.Counter
ProbePlanFailures *metric.Counter
}
// proberOps is an interface that the prober will use to run ops against some
// system. This interface exists so that ops can be mocked for tests.
type proberOps interface {
Read(key interface{}) func(context.Context, *kv.Txn) error
Write(key interface{}) func(context.Context, *kv.Txn) error
}
// proberTxn is an interface that the prober will use to run txns. This
// interface exists so that txn can be mocked for tests.
type proberTxn interface {
// Txn runs the given function with a transaction having the admission
// source in the header set to OTHER. Transaction work submitted from this
// source currently bypassess admission control.
Txn(context.Context, func(context.Context, *kv.Txn) error) error
// TxnRootKV runs the given function with a transaction having the admission
// source in the header set to ROOT KV. Transaction work submitted from this
// source should not bypass admission control.
TxnRootKV(context.Context, func(context.Context, *kv.Txn) error) error
}
// proberOpsImpl is used to probe the kv layer.
type proberOpsImpl struct {
}
// We attempt to commit a txn that reads some data at the key.
func (p *proberOpsImpl) Read(key interface{}) func(context.Context, *kv.Txn) error {
return func(ctx context.Context, txn *kv.Txn) error {
_, err := txn.Get(ctx, key)
return err
}
}
// We attempt to commit a txn that puts some data at the key then deletes
// it. The test of the write code paths is good: We get a raft command that
// goes thru consensus and is written to the pebble log. Importantly, no
// *live* data is left at the key, which simplifies the kvprober, as then
// there is no need to clean up data at the key post range split / merge.
// Note that MVCC tombstones may be left by the probe, but this is okay, as
// GC will clean it up.
func (p *proberOpsImpl) Write(key interface{}) func(context.Context, *kv.Txn) error {
return func(ctx context.Context, txn *kv.Txn) error {
if err := txn.Put(ctx, key, putValue); err != nil {
return err
}
return txn.Del(ctx, key)
}
}
// proberTxnImpl is used to run transactions.
type proberTxnImpl struct {
db *kv.DB
}
func (p *proberTxnImpl) Txn(ctx context.Context, f func(context.Context, *kv.Txn) error) error {
return p.db.Txn(ctx, f)
}
func (p *proberTxnImpl) TxnRootKV(
ctx context.Context, f func(context.Context, *kv.Txn) error,
) error {
return p.db.TxnRootKV(ctx, f)
}
// NewProber creates a Prober from Opts.
func NewProber(opts Opts) *Prober {
return &Prober{
db: opts.DB,
settings: opts.Settings,
readPlanner: newMeta2Planner(opts.DB, opts.Settings, func() time.Duration { return readInterval.Get(&opts.Settings.SV) }),
writePlanner: newMeta2Planner(opts.DB, opts.Settings, func() time.Duration { return writeInterval.Get(&opts.Settings.SV) }),
metrics: Metrics{
ReadProbeAttempts: metric.NewCounter(metaReadProbeAttempts),
ReadProbeFailures: metric.NewCounter(metaReadProbeFailures),
ReadProbeLatency: metric.NewLatency(metaReadProbeLatency, opts.HistogramWindowInterval),
WriteProbeAttempts: metric.NewCounter(metaWriteProbeAttempts),
WriteProbeFailures: metric.NewCounter(metaWriteProbeFailures),
WriteProbeLatency: metric.NewLatency(metaWriteProbeLatency, opts.HistogramWindowInterval),
ProbePlanAttempts: metric.NewCounter(metaProbePlanAttempts),
ProbePlanFailures: metric.NewCounter(metaProbePlanFailures),
},
tracer: opts.Tracer,
}
}
// Metrics returns a struct which contains the kvprober metrics.
func (p *Prober) Metrics() Metrics {
return p.metrics
}
// Start causes kvprober to start probing KV. Start returns immediately. Start
// returns an error only if stopper.RunAsyncTask returns an error.
func (p *Prober) Start(ctx context.Context, stopper *stop.Stopper) error {
ctx = logtags.AddTag(ctx, "kvprober", nil /* value */)
startLoop := func(ctx context.Context, opName string, probe func(context.Context, *kv.DB, planner), pl planner, interval *settings.DurationSetting) error {
return stopper.RunAsyncTaskEx(ctx, stop.TaskOpts{TaskName: opName, SpanOpt: stop.SterileRootSpan}, func(ctx context.Context) {
defer logcrash.RecoverAndReportNonfatalPanic(ctx, &p.settings.SV)
rnd, _ /* seed */ := randutil.NewPseudoRand()
d := func() time.Duration {
return withJitter(interval.Get(&p.settings.SV), rnd)
}
t := timeutil.NewTimer()
defer t.Stop()
t.Reset(d())
ctx, cancel := stopper.WithCancelOnQuiesce(ctx)
defer cancel()
for {
select {
case <-t.C:
t.Read = true
// Jitter added to de-synchronize different nodes' probe loops.
t.Reset(d())
case <-stopper.ShouldQuiesce():
return
}
probeCtx, sp := tracing.EnsureChildSpan(ctx, p.tracer, opName+" - probe")
probe(probeCtx, p.db, pl)
sp.Finish()
}
})
}
if err := startLoop(ctx, "read probe loop", p.readProbe, p.readPlanner, readInterval); err != nil {
return err
}
return startLoop(ctx, "write probe loop", p.writeProbe, p.writePlanner, writeInterval)
}
// Doesn't return an error. Instead increments error type specific metrics.
func (p *Prober) readProbe(ctx context.Context, db *kv.DB, pl planner) {
p.readProbeImpl(ctx, &proberOpsImpl{}, &proberTxnImpl{db: p.db}, pl)
}
func (p *Prober) readProbeImpl(ctx context.Context, ops proberOps, txns proberTxn, pl planner) {
if !readEnabled.Get(&p.settings.SV) {
return
}
p.metrics.ProbePlanAttempts.Inc(1)
step, err := pl.next(ctx)
if err != nil {
log.Health.Errorf(ctx, "can't make a plan: %v", err)
p.metrics.ProbePlanFailures.Inc(1)
return
}
// If errors above the KV scan, then this counter won't be incremented.
// This means that ReadProbeErrors / ReadProbeAttempts captures the KV
// error rate only as desired. It also means that operators can alert on
// an unexpectedly low rate of ReadProbeAttempts or else a high rate of
// ProbePlanFailures. This would probably be a ticket alerting as
// the impact is more low visibility into possible failures than a high
// impact production issue.
p.metrics.ReadProbeAttempts.Inc(1)
start := timeutil.Now()
// Slow enough response times are not different than errors from the
// perspective of the user.
timeout := readTimeout.Get(&p.settings.SV)
err = contextutil.RunWithTimeout(ctx, "read probe", timeout, func(ctx context.Context) error {
// We read a "range-local" key dedicated to probing. See pkg/keys for more.
// There is no data at the key, but that is okay. Even tho there is no data
// at the key, the prober still executes a read operation on the range.
// TODO(josh): Trace the probes.
f := ops.Read(step.Key)
if bypassAdmissionControl.Get(&p.settings.SV) {
return txns.Txn(ctx, f)
}
return txns.TxnRootKV(ctx, f)
})
if err != nil {
// TODO(josh): Write structured events with log.Structured.
log.Health.Errorf(ctx, "kv.Get(%s), r=%v failed with: %v", step.Key, step.RangeID, err)
p.metrics.ReadProbeFailures.Inc(1)
return
}
d := timeutil.Since(start)
log.Health.Infof(ctx, "kv.Get(%s), r=%v returned success in %v", step.Key, step.RangeID, d)
// Latency of failures is not recorded. They are counted as failures tho.
p.metrics.ReadProbeLatency.RecordValue(d.Nanoseconds())
}
// Doesn't return an error. Instead increments error type specific metrics.
func (p *Prober) writeProbe(ctx context.Context, db *kv.DB, pl planner) {
p.writeProbeImpl(ctx, &proberOpsImpl{}, &proberTxnImpl{db: p.db}, pl)
}
func (p *Prober) writeProbeImpl(ctx context.Context, ops proberOps, txns proberTxn, pl planner) {
if !writeEnabled.Get(&p.settings.SV) {
return
}
p.metrics.ProbePlanAttempts.Inc(1)
step, err := pl.next(ctx)
if err != nil {
log.Health.Errorf(ctx, "can't make a plan: %v", err)
p.metrics.ProbePlanFailures.Inc(1)
return
}
p.metrics.WriteProbeAttempts.Inc(1)
start := timeutil.Now()
// Slow enough response times are not different than errors from the
// perspective of the user.
timeout := writeTimeout.Get(&p.settings.SV)
err = contextutil.RunWithTimeout(ctx, "write probe", timeout, func(ctx context.Context) error {
f := ops.Write(step.Key)
if bypassAdmissionControl.Get(&p.settings.SV) {
return txns.Txn(ctx, f)
}
return txns.TxnRootKV(ctx, f)
})
if err != nil {
log.Health.Errorf(ctx, "kv.Txn(Put(%s); Del(-)), r=%v failed with: %v", step.Key, step.RangeID, err)
p.metrics.WriteProbeFailures.Inc(1)
return
}
d := timeutil.Since(start)
log.Health.Infof(ctx, "kv.Txn(Put(%s); Del(-)), r=%v returned success in %v", step.Key, step.RangeID, d)
// Latency of failures is not recorded. They are counted as failures tho.
p.metrics.WriteProbeLatency.RecordValue(d.Nanoseconds())
}
// Returns a random duration pulled from the uniform distribution given below:
// [d - 0.25*d, d + 0.25*d).
func withJitter(d time.Duration, rnd *rand.Rand) time.Duration {
amplitudeNanos := d.Nanoseconds() / 4
return d + time.Duration(randutil.RandInt63InRange(rnd, -amplitudeNanos, amplitudeNanos))
}