Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
125901: streamingccl/logical: remove extra/disused metrics, cleanup var names r=dt a=dt

See commits.

Co-authored-by: David Taylor <tinystatemachine@gmail.com>
  • Loading branch information
craig[bot] and dt committed Jun 19, 2024
2 parents 13c4659 + 8285451 commit 8259239
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 128 deletions.
6 changes: 0 additions & 6 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -1332,19 +1332,13 @@
<tr><td>APPLICATION</td><td>kv.protectedts.reconciliation.num_runs</td><td>number of successful reconciliation runs on this node</td><td>Count</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>kv.protectedts.reconciliation.records_processed</td><td>number of records processed without error during reconciliation on this node</td><td>Count</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>kv.protectedts.reconciliation.records_removed</td><td>number of records removed during reconciliation runs on this node</td><td>Count</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.admit_latency</td><td>Event admission latency: a difference between event MVCC timestamp and the time it was admitted into ingestion processor</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.batch_bytes</td><td>Number of bytes in a given batch</td><td>Bytes</td><td>HISTOGRAM</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.batch_hist_nanos</td><td>Time spent flushing a batch</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.checkpoint_events_ingested</td><td>Checkpoint events ingested by all replication jobs</td><td>Events</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.commit_latency</td><td>Event commit latency: a difference between event MVCC timestamp and the time it was flushed into disk. If we batch events, then the difference between the oldest event in the batch and flush is recorded</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.distsql_replan_count</td><td>Total number of dist sql replanning events</td><td>Events</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.events_ingested</td><td>Events ingested by all replication jobs</td><td>Events</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.flush_bytes</td><td>Number of bytes in a given flush</td><td>Logical bytes</td><td>HISTOGRAM</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.flush_hist_nanos</td><td>Time spent flushing messages across all replication streams</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.flush_row_count</td><td>Number of rows in a given flush</td><td>Rows</td><td>HISTOGRAM</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.flush_wait_nanos</td><td>Time spenting waiting for an in-progress flush</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.flushes</td><td>Total flushes across all replication jobs</td><td>Flushes</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.job_progress_updates</td><td>Total number of updates to the ingestion job progress</td><td>Job Updates</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.logical_bytes</td><td>Logical bytes (sum of keys + values) ingested by all replication jobs</td><td>Bytes</td><td>COUNTER</td><td>BYTES</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.replicated_time_seconds</td><td>The replicated time of the logical replication stream in seconds since the unix epoch.</td><td>Seconds</td><td>GAUGE</td><td>SECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>physical_replication.admit_latency</td><td>Event admission latency: a difference between event MVCC timestamp and the time it was admitted into ingestion processor</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,13 +350,6 @@ func (lrw *logicalReplicationWriterProcessor) consumeEvents(ctx context.Context)
func (lrw *logicalReplicationWriterProcessor) handleEvent(
ctx context.Context, event crosscluster.Event,
) error {
switch event.Type() {
case crosscluster.KVEvent:
ts := event.GetKVs()[0].KeyValue.Value.Timestamp.GoTime()
lrw.metrics.AdmitLatency.RecordValue(
timeutil.Since(ts).Nanoseconds())
}

if streamingKnobs, ok := lrw.FlowCtx.TestingKnobs().StreamingTestingKnobs.(*sql.StreamingTestingKnobs); ok {
if streamingKnobs != nil && streamingKnobs.RunAfterReceivingEvent != nil {
if err := streamingKnobs.RunAfterReceivingEvent(lrw.Ctx()); err != nil {
Expand Down Expand Up @@ -494,8 +487,7 @@ func (lrw *logicalReplicationWriterProcessor) flushBuffer(
batchTime := timeutil.Since(preBatchTime)

lrw.debug.RecordBatchApplied(batchTime, int64(batchEnd-batchStart))
lrw.metrics.BatchBytesHist.RecordValue(int64(batchStats.byteSize))
lrw.metrics.BatchHistNanos.RecordValue(batchTime.Nanoseconds())
lrw.metrics.ApplyBatchNanosHist.RecordValue(batchTime.Nanoseconds())
flushByteSize.Add(int64(batchStats.byteSize))
}
return nil
Expand All @@ -514,15 +506,13 @@ func (lrw *logicalReplicationWriterProcessor) flushBuffer(
keyCount, byteCount := int64(len(kvs)), flushByteSize.Load()
lrw.debug.RecordFlushComplete(flushTime, keyCount, byteCount)

lrw.metrics.Flushes.Inc(1)
lrw.metrics.FlushHistNanos.RecordValue(flushTime)
lrw.metrics.FlushRowCountHist.RecordValue(keyCount)
lrw.metrics.FlushBytesHist.RecordValue(byteCount)
lrw.metrics.IngestedLogicalBytes.Inc(byteCount)

lrw.metrics.CommitLatency.RecordValue(timeutil.Since(firstKeyTS).Nanoseconds())
lrw.metrics.IngestedEvents.Inc(int64(len(kvs)))
lrw.metrics.AppliedRowUpdates.Inc(int64(len(kvs)))
lrw.metrics.AppliedLogicalBytes.Inc(byteCount)
lrw.metrics.CommitToCommitLatency.RecordValue(timeutil.Since(firstKeyTS).Nanoseconds())

lrw.metrics.StreamBatchNanosHist.RecordValue(flushTime)
lrw.metrics.StreamBatchRowsHist.RecordValue(keyCount)
lrw.metrics.StreamBatchBytesHist.RecordValue(byteCount)
return nil
}

Expand Down
161 changes: 56 additions & 105 deletions pkg/ccl/crosscluster/logical/metrics.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 The Cockroach Authors.
// Copyright 2024 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
Expand All @@ -15,118 +15,90 @@ import (
)

var (
metaReplicationEventsIngested = metric.Metadata{
// Top-line metrics.
metaAppliedRowUpdates = metric.Metadata{
Name: "logical_replication.events_ingested",
Help: "Events ingested by all replication jobs",
Measurement: "Events",
Unit: metric.Unit_COUNT,
}
metaReplicationCheckpointEventsIngested = metric.Metadata{
Name: "logical_replication.checkpoint_events_ingested",
Help: "Checkpoint events ingested by all replication jobs",
Measurement: "Events",
Unit: metric.Unit_COUNT,
}
metaReplicationIngestedBytes = metric.Metadata{
metaAppliedLogicalBytes = metric.Metadata{
Name: "logical_replication.logical_bytes",
Help: "Logical bytes (sum of keys + values) ingested by all replication jobs",
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaReplicationFlushes = metric.Metadata{
Name: "logical_replication.flushes",
Help: "Total flushes across all replication jobs",
Measurement: "Flushes",
Unit: metric.Unit_COUNT,
}
metaReplicationFlushHistNanos = metric.Metadata{
Name: "logical_replication.flush_hist_nanos",
Help: "Time spent flushing messages across all replication streams",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaReplicationCommitLatency = metric.Metadata{
metaCommitToCommitLatency = metric.Metadata{
Name: "logical_replication.commit_latency",
Help: "Event commit latency: a difference between event MVCC timestamp " +
"and the time it was flushed into disk. If we batch events, then the difference " +
"between the oldest event in the batch and flush is recorded",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaReplicationAdmitLatency = metric.Metadata{
Name: "logical_replication.admit_latency",
Help: "Event admission latency: a difference between event MVCC timestamp " +
"and the time it was admitted into ingestion processor",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaReplicatedTimeSeconds = metric.Metadata{
Name: "logical_replication.replicated_time_seconds",
Help: "The replicated time of the logical replication stream in seconds since the unix epoch.",
Measurement: "Seconds",
Unit: metric.Unit_SECONDS,
}
metaJobProgressUpdates = metric.Metadata{
Name: "logical_replication.job_progress_updates",
Help: "Total number of updates to the ingestion job progress",
Measurement: "Job Updates",

// User-visible health and ops metrics.
metaApplyBatchNanosHist = metric.Metadata{
Name: "logical_replication.batch_hist_nanos",
Help: "Time spent flushing a batch",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}

// Internal metrics.
metaCheckpointEvents = metric.Metadata{
Name: "logical_replication.checkpoint_events_ingested",
Help: "Checkpoint events ingested by all replication jobs",
Measurement: "Events",
Unit: metric.Unit_COUNT,
}
metaReplicationFlushRowCountHist = metric.Metadata{
metaStreamBatchRowsHist = metric.Metadata{
Name: "logical_replication.flush_row_count",
Help: "Number of rows in a given flush",
Measurement: "Rows",
Unit: metric.Unit_COUNT,
}
metaReplicationFlushBytesHist = metric.Metadata{
metaStreamBatchBytesHist = metric.Metadata{
Name: "logical_replication.flush_bytes",
Help: "Number of bytes in a given flush",
Measurement: "Logical bytes",
Unit: metric.Unit_BYTES,
}
metaReplicationFlushWaitHistNanos = metric.Metadata{
Name: "logical_replication.flush_wait_nanos",
Help: "Time spenting waiting for an in-progress flush",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaReplicationBatchBytes = metric.Metadata{
Name: "logical_replication.batch_bytes",
Help: "Number of bytes in a given batch",
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaReplicationBatchHistNanos = metric.Metadata{
Name: "logical_replication.batch_hist_nanos",
Help: "Time spent flushing a batch",
metaStreamBatchNanosHist = metric.Metadata{
Name: "logical_replication.flush_hist_nanos",
Help: "Time spent flushing messages across all replication streams",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaDistSQLReplanCount = metric.Metadata{
Name: "logical_replication.distsql_replan_count",
Help: "Total number of dist sql replanning events",
Measurement: "Events",
Unit: metric.Unit_COUNT,
}
)

// Metrics are for production monitoring of logical replication jobs.
type Metrics struct {
IngestedEvents *metric.Counter
IngestedLogicalBytes *metric.Counter
Flushes *metric.Counter
JobProgressUpdates *metric.Counter
CheckpointEvents *metric.Counter
ReplanCount *metric.Counter
FlushRowCountHist metric.IHistogram
FlushBytesHist metric.IHistogram
FlushHistNanos metric.IHistogram
FlushWaitHistNanos metric.IHistogram
BatchBytesHist metric.IHistogram
BatchHistNanos metric.IHistogram
CommitLatency metric.IHistogram
AdmitLatency metric.IHistogram
// Top-line user-facing numbers that how many events and how much data are
// bring moved and applied/rejected/etc.
AppliedRowUpdates *metric.Counter
AppliedLogicalBytes *metric.Counter
CommitToCommitLatency metric.IHistogram
ReplicatedTimeSeconds *metric.Gauge

// User-surfaced information about the health/operation of the stream; this
// should be a narrow subset of numbers that are actually relevant to a user
// such as the latency of application as that could be their supplied UDF.
ApplyBatchNanosHist metric.IHistogram

// Internal numbers that are useful for determining why a stream is behaving
// a specific way.
CheckpointEvents *metric.Counter
// TODO(dt): are these stream batch size or latency numbers useful?
StreamBatchRowsHist metric.IHistogram
StreamBatchBytesHist metric.IHistogram
StreamBatchNanosHist metric.IHistogram
}

// MetricStruct implements the metric.Struct interface.
Expand All @@ -135,60 +107,39 @@ func (*Metrics) MetricStruct() {}
// MakeMetrics makes the metrics for logical replication job monitoring.
func MakeMetrics(histogramWindow time.Duration) metric.Struct {
return &Metrics{
IngestedEvents: metric.NewCounter(metaReplicationEventsIngested),
IngestedLogicalBytes: metric.NewCounter(metaReplicationIngestedBytes),
Flushes: metric.NewCounter(metaReplicationFlushes),
CheckpointEvents: metric.NewCounter(metaReplicationCheckpointEventsIngested),
JobProgressUpdates: metric.NewCounter(metaJobProgressUpdates),
ReplanCount: metric.NewCounter(metaDistSQLReplanCount),
FlushHistNanos: metric.NewHistogram(metric.HistogramOptions{
AppliedRowUpdates: metric.NewCounter(metaAppliedRowUpdates),
AppliedLogicalBytes: metric.NewCounter(metaAppliedLogicalBytes),
CommitToCommitLatency: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePrometheus,
Metadata: metaReplicationFlushHistNanos,
Duration: histogramWindow,
BucketConfig: metric.BatchProcessLatencyBuckets,
}),
CommitLatency: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePrometheus,
Metadata: metaReplicationCommitLatency,
Metadata: metaCommitToCommitLatency,
Duration: histogramWindow,
BucketConfig: metric.LongRunning60mLatencyBuckets,
}),
AdmitLatency: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePrometheus,
Metadata: metaReplicationAdmitLatency,
Duration: histogramWindow,
BucketConfig: metric.BatchProcessLatencyBuckets,
}),
FlushRowCountHist: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePrometheus,
Metadata: metaReplicationFlushRowCountHist,
Duration: histogramWindow,
BucketConfig: metric.BatchProcessLatencyBuckets,
}),
FlushBytesHist: metric.NewHistogram(metric.HistogramOptions{
ReplicatedTimeSeconds: metric.NewGauge(metaReplicatedTimeSeconds),
ApplyBatchNanosHist: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePrometheus,
Metadata: metaReplicationFlushBytesHist,
Metadata: metaApplyBatchNanosHist,
Duration: histogramWindow,
BucketConfig: metric.BatchProcessLatencyBuckets,
}),
FlushWaitHistNanos: metric.NewHistogram(metric.HistogramOptions{
CheckpointEvents: metric.NewCounter(metaCheckpointEvents),
StreamBatchRowsHist: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePrometheus,
Metadata: metaReplicationFlushWaitHistNanos,
Metadata: metaStreamBatchRowsHist,
Duration: histogramWindow,
BucketConfig: metric.BatchProcessLatencyBuckets,
}),
BatchBytesHist: metric.NewHistogram(metric.HistogramOptions{
StreamBatchBytesHist: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePrometheus,
Metadata: metaReplicationBatchBytes,
Metadata: metaStreamBatchBytesHist,
Duration: histogramWindow,
BucketConfig: metric.BatchProcessLatencyBuckets,
}),
BatchHistNanos: metric.NewHistogram(metric.HistogramOptions{
StreamBatchNanosHist: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePrometheus,
Metadata: metaReplicationBatchHistNanos,
Metadata: metaStreamBatchNanosHist,
Duration: histogramWindow,
BucketConfig: metric.BatchProcessLatencyBuckets,
}),
ReplicatedTimeSeconds: metric.NewGauge(metaReplicatedTimeSeconds),
}
}

0 comments on commit 8259239

Please sign in to comment.