Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
63923: changefeed,server: changefeed histograms and configurable slow logging r=stevendanna a=stevendanna

See the individual commits for details.

Overall, the goal of these changes is to get more information about the ongoing test failures
we are seeing in cdc/ledger and cdc/cloud-sink-gcs roachtests.

A number of log messages we might expect to see when the the changeFrontier is behind
aren't emitted because we auto-calculate a slow logging threshold based on the 
closed_timestamp.target_duration setting which we are increasing in these tests.

Further, the current metrics around emitting and flushing are a single counter, which is fine for 
many purposes because the timeseries database allows us to look at the values over time, but when
trying to debug a test issue after-the-fact, the timeseries database is gone and it would be nice to have the 
histogram data.

63941: sql: More nuanced blocking of zone config DISCARD (as well as assorted testing) r=otan a=ajstorm

See individual commits for details.

63956: opt: support correlated CTEs r=RaduBerinde a=RaduBerinde

This change enables building correlated CTEs. Single-use CTEs that can
be inlined will result in a query that can get decorrelated with the
normal rules. If the CTE cannot be inlined, the With operator remains
in place in the tree and prevents decorrelation (apply join will need
to be used during execution).

Fixes #42540.

Release note (sql change): correlated CTEs can now be used.

Co-authored-by: Steven Danna <danna@cockroachlabs.com>
Co-authored-by: Adam Storm <storm@cockroachlabs.com>
Co-authored-by: Radu Berinde <radu@cockroachlabs.com>
  • Loading branch information
4 people committed Apr 22, 2021
4 parents ee8b38e + 5489f0b + 92d1b7c + 53d4fd6 commit 98ba7f3
Show file tree
Hide file tree
Showing 17 changed files with 1,070 additions and 99 deletions.
61 changes: 40 additions & 21 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,8 +858,9 @@ type changeFrontier struct {
freqEmitResolved time.Duration
// lastEmitResolved is the last time a resolved timestamp was emitted.
lastEmitResolved time.Time
// lastSlowSpanLog is the last time a slow span from `sf` was logged.
lastSlowSpanLog time.Time

// slowLogEveryN rate-limits the logging of slow spans
slowLogEveryN log.EveryN

// schemaChangeBoundary represents an hlc timestamp at which a schema change
// event occurred to a target watched by this frontier. If the changefeed is
Expand Down Expand Up @@ -902,7 +903,10 @@ type changeFrontier struct {
metricsID int
}

const runStatusUpdateFrequency time.Duration = time.Minute
const (
runStatusUpdateFrequency time.Duration = time.Minute
slowSpanMaxFrequency = 10 * time.Second
)

type jobState struct {
job *jobs.Job
Expand All @@ -923,11 +927,12 @@ func newChangeFrontierProcessor(
ctx := flowCtx.EvalCtx.Ctx()
memMonitor := execinfra.NewMonitor(ctx, flowCtx.EvalCtx.Mon, "changefntr-mem")
cf := &changeFrontier{
flowCtx: flowCtx,
spec: spec,
memAcc: memMonitor.MakeBoundAccount(),
input: input,
sf: span.MakeFrontier(spec.TrackedSpans...),
flowCtx: flowCtx,
spec: spec,
memAcc: memMonitor.MakeBoundAccount(),
input: input,
sf: span.MakeFrontier(spec.TrackedSpans...),
slowLogEveryN: log.Every(slowSpanMaxFrequency),
}
if err := cf.Init(
cf,
Expand Down Expand Up @@ -1241,9 +1246,13 @@ func (cf *changeFrontier) handleFrontierChanged(isBehind bool) error {
cf.metrics.mu.resolved[cf.metricsID] = newResolved
}
cf.metrics.mu.Unlock()

checkpointStart := timeutil.Now()
if err := cf.checkpointResolvedTimestamp(newResolved, isBehind); err != nil {
return err
}
cf.metrics.CheckpointHistNanos.RecordValue(timeutil.Since(checkpointStart).Nanoseconds())

if err := cf.maybeEmitResolved(newResolved); err != nil {
return err
}
Expand Down Expand Up @@ -1398,38 +1407,48 @@ func (cf *changeFrontier) maybeEmitResolved(newResolved hlc.Timestamp) error {
// returned boolean will be true if the resolved timestamp lags far behind the
// present as defined by the current configuration.
func (cf *changeFrontier) maybeLogBehindSpan(frontierChanged bool) (isBehind bool) {
// These two cluster setting values represent the target responsiveness of
// poller and range feed. The cluster setting for switching between poller and
// rangefeed is only checked at changefeed start/resume, so instead of
// switching on it here, just add them. Also add 1 second in case both these
// settings are set really low (as they are in unit tests).
pollInterval := changefeedbase.TableDescriptorPollInterval.Get(&cf.flowCtx.Cfg.Settings.SV)
closedtsInterval := closedts.TargetDuration.Get(&cf.flowCtx.Cfg.Settings.SV)
slownessThreshold := time.Second + 10*(pollInterval+closedtsInterval)
frontier := cf.sf.Frontier()
now := timeutil.Now()
resolvedBehind := now.Sub(frontier.GoTime())
if resolvedBehind <= slownessThreshold {
if resolvedBehind <= cf.slownessThreshold() {
return false
}

description := `sinkless feed`
description := "sinkless feed"
if !cf.isSinkless() {
description = fmt.Sprintf("job %d", cf.spec.JobID)
}
if frontierChanged {
log.Infof(cf.Ctx, "%s new resolved timestamp %s is behind by %s",
description, frontier, resolvedBehind)
}
const slowSpanMaxFrequency = 10 * time.Second
if now.Sub(cf.lastSlowSpanLog) > slowSpanMaxFrequency {
cf.lastSlowSpanLog = now

if cf.slowLogEveryN.ShouldProcess(now) {
s := cf.sf.PeekFrontierSpan()
log.Infof(cf.Ctx, "%s span %s is behind by %s", description, s, resolvedBehind)
}
return true
}

func (cf *changeFrontier) slownessThreshold() time.Duration {
clusterThreshold := changefeedbase.SlowSpanLogThreshold.Get(&cf.flowCtx.Cfg.Settings.SV)
if clusterThreshold > 0 {
return clusterThreshold
}

// These two cluster setting values represent the target
// responsiveness of schemafeed and rangefeed.
//
// We add 1 second in case both these settings are set really
// low (as they are in unit tests).
//
// TODO(ssd): We should probably take into account the flush
// frequency here.
pollInterval := changefeedbase.TableDescriptorPollInterval.Get(&cf.flowCtx.Cfg.Settings.SV)
closedtsInterval := closedts.TargetDuration.Get(&cf.flowCtx.Cfg.Settings.SV)
return time.Second + 10*(pollInterval+closedtsInterval)
}

// ConsumerClosed is part of the RowSource interface.
func (cf *changeFrontier) ConsumerClosed() {
// The consumer is done, Next() will not be called again.
Expand Down
17 changes: 13 additions & 4 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,18 @@ func changefeedJobDescription(
return tree.AsStringWithFQNames(c, ann), nil
}

// validateNonNegativeDuration returns a nil error if optValue can be
// parsed as a duration and is non-negative; otherwise, an error is
// returned.
func validateNonNegativeDuration(optName string, optValue string) error {
if d, err := time.ParseDuration(optValue); err != nil {
return err
} else if d < 0 {
return errors.Errorf("negative durations are not accepted: %s='%s'", optName, optValue)
}
return nil
}

func validateDetails(details jobspb.ChangefeedDetails) (jobspb.ChangefeedDetails, error) {
if details.Opts == nil {
// The proto MarshalTo method omits the Opts field if the map is empty.
Expand All @@ -444,11 +456,8 @@ func validateDetails(details jobspb.ChangefeedDetails) (jobspb.ChangefeedDetails
{
const opt = changefeedbase.OptResolvedTimestamps
if o, ok := details.Opts[opt]; ok && o != `` {
if d, err := time.ParseDuration(o); err != nil {
if err := validateNonNegativeDuration(opt, o); err != nil {
return jobspb.ChangefeedDetails{}, err
} else if d < 0 {
return jobspb.ChangefeedDetails{}, errors.Errorf(
`negative durations are not accepted: %s='%s'`, opt, o)
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2059,10 +2059,16 @@ func TestChangefeedErrors(t *testing.T) {
t, `unknown envelope: nope`,
`EXPERIMENTAL CHANGEFEED FOR foo WITH envelope=nope`,
)

sqlDB.ExpectErr(
t, `time: invalid duration "bar"`,
`EXPERIMENTAL CHANGEFEED FOR foo WITH resolved='bar'`,
)
sqlDB.ExpectErr(
t, `negative durations are not accepted: resolved='-1s'`,
`EXPERIMENTAL CHANGEFEED FOR foo WITH resolved='-1s'`,
)

sqlDB.ExpectErr(
t, `cannot specify timestamp in the future`,
`EXPERIMENTAL CHANGEFEED FOR foo WITH cursor=$1`, timeutil.Now().Add(time.Hour),
Expand Down
8 changes: 8 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,11 @@ var PerChangefeedMemLimit = settings.RegisterByteSizeSetting(
"controls amount of data that can be buffered per changefeed",
1<<30,
)

// SlowSpanLogThreshold controls when we will log slow spans.
var SlowSpanLogThreshold = settings.RegisterDurationSetting(
"changefeed.slow_span_log_threshold",
"a changefeed will log spans with resolved timestamps this far behind the current wall-clock time; if 0, a default value is calculated based on other cluster settings",
0,
settings.NonNegativeDuration,
)
59 changes: 55 additions & 4 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ func (s *metricsSink) EmitRow(
start := timeutil.Now()
err := s.wrapped.EmitRow(ctx, topic, key, value, updated)
if err == nil {
emitNanos := timeutil.Since(start).Nanoseconds()
s.metrics.EmittedMessages.Inc(1)
s.metrics.EmittedBytes.Inc(int64(len(key) + len(value)))
s.metrics.EmitNanos.Inc(timeutil.Since(start).Nanoseconds())
s.metrics.EmitNanos.Inc(emitNanos)
s.metrics.EmitHistNanos.RecordValue(emitNanos)
}
return err
}
Expand All @@ -54,11 +56,13 @@ func (s *metricsSink) EmitResolvedTimestamp(
start := timeutil.Now()
err := s.wrapped.EmitResolvedTimestamp(ctx, encoder, resolved)
if err == nil {
emitNanos := timeutil.Since(start).Nanoseconds()
s.metrics.EmittedMessages.Inc(1)
// TODO(dan): This wasn't correct. The wrapped sink may emit the payload
// any number of times.
// s.metrics.EmittedBytes.Inc(int64(len(payload)))
s.metrics.EmitNanos.Inc(timeutil.Since(start).Nanoseconds())
s.metrics.EmitNanos.Inc(emitNanos)
s.metrics.EmitHistNanos.RecordValue(emitNanos)
}
return err
}
Expand All @@ -68,9 +72,12 @@ func (s *metricsSink) Flush(ctx context.Context) error {
start := timeutil.Now()
err := s.wrapped.Flush(ctx)
if err == nil {
flushNanos := timeutil.Since(start).Nanoseconds()
s.metrics.Flushes.Inc(1)
s.metrics.FlushNanos.Inc(timeutil.Since(start).Nanoseconds())
s.metrics.FlushNanos.Inc(flushNanos)
s.metrics.FlushHistNanos.RecordValue(flushNanos)
}

return err
}

Expand All @@ -84,6 +91,12 @@ func (s *metricsSink) Dial() error {
return s.wrapped.Dial()
}

const (
changefeedCheckpointHistMaxLatency = 30 * time.Second
changefeedEmitHistMaxLatency = 30 * time.Second
changefeedFlushHistMaxLatency = 1 * time.Minute
)

var (
metaChangefeedEmittedMessages = metric.Metadata{
Name: "changefeed.emitted_messages",
Expand Down Expand Up @@ -147,6 +160,32 @@ var (
Unit: metric.Unit_COUNT,
}

metaChangefeedCheckpointHistNanos = metric.Metadata{
Name: "changefeed.checkpoint_hist_nanos",
Help: "Time spent checkpointing changefeed progress",
Measurement: "Changefeeds",
Unit: metric.Unit_NANOSECONDS,
}

// emit_hist_nanos and flush_hist_nanos duplicate information
// in emit_nanos, emitted_messages, and flush_nanos,
// flushes. While all of those could be reconstructed from
// information in the histogram, We've kept the older metrics
// to avoid breaking historical timeseries data.
metaChangefeedEmitHistNanos = metric.Metadata{
Name: "changefeed.emit_hist_nanos",
Help: "Time spent emitting messages across all changefeeds",
Measurement: "Changefeeds",
Unit: metric.Unit_NANOSECONDS,
}

metaChangefeedFlushHistNanos = metric.Metadata{
Name: "changefeed.flush_hist_nanos",
Help: "Time spent flushing messages across all changefeeds",
Measurement: "Changefeeds",
Unit: metric.Unit_NANOSECONDS,
}

// TODO(dan): This was intended to be a measure of the minimum distance of
// any changefeed ahead of its gc ttl threshold, but keeping that correct in
// the face of changing zone configs is much harder, so this will have to do
Expand All @@ -173,6 +212,10 @@ type Metrics struct {
EmitNanos *metric.Counter
FlushNanos *metric.Counter

CheckpointHistNanos *metric.Histogram
EmitHistNanos *metric.Histogram
FlushHistNanos *metric.Histogram

Running *metric.Gauge

mu struct {
Expand Down Expand Up @@ -200,7 +243,15 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {
TableMetadataNanos: metric.NewCounter(metaChangefeedTableMetadataNanos),
EmitNanos: metric.NewCounter(metaChangefeedEmitNanos),
FlushNanos: metric.NewCounter(metaChangefeedFlushNanos),
Running: metric.NewGauge(metaChangefeedRunning),

CheckpointHistNanos: metric.NewHistogram(metaChangefeedCheckpointHistNanos, histogramWindow,
changefeedCheckpointHistMaxLatency.Nanoseconds(), 2),
EmitHistNanos: metric.NewHistogram(metaChangefeedEmitHistNanos, histogramWindow,
changefeedEmitHistMaxLatency.Nanoseconds(), 2),
FlushHistNanos: metric.NewHistogram(metaChangefeedFlushHistNanos, histogramWindow,
changefeedFlushHistMaxLatency.Nanoseconds(), 2),

Running: metric.NewGauge(metaChangefeedRunning),
}
m.mu.resolved = make(map[int]hlc.Timestamp)
m.mu.id = 1 // start the first id at 1 so we can detect initialization
Expand Down
Loading

0 comments on commit 98ba7f3

Please sign in to comment.