Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…68660 #68661

67090: sql: periodically flush sqlstats r=Azhng a=Azhng

Previous PR: #67866

This commit introduces a new persisted sql stats subsystem
that wraps the existing node-local sql stats subsystem.
This new subsystem is responsible for flushing the in-meory
statistics into the system table periodically, or when it
detects memory pressure.
This replaces sql.Server's in-memory sqlStats provider.

Release note (sql change): SQL stats now can be persisted into
system.statement_statistics and system.transaction_statistics
tables by enabling the sql.stats.flush.enable cluster setting.
The interval of persistence is determined by the new
sql.stats.flush.interval cluster setting which defaults to 1 hour.

68195: streamingccl: allow stream ingestion processors to keep running on `GenerationEvent` r=annezhu98 a=annezhu98

Previously, a stream ingestion processor would shut down if it ever loses connection with its stream client. With generation support, the processor should not immediately move to draining state, instead, it should be in `StateRunning` to poll for cutover signal sent by the coordinator. Generation support will be implemented by the following PR: #67189

The first commit adds `GenerationEvent` as a valid event type that can be emitted over a cluster stream.
The second commit implements the mechanism that keeps processors running when losing connection with the client.

68288: changefeedccl: Propagate pushback throughout changefeed pipeline. r=ajwerner a=miretskiy

Propagate pushback information throughout changefeed pipeline.

The pushback propagation is accomplished by associating a `Resource`
with each event that is processed by changefeed system.  The resource is
propagated throughout changefeed, and is released when the event
has been written to the sink.

This change also updates and simplifies event memory accounting.
Prior to this PR, memory accounting was incomplete and was error prone.
This PR simplifies memory accounting by allocating resources once when
the event arrives (blocking buffer), and releasing resources
when event is written to the sink.  Dynamic modifications to the amount
of allocated memory are, in general, not safe (without additional
complexity).  To accommodate the fact that during event processing we
use more memory (e.g. to parse and convert this event), we over-allocate
resources to the the event.

Release Notes: Enterprise change; changefeed will slow down correctly
whenever there is a slow down in the system (i.e. downstream sink is
slow).

68444: multiregionccl: add zone config waiting for partitioned tables r=arulajmani a=pawalt

Previously, wait-for-zone-configs could only wait on tables, meaning we
couldn't wait for zone config changes to apply to REGIONAL BY ROW
partitions. This PR adds a `partition-name` flag to allow users to wait
on a specific partition.

Release note: None

A future PR will extend the tracing to allow us to trace REGIONAL BY ROW queries, but those changes combined with these will likely be too large for a single PR.

68596: authors: add pseudomuto to authors r=pseudomuto a=pseudomuto

Release note: None

68611: ci: move bazel build scripts to `build/teamcity` directory r=rail a=rickystewart

This gives a little more structure to what was previously a completely
flat `build` directory that contained one or more scripts for every
single build configuration.

When this PR is merged I'll move the affected build configurations to
final destinations in the `Cockroach / CI` subproject in TC.

Release note: None

68614: server: reduce flakiness of TestEnsureSQLStatsAreFlushedForTelemetry r=Azhng a=Azhng

Previously, this unit tests generates statements with the fingerprint
'INSERT INTO _ VALUES (_)'. However, this is a very common fingerprint
and can collide with actual statements issued by other subsystems, which
in turn cause test failure.
This commit change the test cases to use uncommon statements that should
not cause fingerprint collision

Resolves #66826

Release note: None

68637: kvserver/closedts: fix nil dereference in HTML generation r=aliher1911 a=erikgrinaker

The `/debug/closedts-{sender,receiver}` endpoints could panic due to a
nil dereference if the last update in the sidetransport buffer was
removed by the time it was rendered. This patch adds a `nil` check to
avoid that panic.

Release note (bug fix): Fixed a crash in the
`/debug/closedts-{sender,receiver}` advanced debug pages if the last
message of the closed timestamp side transport buffer was removed before
rendering.

68642: kvserver: deflake TestNonVoterCatchesUpViaRaftSnapshotQueue r=aayushshah15 a=aayushshah15

Fixes #68142

Release note: None

68660: authors: Add shiranka to authors r=samiskin a=samiskin



68661: add ding@ to AUTHORS r=davidwding a=davidwding



Co-authored-by: Azhng <archer.xn@gmail.com>
Co-authored-by: Anne Zhu <anne.zhu@cockroachlabs.com>
Co-authored-by: Yevgeniy Miretskiy <yevgeniy@cockroachlabs.com>
Co-authored-by: Peyton Walters <peyton.walters@cockroachlabs.com>
Co-authored-by: David Muto (pseudomuto) <david.muto@gmail.com>
Co-authored-by: Ricky Stewart <ricky@cockroachlabs.com>
Co-authored-by: Erik Grinaker <grinaker@cockroachlabs.com>
Co-authored-by: Aayush Shah <aayush.shah15@gmail.com>
Co-authored-by: Shiranka Miskin <shiranka.miskin@gmail.com>
Co-authored-by: David Ding <ding@cockroachlabs.com>
  • Loading branch information
11 people committed Aug 10, 2021
12 parents f673e97 + 9104fd1 + f5244f4 + 1b20162 + 0d1d14a + 4780edc + 4e329e2 + f2bf9b3 + b8e6927 + 630c2df + 4a66f3b + 7c3f07c commit f28f98f
Show file tree
Hide file tree
Showing 62 changed files with 1,330 additions and 716 deletions.
3 changes: 3 additions & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,12 @@ Daniel Harrison <dan@cockroachlabs.com> <daniel.harrison@gmail.com>
Daniel Theophanes <kardianos@gmail.com>
Daniel Upton <daniel@floppy.co>
Darin Peshev <darinp@gmail.com> Darin <darinp@gmail.com> <@cockroachlabs.com>
David Ding <davidwding@gmail.com> <ding@cockroachlabs.com>
David Eisenstat <eisen@cockroachlabs.com>
David Hartunian <@cockroachlabs.com>
David López <not4rent@gmail.com>
David Lukens <lukens@cockroachlabs.com>
David Muto <david.muto@gmail.com> <muto@cockroachlabs.com>
David Taylor <tinystatemachine@gmail.com> <@cockroachlabs.com>
dchenk <dcherchenko@gmail.com>
Dexter Valkyrie <skynode@users.noreply.github.com>
Expand Down Expand Up @@ -338,6 +340,7 @@ Seth Bunce <seth.bunce@gmail.com>
shakeelrao <shakeelrao79@gmail.com>
Shaun McVey <shaun@cockroachlabs.com>
Shawn Morel <shawn@squareup.com> <shawn@strangemonad.com> <strangemonad@squareup.com>
Shiranka Miskin <shiranka@cockroachlabs.com> <shiranka.miskin@gmail.com>
silvano <silvano@cockroachlabs.com>
Solon Gordon <solon@cockroachlabs.com> solongordon <solongordon@gmail.com>
Song Hao <songhao9021@gmail.com> songhao <songhao9021@gmail.com>
Expand Down
12 changes: 0 additions & 12 deletions build/teamcity-bazel-arm-build.sh

This file was deleted.

12 changes: 0 additions & 12 deletions build/teamcity-bazel-macos-build.sh

This file was deleted.

12 changes: 0 additions & 12 deletions build/teamcity-bazel-test.sh

This file was deleted.

12 changes: 0 additions & 12 deletions build/teamcity-bazel-windows-build.sh

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ set -xeuo pipefail

if [ -z "$1" ]
then
echo 'Usage: bazelbuild.sh CONFIG'
echo 'Usage: build_impl.sh CONFIG'
exit 1
fi

Expand Down
14 changes: 14 additions & 0 deletions build/teamcity/cockroach/ci/builds/build_linux_arm64.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env bash

set -euo pipefail

dir="$(dirname $(dirname $(dirname $(dirname $(dirname "${0}")))))"

source "$dir/teamcity-support.sh" # For $root
source "$dir/teamcity-bazel-support.sh" # For run_bazel

tc_prepare

tc_start_block "Run Bazel build"
run_bazel build/teamcity/cockroach/ci/builds/build_impl.sh crosslinuxarm
tc_end_block "Run Bazel build"
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

set -euo pipefail

source "$(dirname "${0}")/teamcity-support.sh" # For $root
source "$(dirname "${0}")/teamcity-bazel-support.sh" # For run_bazel
dir="$(dirname $(dirname $(dirname $(dirname $(dirname "${0}")))))"

source "$dir/teamcity-support.sh" # For $root
source "$dir/teamcity-bazel-support.sh" # For run_bazel

tc_prepare

tc_start_block "Run Bazel build"
run_bazel build/bazelutil/bazelbuild.sh crosslinux
run_bazel build/teamcity/cockroach/ci/builds/build_impl.sh crosslinux
tc_end_block "Run Bazel build"

set +e
Expand Down
14 changes: 14 additions & 0 deletions build/teamcity/cockroach/ci/builds/build_macos_x86_64.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env bash

set -euo pipefail

dir="$(dirname $(dirname $(dirname $(dirname $(dirname "${0}")))))"

source "$dir/teamcity-support.sh" # For $root
source "$dir/teamcity-bazel-support.sh" # For run_bazel

tc_prepare

tc_start_block "Run Bazel build"
run_bazel build/teamcity/cockroach/ci/builds/build_impl.sh crossmacos
tc_end_block "Run Bazel build"
14 changes: 14 additions & 0 deletions build/teamcity/cockroach/ci/builds/build_windows_x86_64.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env bash

set -euo pipefail

dir="$(dirname $(dirname $(dirname $(dirname $(dirname "${0}")))))"

source "$dir/teamcity-support.sh" # For $root
source "$dir/teamcity-bazel-support.sh" # For run_bazel

tc_prepare

tc_start_block "Run Bazel build"
run_bazel build/teamcity/cockroach/ci/builds/build_impl.sh crosswindows
tc_end_block "Run Bazel build"
4 changes: 2 additions & 2 deletions build/teamcity/cockroach/ci/tests/lint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ source "$dir/teamcity-bazel-support.sh" # For run_bazel

tc_prepare

tc_start_block "Run Bazel test"
tc_start_block "Run lints"
run_bazel build/teamcity/cockroach/ci/tests/lint_impl.sh
tc_end_block "Run Bazel test"
tc_end_block "Run lints"
14 changes: 14 additions & 0 deletions build/teamcity/cockroach/ci/tests/unit_tests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env bash

set -euo pipefail

dir="$(dirname $(dirname $(dirname $(dirname $(dirname "${0}")))))"

source "$dir/teamcity-support.sh" # For $root
source "$dir/teamcity-bazel-support.sh" # For run_bazel

tc_prepare

tc_start_block "Run unit tests"
run_bazel build/teamcity/cockroach/ci/tests/unit_tests_impl.sh
tc_end_block "Run unit tests"
File renamed without changes.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ sql.spatial.experimental_box2d_comparison_operators.enabled boolean false enable
sql.stats.automatic_collection.enabled boolean true automatic statistics collection mode
sql.stats.automatic_collection.fraction_stale_rows float 0.2 target fraction of stale rows per table that will trigger a statistics refresh
sql.stats.automatic_collection.min_stale_rows integer 500 target minimum number of stale rows per table that will trigger a statistics refresh
sql.stats.flush.enabled boolean true if set, SQL execution statistics are periodically flushed to disk
sql.stats.flush.interval duration 1h0m0s the interval at which SQL execution statistics are flushed to disk
sql.stats.histogram_collection.enabled boolean true histogram collection mode
sql.stats.multi_column_collection.enabled boolean true multi-column statistics collection mode
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@
<tr><td><code>sql.stats.automatic_collection.enabled</code></td><td>boolean</td><td><code>true</code></td><td>automatic statistics collection mode</td></tr>
<tr><td><code>sql.stats.automatic_collection.fraction_stale_rows</code></td><td>float</td><td><code>0.2</code></td><td>target fraction of stale rows per table that will trigger a statistics refresh</td></tr>
<tr><td><code>sql.stats.automatic_collection.min_stale_rows</code></td><td>integer</td><td><code>500</code></td><td>target minimum number of stale rows per table that will trigger a statistics refresh</td></tr>
<tr><td><code>sql.stats.flush.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set, SQL execution statistics are periodically flushed to disk</td></tr>
<tr><td><code>sql.stats.flush.interval</code></td><td>duration</td><td><code>1h0m0s</code></td><td>the interval at which SQL execution statistics are flushed to disk</td></tr>
<tr><td><code>sql.stats.histogram_collection.enabled</code></td><td>boolean</td><td><code>true</code></td><td>histogram collection mode</td></tr>
<tr><td><code>sql.stats.multi_column_collection.enabled</code></td><td>boolean</td><td><code>true</code></td><td>multi-column statistics collection mode</td></tr>
Expand Down
7 changes: 6 additions & 1 deletion pkg/ccl/changefeedccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,13 @@ func makeBenchSink() *benchSink {
}

func (s *benchSink) EmitRow(
ctx context.Context, topicDescr TopicDescriptor, key, value []byte, updated hlc.Timestamp,
ctx context.Context,
topicDescr TopicDescriptor,
key, value []byte,
updated hlc.Timestamp,
alloc kvevent.Alloc,
) error {
defer alloc.Release(ctx)
return s.emit(int64(len(key) + len(value)))
}
func (s *benchSink) EmitResolvedTimestamp(ctx context.Context, e Encoder, ts hlc.Timestamp) error {
Expand Down
24 changes: 6 additions & 18 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,17 +232,9 @@ func (ca *changeAggregator) Start(ctx context.Context) {
kvFeedMemMon.Start(ctx, pool, mon.BoundAccount{})
ca.kvFeedMemMon = kvFeedMemMon

// NB: sink uses pool bound account, and not kvFeedMemMon.
// This is because if we use shared kvFeedMemMon budget, it is possible that that budget
// will be exhausted by kvfeed (e.g. because of a down or slow sink); Then, when sink
// is no longer unavailable, we will proceed with the message, but once it gets to the sink,
// we won't be able to allocate additional memory because more events could have been added
// to KVFeed buffer. Basically, the problem is that the ingress rate of messages into kvfeed
// buffer is different from the eggress rate from the sink.
// TODO(yevgeniy): The real solution is to have the sink pushback.
ca.sink, err = getSink(
ctx, ca.flowCtx.Cfg, ca.spec.Feed, timestampOracle,
ca.spec.User(), pool.MakeBoundAccount(), ca.spec.JobID)
ca.spec.User(), ca.spec.JobID)

if err != nil {
err = changefeedbase.MarkRetryableError(err)
Expand Down Expand Up @@ -471,10 +463,6 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet
// kvFeed, sends off this event to the event consumer, and flushes the sink
// if necessary.
func (ca *changeAggregator) tick() error {
// TODO(yevgeniy): Getting an event from producer decreases the amount of
// memory tracked by kvFeedMonitor. We should "transfer" that memory
// to the consumer below, and keep track of changes to the memory usage when
// we convert feed event to datums; and then when we encode those datums.
event, err := ca.eventProducer.GetEvent(ca.Ctx)
if err != nil {
return err
Expand All @@ -495,6 +483,7 @@ func (ca *changeAggregator) tick() error {
case kvevent.TypeKV:
return ca.eventConsumer.ConsumeEvent(ca.Ctx, event)
case kvevent.TypeResolved:
event.DetachAlloc().Release(ca.Ctx)
resolved := event.Resolved()
if ca.knobs.ShouldSkipResolved == nil || !ca.knobs.ShouldSkipResolved(resolved) {
return ca.noteResolvedSpan(resolved)
Expand Down Expand Up @@ -689,7 +678,8 @@ func (c *kvEventToRowConsumer) ConsumeEvent(ctx context.Context, ev kvevent.Even
}
}
if err := c.sink.EmitRow(
ctx, tableDescriptorTopic{r.tableDesc}, keyCopy, valueCopy, r.updated,
ctx, tableDescriptorTopic{r.tableDesc},
keyCopy, valueCopy, r.updated, ev.DetachAlloc(),
); err != nil {
return err
}
Expand Down Expand Up @@ -852,7 +842,7 @@ func (c *nativeKVConsumer) ConsumeEvent(ctx context.Context, ev kvevent.Event) e
return err
}

return c.sink.EmitRow(ctx, &noTopic{}, keyBytes, valBytes, val.Timestamp)
return c.sink.EmitRow(ctx, &noTopic{}, keyBytes, valBytes, val.Timestamp, ev.DetachAlloc())
}

const (
Expand Down Expand Up @@ -1079,10 +1069,8 @@ func (cf *changeFrontier) Start(ctx context.Context) {
// but the oracle is only used when emitting row updates.
var nilOracle timestampLowerBoundOracle
var err error
// TODO(yevgeniy): Evaluate if we should introduce changefeed specific monitor.
mm := cf.flowCtx.Cfg.BackfillerMonitor
cf.sink, err = getSink(ctx, cf.flowCtx.Cfg, cf.spec.Feed, nilOracle,
cf.spec.User(), mm.MakeBoundAccount(), cf.spec.JobID)
cf.spec.User(), cf.spec.JobID)

if err != nil {
err = changefeedbase.MarkRetryableError(err)
Expand Down
4 changes: 1 addition & 3 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/errorutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
Expand Down Expand Up @@ -332,8 +331,7 @@ func changefeedPlanHook(
{
var nilOracle timestampLowerBoundOracle
canarySink, err := getSink(
ctx, &p.ExecCfg().DistSQLSrv.ServerConfig, details, nilOracle, p.User(), mon.BoundAccount{},
jobspb.InvalidJobID,
ctx, &p.ExecCfg().DistSQLSrv.ServerConfig, details, nilOracle, p.User(), jobspb.InvalidJobID,
)
if err != nil {
return changefeedbase.MaybeStripRetryableErrorMarker(err)
Expand Down
19 changes: 19 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/errors"
)

// TableDescriptorPollInterval controls how fast table descriptors are polled. A
Expand Down Expand Up @@ -139,3 +140,21 @@ var MinHighWaterMarkCheckpointAdvance = settings.RegisterDurationSetting(
0,
settings.NonNegativeDuration,
)

// EventMemoryMultiplier is the multiplier for the amount of memory needed to process an event.
//
// Memory accounting is hard. Furthermore, during the lifetime of the event, the
// amount of resources used to process such event varies. So, instead of coming up
// with complex schemes to accurately measure and adjust current memory usage,
// we'll request the amount of memory multiplied by this fudge factor.
var EventMemoryMultiplier = settings.RegisterFloatSetting(
"changefeed.event_memory_multiplier",
"the amount of memory required to process an event is multiplied by this factor",
3,
func(v float64) error {
if v < 1 {
return errors.New("changefeed.event_memory_multiplier must be at least 1")
}
return nil
},
)
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/kvevent/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "kvevent",
srcs = [
"alloc.go",
"blocking_buffer.go",
"chan_buffer.go",
"err_buffer.go",
Expand All @@ -17,13 +18,16 @@ go_library(
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/jobs/jobspb",
"//pkg/roachpb:with-mocks",
"//pkg/settings",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/log/logcrash",
"//pkg/util/metric",
"//pkg/util/mon",
"//pkg/util/quotapool",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
],
)

Expand Down
Loading

0 comments on commit f28f98f

Please sign in to comment.