Skip to content

Commit

Permalink
Merge #88370
Browse files Browse the repository at this point in the history
88370: changefeedccl: add metric for size based flushes r=jayshrivastava a=jayshrivastava

Add metric to track when the cloud storage sink has to flush
data due to the file size limit being reached.

Fixes: #84435
Release note: None



Co-authored-by: Jayant Shrivastava <jayants@cockroachlabs.com>
  • Loading branch information
craig[bot] and jayshrivastava committed Sep 21, 2022
2 parents 31754ac + dcba53b commit 6450a9d
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 6 deletions.
37 changes: 31 additions & 6 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type AggMetrics struct {
BatchHistNanos *aggmetric.AggHistogram
Flushes *aggmetric.AggCounter
FlushHistNanos *aggmetric.AggHistogram
SizeBasedFlushes *aggmetric.AggCounter
CommitLatency *aggmetric.AggHistogram
BackfillCount *aggmetric.AggGauge
BackfillPendingRanges *aggmetric.AggGauge
Expand Down Expand Up @@ -85,6 +86,7 @@ type metricsRecorder interface {
recordFlushRequestCallback() func()
getBackfillCallback() func() func()
getBackfillRangeCallback() func(int64) (func(), func())
recordSizeBasedFlush()
}

var _ metricsRecorder = (*sliMetrics)(nil)
Expand All @@ -102,6 +104,7 @@ type sliMetrics struct {
BatchHistNanos *aggmetric.Histogram
Flushes *aggmetric.Counter
FlushHistNanos *aggmetric.Histogram
SizeBasedFlushes *aggmetric.Counter
CommitLatency *aggmetric.Histogram
ErrorRetries *aggmetric.Counter
AdmitLatency *aggmetric.Histogram
Expand Down Expand Up @@ -224,6 +227,15 @@ func (m *sliMetrics) getBackfillRangeCallback() func(int64) (func(), func()) {
}
}

// Record size-based flush.
func (m *sliMetrics) recordSizeBasedFlush() {
if m == nil {
return
}

m.SizeBasedFlushes.Inc(1)
}

type wrappingCostController struct {
ctx context.Context
inner metricsRecorder
Expand Down Expand Up @@ -284,6 +296,11 @@ func (w *wrappingCostController) getBackfillRangeCallback() func(int64) (func(),
return w.inner.getBackfillRangeCallback()
}

// Record size-based flush.
func (w *wrappingCostController) recordSizeBasedFlush() {
w.inner.recordSizeBasedFlush()
}

var (
metaChangefeedForwardedResolvedMessages = metric.Metadata{
Name: "changefeed.forwarded_resolved_messages",
Expand Down Expand Up @@ -369,6 +386,12 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
Measurement: "Flushes",
Unit: metric.Unit_COUNT,
}
metaSizeBasedFlushes := metric.Metadata{
Name: "changefeed.size_based_flushes",
Help: "Total size based flushes across all feeds",
Measurement: "Flushes",
Unit: metric.Unit_COUNT,
}
metaChangefeedBatchHistNanos := metric.Metadata{
Name: "changefeed.sink_batch_hist_nanos",
Help: "Time spent batched in the sink buffer before being flushed and acknowledged",
Expand Down Expand Up @@ -440,12 +463,13 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
// retain significant figures of 2.
b := aggmetric.MakeBuilder("scope")
a := &AggMetrics{
ErrorRetries: b.Counter(metaChangefeedErrorRetries),
EmittedMessages: b.Counter(metaChangefeedEmittedMessages),
MessageSize: b.Histogram(metaMessageSize, histogramWindow, metric.DataSize16MBBuckets),
EmittedBytes: b.Counter(metaChangefeedEmittedBytes),
FlushedBytes: b.Counter(metaChangefeedFlushedBytes),
Flushes: b.Counter(metaChangefeedFlushes),
ErrorRetries: b.Counter(metaChangefeedErrorRetries),
EmittedMessages: b.Counter(metaChangefeedEmittedMessages),
MessageSize: b.Histogram(metaMessageSize, histogramWindow, metric.DataSize16MBBuckets),
EmittedBytes: b.Counter(metaChangefeedEmittedBytes),
FlushedBytes: b.Counter(metaChangefeedFlushedBytes),
Flushes: b.Counter(metaChangefeedFlushes),
SizeBasedFlushes: b.Counter(metaSizeBasedFlushes),

BatchHistNanos: b.Histogram(metaChangefeedBatchHistNanos, histogramWindow, metric.BatchProcessLatencyBuckets),
FlushHistNanos: b.Histogram(metaChangefeedFlushHistNanos, histogramWindow, metric.BatchProcessLatencyBuckets),
Expand Down Expand Up @@ -507,6 +531,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
BatchHistNanos: a.BatchHistNanos.AddChild(scope),
Flushes: a.Flushes.AddChild(scope),
FlushHistNanos: a.FlushHistNanos.AddChild(scope),
SizeBasedFlushes: a.SizeBasedFlushes.AddChild(scope),
CommitLatency: a.CommitLatency.AddChild(scope),
ErrorRetries: a.ErrorRetries.AddChild(scope),
AdmitLatency: a.AdmitLatency.AddChild(scope),
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ func (s *cloudStorageSink) EmitRow(
}

if int64(file.buf.Len()) > s.targetMaxFileSize {
s.metrics.recordSizeBasedFlush()
if err := s.flushTopicVersions(ctx, file.topic, file.schemaID); err != nil {
return err
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/ts/catalog/chart_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -1385,6 +1385,12 @@ var charts = []sectionDescription{
"changefeed.flushes",
},
},
{
Title: "Size Based Flushes",
Metrics: []string{
"changefeed.size_based_flushes",
},
},
{
Title: "Max Behind Nanos",
Metrics: []string{
Expand Down

0 comments on commit 6450a9d

Please sign in to comment.