Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
99663: sql: update connExecutor logic for pausable portals r=ZhouXing19 a=ZhouXing19

This PR replaces #96358 and is part of the initial implementation of multiple active portals.

----

This PR is to add limited support for multiple active portals. Now portals satisfying all following restrictions can be paused and resumed (i.e., with other queries interleaving it):

1. Not an internal query;
2. Read-only query;
3. No sub-queries or post-queries.

And such a portal will only have the statement executed with a _non-distributed_ plan. 

This feature is gated by a session variable `multiple_active_portals_enabled`. When it's set `true`, all portals that satisfy the restrictions above will automatically become "pausable" when being created via the pgwire `Bind` stmt. 

The core idea of this implementation is 
1. Add a `switchToAnotherPortal` status to the result-consumption state machine. When we receive an `ExecPortal` message for a different portal, we simply return the control to the connExecutor. (#99052)
2. Persist `flow` `queryID` `span` and `instrumentationHelper` for the portal, and reuse it when we re-execute a portal. This is to ensure we _continue_ the fetching rather than starting all over. (#99173)
3. To enable 2, we need to delay the clean-up of resources till we close the portal. For this we introduced the stacks of cleanup functions. (This PR)

Note that we kept the implementation of the original "un-pausable" portal, as we'd like to limit this new functionality only to a small set of statements. Eventually some of them should be replaced (e.g. the limitedCommandResult's lifecycle) with the new code. 

Also, we don't support distributed plan yet, as it involves much more complicated changes. See `Start with an entirely local plan` section in the [design doc](https://docs.google.com/document/d/1SpKTrTqc4AlGWBqBNgmyXfTweUUsrlqIaSkmaXpznA8/edit). Support for this will come as a follow-up.

Epic: CRDB-17622

Release note (sql change): initial support for multiple active portals. Now with session variable `multiple_active_portals_enabled` set to true,  portals satisfying all following restrictions can be executed in an interleaving manner:  1. Not an internal query; 2. Read-only query; 3. No sub-queries or post-queries. And such a portal will only have the statement executed with an entirely local plan. 





99947: ui: small fixes to DB Console charts shown for secondary tenants r=dhartunian a=abarganier

#97995 updated the
DB Console to filter out KV-specific charts from the metrics page
when viewing DB Console as a secondary application tenant.

The PR missed a couple small details. This patch cleans those
up with the following:

- Removes KV latency charts for app tenants
- Adds a single storage graph for app tenants showing livebytes
- Removes the "Capacity" chart on the Overview dashboard for app
  tenants

Release note: none

Epic: https://cockroachlabs.atlassian.net/browse/CRDB-12100

NB: Please only review the final commit. 1st commit is being reviewed separately @ #99860

100188: changefeedccl: pubsub sink refactor to batching sink r=rickystewart a=samiskin

Epic: https://cockroachlabs.atlassian.net/browse/CRDB-13237

This change is a followup to #99086 which moves the Pubsub sink to the batching sink framework.

The changes involve:
1. Moves the Pubsub code to match the `SinkClient` interface, moving to using the lower level v1 pubsub API that lets us publish batches manually
3. Removing the extra call to json.Marshal
4. Moving to using the `pstest` package for validating results in unit tests
5. Adding topic handling to the batching sink, where batches are created per-topic
6. Added a pubsub_sink_config since it can now handle Retry and Flush config settings
7. Added metrics even to the old pubsub for the purpose of comparing the two versions

At default settings, this resulted in a peak of 90k messages per second on a single node with throughput at 27.6% cpu usage, putting it at a similar level to kafka.

Running pubsub v2 across all of TPCC (nodes ran out of ranges at different speeds):
<img width="637" alt="Screenshot 2023-03-30 at 3 38 25 PM" src="https://user-images.githubusercontent.com/6236424/229863386-edaee27d-9762-4806-bab6-e18b8a6169d6.png">

Running pubsub v1 (barely visible, 2k messages per second) followed by v2 on tpcc.order_line (in v2 only 2 nodes ended up having ranges assigned to them):
<img width="642" alt="Screenshot 2023-04-04 at 12 53 45 PM" src="https://user-images.githubusercontent.com/6236424/229863507-1883ea45-d8ce-437b-9b9c-550afec68752.png">

In the following graphs from the cloud console, where v1 was ran followed by v2, you can see how the main reason v1 was slow was that it wasn't able to batch different keys together.
<img width="574" alt="Screenshot 2023-04-04 at 12 59 51 PM" src="https://user-images.githubusercontent.com/6236424/229864083-758c0814-d53c-447e-84c3-471cf5d56c44.png">

Publish requests remained the same despite way more messages in v2
<img width="1150" alt="Screenshot 2023-04-04 at 1 46 51 PM" src="https://user-images.githubusercontent.com/6236424/229875314-6e07177e-62c4-4c15-b13f-f75e8143e011.png">



Release note (performance improvement): pubsub sink changefeeds can now support higher throughputs by enabling the changefeed.new_pubsub_sink_enabled cluster setting.

100620: pkg/server: move DataDistribution to systemAdminServer r=dhartunian a=abarganier

The DataDistribution endpoint reports replica counts by database and table. When it was built, it operated off the assumption that a range would only ever contain a single table's data within.

Now that we have coalesced ranges, a single range can span multiple tables. Unfortunately, the DataDistribution endpoint does not take this fact into account, meaning it reports garbled and inaccurate data, unless the `spanconfig.storage_coalesce_adjacent.enabled` setting is set to false (see #98820).

For secondary tenants, ranges are *always* coalesced, so this endpoint in its current state could never report meaningful data for a tenant.

Given all of this, we have decided to make this endpoint only available for the system tenant. This patch
accomplishes this by moving the endpoint away from the adminServer and into the systemAdminServer, making it effectively unimplemented for secondary tenants.

Release note: none

Informs: #97942

Co-authored-by: Jane Xing <zhouxing@uchicago.edu>
Co-authored-by: Alex Barganier <abarganier@cockroachlabs.com>
Co-authored-by: Shiranka Miskin <shiranka.miskin@gmail.com>
  • Loading branch information
4 people committed Apr 7, 2023
5 parents e21779b + 199b177 + 8032959 + 699b7f4 + f85d864 commit 22ab7ed
Show file tree
Hide file tree
Showing 42 changed files with 2,950 additions and 543 deletions.
1 change: 0 additions & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,6 @@ sql.multiple_modifications_of_table.enabled boolean false if true, allow stateme
sql.multiregion.drop_primary_region.enabled boolean true allows dropping the PRIMARY REGION of a database if it is the last region tenant-rw
sql.notices.enabled boolean true enable notices in the server/client protocol being sent tenant-rw
sql.optimizer.uniqueness_checks_for_gen_random_uuid.enabled boolean false if enabled, uniqueness checks may be planned for mutations of UUID columns updated with gen_random_uuid(); otherwise, uniqueness is assumed due to near-zero collision probability tenant-rw
sql.pgwire.multiple_active_portals.enabled boolean false if true, portals with read-only SELECT query without sub/post queries can be executed in interleaving manner, but with local execution plan tenant-rw
sql.schema.telemetry.recurrence string @weekly cron-tab recurrence for SQL schema telemetry job tenant-ro
sql.show_ranges_deprecated_behavior.enabled boolean true if set, SHOW RANGES and crdb_internal.ranges{_no_leases} behave with deprecated pre-v23.1 semantics. NB: the new SHOW RANGES interface has richer WITH options than pre-v23.1 SHOW RANGES. tenant-rw
sql.spatial.experimental_box2d_comparison_operators.enabled boolean false enables the use of certain experimental box2d comparison operators tenant-rw
Expand Down
1 change: 0 additions & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@
<tr><td><div id="setting-sql-multiregion-drop-primary-region-enabled" class="anchored"><code>sql.multiregion.drop_primary_region.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>allows dropping the PRIMARY REGION of a database if it is the last region</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-notices-enabled" class="anchored"><code>sql.notices.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>enable notices in the server/client protocol being sent</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-optimizer-uniqueness-checks-for-gen-random-uuid-enabled" class="anchored"><code>sql.optimizer.uniqueness_checks_for_gen_random_uuid.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if enabled, uniqueness checks may be planned for mutations of UUID columns updated with gen_random_uuid(); otherwise, uniqueness is assumed due to near-zero collision probability</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-pgwire-multiple-active-portals-enabled" class="anchored"><code>sql.pgwire.multiple_active_portals.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if true, portals with read-only SELECT query without sub/post queries can be executed in interleaving manner, but with local execution plan</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-schema-telemetry-recurrence" class="anchored"><code>sql.schema.telemetry.recurrence</code></div></td><td>string</td><td><code>@weekly</code></td><td>cron-tab recurrence for SQL schema telemetry job</td><td>Serverless/Dedicated/Self-Hosted (read-only)</td></tr>
<tr><td><div id="setting-sql-show-ranges-deprecated-behavior-enabled" class="anchored"><code>sql.show_ranges_deprecated_behavior.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set, SHOW RANGES and crdb_internal.ranges{_no_leases} behave with deprecated pre-v23.1 semantics. NB: the new SHOW RANGES interface has richer WITH options than pre-v23.1 SHOW RANGES.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-spatial-experimental-box2d-comparison-operators-enabled" class="anchored"><code>sql.spatial.experimental_box2d_comparison_operators.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>enables the use of certain experimental box2d comparison operators</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
10 changes: 10 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ go_library(
"sink_external_connection.go",
"sink_kafka.go",
"sink_pubsub.go",
"sink_pubsub_v2.go",
"sink_sql.go",
"sink_webhook.go",
"sink_webhook_v2.go",
Expand Down Expand Up @@ -165,6 +166,8 @@ go_library(
"@com_github_shopify_sarama//:sarama",
"@com_github_xdg_go_scram//:scram",
"@com_google_cloud_go_pubsub//:pubsub",
"@com_google_cloud_go_pubsub//apiv1",
"@com_google_cloud_go_pubsub//apiv1/pubsubpb",
"@org_golang_google_api//impersonate",
"@org_golang_google_api//option",
"@org_golang_google_grpc//codes",
Expand Down Expand Up @@ -296,6 +299,7 @@ go_test(
"//pkg/util/mon",
"//pkg/util/parquet",
"//pkg/util/protoutil",
"//pkg/util/randident",
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/span",
Expand All @@ -317,6 +321,12 @@ go_test(
"@com_github_shopify_sarama//:sarama",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_google_cloud_go_pubsub//apiv1",
"@com_google_cloud_go_pubsub//apiv1/pubsubpb",
"@com_google_cloud_go_pubsub//pstest",
"@org_golang_google_api//option",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//credentials/insecure",
"@org_golang_x_text//collate",
],
)
Expand Down
85 changes: 62 additions & 23 deletions pkg/ccl/changefeedccl/batching_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@ import (
// into batches as they arrive and once ready are flushed out.
type SinkClient interface {
MakeResolvedPayload(body []byte, topic string) (SinkPayload, error)
MakeBatchBuffer() BatchBuffer
// Batches can only hold messages for one unique topic
MakeBatchBuffer(topic string) BatchBuffer
Flush(context.Context, SinkPayload) error
Close() error
}

// BatchBuffer is an interface to aggregate KVs into a payload that can be sent
// to the sink.
type BatchBuffer interface {
Append(key []byte, value []byte, topic string)
Append(key []byte, value []byte)
ShouldFlush() bool

// Once all data has been Append'ed, Close can be called to return a finalized
Expand Down Expand Up @@ -90,9 +91,9 @@ type flushReq struct {
}

type rowEvent struct {
key []byte
val []byte
topic string
key []byte
val []byte
topicDescriptor TopicDescriptor

alloc kvevent.Alloc
mvcc hlc.Timestamp
Expand Down Expand Up @@ -176,7 +177,7 @@ func (s *batchingSink) EmitRow(
payload := newRowEvent()
payload.key = key
payload.val = value
payload.topic = "" // unimplemented for now
payload.topicDescriptor = topic
payload.mvcc = mvcc
payload.alloc = alloc

Expand Down Expand Up @@ -277,7 +278,7 @@ func (sb *sinkBatch) Append(e *rowEvent) {
sb.bufferTime = timeutil.Now()
}

sb.buffer.Append(e.key, e.val, e.topic)
sb.buffer.Append(e.key, e.val)

sb.keys.Add(hashToInt(sb.hasher, e.key))
sb.numMessages += 1
Expand All @@ -296,17 +297,22 @@ func (s *batchingSink) handleError(err error) {
}
}

func (s *batchingSink) newBatchBuffer() *sinkBatch {
func (s *batchingSink) newBatchBuffer(topic string) *sinkBatch {
batch := newSinkBatch()
batch.buffer = s.client.MakeBatchBuffer()
batch.buffer = s.client.MakeBatchBuffer(topic)
batch.hasher = s.hasher
return batch
}

// runBatchingWorker combines 1 or more row events into batches, sending the IO
// requests out either once the batch is full or a flush request arrives.
func (s *batchingSink) runBatchingWorker(ctx context.Context) {
batchBuffer := s.newBatchBuffer()
// topicBatches stores per-topic sinkBatches which are flushed individually
// when one reaches its size limit, but are all flushed together if the
// frequency timer triggers. Messages for different topics cannot be allowed
// to be batched together as the data may need to end up at a specific
// endpoint for that topic.
topicBatches := make(map[string]*sinkBatch)

// Once finalized, batches are sent to a parallelIO struct which handles
// performing multiple Flushes in parallel while maintaining Keys() ordering.
Expand Down Expand Up @@ -347,14 +353,14 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
freeSinkBatchEvent(batch)
}

tryFlushBatch := func() error {
if batchBuffer.isEmpty() {
tryFlushBatch := func(topic string) error {
batchBuffer, ok := topicBatches[topic]
if !ok || batchBuffer.isEmpty() {
return nil
}
toFlush := batchBuffer
batchBuffer = s.newBatchBuffer()
topicBatches[topic] = s.newBatchBuffer(topic)

if err := toFlush.FinalizePayload(); err != nil {
if err := batchBuffer.FinalizePayload(); err != nil {
return err
}

Expand All @@ -364,7 +370,7 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
select {
case <-ctx.Done():
return ctx.Err()
case ioEmitter.requestCh <- toFlush:
case ioEmitter.requestCh <- batchBuffer:
case result := <-ioEmitter.resultCh:
handleResult(result)
continue
Expand All @@ -376,8 +382,22 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
return nil
}

flushAll := func() error {
for topic := range topicBatches {
if err := tryFlushBatch(topic); err != nil {
return err
}
}
return nil
}

// flushTimer is used to ensure messages do not remain batched longer than a
// given timeout. Every minFlushFrequency seconds after the first event for
// any topic has arrived, batches for all topics are flushed out immediately
// and the timer once again waits for the first message to arrive.
flushTimer := s.ts.NewTimer()
defer flushTimer.Stop()
isTimerPending := false

for {
select {
Expand All @@ -396,11 +416,29 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {

inflight += 1

// If we're about to append to an empty batch, start the timer to
// guarantee the messages do not stay buffered longer than the
// configured frequency.
if batchBuffer.isEmpty() && s.minFlushFrequency > 0 {
var topic string
var err error
if s.topicNamer != nil {
topic, err = s.topicNamer.Name(r.topicDescriptor)
if err != nil {
s.handleError(err)
continue
}
}

// If the timer isn't pending then this message is the first message to
// arrive either ever or since the timer last triggered a flush,
// therefore we're going from 0 messages batched to 1, and should
// restart the timer.
if !isTimerPending && s.minFlushFrequency > 0 {
flushTimer.Reset(s.minFlushFrequency)
isTimerPending = true
}

batchBuffer, ok := topicBatches[topic]
if !ok {
batchBuffer = s.newBatchBuffer(topic)
topicBatches[topic] = batchBuffer
}

batchBuffer.Append(r)
Expand All @@ -414,7 +452,7 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {

if batchBuffer.buffer.ShouldFlush() {
s.metrics.recordSizeBasedFlush()
if err := tryFlushBatch(); err != nil {
if err := tryFlushBatch(topic); err != nil {
s.handleError(err)
}
}
Expand All @@ -423,7 +461,7 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
close(r.waiter)
} else {
sinkFlushWaiter = r.waiter
if err := tryFlushBatch(); err != nil {
if err := flushAll(); err != nil {
s.handleError(err)
}
}
Expand All @@ -434,7 +472,8 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
handleResult(result)
case <-flushTimer.Ch():
flushTimer.MarkRead()
if err := tryFlushBatch(); err != nil {
isTimerPending = false
if err := flushAll(); err != nil {
s.handleError(err)
}
case <-ctx.Done():
Expand Down
41 changes: 41 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/randident"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -8366,3 +8367,43 @@ func TestChangefeedExecLocality(t *testing.T) {
test(t, "x", "x=0", []bool{true, true, false, false})
test(t, "y", "y=1", []bool{false, true, false, true})
}

func TestChangefeedTopicNames(t *testing.T) {
defer leaktest.AfterTest(t)()
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
rand, _ := randutil.NewTestRand()
cfg := randident.DefaultNameGeneratorConfig()
cfg.Noise = true
cfg.Finalize()
ng := randident.NewNameGenerator(&cfg, rand, "table")

names, _ := ng.GenerateMultiple(context.Background(), 100, make(map[string]struct{}))

var escapedNames []string
for _, name := range names {
escapedNames = append(escapedNames, strings.ReplaceAll(name, `"`, `""`))
}

sqlDB := sqlutils.MakeSQLRunner(s.DB)
for _, name := range escapedNames {
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE "%s" (a INT PRIMARY KEY);`, name))
sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO "%s" VALUES (1);`, name))
}

var quotedNames []string
for _, name := range escapedNames {
quotedNames = append(quotedNames, "\""+name+"\"")
}
createStmt := fmt.Sprintf(`CREATE CHANGEFEED FOR %s`, strings.Join(quotedNames, ", "))
foo := feed(t, f, createStmt)
defer closeFeed(t, foo)

var expected []string
for _, name := range names {
expected = append(expected, fmt.Sprintf(`%s: [1]->{"after": {"a": 1}}`, name))
}
assertPayloads(t, foo, expected)
}

cdcTest(t, testFn, feedTestForceSink("pubsub"))
}
10 changes: 9 additions & 1 deletion pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ const (

// OptKafkaSinkConfig is a JSON configuration for kafka sink (kafkaSinkConfig).
OptKafkaSinkConfig = `kafka_sink_config`
OptPubsubSinkConfig = `pubsub_sink_config`
OptWebhookSinkConfig = `webhook_sink_config`

// OptSink allows users to alter the Sink URI of an existing changefeed.
Expand Down Expand Up @@ -333,6 +334,7 @@ var ChangefeedOptionExpectValues = map[string]OptionPermittedValues{
OptProtectDataFromGCOnPause: flagOption,
OptExpirePTSAfter: durationOption.thatCanBeZero(),
OptKafkaSinkConfig: jsonOption,
OptPubsubSinkConfig: jsonOption,
OptWebhookSinkConfig: jsonOption,
OptWebhookAuthHeader: stringOption,
OptWebhookClientTimeout: durationOption,
Expand Down Expand Up @@ -369,7 +371,7 @@ var CloudStorageValidOptions = makeStringSet(OptCompression)
var WebhookValidOptions = makeStringSet(OptWebhookAuthHeader, OptWebhookClientTimeout, OptWebhookSinkConfig)

// PubsubValidOptions is options exclusive to pubsub sink
var PubsubValidOptions = makeStringSet()
var PubsubValidOptions = makeStringSet(OptPubsubSinkConfig)

// ExternalConnectionValidOptions is options exclusive to the external
// connection sink.
Expand Down Expand Up @@ -888,6 +890,12 @@ func (s StatementOptions) GetKafkaConfigJSON() SinkSpecificJSONConfig {
return s.getJSONValue(OptKafkaSinkConfig)
}

// GetPubsubConfigJSON returns arbitrary json to be interpreted
// by the pubsub sink.
func (s StatementOptions) GetPubsubConfigJSON() SinkSpecificJSONConfig {
return s.getJSONValue(OptPubsubSinkConfig)
}

// GetResolvedTimestampInterval gets the best-effort interval at which resolved timestamps
// should be emitted. Nil or 0 means emit as often as possible. False means do not emit at all.
// Returns an error for negative or invalid duration value.
Expand Down
Loading

0 comments on commit 22ab7ed

Please sign in to comment.