Skip to content

Commit

Permalink
streamingccl/logical: cleanup metrics in-code names
Browse files Browse the repository at this point in the history
This cleans up the in-code names of the metrics in the logical package, to
use StreamBatch or ApplyBatch and standardize on Hist suffix appearing last,
and to have the meta names match the field names.

A follow-up change may update the persisted names and the dashboard accordingly;
this change is only changing the in-code variable names.

Release note: none.
Epic: none.
  • Loading branch information
dt committed Jun 19, 2024
1 parent becf617 commit 8285451
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ func (lrw *logicalReplicationWriterProcessor) flushBuffer(
batchTime := timeutil.Since(preBatchTime)

lrw.debug.RecordBatchApplied(batchTime, int64(batchEnd-batchStart))
lrw.metrics.BatchHistNanos.RecordValue(batchTime.Nanoseconds())
lrw.metrics.ApplyBatchNanosHist.RecordValue(batchTime.Nanoseconds())
flushByteSize.Add(int64(batchStats.byteSize))
}
return nil
Expand All @@ -506,14 +506,13 @@ func (lrw *logicalReplicationWriterProcessor) flushBuffer(
keyCount, byteCount := int64(len(kvs)), flushByteSize.Load()
lrw.debug.RecordFlushComplete(flushTime, keyCount, byteCount)

lrw.metrics.FlushHistNanos.RecordValue(flushTime)
lrw.metrics.FlushRowCountHist.RecordValue(keyCount)
lrw.metrics.FlushBytesHist.RecordValue(byteCount)
lrw.metrics.IngestedLogicalBytes.Inc(byteCount)

lrw.metrics.CommitLatency.RecordValue(timeutil.Since(firstKeyTS).Nanoseconds())
lrw.metrics.IngestedEvents.Inc(int64(len(kvs)))
lrw.metrics.AppliedRowUpdates.Inc(int64(len(kvs)))
lrw.metrics.AppliedLogicalBytes.Inc(byteCount)
lrw.metrics.CommitToCommitLatency.RecordValue(timeutil.Since(firstKeyTS).Nanoseconds())

lrw.metrics.StreamBatchNanosHist.RecordValue(flushTime)
lrw.metrics.StreamBatchRowsHist.RecordValue(keyCount)
lrw.metrics.StreamBatchBytesHist.RecordValue(byteCount)
return nil
}

Expand Down
105 changes: 60 additions & 45 deletions pkg/ccl/crosscluster/logical/metrics.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 The Cockroach Authors.
// Copyright 2024 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
Expand All @@ -15,31 +15,20 @@ import (
)

var (
metaReplicationEventsIngested = metric.Metadata{
// Top-line metrics.
metaAppliedRowUpdates = metric.Metadata{
Name: "logical_replication.events_ingested",
Help: "Events ingested by all replication jobs",
Measurement: "Events",
Unit: metric.Unit_COUNT,
}
metaReplicationCheckpointEventsIngested = metric.Metadata{
Name: "logical_replication.checkpoint_events_ingested",
Help: "Checkpoint events ingested by all replication jobs",
Measurement: "Events",
Unit: metric.Unit_COUNT,
}
metaReplicationIngestedBytes = metric.Metadata{
metaAppliedLogicalBytes = metric.Metadata{
Name: "logical_replication.logical_bytes",
Help: "Logical bytes (sum of keys + values) ingested by all replication jobs",
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaReplicationFlushHistNanos = metric.Metadata{
Name: "logical_replication.flush_hist_nanos",
Help: "Time spent flushing messages across all replication streams",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaReplicationCommitLatency = metric.Metadata{
metaCommitToCommitLatency = metric.Metadata{
Name: "logical_replication.commit_latency",
Help: "Event commit latency: a difference between event MVCC timestamp " +
"and the time it was flushed into disk. If we batch events, then the difference " +
Expand All @@ -53,37 +42,63 @@ var (
Measurement: "Seconds",
Unit: metric.Unit_SECONDS,
}
metaReplicationFlushRowCountHist = metric.Metadata{

// User-visible health and ops metrics.
metaApplyBatchNanosHist = metric.Metadata{
Name: "logical_replication.batch_hist_nanos",
Help: "Time spent flushing a batch",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}

// Internal metrics.
metaCheckpointEvents = metric.Metadata{
Name: "logical_replication.checkpoint_events_ingested",
Help: "Checkpoint events ingested by all replication jobs",
Measurement: "Events",
Unit: metric.Unit_COUNT,
}
metaStreamBatchRowsHist = metric.Metadata{
Name: "logical_replication.flush_row_count",
Help: "Number of rows in a given flush",
Measurement: "Rows",
Unit: metric.Unit_COUNT,
}
metaReplicationFlushBytesHist = metric.Metadata{
metaStreamBatchBytesHist = metric.Metadata{
Name: "logical_replication.flush_bytes",
Help: "Number of bytes in a given flush",
Measurement: "Logical bytes",
Unit: metric.Unit_BYTES,
}
metaReplicationBatchHistNanos = metric.Metadata{
Name: "logical_replication.batch_hist_nanos",
Help: "Time spent flushing a batch",
metaStreamBatchNanosHist = metric.Metadata{
Name: "logical_replication.flush_hist_nanos",
Help: "Time spent flushing messages across all replication streams",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
)

// Metrics are for production monitoring of logical replication jobs.
type Metrics struct {
IngestedEvents *metric.Counter
IngestedLogicalBytes *metric.Counter
CheckpointEvents *metric.Counter
FlushRowCountHist metric.IHistogram
FlushBytesHist metric.IHistogram
FlushHistNanos metric.IHistogram
BatchHistNanos metric.IHistogram
CommitLatency metric.IHistogram
// Top-line user-facing numbers that how many events and how much data are
// bring moved and applied/rejected/etc.
AppliedRowUpdates *metric.Counter
AppliedLogicalBytes *metric.Counter
CommitToCommitLatency metric.IHistogram
ReplicatedTimeSeconds *metric.Gauge

// User-surfaced information about the health/operation of the stream; this
// should be a narrow subset of numbers that are actually relevant to a user
// such as the latency of application as that could be their supplied UDF.
ApplyBatchNanosHist metric.IHistogram

// Internal numbers that are useful for determining why a stream is behaving
// a specific way.
CheckpointEvents *metric.Counter
// TODO(dt): are these stream batch size or latency numbers useful?
StreamBatchRowsHist metric.IHistogram
StreamBatchBytesHist metric.IHistogram
StreamBatchNanosHist metric.IHistogram
}

// MetricStruct implements the metric.Struct interface.
Expand All @@ -92,39 +107,39 @@ func (*Metrics) MetricStruct() {}
// MakeMetrics makes the metrics for logical replication job monitoring.
func MakeMetrics(histogramWindow time.Duration) metric.Struct {
return &Metrics{
IngestedEvents: metric.NewCounter(metaReplicationEventsIngested),
IngestedLogicalBytes: metric.NewCounter(metaReplicationIngestedBytes),
CheckpointEvents: metric.NewCounter(metaReplicationCheckpointEventsIngested),
FlushHistNanos: metric.NewHistogram(metric.HistogramOptions{
AppliedRowUpdates: metric.NewCounter(metaAppliedRowUpdates),
AppliedLogicalBytes: metric.NewCounter(metaAppliedLogicalBytes),
CommitToCommitLatency: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePrometheus,
Metadata: metaReplicationFlushHistNanos,
Metadata: metaCommitToCommitLatency,
Duration: histogramWindow,
BucketConfig: metric.BatchProcessLatencyBuckets,
BucketConfig: metric.LongRunning60mLatencyBuckets,
}),
CommitLatency: metric.NewHistogram(metric.HistogramOptions{
ReplicatedTimeSeconds: metric.NewGauge(metaReplicatedTimeSeconds),
ApplyBatchNanosHist: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePrometheus,
Metadata: metaReplicationCommitLatency,
Metadata: metaApplyBatchNanosHist,
Duration: histogramWindow,
BucketConfig: metric.LongRunning60mLatencyBuckets,
BucketConfig: metric.BatchProcessLatencyBuckets,
}),
FlushRowCountHist: metric.NewHistogram(metric.HistogramOptions{
CheckpointEvents: metric.NewCounter(metaCheckpointEvents),
StreamBatchRowsHist: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePrometheus,
Metadata: metaReplicationFlushRowCountHist,
Metadata: metaStreamBatchRowsHist,
Duration: histogramWindow,
BucketConfig: metric.BatchProcessLatencyBuckets,
}),
FlushBytesHist: metric.NewHistogram(metric.HistogramOptions{
StreamBatchBytesHist: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePrometheus,
Metadata: metaReplicationFlushBytesHist,
Metadata: metaStreamBatchBytesHist,
Duration: histogramWindow,
BucketConfig: metric.BatchProcessLatencyBuckets,
}),
BatchHistNanos: metric.NewHistogram(metric.HistogramOptions{
StreamBatchNanosHist: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePrometheus,
Metadata: metaReplicationBatchHistNanos,
Metadata: metaStreamBatchNanosHist,
Duration: histogramWindow,
BucketConfig: metric.BatchProcessLatencyBuckets,
}),
ReplicatedTimeSeconds: metric.NewGauge(metaReplicatedTimeSeconds),
}
}

0 comments on commit 8285451

Please sign in to comment.