Skip to content

Commit

Permalink
changefeedccl: Add admit and commit latency metrics.
Browse files Browse the repository at this point in the history
Add admit and commit latency histograms to changefeed metrics.
Admit latency keeps track of the differentce between MVCC timestamp
when record was updated and the time it was admitted for changefeed
processing.

The commit latency histogram keeps track of the difference between
MVCC timestamp and the time the event was acknowledged by downstream sink.

Release Notes: Add admit and commit latency metrics to changefeeds.
  • Loading branch information
Yevgeniy Miretskiy committed Nov 19, 2021
1 parent f690f4c commit 35e3995
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 45 deletions.
33 changes: 28 additions & 5 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,9 @@ func (ca *changeAggregator) Start(ctx context.Context) {
// runs. They're all stored as the `metric.Struct` interface because of
// dependency cycles.
ca.metrics = ca.flowCtx.Cfg.JobRegistry.MetricsStruct().Changefeed.(*Metrics)
sm := &sinkMetrics{SinkMetrics: ca.metrics.SinkMetrics}
ca.sink, err = getSink(ctx, ca.flowCtx.Cfg, ca.spec.Feed, timestampOracle,
ca.spec.User(), ca.spec.JobID, ca.metrics.SinkMetrics)
ca.spec.User(), ca.spec.JobID, sm)

if err != nil {
err = changefeedbase.MarkRetryableError(err)
Expand All @@ -264,7 +265,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {

ca.sink = &errorWrapperSink{wrapped: ca.sink}

ca.eventProducer, err = ca.startKVFeed(ctx, spans, initialHighWater, needsInitialScan)
ca.eventProducer, err = ca.startKVFeed(ctx, spans, initialHighWater, needsInitialScan, sm)
if err != nil {
// Early abort in the case that there is an error creating the sink.
ca.MoveToDraining(err)
Expand All @@ -282,7 +283,11 @@ func (ca *changeAggregator) Start(ctx context.Context) {
}

func (ca *changeAggregator) startKVFeed(
ctx context.Context, spans []roachpb.Span, initialHighWater hlc.Timestamp, needsInitialScan bool,
ctx context.Context,
spans []roachpb.Span,
initialHighWater hlc.Timestamp,
needsInitialScan bool,
sm *sinkMetrics,
) (kvevent.Reader, error) {
cfg := ca.flowCtx.Cfg
buf := kvevent.NewThrottlingBuffer(
Expand All @@ -291,7 +296,7 @@ func (ca *changeAggregator) startKVFeed(

// KVFeed takes ownership of the kvevent.Writer portion of the buffer, while
// we return the kvevent.Reader part to the caller.
kvfeedCfg := ca.makeKVFeedCfg(ctx, spans, buf, initialHighWater, needsInitialScan)
kvfeedCfg := ca.makeKVFeedCfg(ctx, spans, buf, initialHighWater, needsInitialScan, sm)

// Give errCh enough buffer both possible errors from supporting goroutines,
// but only the first one is ever used.
Expand Down Expand Up @@ -322,6 +327,7 @@ func (ca *changeAggregator) makeKVFeedCfg(
buf kvevent.Writer,
initialHighWater hlc.Timestamp,
needsInitialScan bool,
sm *sinkMetrics,
) kvfeed.Config {
schemaChangeEvents := changefeedbase.SchemaChangeEventClass(
ca.spec.Feed.Opts[changefeedbase.OptSchemaChangeEvents])
Expand All @@ -338,6 +344,16 @@ func (ca *changeAggregator) makeKVFeedCfg(
initialHighWater, &ca.metrics.SchemaFeedMetrics)
}

onBackfillCb := func(backfillTS hlc.Timestamp) {
if backfillTS.IsEmpty() {
sm.backfilling.Set(false)
ca.metrics.BackfillCount.Dec(1)
} else {
sm.backfilling.Set(true)
ca.metrics.BackfillCount.Inc(1)
}
}

return kvfeed.Config{
Writer: buf,
Settings: cfg.Settings,
Expand All @@ -349,6 +365,7 @@ func (ca *changeAggregator) makeKVFeedCfg(
BackfillCheckpoint: ca.spec.Checkpoint.Spans,
Targets: ca.spec.Feed.Targets,
Metrics: &ca.metrics.KVFeedMetrics,
OnBackfillCallback: onBackfillCb,
MM: ca.kvFeedMemMon,
InitialHighWater: initialHighWater,
WithDiff: withDiff,
Expand Down Expand Up @@ -501,6 +518,11 @@ func (ca *changeAggregator) tick() error {
queuedNanos := timeutil.Since(event.BufferAddTimestamp()).Nanoseconds()
ca.metrics.QueueTimeNanos.Inc(queuedNanos)

// Keep track of SLI latency for non-backfill/rangefeed KV events.
if event.Type() == kvevent.TypeKV && event.BackfillTimestamp().IsEmpty() {
ca.metrics.AdmitLatency.RecordValue(timeutil.Since(event.Timestamp().GoTime()).Nanoseconds())
}

switch event.Type() {
case kvevent.TypeKV:
return ca.eventConsumer.ConsumeEvent(ca.Ctx, event)
Expand Down Expand Up @@ -1086,8 +1108,9 @@ func (cf *changeFrontier) Start(ctx context.Context) {
// but the oracle is only used when emitting row updates.
var nilOracle timestampLowerBoundOracle
var err error
sm := &sinkMetrics{SinkMetrics: cf.metrics.SinkMetrics}
cf.sink, err = getSink(ctx, cf.flowCtx.Cfg, cf.spec.Feed, nilOracle,
cf.spec.User(), cf.spec.JobID, cf.metrics.SinkMetrics)
cf.spec.User(), cf.spec.JobID, sm)

if err != nil {
err = changefeedbase.MarkRetryableError(err)
Expand Down
7 changes: 5 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,9 +363,12 @@ func changefeedPlanHook(
// which will be immediately closed, only to check for errors.
{
var nilOracle timestampLowerBoundOracle
sm := &sinkMetrics{
SinkMetrics: p.ExecCfg().JobRegistry.MetricsStruct().Changefeed.(*Metrics).SinkMetrics,
}
canarySink, err := getSink(
ctx, &p.ExecCfg().DistSQLSrv.ServerConfig, details, nilOracle, p.User(), jobspb.InvalidJobID,
p.ExecCfg().JobRegistry.MetricsStruct().Changefeed.(*Metrics).SinkMetrics,
ctx, &p.ExecCfg().DistSQLSrv.ServerConfig, details, nilOracle,
p.User(), jobspb.InvalidJobID, sm,
)
if err != nil {
return changefeedbase.MaybeStripRetryableErrorMarker(err)
Expand Down
8 changes: 8 additions & 0 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Config struct {
Targets jobspb.ChangefeedTargets
Writer kvevent.Writer
Metrics *kvevent.Metrics
OnBackfillCallback func(timestamp hlc.Timestamp)
MM *mon.BytesMonitor
WithDiff bool
SchemaChangeEvents changefeedbase.SchemaChangeEventClass
Expand Down Expand Up @@ -98,6 +99,7 @@ func Run(ctx context.Context, cfg Config) error {
cfg.Codec,
cfg.SchemaFeed,
sc, pff, bf, cfg.Knobs)
f.onBackfillCallback = cfg.OnBackfillCallback

g := ctxgroup.WithContext(ctx)
g.GoCtx(cfg.SchemaFeed.Run)
Expand Down Expand Up @@ -155,6 +157,7 @@ type kvFeed struct {
writer kvevent.Writer
codec keys.SQLCodec

onBackfillCallback func(timestamp hlc.Timestamp)
schemaChangeEvents changefeedbase.SchemaChangeEventClass
schemaChangePolicy changefeedbase.SchemaChangePolicy

Expand Down Expand Up @@ -324,6 +327,11 @@ func (f *kvFeed) scanIfShould(
return nil
}

if f.onBackfillCallback != nil {
f.onBackfillCallback(scanTime)
defer f.onBackfillCallback(hlc.Timestamp{})
}

if err := f.scanner.Scan(ctx, f.writer, physicalConfig{
Spans: spansToBackfill,
Timestamp: scanTime,
Expand Down
62 changes: 54 additions & 8 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,38 @@ type SinkMetrics struct {
BatchHistNanos *metric.Histogram
Flushes *metric.Counter
FlushHistNanos *metric.Histogram
CommitLatency *metric.Histogram
}

// MetricStruct implements metric.Struct interface.
func (m *SinkMetrics) MetricStruct() {}

// sinkMetrics annotates global SinkMetrics with per-changefeed information,
// such as the state of backfill.
type sinkMetrics struct {
*SinkMetrics
backfilling syncutil.AtomicBool
}

// sinkDoesNotCompress is a sentinel value indicating the the sink
// does not compress the data it emits.
const sinkDoesNotCompress = -1

type recordEmittedMessagesCallback func(numMessages int, bytes int, compressedBytes int)
type recordEmittedMessagesCallback func(numMessages int, mvcc hlc.Timestamp, bytes int, compressedBytes int)

func (m *SinkMetrics) recordEmittedMessages() recordEmittedMessagesCallback {
func (m *sinkMetrics) recordEmittedMessages() recordEmittedMessagesCallback {
if m == nil {
return func(numMessages int, bytes int, compressedBytes int) {}
return func(numMessages int, mvcc hlc.Timestamp, bytes int, compressedBytes int) {}
}

start := timeutil.Now()
return func(numMessages int, bytes int, compressedBytes int) {
m.recordEmittedBatch(start, numMessages, bytes, compressedBytes)
return func(numMessages int, mvcc hlc.Timestamp, bytes int, compressedBytes int) {
m.recordEmittedBatch(start, numMessages, mvcc, bytes, compressedBytes)
}
}

func (m *SinkMetrics) recordEmittedBatch(
startTime time.Time, numMessages int, bytes int, compressedBytes int,
func (m *sinkMetrics) recordEmittedBatch(
startTime time.Time, numMessages int, mvcc hlc.Timestamp, bytes int, compressedBytes int,
) {
if m == nil {
return
Expand All @@ -65,9 +73,12 @@ func (m *SinkMetrics) recordEmittedBatch(
}
m.FlushedBytes.Inc(int64(compressedBytes))
m.BatchHistNanos.RecordValue(emitNanos)
if !m.backfilling.Get() {
m.CommitLatency.RecordValue(timeutil.Since(mvcc.GoTime()).Nanoseconds())
}
}

func (m *SinkMetrics) recordResolvedCallback() func() {
func (m *sinkMetrics) recordResolvedCallback() func() {
if m == nil {
return func() {}
}
Expand Down Expand Up @@ -97,6 +108,8 @@ const (
changefeedCheckpointHistMaxLatency = 30 * time.Second
changefeedEmitHistMaxLatency = 30 * time.Second
changefeedFlushHistMaxLatency = 1 * time.Minute
admitLatencyMaxValue = 60 * time.Second
commitLatencyMaxValue = 10 * 60 * time.Second
)

var (
Expand Down Expand Up @@ -149,6 +162,25 @@ var (
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaAdmitLatency = metric.Metadata{
Name: "changefeed.admit_latency",
Help: "Event admission latency: a difference between event MVCC timestamp " +
"and the time it was admitted into changefeed pipeline; " +
"Note: this metric includes the time spent waiting until event can be processed due " +
"to backpressure or time spent resolving schema descriptors. " +
"Also note, this metric excludes latency during backfill",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaCommitLatency = metric.Metadata{
Name: "changefeed.commit_latency",
Help: "Event commit latency: a difference between event MVCC timestamp " +
"and the time it was acknowledged by the downstream sink. If the sink batches events, " +
" then the difference between the oldest event in the batch and acknowledgement is recorded; " +
"Excludes latency during backfill",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaChangefeedRunning = metric.Metadata{
Name: "changefeed.running",
Help: "Number of currently running changefeeds, including sinkless",
Expand Down Expand Up @@ -194,6 +226,12 @@ var (
Measurement: "Updates",
Unit: metric.Unit_COUNT,
}
metaChangefeedBackfillCount = metric.Metadata{
Name: "changefeed.backfill_count",
Help: "Number of changefeeds currently executing backfill",
Measurement: "Count",
Unit: metric.Unit_COUNT,
}
)

// Metrics are for production monitoring of changefeeds.
Expand All @@ -205,8 +243,10 @@ type Metrics struct {
ErrorRetries *metric.Counter
Failures *metric.Counter
ResolvedMessages *metric.Counter
BackfillCount *metric.Gauge

QueueTimeNanos *metric.Counter
AdmitLatency *metric.Histogram
CheckpointHistNanos *metric.Histogram
Running *metric.Gauge

Expand Down Expand Up @@ -237,6 +277,9 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {
changefeedEmitHistMaxLatency.Nanoseconds(), 1),
FlushHistNanos: metric.NewHistogram(metaChangefeedFlushHistNanos, histogramWindow,
changefeedFlushHistMaxLatency.Nanoseconds(), 1),

CommitLatency: metric.NewHistogram(metaCommitLatency, histogramWindow,
commitLatencyMaxValue.Nanoseconds(), 1),
},

KVFeedMetrics: kvevent.MakeMetrics(histogramWindow),
Expand All @@ -245,10 +288,13 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {
ResolvedMessages: metric.NewCounter(metaChangefeedForwardedResolvedMessages),
Failures: metric.NewCounter(metaChangefeedFailures),
QueueTimeNanos: metric.NewCounter(metaEventQueueTime),
AdmitLatency: metric.NewHistogram(metaAdmitLatency, histogramWindow,
admitLatencyMaxValue.Nanoseconds(), 1),

CheckpointHistNanos: metric.NewHistogram(metaChangefeedCheckpointHistNanos, histogramWindow,
changefeedCheckpointHistMaxLatency.Nanoseconds(), 2),

BackfillCount: metric.NewGauge(metaChangefeedBackfillCount),
Running: metric.NewGauge(metaChangefeedRunning),
FrontierUpdates: metric.NewCounter(metaChangefeedFrontierUpdates),
ThrottleMetrics: cdcutils.MakeMetrics(histogramWindow),
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func getSink(
timestampOracle timestampLowerBoundOracle,
user security.SQLUsername,
jobID jobspb.JobID,
m *SinkMetrics,
m *sinkMetrics,
) (Sink, error) {
u, err := url.Parse(feedCfg.SinkURI)
if err != nil {
Expand Down Expand Up @@ -298,7 +298,7 @@ type bufferSink struct {
alloc rowenc.DatumAlloc
scratch bufalloc.ByteAllocator
closed bool
metrics *SinkMetrics
metrics *sinkMetrics
}

// EmitRow implements the Sink interface.
Expand All @@ -310,7 +310,7 @@ func (s *bufferSink) EmitRow(
r kvevent.Alloc,
) error {
defer r.Release(ctx)
defer s.metrics.recordEmittedMessages()(1, len(key)+len(value), sinkDoesNotCompress)
defer s.metrics.recordEmittedMessages()(1, mvcc, len(key)+len(value), sinkDoesNotCompress)

if s.closed {
return errors.New(`cannot EmitRow on a closed sink`)
Expand Down
20 changes: 14 additions & 6 deletions pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type cloudStorageSinkFile struct {
numMessages int
buf bytes.Buffer
alloc kvevent.Alloc
oldestMVCC hlc.Timestamp
recordMetrics recordEmittedMessagesCallback
}

Expand Down Expand Up @@ -299,7 +300,7 @@ type cloudStorageSink struct {
dataFileTs string
dataFilePartition string
prevFilename string
metrics *SinkMetrics
metrics *sinkMetrics
}

const sinkCompressionGzip = "gzip"
Expand All @@ -326,7 +327,7 @@ func makeCloudStorageSink(
timestampOracle timestampLowerBoundOracle,
makeExternalStorageFromURI cloud.ExternalStorageFromURIFactory,
user security.SQLUsername,
m *SinkMetrics,
m *sinkMetrics,
) (Sink, error) {
var targetMaxFileSize int64 = 16 << 20 // 16MB
if fileSizeParam := u.consumeParam(changefeedbase.SinkParamFileSize); fileSizeParam != `` {
Expand Down Expand Up @@ -404,14 +405,21 @@ func makeCloudStorageSink(
return s, nil
}

func (s *cloudStorageSink) getOrCreateFile(topic TopicDescriptor) *cloudStorageSinkFile {
func (s *cloudStorageSink) getOrCreateFile(
topic TopicDescriptor, eventMVCC hlc.Timestamp,
) *cloudStorageSinkFile {
key := cloudStorageSinkKey{topic.GetName(), int64(topic.GetVersion())}
if item := s.files.Get(key); item != nil {
return item.(*cloudStorageSinkFile)
f := item.(*cloudStorageSinkFile)
if eventMVCC.Less(f.oldestMVCC) {
f.oldestMVCC = eventMVCC
}
return f
}
f := &cloudStorageSinkFile{
cloudStorageSinkKey: key,
recordMetrics: s.metrics.recordEmittedMessages(),
oldestMVCC: eventMVCC,
}
switch s.compression {
case sinkCompressionGzip:
Expand All @@ -433,7 +441,7 @@ func (s *cloudStorageSink) EmitRow(
return errors.New(`cannot EmitRow on a closed sink`)
}

file := s.getOrCreateFile(topic)
file := s.getOrCreateFile(topic, mvcc)
file.alloc.Merge(&alloc)

if _, err := file.Write(value); err != nil {
Expand Down Expand Up @@ -573,7 +581,7 @@ func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSink
if err := cloud.WriteFile(ctx, s.es, filepath.Join(s.dataFilePartition, filename), bytes.NewReader(file.buf.Bytes())); err != nil {
return err
}
file.recordMetrics(file.numMessages, file.rawSize, compressedBytes)
file.recordMetrics(file.numMessages, file.oldestMVCC, file.rawSize, compressedBytes)

return nil
}
Expand Down
Loading

0 comments on commit 35e3995

Please sign in to comment.