diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 91e20abc851c..513c37ef323f 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -168,4 +168,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as :. If no port is specified, 6381 will be used. trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -version version 21.2-28 set the active cluster version in the format '.' +version version 21.2-32 set the active cluster version in the format '.' diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index fb184e62743f..a98b00e0819f 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -32,6 +32,7 @@ ALL_TESTS = [ "//pkg/ccl/serverccl/diagnosticsccl:diagnosticsccl_test", "//pkg/ccl/serverccl/statusccl:statusccl_test", "//pkg/ccl/serverccl:serverccl_test", + "//pkg/ccl/spanconfigccl/spanconfigreconcilerccl:spanconfigreconcilerccl_test", "//pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl:spanconfigsqltranslatorccl_test", "//pkg/ccl/sqlproxyccl/denylist:denylist_test", "//pkg/ccl/sqlproxyccl/idle:idle_test", @@ -184,6 +185,7 @@ ALL_TESTS = [ "//pkg/spanconfig/spanconfigkvaccessor:spanconfigkvaccessor_test", "//pkg/spanconfig/spanconfigkvsubscriber:spanconfigkvsubscriber_test", "//pkg/spanconfig/spanconfigmanager:spanconfigmanager_test", + "//pkg/spanconfig/spanconfigreconciler:spanconfigreconciler_test", "//pkg/spanconfig/spanconfigsqltranslator:spanconfigsqltranslator_test", "//pkg/spanconfig/spanconfigsqlwatcher:spanconfigsqlwatcher_test", "//pkg/spanconfig/spanconfigstore:spanconfigstore_test", diff --git a/pkg/ccl/kvccl/kvtenantccl/connector.go b/pkg/ccl/kvccl/kvtenantccl/connector.go index 0df94c368131..4e23cd2763fc 100644 --- a/pkg/ccl/kvccl/kvtenantccl/connector.go +++ b/pkg/ccl/kvccl/kvtenantccl/connector.go @@ -455,7 +455,7 @@ func (c *Connector) UpdateSpanConfigEntries( } // WithTxn implements the spanconfig.KVAccessor interface. -func (c *Connector) WithTxn(*kv.Txn) spanconfig.KVAccessor { +func (c *Connector) WithTxn(context.Context, *kv.Txn) spanconfig.KVAccessor { panic("not applicable") } diff --git a/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/BUILD.bazel b/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/BUILD.bazel new file mode 100644 index 000000000000..3bb5c6d2d1e0 --- /dev/null +++ b/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/BUILD.bazel @@ -0,0 +1,35 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_test") + +go_test( + name = "spanconfigreconcilerccl_test", + srcs = [ + "datadriven_test.go", + "main_test.go", + ], + data = glob(["testdata/**"]), + deps = [ + "//pkg/base", + "//pkg/ccl/kvccl/kvtenantccl", + "//pkg/ccl/partitionccl", + "//pkg/ccl/utilccl", + "//pkg/jobs", + "//pkg/keys", + "//pkg/roachpb:with-mocks", + "//pkg/security", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/spanconfig", + "//pkg/spanconfig/spanconfigtestutils", + "//pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster", + "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/sqlutils", + "//pkg/testutils/testcluster", + "//pkg/util/hlc", + "//pkg/util/leaktest", + "//pkg/util/randutil", + "@com_github_cockroachdb_datadriven//:datadriven", + "@com_github_cockroachdb_errors//:errors", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/datadriven_test.go b/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/datadriven_test.go new file mode 100644 index 000000000000..44c63ecaba6e --- /dev/null +++ b/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/datadriven_test.go @@ -0,0 +1,238 @@ +// Copyright 2021 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package spanconfigreconcilerccl + +import ( + "context" + "fmt" + "sort" + "strings" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + _ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" + _ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigtestutils" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/datadriven" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// TestDataDriven is a data-driven test for spanconfig.Reconciler. It lets test +// authors spin up secondary tenants, create arbitrary schema objects with +// arbitrary zone configs, and verify that the global span config state is as +// we'd expect. Only fields that differ from the static RANGE DEFAULT are +// printed in the test output for readability. The following syntax is provided: +// +// - "initialize" tenant= +// Initialize a secondary tenant with the given ID. +// +// - "exec-sql" [tenant=] +// Executes the input SQL query for the given tenant. All statements are +// executed in a single transaction. +// +// - "query-sql" [tenant=] +// Executes the input SQL query for the given tenant and print the results. +// +// - "reconcile" [tenant=] +// Start the reconciliation process for the given tenant. +// +// - "mutations" [tenant=] [discard] +// Print the latest set of mutations issued by the reconciler for the given +// tenant. If 'discard' is specified, nothing is printed. +// +// - "state" [offset=] [limit== 0) + require.Truef(t, offset <= len(entries), + "offset (%d) larger than number of entries (%d)", offset, len(entries)) + } + if d.HasArg("limit") { + d.ScanArgs(t, "limit", &limit) + require.True(t, limit >= 0) + } else { + limit = len(entries) + } + var output strings.Builder + if offset > 0 && len(entries) > 0 { + output.WriteString("...\n") // print leading elipses + } + + entries = entries[offset:] + for i, entry := range entries { + if i == limit { + output.WriteString("...\n") // print trailing elipses + break + } + + output.WriteString(fmt.Sprintf("%-42s %s\n", entry.Span, + spanconfigtestutils.PrintSpanConfigDiffedAgainstDefaults(entry.Config))) + } + return output.String() + + default: + t.Fatalf("unknown command: %s", d.Cmd) + } + return "" + }) + }) +} diff --git a/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/main_test.go b/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/main_test.go new file mode 100644 index 000000000000..b29b2c8217c1 --- /dev/null +++ b/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/main_test.go @@ -0,0 +1,33 @@ +// Copyright 2021 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package spanconfigreconcilerccl + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/randutil" +) + +//go:generate ../../../util/leaktest/add-leaktest.sh *_test.go + +func TestMain(m *testing.M) { + defer utilccl.TestingEnableEnterprise()() + security.SetAssetLoader(securitytest.EmbeddedAssets) + randutil.SeedForTests() + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} diff --git a/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/testdata/basic b/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/testdata/basic new file mode 100644 index 000000000000..d8a43fc3402e --- /dev/null +++ b/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/testdata/basic @@ -0,0 +1,195 @@ +# Create a database with some tables, types (ignored), and schemas (ignored); +# set some zone configs. Check that the mutations, if starting from a fresh +# slate, are as we'd expect. + +reconcile +---- + +mutations +---- +upsert /{Min-System/NodeLiveness} ttl_seconds=3600 num_replicas=5 +upsert /System/NodeLiveness{-Max} ttl_seconds=600 num_replicas=5 +upsert /System/{NodeLivenessMax-tsd} range system +upsert /System{/tsd-tse} range default +upsert /System{tse-/Max} range system +upsert /Table/{3-4} range system +upsert /Table/{4-5} range system +upsert /Table/{5-6} range system +upsert /Table/{6-7} range system +upsert /Table/{8-9} range system +upsert /Table/1{1-2} range system +upsert /Table/1{2-3} range system +upsert /Table/1{3-4} range system +upsert /Table/1{4-5} range system +upsert /Table/1{5-6} range system +upsert /Table/{19-20} range system +upsert /Table/2{0-1} range system +upsert /Table/2{1-2} range system +upsert /Table/2{3-4} range system +upsert /Table/2{4-5} range system +upsert /Table/2{5-6} ttl_seconds=600 num_replicas=5 +upsert /Table/2{6-7} range system +upsert /Table/2{7-8} ttl_seconds=600 num_replicas=5 +upsert /Table/2{8-9} range system +upsert /NamespaceTable/{30-Max} range system +upsert /{NamespaceTable/Max-Table/32} range system +upsert /Table/3{2-3} range system +upsert /Table/3{3-4} range system +upsert /Table/3{4-5} range system +upsert /Table/3{5-6} range system +upsert /Table/3{6-7} range system +upsert /Table/3{7-8} range system +upsert /Table/{39-40} range system +upsert /Table/4{0-1} range system +upsert /Table/4{1-2} range system +upsert /Table/4{2-3} range system +upsert /Table/4{3-4} range system +upsert /Table/4{4-5} range system +upsert /Table/4{5-6} ttl_seconds=7200 num_replicas=5 +upsert /Table/4{6-7} range system +upsert /Table/4{7-8} range system + +exec-sql +CREATE DATABASE db; +CREATE TABLE db.t1(); +CREATE TABLE db.t2(); +CREATE SCHEMA db.sc; +CREATE TYPE typ AS ENUM(); +---- + +mutations +---- +upsert /Table/5{6-7} range default +upsert /Table/5{7-8} range default + +exec-sql +ALTER DATABASE db CONFIGURE ZONE USING num_replicas = 7; +ALTER TABLE db.t1 CONFIGURE ZONE USING num_voters = 5; +---- + +mutations +---- +delete /Table/5{6-7} +upsert /Table/5{6-7} num_replicas=7 num_voters=5 +delete /Table/5{7-8} +upsert /Table/5{7-8} num_replicas=7 + +state offset=41 +---- +... +/Table/5{6-7} num_replicas=7 num_voters=5 +/Table/5{7-8} num_replicas=7 + +exec-sql +ALTER DATABASE system CONFIGURE ZONE USING gc.ttlseconds = 100; +---- + +mutations +---- +delete /Table/{3-4} +upsert /Table/{3-4} ttl_seconds=100 num_replicas=5 +delete /Table/{4-5} +upsert /Table/{4-5} ttl_seconds=100 num_replicas=5 +delete /Table/{5-6} +upsert /Table/{5-6} ttl_seconds=100 num_replicas=5 +delete /Table/{6-7} +upsert /Table/{6-7} ttl_seconds=100 num_replicas=5 +delete /Table/{8-9} +upsert /Table/{8-9} ttl_seconds=100 num_replicas=5 +delete /Table/1{1-2} +upsert /Table/1{1-2} ttl_seconds=100 num_replicas=5 +delete /Table/1{2-3} +upsert /Table/1{2-3} ttl_seconds=100 num_replicas=5 +delete /Table/1{3-4} +upsert /Table/1{3-4} ttl_seconds=100 num_replicas=5 +delete /Table/1{4-5} +upsert /Table/1{4-5} ttl_seconds=100 num_replicas=5 +delete /Table/1{5-6} +upsert /Table/1{5-6} ttl_seconds=100 num_replicas=5 +delete /Table/{19-20} +upsert /Table/{19-20} ttl_seconds=100 num_replicas=5 +delete /Table/2{0-1} +upsert /Table/2{0-1} ttl_seconds=100 num_replicas=5 +delete /Table/2{1-2} +upsert /Table/2{1-2} ttl_seconds=100 num_replicas=5 +delete /Table/2{3-4} +upsert /Table/2{3-4} ttl_seconds=100 num_replicas=5 +delete /Table/2{4-5} +upsert /Table/2{4-5} ttl_seconds=100 num_replicas=5 +delete /Table/2{6-7} +upsert /Table/2{6-7} ttl_seconds=100 num_replicas=5 +delete /Table/2{8-9} +upsert /Table/2{8-9} ttl_seconds=100 num_replicas=5 +delete /NamespaceTable/{30-Max} +upsert /NamespaceTable/{30-Max} ttl_seconds=100 num_replicas=5 +delete /{NamespaceTable/Max-Table/32} +upsert /{NamespaceTable/Max-Table/32} ttl_seconds=100 num_replicas=5 +delete /Table/3{2-3} +upsert /Table/3{2-3} ttl_seconds=100 num_replicas=5 +delete /Table/3{3-4} +upsert /Table/3{3-4} ttl_seconds=100 num_replicas=5 +delete /Table/3{4-5} +upsert /Table/3{4-5} ttl_seconds=100 num_replicas=5 +delete /Table/3{5-6} +upsert /Table/3{5-6} ttl_seconds=100 num_replicas=5 +delete /Table/3{6-7} +upsert /Table/3{6-7} ttl_seconds=100 num_replicas=5 +delete /Table/3{7-8} +upsert /Table/3{7-8} ttl_seconds=100 num_replicas=5 +delete /Table/{39-40} +upsert /Table/{39-40} ttl_seconds=100 num_replicas=5 +delete /Table/4{0-1} +upsert /Table/4{0-1} ttl_seconds=100 num_replicas=5 +delete /Table/4{1-2} +upsert /Table/4{1-2} ttl_seconds=100 num_replicas=5 +delete /Table/4{2-3} +upsert /Table/4{2-3} ttl_seconds=100 num_replicas=5 +delete /Table/4{3-4} +upsert /Table/4{3-4} ttl_seconds=100 num_replicas=5 +delete /Table/4{4-5} +upsert /Table/4{4-5} ttl_seconds=100 num_replicas=5 +delete /Table/4{6-7} +upsert /Table/4{6-7} ttl_seconds=100 num_replicas=5 +delete /Table/4{7-8} +upsert /Table/4{7-8} ttl_seconds=100 num_replicas=5 + +state offset=5 limit=36 +---- +... +/Table/{3-4} ttl_seconds=100 num_replicas=5 +/Table/{4-5} ttl_seconds=100 num_replicas=5 +/Table/{5-6} ttl_seconds=100 num_replicas=5 +/Table/{6-7} ttl_seconds=100 num_replicas=5 +/Table/{8-9} ttl_seconds=100 num_replicas=5 +/Table/1{1-2} ttl_seconds=100 num_replicas=5 +/Table/1{2-3} ttl_seconds=100 num_replicas=5 +/Table/1{3-4} ttl_seconds=100 num_replicas=5 +/Table/1{4-5} ttl_seconds=100 num_replicas=5 +/Table/1{5-6} ttl_seconds=100 num_replicas=5 +/Table/{19-20} ttl_seconds=100 num_replicas=5 +/Table/2{0-1} ttl_seconds=100 num_replicas=5 +/Table/2{1-2} ttl_seconds=100 num_replicas=5 +/Table/2{3-4} ttl_seconds=100 num_replicas=5 +/Table/2{4-5} ttl_seconds=100 num_replicas=5 +/Table/2{5-6} ttl_seconds=600 num_replicas=5 +/Table/2{6-7} ttl_seconds=100 num_replicas=5 +/Table/2{7-8} ttl_seconds=600 num_replicas=5 +/Table/2{8-9} ttl_seconds=100 num_replicas=5 +/NamespaceTable/{30-Max} ttl_seconds=100 num_replicas=5 +/{NamespaceTable/Max-Table/32} ttl_seconds=100 num_replicas=5 +/Table/3{2-3} ttl_seconds=100 num_replicas=5 +/Table/3{3-4} ttl_seconds=100 num_replicas=5 +/Table/3{4-5} ttl_seconds=100 num_replicas=5 +/Table/3{5-6} ttl_seconds=100 num_replicas=5 +/Table/3{6-7} ttl_seconds=100 num_replicas=5 +/Table/3{7-8} ttl_seconds=100 num_replicas=5 +/Table/{39-40} ttl_seconds=100 num_replicas=5 +/Table/4{0-1} ttl_seconds=100 num_replicas=5 +/Table/4{1-2} ttl_seconds=100 num_replicas=5 +/Table/4{2-3} ttl_seconds=100 num_replicas=5 +/Table/4{3-4} ttl_seconds=100 num_replicas=5 +/Table/4{4-5} ttl_seconds=100 num_replicas=5 +/Table/4{5-6} ttl_seconds=7200 num_replicas=5 +/Table/4{6-7} ttl_seconds=100 num_replicas=5 +/Table/4{7-8} ttl_seconds=100 num_replicas=5 +... diff --git a/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/testdata/dropped_table b/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/testdata/dropped_table new file mode 100644 index 000000000000..b16d1621b00f --- /dev/null +++ b/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/testdata/dropped_table @@ -0,0 +1,24 @@ +# Check that dropped (really GC-ed) tables get their span configs removed. + +reconcile +---- + +mutations discard +---- + +exec-sql +CREATE DATABASE db; +CREATE TABLE db.t1(); +---- + +mutations +---- +upsert /Table/5{6-7} range default + +exec-sql +DROP TABLE db.t1; +---- + +mutations +---- +delete /Table/5{6-7} diff --git a/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/testdata/indexes b/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/testdata/indexes new file mode 100644 index 000000000000..77dc632798dd --- /dev/null +++ b/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/testdata/indexes @@ -0,0 +1,107 @@ +# Test behavior of span configs in the presence of indexes (we care about zone +# config inheritance from database -> table -> indexes, with and without +# ancestor nodes with explicit configs). + +reconcile +---- + +mutations discard +---- + +exec-sql +CREATE DATABASE db; +CREATE TABLE db.t(i INT PRIMARY KEY, j INT); +CREATE INDEX idx ON db.t (j); +---- + +mutations +---- +upsert /Table/5{6-7} range default + +state offset=41 +---- +... +/Table/5{6-7} range default + +exec-sql +ALTER DATABASE db CONFIGURE ZONE USING num_replicas = 7; +ALTER INDEX db.t@idx CONFIGURE ZONE USING num_voters = 5; +---- + +# We should expect to find the following entries, in order: +# - primary index (table's config) +# - secondary index (overridden num_voters) +# - any future indexes that may be added to this table (table's config) +mutations +---- +delete /Table/5{6-7} +upsert /Table/56{-/2} num_replicas=7 +upsert /Table/56/{2-3} num_replicas=7 num_voters=5 +upsert /Table/5{6/3-7} num_replicas=7 + +state offset=41 +---- +... +/Table/56{-/2} num_replicas=7 +/Table/56/{2-3} num_replicas=7 num_voters=5 +/Table/5{6/3-7} num_replicas=7 + +# Configure GC ttl on the database and override it for the index. The table +# continues to hold a placeholder zone config. +exec-sql +ALTER DATABASE db CONFIGURE ZONE USING gc.ttlseconds = 3600; +ALTER INDEX db.t@idx CONFIGURE ZONE USING gc.ttlseconds = 25 +---- + +mutations +---- +delete /Table/56{-/2} +upsert /Table/56{-/2} ttl_seconds=3600 num_replicas=7 +delete /Table/56/{2-3} +upsert /Table/56/{2-3} ttl_seconds=25 num_replicas=7 num_voters=5 +delete /Table/5{6/3-7} +upsert /Table/5{6/3-7} ttl_seconds=3600 num_replicas=7 + +state offset=41 +---- +... +/Table/56{-/2} ttl_seconds=3600 num_replicas=7 +/Table/56/{2-3} ttl_seconds=25 num_replicas=7 num_voters=5 +/Table/5{6/3-7} ttl_seconds=3600 num_replicas=7 + +# Configure a zone config field on the table, so that it is no longer a +# placeholder zone config. +exec-sql +ALTER TABLE db.t CONFIGURE ZONE USING num_replicas = 9 +---- + +state offset=41 +---- +... +/Table/56{-/2} ttl_seconds=3600 num_replicas=9 +/Table/56/{2-3} ttl_seconds=25 num_replicas=9 num_voters=5 +/Table/5{6/3-7} ttl_seconds=3600 num_replicas=9 + +mutations +---- +delete /Table/56{-/2} +upsert /Table/56{-/2} ttl_seconds=3600 num_replicas=9 +delete /Table/56/{2-3} +upsert /Table/56/{2-3} ttl_seconds=25 num_replicas=9 num_voters=5 +delete /Table/5{6/3-7} +upsert /Table/5{6/3-7} ttl_seconds=3600 num_replicas=9 + +exec-sql +DROP TABLE db.t; +---- + +mutations +---- +delete /Table/56{-/2} +delete /Table/56/{2-3} +delete /Table/5{6/3-7} + +state offset=40 +---- +... +/Table/4{7-8} range system diff --git a/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/testdata/multitenant/basic b/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/testdata/multitenant/basic new file mode 100644 index 000000000000..30ee024abf3e --- /dev/null +++ b/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/testdata/multitenant/basic @@ -0,0 +1,125 @@ +# Test span configs in the presence of multiple secondary tenants, each running +# their own reconciliation loop. + +reconcile +---- + +mutations discard +---- + +initialize tenant=10 +---- + +initialize tenant=11 +---- + +# Ensure that the host tenant observes no mutations. +mutations +---- + +# We should observe placeholder entries for both tenants (installed when +# creating tenant records). +state offset=41 +---- +... +/Tenant/10{-"\x00"} range default +/Tenant/11{-"\x00"} range default + +# Start the reconciliation loop for the secondary tenant. +reconcile tenant=10 +---- + +mutations tenant=10 +---- +upsert /Tenant/10/Table/{3-4} range default +upsert /Tenant/10/Table/{4-5} range default +upsert /Tenant/10/Table/{5-6} range default +upsert /Tenant/10/Table/{6-7} range default +upsert /Tenant/10/Table/{7-8} range default +upsert /Tenant/10/Table/1{1-2} range default +upsert /Tenant/10/Table/1{2-3} range default +upsert /Tenant/10/Table/1{3-4} range default +upsert /Tenant/10/Table/1{4-5} range default +upsert /Tenant/10/Table/1{5-6} range default +upsert /Tenant/10/Table/{19-20} range default +upsert /Tenant/10/Table/2{0-1} range default +upsert /Tenant/10/Table/2{1-2} range default +upsert /Tenant/10/Table/2{3-4} range default +upsert /Tenant/10/Table/2{4-5} range default +upsert /Tenant/10/Table/2{5-6} range default +upsert /Tenant/10/Table/2{6-7} range default +upsert /Tenant/10/Table/2{7-8} range default +upsert /Tenant/10/Table/2{8-9} range default +upsert /Tenant/10/NamespaceTable/{30-Max} range default +upsert /Tenant/10/{NamespaceTable/Max-Table/32} range default +upsert /Tenant/10/Table/3{2-3} range default +upsert /Tenant/10/Table/3{3-4} range default +upsert /Tenant/10/Table/3{4-5} range default +upsert /Tenant/10/Table/3{5-6} range default +upsert /Tenant/10/Table/3{6-7} range default +upsert /Tenant/10/Table/3{7-8} range default +upsert /Tenant/10/Table/{39-40} range default +upsert /Tenant/10/Table/4{0-1} range default +upsert /Tenant/10/Table/4{1-2} range default +upsert /Tenant/10/Table/4{2-3} range default +upsert /Tenant/10/Table/4{3-4} range default +upsert /Tenant/10/Table/4{4-5} range default +upsert /Tenant/10/Table/4{6-7} range default +delete /Tenant/10{-"\x00"} + +state offset=41 +---- +... +/Tenant/10/Table/{3-4} range default +/Tenant/10/Table/{4-5} range default +/Tenant/10/Table/{5-6} range default +/Tenant/10/Table/{6-7} range default +/Tenant/10/Table/{7-8} range default +/Tenant/10/Table/1{1-2} range default +/Tenant/10/Table/1{2-3} range default +/Tenant/10/Table/1{3-4} range default +/Tenant/10/Table/1{4-5} range default +/Tenant/10/Table/1{5-6} range default +/Tenant/10/Table/{19-20} range default +/Tenant/10/Table/2{0-1} range default +/Tenant/10/Table/2{1-2} range default +/Tenant/10/Table/2{3-4} range default +/Tenant/10/Table/2{4-5} range default +/Tenant/10/Table/2{5-6} range default +/Tenant/10/Table/2{6-7} range default +/Tenant/10/Table/2{7-8} range default +/Tenant/10/Table/2{8-9} range default +/Tenant/10/NamespaceTable/{30-Max} range default +/Tenant/10/{NamespaceTable/Max-Table/32} range default +/Tenant/10/Table/3{2-3} range default +/Tenant/10/Table/3{3-4} range default +/Tenant/10/Table/3{4-5} range default +/Tenant/10/Table/3{5-6} range default +/Tenant/10/Table/3{6-7} range default +/Tenant/10/Table/3{7-8} range default +/Tenant/10/Table/{39-40} range default +/Tenant/10/Table/4{0-1} range default +/Tenant/10/Table/4{1-2} range default +/Tenant/10/Table/4{2-3} range default +/Tenant/10/Table/4{3-4} range default +/Tenant/10/Table/4{4-5} range default +/Tenant/10/Table/4{6-7} range default +/Tenant/11{-"\x00"} range default + +exec-sql tenant=10 +CREATE DATABASE db; +CREATE TABLE db.t1(); +CREATE TABLE db.t2(); +---- + +mutations tenant=10 +---- +upsert /Tenant/10/Table/5{6-7} range default +upsert /Tenant/10/Table/5{7-8} range default + +state offset=75 +---- +... +/Tenant/10/Table/5{6-7} range default +/Tenant/10/Table/5{7-8} range default +/Tenant/11{-"\x00"} range default diff --git a/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/testdata/named_zones b/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/testdata/named_zones new file mode 100644 index 000000000000..50d47fa30f75 --- /dev/null +++ b/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/testdata/named_zones @@ -0,0 +1,163 @@ +# Test span config behavior with named zones. These affect reserved key spans, +# can be discarded (except for RANGE DEFAULT), and in the specific case of RANGE +# DEFAULT, is the top most ancestor of the zone config hierarchy. +reconcile +---- + +mutations discard +---- + +state limit=5 +---- +/{Min-System/NodeLiveness} ttl_seconds=3600 num_replicas=5 +/System/NodeLiveness{-Max} ttl_seconds=600 num_replicas=5 +/System/{NodeLivenessMax-tsd} range system +/System{/tsd-tse} range default +/System{tse-/Max} range system +... + +# Adding an explicit zone configuration for the timeseries range should work +# as expected. +exec-sql +ALTER RANGE timeseries CONFIGURE ZONE USING gc.ttlseconds = 42 +---- + +mutations +---- +delete /System{/tsd-tse} +upsert /System{/tsd-tse} ttl_seconds=42 + +# Change a field on the liveness range and ensure it behaves as expected. +exec-sql +ALTER RANGE liveness CONFIGURE ZONE USING num_replicas = 7 +---- + +mutations +---- +delete /System/NodeLiveness{-Max} +upsert /System/NodeLiveness{-Max} ttl_seconds=600 num_replicas=7 + +# Discarding RANGE SYSTEM should re-parent the gaps between RANGE +# {liveness,timeseries} under RANGE DEFAULT. Note that discarding RANGE DEFAULT +# isn't allowed. +exec-sql +ALTER RANGE system CONFIGURE ZONE DISCARD +---- + +mutations +---- +delete /System/{NodeLivenessMax-tsd} +upsert /System/{NodeLivenessMax-tsd} range default +delete /System{tse-/Max} +upsert /System{tse-/Max} range default + +state limit=5 +---- +/{Min-System/NodeLiveness} ttl_seconds=3600 num_replicas=5 +/System/NodeLiveness{-Max} ttl_seconds=600 num_replicas=7 +/System/{NodeLivenessMax-tsd} range default +/System{/tsd-tse} ttl_seconds=42 +/System{tse-/Max} range default +... + +# Ensure that discarding other named zones behave as expected (reparenting them +# under RANGE DEFAULT). +exec-sql +ALTER RANGE meta CONFIGURE ZONE DISCARD; +ALTER RANGE timeseries CONFIGURE ZONE DISCARD; +---- + +mutations +---- +delete /{Min-System/NodeLiveness} +upsert /{Min-System/NodeLiveness} range default +delete /System{/tsd-tse} +upsert /System{/tsd-tse} range default + +state limit=5 +---- +/{Min-System/NodeLiveness} range default +/System/NodeLiveness{-Max} ttl_seconds=600 num_replicas=7 +/System/{NodeLivenessMax-tsd} range default +/System{/tsd-tse} range default +/System{tse-/Max} range default +... + + +# Ensure that changes to RANGE DEFAULT propagate to descendants. +exec-sql +CREATE DATABASE db; +CREATE TABLE db.t1(); +---- + +mutations +---- +upsert /Table/5{6-7} range default + +exec-sql +ALTER RANGE default CONFIGURE ZONE USING gc.ttlseconds = 50; +---- + +mutations +---- +delete /{Min-System/NodeLiveness} +upsert /{Min-System/NodeLiveness} ttl_seconds=50 +delete /System/{NodeLivenessMax-tsd} +upsert /System/{NodeLivenessMax-tsd} ttl_seconds=50 +delete /System{/tsd-tse} +upsert /System{/tsd-tse} ttl_seconds=50 +delete /System{tse-/Max} +upsert /System{tse-/Max} ttl_seconds=50 +delete /Table/5{6-7} +upsert /Table/5{6-7} ttl_seconds=50 + +state limit=5 +---- +/{Min-System/NodeLiveness} ttl_seconds=50 +/System/NodeLiveness{-Max} ttl_seconds=600 num_replicas=7 +/System/{NodeLivenessMax-tsd} ttl_seconds=50 +/System{/tsd-tse} ttl_seconds=50 +/System{tse-/Max} ttl_seconds=50 +... + +state offset=40 +---- +... +/Table/4{7-8} range system +/Table/5{6-7} ttl_seconds=50 + +# Make sure future descendants observe the same. +exec-sql +CREATE TABLE db.t2(); +---- + +mutations +---- +upsert /Table/5{7-8} ttl_seconds=50 + +state offset=40 +---- +... +/Table/4{7-8} range system +/Table/5{6-7} ttl_seconds=50 +/Table/5{7-8} ttl_seconds=50 + +exec-sql +ALTER RANGE system CONFIGURE ZONE USING gc.ttlseconds = 100; +---- + +mutations +---- +delete /System/{NodeLivenessMax-tsd} +upsert /System/{NodeLivenessMax-tsd} ttl_seconds=100 +delete /System{tse-/Max} +upsert /System{tse-/Max} ttl_seconds=100 + +state limit=5 +---- +/{Min-System/NodeLiveness} ttl_seconds=50 +/System/NodeLiveness{-Max} ttl_seconds=600 num_replicas=7 +/System/{NodeLivenessMax-tsd} ttl_seconds=100 +/System{/tsd-tse} ttl_seconds=50 +/System{tse-/Max} ttl_seconds=100 +... diff --git a/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/testdata/partitions b/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/testdata/partitions new file mode 100644 index 000000000000..995716994612 --- /dev/null +++ b/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/testdata/partitions @@ -0,0 +1,187 @@ +# Test behavior of span configs in the presence of partitions (we care about zone +# config inheritance from database -> table -> indexes -> partitions, with and +# without ancestor nodes with explicit configs). +# +# TODO(irfansharif): Would be worth capturing partitions on a secondary index. + +reconcile +---- + +mutations discard +---- + +state offset=41 +---- +... + +exec-sql +CREATE DATABASE db; +CREATE TABLE db.t(i INT PRIMARY KEY, j INT) PARTITION BY LIST (i) ( + PARTITION one_two VALUES IN (1, 2), + PARTITION three_four VALUES IN (3, 4), + PARTITION default VALUES IN (default) +); +---- + +# With no explicit zone configs, we should only observe a single span for the +# entire table. +mutations +---- +upsert /Table/5{6-7} range default + +state offset=41 +---- +... +/Table/5{6-7} range default + +# All parent schema zone config changes cascade to the entire table's span. +exec-sql +ALTER DATABASE db CONFIGURE ZONE USING num_replicas = 7; +ALTER TABLE db.t CONFIGURE ZONE USING num_voters = 5; +---- + +mutations +---- +delete /Table/5{6-7} +upsert /Table/5{6-7} num_replicas=7 num_voters=5 + +state offset=41 +---- +... +/Table/5{6-7} num_replicas=7 num_voters=5 + +# Apply a zone configuration on one of the partitions, `one_two`, which +# encompasses two (adjacent) spans -- both with global reads set to true. The +# table's spans before and after the partitions span continue to have the +# table's zone configuration. +exec-sql +ALTER PARTITION one_two OF TABLE db.t CONFIGURE ZONE USING global_reads = true +---- + +mutations +---- +delete /Table/5{6-7} +upsert /Table/56{-/1/1} num_replicas=7 num_voters=5 +upsert /Table/56/1/{1-2} global_reads=true num_replicas=7 num_voters=5 +upsert /Table/56/1/{2-3} global_reads=true num_replicas=7 num_voters=5 +upsert /Table/5{6/1/3-7} num_replicas=7 num_voters=5 + +state offset=41 +---- +... +/Table/56{-/1/1} num_replicas=7 num_voters=5 +/Table/56/1/{1-2} global_reads=true num_replicas=7 num_voters=5 +/Table/56/1/{2-3} global_reads=true num_replicas=7 num_voters=5 +/Table/5{6/1/3-7} num_replicas=7 num_voters=5 + +# Apply a zone config on the second partition; we should expect two more +# adjacent spans with ttl_seconds = 5. +exec-sql +ALTER PARTITION three_four OF TABLE db.t CONFIGURE ZONE USING gc.ttlseconds = 5 +---- + +mutations +---- +delete /Table/5{6/1/3-7} +upsert /Table/56/1/{3-4} ttl_seconds=5 num_replicas=7 num_voters=5 +upsert /Table/56/1/{4-5} ttl_seconds=5 num_replicas=7 num_voters=5 +upsert /Table/5{6/1/5-7} num_replicas=7 num_voters=5 + +state offset=41 +---- +... +/Table/56{-/1/1} num_replicas=7 num_voters=5 +/Table/56/1/{1-2} global_reads=true num_replicas=7 num_voters=5 +/Table/56/1/{2-3} global_reads=true num_replicas=7 num_voters=5 +/Table/56/1/{3-4} ttl_seconds=5 num_replicas=7 num_voters=5 +/Table/56/1/{4-5} ttl_seconds=5 num_replicas=7 num_voters=5 +/Table/5{6/1/5-7} num_replicas=7 num_voters=5 + +# Set a zone config for the default partition for the primary index. We should +# expect: +# - the (empty) span before the primary index to have inherit from the +# table's config (num_voters = 5); +# - each partition of the primary index with an explicit zone config set to +# have the specified attributes (num_voters = 5); +# - the keyspace starting after the primary index's last partition with an +# explicit config, and ending before the secondary index, to have the new +# partition default config (num_voters = 6); +# - the keyspace starting at the first key of the primary index, and ending +# before the primary index's first partition with an explicit config, to have +# the new partition default config (num_voters = 6). +exec-sql +ALTER PARTITION default OF TABLE db.t CONFIGURE ZONE USING num_voters = 6 +---- + +mutations +---- +delete /Table/56{-/1/1} +upsert /Table/56{-/1} num_replicas=7 num_voters=5 +upsert /Table/56/1{-/1} num_replicas=7 num_voters=6 +delete /Table/5{6/1/5-7} +upsert /Table/56/{1/5-2} num_replicas=7 num_voters=6 +upsert /Table/5{6/2-7} num_replicas=7 num_voters=5 + +state offset=41 +---- +... +/Table/56{-/1} num_replicas=7 num_voters=5 +/Table/56/1{-/1} num_replicas=7 num_voters=6 +/Table/56/1/{1-2} global_reads=true num_replicas=7 num_voters=5 +/Table/56/1/{2-3} global_reads=true num_replicas=7 num_voters=5 +/Table/56/1/{3-4} ttl_seconds=5 num_replicas=7 num_voters=5 +/Table/56/1/{4-5} ttl_seconds=5 num_replicas=7 num_voters=5 +/Table/56/{1/5-2} num_replicas=7 num_voters=6 +/Table/5{6/2-7} num_replicas=7 num_voters=5 + +# Discard the table's zone configuration, dropping all the num_voters = 5 +# overrides. +exec-sql +ALTER TABLE db.t CONFIGURE ZONE DISCARD +---- + +mutations +---- +delete /Table/56{-/1} +upsert /Table/56{-/1} num_replicas=7 +delete /Table/56/1/{1-2} +upsert /Table/56/1/{1-2} global_reads=true num_replicas=7 +delete /Table/56/1/{2-3} +upsert /Table/56/1/{2-3} global_reads=true num_replicas=7 +delete /Table/56/1/{3-4} +upsert /Table/56/1/{3-4} ttl_seconds=5 num_replicas=7 +delete /Table/56/1/{4-5} +upsert /Table/56/1/{4-5} ttl_seconds=5 num_replicas=7 +delete /Table/5{6/2-7} +upsert /Table/5{6/2-7} num_replicas=7 + +state offset=41 +---- +... +/Table/56{-/1} num_replicas=7 +/Table/56/1{-/1} num_replicas=7 num_voters=6 +/Table/56/1/{1-2} global_reads=true num_replicas=7 +/Table/56/1/{2-3} global_reads=true num_replicas=7 +/Table/56/1/{3-4} ttl_seconds=5 num_replicas=7 +/Table/56/1/{4-5} ttl_seconds=5 num_replicas=7 +/Table/56/{1/5-2} num_replicas=7 num_voters=6 +/Table/5{6/2-7} num_replicas=7 + +exec-sql +DROP TABLE db.t +---- + +mutations +---- +delete /Table/56{-/1} +delete /Table/56/1{-/1} +delete /Table/56/1/{1-2} +delete /Table/56/1/{2-3} +delete /Table/56/1/{3-4} +delete /Table/56/1/{4-5} +delete /Table/56/{1/5-2} +delete /Table/5{6/2-7} + +state offset=41 +---- +... diff --git a/pkg/migration/migrations/seed_tenant_span_configs.go b/pkg/migration/migrations/seed_tenant_span_configs.go index a856f374de34..d3ced358ed9e 100644 --- a/pkg/migration/migrations/seed_tenant_span_configs.go +++ b/pkg/migration/migrations/seed_tenant_span_configs.go @@ -51,7 +51,7 @@ func seedTenantSpanConfigsMigration( return err } - scKVAccessor := d.SpanConfig.KVAccessor.WithTxn(txn) + scKVAccessor := d.SpanConfig.KVAccessor.WithTxn(ctx, txn) for _, tenantID := range tenantIDs { // Install a single key span config at the start of tenant's // keyspace; elsewhere this ensures that we split on the tenant @@ -79,7 +79,7 @@ func seedTenantSpanConfigsMigration( if len(scEntries) != 0 { // This tenant already has span config entries. It was either // already migrated (migrations need to be idempotent) or it was - // created after PreSeedTenantSpanConfigs was activated. THere's + // created after PreSeedTenantSpanConfigs was activated. There's // nothing left to do here. continue } diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 8682e91d0151..d0d66862a7ac 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -115,6 +115,7 @@ go_library( "//pkg/spanconfig/spanconfigkvaccessor", "//pkg/spanconfig/spanconfigkvsubscriber", "//pkg/spanconfig/spanconfigmanager", + "//pkg/spanconfig/spanconfigreconciler", "//pkg/spanconfig/spanconfigsqltranslator", "//pkg/spanconfig/spanconfigsqlwatcher", "//pkg/sql", diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 664f984f5878..2fbf18cba4eb 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -52,6 +52,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigmanager" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigreconciler" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigsqltranslator" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigsqlwatcher" "github.com/cockroachdb/cockroach/pkg/sql" @@ -137,12 +138,14 @@ type SQLServer struct { stmtDiagnosticsRegistry *stmtdiagnostics.Registry // sqlLivenessSessionID will be populated with a non-zero value for non-system // tenants. - sqlLivenessSessionID sqlliveness.SessionID - sqlLivenessProvider sqlliveness.Provider - sqlInstanceProvider sqlinstance.Provider - metricsRegistry *metric.Registry - diagnosticsReporter *diagnostics.Reporter - spanconfigMgr *spanconfigmanager.Manager + sqlLivenessSessionID sqlliveness.SessionID + sqlLivenessProvider sqlliveness.Provider + sqlInstanceProvider sqlinstance.Provider + metricsRegistry *metric.Registry + diagnosticsReporter *diagnostics.Reporter + spanconfigMgr *spanconfigmanager.Manager + spanconfigSQLTranslator *spanconfigsqltranslator.SQLTranslator + spanconfigSQLWatcher *spanconfigsqlwatcher.SQLWatcher // settingsWatcher is utilized by secondary tenants to watch for settings // changes. It is nil on the system tenant. @@ -855,13 +858,17 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { execCfg.MigrationTestingKnobs = knobs } - var spanConfigMgr *spanconfigmanager.Manager + spanConfig := struct { + manager *spanconfigmanager.Manager + sqlTranslator *spanconfigsqltranslator.SQLTranslator + sqlWatcher *spanconfigsqlwatcher.SQLWatcher + }{} if !codec.ForSystemTenant() || cfg.SpanConfigsEnabled { // Instantiate a span config manager. If we're the host tenant we'll // only do it if COCKROACH_EXPERIMENTAL_SPAN_CONFIGS is set. spanConfigKnobs, _ := cfg.TestingKnobs.SpanConfig.(*spanconfig.TestingKnobs) - sqlTranslator := spanconfigsqltranslator.New(execCfg, codec, spanConfigKnobs) - sqlWatcher := spanconfigsqlwatcher.New( + spanConfig.sqlTranslator = spanconfigsqltranslator.New(execCfg, codec, spanConfigKnobs) + spanConfig.sqlWatcher = spanconfigsqlwatcher.New( codec, cfg.Settings, cfg.rangeFeedFactory, @@ -871,19 +878,26 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { 30*time.Second, /* checkpointNoopsEvery */ spanConfigKnobs, ) - spanConfigMgr = spanconfigmanager.New( + spanConfigReconciler := spanconfigreconciler.New( + spanConfig.sqlWatcher, + spanConfig.sqlTranslator, + cfg.spanConfigAccessor, + execCfg, + codec, + cfg.TenantID, + spanConfigKnobs, + ) + spanConfig.manager = spanconfigmanager.New( cfg.db, jobRegistry, cfg.circularInternalExecutor, cfg.stopper, cfg.Settings, - cfg.spanConfigAccessor, - sqlWatcher, - sqlTranslator, + spanConfigReconciler, spanConfigKnobs, ) - execCfg.SpanConfigReconciliationJobDeps = spanConfigMgr + execCfg.SpanConfigReconciliationJobDeps = spanConfig.manager } execCfg.SpanConfigKVAccessor = cfg.sqlServerOptionalKVArgs.spanConfigKVAccessor @@ -948,7 +962,9 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { sqlInstanceProvider: cfg.sqlInstanceProvider, metricsRegistry: cfg.registry, diagnosticsReporter: reporter, - spanconfigMgr: spanConfigMgr, + spanconfigMgr: spanConfig.manager, + spanconfigSQLTranslator: spanConfig.sqlTranslator, + spanconfigSQLWatcher: spanConfig.sqlWatcher, settingsWatcher: settingsWatcher, }, nil } diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 41c7f6193ced..da01b5a48f94 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -588,14 +588,19 @@ func (t *TestTenant) SpanConfigKVAccessor() interface{} { return t.SQLServer.tenantConnect } +// SpanConfigReconciler is part TestTenantInterface. +func (t *TestTenant) SpanConfigReconciler() interface{} { + return t.SQLServer.spanconfigMgr.Reconciler +} + // SpanConfigSQLTranslator is part TestTenantInterface. func (t *TestTenant) SpanConfigSQLTranslator() interface{} { - return t.SQLServer.spanconfigMgr.SQLTranslator + return t.SQLServer.spanconfigSQLTranslator } // SpanConfigSQLWatcher is part TestTenantInterface. func (t *TestTenant) SpanConfigSQLWatcher() interface{} { - return t.SQLServer.spanconfigMgr.SQLTranslator + return t.SQLServer.spanconfigSQLWatcher } // StartTenant starts a SQL tenant communicating with this TestServer. @@ -1043,24 +1048,28 @@ func (ts *TestServer) SpanConfigKVAccessor() interface{} { return ts.Server.node.spanConfigAccessor } +// SpanConfigReconciler is part of TestServerInterface. +func (ts *TestServer) SpanConfigReconciler() interface{} { + if ts.sqlServer.spanconfigMgr == nil { + panic("uninitialized; see EnableSpanConfigs testing knob to use span configs") + } + return ts.sqlServer.spanconfigMgr.Reconciler +} + // SpanConfigSQLTranslator is part of TestServerInterface. func (ts *TestServer) SpanConfigSQLTranslator() interface{} { - if ts.sqlServer.spanconfigMgr == nil { - panic( - "span config manager uninitialized; see EnableSpanConfigs testing knob to use span configs", - ) + if ts.sqlServer.spanconfigSQLTranslator == nil { + panic("uninitialized; see EnableSpanConfigs testing knob to use span configs") } - return ts.sqlServer.spanconfigMgr.SQLTranslator + return ts.sqlServer.spanconfigSQLTranslator } // SpanConfigSQLWatcher is part of TestServerInterface. func (ts *TestServer) SpanConfigSQLWatcher() interface{} { - if ts.sqlServer.spanconfigMgr == nil { - panic( - "span config manager uninitialized; see EnableSpanConfigs testing knob to use span configs", - ) + if ts.sqlServer.spanconfigSQLWatcher == nil { + panic("uninitialized; see EnableSpanConfigs testing knob to use span configs") } - return ts.sqlServer.spanconfigMgr.SQLWatcher + return ts.sqlServer.spanconfigSQLWatcher } // SQLServer is part of TestServerInterface. diff --git a/pkg/spanconfig/spanconfig.go b/pkg/spanconfig/spanconfig.go index 6743c93c15f3..c50b455d9c7f 100644 --- a/pkg/spanconfig/spanconfig.go +++ b/pkg/spanconfig/spanconfig.go @@ -46,9 +46,8 @@ type KVAccessor interface { // WithTxn returns a KVAccessor that runs using the given transaction (with // its operations discarded if aborted, valid only if committed). If nil, a - // transaction is created internally for every operation. If chained, - // (.WithTxn(txnA).WithTxn(txnB)), only the last transaction is used. - WithTxn(*kv.Txn) KVAccessor + // transaction is created internally for every operation. + WithTxn(context.Context, *kv.Txn) KVAccessor } // KVSubscriber presents a consistent[1] snapshot of a StoreReader that's @@ -150,14 +149,34 @@ type SQLWatcher interface { ) error } +// Reconciler is responsible for reconciling a tenant's zone configs (SQL +// construct) with the cluster's span configs (KV construct). It's the central +// engine for the span configs infrastructure; a single Reconciler instance is +// active for every tenant in the system. +type Reconciler interface { + // Reconcile starts the incremental reconciliation process from the given + // timestamp. If it does not find MVCC history going far back enough[1], it + // falls back to a scan of all descriptors and zone configs before being + // able to do more incremental work. The provided callback is invoked + // with timestamps that can be safely checkpointed. A future Reconciliation + // attempt can make use of this timestamp to reduce the amount of necessary + // work (provided the MVCC history is still available). + // + // [1]: It's possible for system.{zones,descriptor} to have been GC-ed away; + // think suspended tenants. + Reconcile( + ctx context.Context, + startTS hlc.Timestamp, + callback func(checkpoint hlc.Timestamp) error, + ) error +} + // ReconciliationDependencies captures what's needed by the span config // reconciliation job to perform its task. The job is responsible for // reconciling a tenant's zone configurations with the clusters span // configurations. type ReconciliationDependencies interface { - KVAccessor - SQLTranslator - SQLWatcher + Reconciler } // Store is a data structure used to store spans and their corresponding @@ -171,8 +190,7 @@ type Store interface { type StoreWriter interface { // Apply applies a batch of non-overlapping updates atomically[1] and // returns (i) the existing spans that were deleted, and (ii) the entries - // that were newly added to make room for the batch. The deleted list can - // also double as a list of overlapping spans in the Store[2]. + // that were newly added to make room for the batch. // // Span configs are stored in non-overlapping fashion. When an update // overlaps with existing configs, the existing configs are deleted. If the @@ -199,49 +217,8 @@ type StoreWriter interface { // Added | [--- D ----)[-- B --) [-- C -)[--- E ---) // Store* | [--- A ----)[--- D ----)[-- B --) [-- C -)[--- E ---) // - // TODO(irfansharif): We'll make use of the dryrun option in a future PR - // when wiring up the reconciliation job to use the KVAccessor. Since the - // KVAccessor is a "targeted" API (the spans being deleted/upserted - // have to already be present with the exact same bounds), we'll dryrun an - // update against a StoreWriter (pre-populated with the entries present in - // KV) to generate the targeted deletes and upserts we'd need to issue. - // After successfully installing them in KV, we can keep our StoreWriter - // up-to-date by actually applying the update. - // - // There's also the question of a "full reconciliation pass". We'll be - // generating updates reactively listening in on changes to - // system.{descriptor,zones} (see SQLWatcher). It's possible then for a - // suspended tenant's table history to be GC-ed away and for its SQLWatcher - // to never detect that a certain table/index/partition has been deleted. - // Left as is, this results in us never issuing a corresponding span config - // deletion request. We'd be leaving a bunch of delete-able span configs - // lying around, and a bunch of empty ranges as a result of those. A "full - // reconciliation pass" is our attempt to find all these extraneous entries - // in KV and to delete them. - // - // We can use a StoreWriter here too (one that's pre-populated with the - // contents of KVAccessor, as before). We'd iterate through all descriptors, - // find all overlapping spans, issue KVAccessor deletes for them, and upsert - // the descriptor's span config[3]. As for the StoreWriter itself, we'd - // simply delete the overlapping entries. After iterating through all the - // descriptors, we'd finally issue KVAccessor deletes for all span configs - // still remaining in the Store. - // - // TODO(irfansharif): The descriptions above presume holding the entire set - // of span configs in memory, but we could break away from that by adding - // pagination + retrieval limit to the GetSpanConfigEntriesFor API. We'd - // then paginate through chunks of the keyspace at a time, do a "full - // reconciliation pass" over just that chunk, and continue. - // // [1]: Unless dryrun is true. We'll still generate the same {deleted,added} // lists. - // [2]: We could alternatively expose a GetAllOverlapping() API to make - // things clearer. - // [3]: We could skip the delete + upsert dance if the descriptor's exact - // span config entry already exists in KV. Using Apply (dryrun=true) - // against a StoreWriter (populated using KVAccessor contents) using - // the descriptor's span config entry would return empty lists, - // indicating a no-op. Apply(ctx context.Context, dryrun bool, updates ...Update) ( deleted []roachpb.Span, added []roachpb.SpanConfigEntry, ) diff --git a/pkg/spanconfig/spanconfigjob/BUILD.bazel b/pkg/spanconfig/spanconfigjob/BUILD.bazel index a547ebd5ee55..f814a5f32fd4 100644 --- a/pkg/spanconfig/spanconfigjob/BUILD.bazel +++ b/pkg/spanconfig/spanconfigjob/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "//pkg/jobs/jobspb", "//pkg/settings/cluster", "//pkg/sql", + "//pkg/util/hlc", "@com_github_cockroachdb_errors//:errors", ], ) diff --git a/pkg/spanconfig/spanconfigjob/job.go b/pkg/spanconfig/spanconfigjob/job.go index fa461a0d11cb..5d73ee97e042 100644 --- a/pkg/spanconfig/spanconfigjob/job.go +++ b/pkg/spanconfig/spanconfigjob/job.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/errors" ) @@ -31,11 +32,25 @@ func (r *resumer) Resume(ctx context.Context, execCtxI interface{}) error { execCtx := execCtxI.(sql.JobExecContext) rc := execCtx.SpanConfigReconciliationJobDeps() - // TODO(irfansharif): Actually make use of these dependencies. - _ = rc + // TODO(irfansharif): #73086 bubbles up retryable errors from the + // reconciler/underlying watcher in the (very) unlikely event that it's + // unable to generate incremental updates from the given timestamp (things + // could've been GC-ed from underneath us). For such errors, instead of + // failing this entire job, we should simply retry the reconciliation + // process here. Not doing so is still fine, the spanconfig.Manager starts + // the job all over again after some time, it's just that the checks for + // failed jobs happen infrequently. - <-ctx.Done() - return ctx.Err() + if err := rc.Reconcile(ctx, hlc.Timestamp{}, func(checkpoint hlc.Timestamp) error { + // TODO(irfansharif): Stash this checkpoint somewhere and use it when + // starting back up. + _ = checkpoint + return nil + }); err != nil { + return err + } + + return nil } // OnFailOrCancel implements the jobs.Resumer interface. diff --git a/pkg/spanconfig/spanconfigkvaccessor/dummy.go b/pkg/spanconfig/spanconfigkvaccessor/dummy.go index fd46b89d474e..b6171758fd45 100644 --- a/pkg/spanconfig/spanconfigkvaccessor/dummy.go +++ b/pkg/spanconfig/spanconfigkvaccessor/dummy.go @@ -53,6 +53,6 @@ func (k dummyKVAccessor) UpdateSpanConfigEntries( return k.error } -func (k dummyKVAccessor) WithTxn(*kv.Txn) spanconfig.KVAccessor { +func (k dummyKVAccessor) WithTxn(context.Context, *kv.Txn) spanconfig.KVAccessor { return k } diff --git a/pkg/spanconfig/spanconfigkvaccessor/kvaccessor.go b/pkg/spanconfig/spanconfigkvaccessor/kvaccessor.go index 4daa6e5f05a9..fba4f1397a48 100644 --- a/pkg/spanconfig/spanconfigkvaccessor/kvaccessor.go +++ b/pkg/spanconfig/spanconfigkvaccessor/kvaccessor.go @@ -61,7 +61,10 @@ func New( } // WithTxn is part of the KVAccessor interface. -func (k *KVAccessor) WithTxn(txn *kv.Txn) spanconfig.KVAccessor { +func (k *KVAccessor) WithTxn(ctx context.Context, txn *kv.Txn) spanconfig.KVAccessor { + if k.optionalTxn != nil { + log.Fatalf(ctx, "KVAccessor already scoped to txn (was .WithTxn(...) chained multiple times?)") + } return newKVAccessor(k.db, k.ie, k.settings, k.configurationsTableFQN, txn) } diff --git a/pkg/spanconfig/spanconfigmanager/manager.go b/pkg/spanconfig/spanconfigmanager/manager.go index 6ca2f58bf9e9..42f4e7cd26fd 100644 --- a/pkg/spanconfig/spanconfigmanager/manager.go +++ b/pkg/spanconfig/spanconfigmanager/manager.go @@ -59,9 +59,7 @@ type Manager struct { settings *cluster.Settings knobs *spanconfig.TestingKnobs - spanconfig.KVAccessor - spanconfig.SQLWatcher - spanconfig.SQLTranslator + spanconfig.Reconciler } var _ spanconfig.ReconciliationDependencies = &Manager{} @@ -73,24 +71,20 @@ func New( ie sqlutil.InternalExecutor, stopper *stop.Stopper, settings *cluster.Settings, - kvAccessor spanconfig.KVAccessor, - sqlWatcher spanconfig.SQLWatcher, - sqlTranslator spanconfig.SQLTranslator, + reconciler spanconfig.Reconciler, knobs *spanconfig.TestingKnobs, ) *Manager { if knobs == nil { knobs = &spanconfig.TestingKnobs{} } return &Manager{ - db: db, - jr: jr, - ie: ie, - stopper: stopper, - settings: settings, - KVAccessor: kvAccessor, - SQLWatcher: sqlWatcher, - SQLTranslator: sqlTranslator, - knobs: knobs, + db: db, + jr: jr, + ie: ie, + stopper: stopper, + settings: settings, + Reconciler: reconciler, + knobs: knobs, } } diff --git a/pkg/spanconfig/spanconfigmanager/manager_test.go b/pkg/spanconfig/spanconfigmanager/manager_test.go index 1b7525121c5d..f3319da76e83 100644 --- a/pkg/spanconfig/spanconfigmanager/manager_test.go +++ b/pkg/spanconfig/spanconfigmanager/manager_test.go @@ -73,9 +73,7 @@ func TestManagerConcurrentJobCreation(t *testing.T) { ts.InternalExecutor().(*sql.InternalExecutor), ts.Stopper(), ts.ClusterSettings(), - ts.SpanConfigKVAccessor().(spanconfig.KVAccessor), - ts.SpanConfigSQLWatcher().(spanconfig.SQLWatcher), - ts.SpanConfigSQLTranslator().(spanconfig.SQLTranslator), + ts.SpanConfigReconciler().(spanconfig.Reconciler), &spanconfig.TestingKnobs{ ManagerCreatedJobInterceptor: func(jobI interface{}) { job := jobI.(*jobs.Job) @@ -162,9 +160,7 @@ func TestManagerStartsJobIfFailed(t *testing.T) { ts.InternalExecutor().(*sql.InternalExecutor), ts.Stopper(), ts.ClusterSettings(), - ts.SpanConfigKVAccessor().(spanconfig.KVAccessor), - ts.SpanConfigSQLWatcher().(spanconfig.SQLWatcher), - ts.SpanConfigSQLTranslator().(spanconfig.SQLTranslator), + ts.SpanConfigReconciler().(spanconfig.Reconciler), &spanconfig.TestingKnobs{ ManagerAfterCheckedReconciliationJobExistsInterceptor: func(exists bool) { require.False(t, exists) @@ -238,9 +234,7 @@ func TestManagerCheckJobConditions(t *testing.T) { ts.InternalExecutor().(*sql.InternalExecutor), ts.Stopper(), ts.ClusterSettings(), - ts.SpanConfigKVAccessor().(spanconfig.KVAccessor), - ts.SpanConfigSQLWatcher().(spanconfig.SQLWatcher), - ts.SpanConfigSQLTranslator().(spanconfig.SQLTranslator), + ts.SpanConfigReconciler().(spanconfig.Reconciler), &spanconfig.TestingKnobs{ ManagerDisableJobCreation: true, ManagerCheckJobInterceptor: func() { diff --git a/pkg/spanconfig/spanconfigreconciler/BUILD.bazel b/pkg/spanconfig/spanconfigreconciler/BUILD.bazel new file mode 100644 index 000000000000..48eb36f5a59e --- /dev/null +++ b/pkg/spanconfig/spanconfigreconciler/BUILD.bazel @@ -0,0 +1,28 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "spanconfigreconciler", + srcs = ["reconciler.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigreconciler", + visibility = ["//visibility:public"], + deps = [ + "//pkg/keys", + "//pkg/kv", + "//pkg/roachpb:with-mocks", + "//pkg/spanconfig", + "//pkg/spanconfig/spanconfigstore", + "//pkg/sql", + "//pkg/sql/catalog", + "//pkg/sql/catalog/descpb", + "//pkg/sql/catalog/descs", + "//pkg/sql/sem/tree", + "//pkg/util/hlc", + "@com_github_cockroachdb_errors//:errors", + ], +) + +go_test( + name = "spanconfigreconciler_test", + srcs = ["reconciler_test.go"], + data = glob(["testdata/**"]), +) diff --git a/pkg/spanconfig/spanconfigreconciler/reconciler.go b/pkg/spanconfig/spanconfigreconciler/reconciler.go new file mode 100644 index 000000000000..d6e0e5743eac --- /dev/null +++ b/pkg/spanconfig/spanconfigreconciler/reconciler.go @@ -0,0 +1,438 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigreconciler + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigstore" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/errors" +) + +// Reconciler is a concrete implementation of the spanconfig.Reconciler +// interface. +type Reconciler struct { + sqlWatcher spanconfig.SQLWatcher + sqlTranslator spanconfig.SQLTranslator + kvAccessor spanconfig.KVAccessor + + execCfg *sql.ExecutorConfig + codec keys.SQLCodec + tenID roachpb.TenantID + knobs *spanconfig.TestingKnobs +} + +var _ spanconfig.Reconciler = &Reconciler{} + +// New constructs a new Reconciler. +func New( + sqlWatcher spanconfig.SQLWatcher, + sqlTranslator spanconfig.SQLTranslator, + kvAccessor spanconfig.KVAccessor, + execCfg *sql.ExecutorConfig, + codec keys.SQLCodec, + tenID roachpb.TenantID, + knobs *spanconfig.TestingKnobs, +) *Reconciler { + if knobs == nil { + knobs = &spanconfig.TestingKnobs{} + } + return &Reconciler{ + sqlWatcher: sqlWatcher, + sqlTranslator: sqlTranslator, + kvAccessor: kvAccessor, + + execCfg: execCfg, + codec: codec, + tenID: tenID, + knobs: knobs, + } +} + +// Reconcile is part of the spanconfig.Reconciler interface; it's responsible +// for reconciling a tenant's zone configs with the cluster's span configs (KV +// construct). It does so incrementally and continuously, internally leveraging +// SQL{Watcher,Translator}, KVAccessor, and Store to make progress. +// +// Let's walk through what it does. At a high-level, we maintain an in-memory +// data structure that's up-to-date with the contents of the KV (for the +// subset of spans we have access to, i.e. the keyspace carved out for our +// tenant ID, host or otherwise). We watch for changes to SQL state +// (descriptors, zone configs), translate the SQL updates to the flattened +// span+config form, "diff" the updates against the data structure to see if +// there are any changes we need to inform KV of. If so, we do, and ensure that +// the data structure is kept up-to-date. We continue watching for future +// updates, repeating as necessary. +// +// There's a single instance of the Reconciler running for a given tenant at a +// given point it time (mutual exclusion/leasing is provided by the jobs +// subsystem and the span config manager). We needn't worry about contending +// writers, or the KV state being changed from underneath us. What we do have to +// worry about, however, is suspended tenants' not being reconciling while +// suspended. It's possible for a suspended tenant's SQL state to be GC-ed away +// at older MVCC timestamps; when watching for changes, we could fail to observe +// tables/indexes/partitions getting deleted[1]. Left as is, this would result +// in us never issuing a corresponding deletion requests for the dropped span +// configs -- we'd be leaving orphaned span configs lying around (taking up +// storage space and creating pointless empty ranges). A "full reconciliation +// pass" is our attempt to find all these extraneous entries in KV and to delete +// them. +// +// We can use our span config data structure here too, one that's pre-populated +// with the contents of KV. We translate the entire SQL state into constituent +// spans and configs, diff against our data structure to generate KV updates +// that we then apply. We follow this with clearing out all these spans in our +// data structure, leaving behind all extraneous entries to be found in KV -- +// entries we can then simply issue deletes for. +// +// [1]: #73399 proposes a new KV request type that would let us more rapidly +// trigger reconciliation after a tenant's SQL transaction. If we're able +// to do this fast enough, it would be reasonable to wait for +// reconciliation to happen before returning to the client. We could +// alternatively use it as part of a handshake protocol during pod +// suspension, to ensure that all outstanding work ("reconciliation" has +// been done before a pod is suspended. +// +// TODO(irfansharif): The descriptions above presume holding the entire set of +// span configs in memory, but we could break away from that by adding +// pagination + retrieval limit to the GetSpanConfigEntriesFor API. We'd then +// paginate through chunks of the keyspace at a time, do a "full reconciliation +// pass" over just that chunk, and continue. +// +// TODO(irfansharif): We probably want some degree of batching when issuing RPCs +// to KV -- right now we sent forth the entire set of updates since our last +// checkpoint. For changes to, say, RANGE DEFAULT, the RPC request proto is +// proportional to the number of schema objects. +func (r *Reconciler) Reconcile( + ctx context.Context, startTS hlc.Timestamp, callback func(checkpoint hlc.Timestamp) error, +) error { + // TODO(irfansharif): Check system.{zones,descriptors} for last GC timestamp + // and avoid the full reconciliation pass if the startTS provided is + // visible to the rangefeed. Right now we're doing a full reconciliation + // pass every time the reconciliation job kicks us off. + _ = startTS + + full := fullReconciler{ + sqlTranslator: r.sqlTranslator, + kvAccessor: r.kvAccessor, + codec: r.codec, + tenID: r.tenID, + } + latestStore, reconciledUpto, err := full.reconcile(ctx) + if err != nil { + return err + } + + if err := callback(reconciledUpto); err != nil { + return err + } + + incremental := incrementalReconciler{ + sqlTranslator: r.sqlTranslator, + sqlWatcher: r.sqlWatcher, + kvAccessor: r.kvAccessor, + storeWithKVContents: latestStore, + execCfg: r.execCfg, + codec: r.codec, + knobs: r.knobs, + } + return incremental.reconcile(ctx, reconciledUpto, callback) +} + +// fullReconciler orchestrates the full reconciliation process. +type fullReconciler struct { + sqlTranslator spanconfig.SQLTranslator + kvAccessor spanconfig.KVAccessor + + codec keys.SQLCodec + tenID roachpb.TenantID +} + +// reconcile runs the full reconciliation process, returning: +// - a store with the latest set of span configs under our purview; +// - the timestamp we've reconciled up until. +func (f *fullReconciler) reconcile( + ctx context.Context, +) (storeWithLatestSpanConfigs *spanconfigstore.Store, reconciledUpUntil hlc.Timestamp, _ error) { + storeWithExistingSpanConfigs, err := f.fetchExistingSpanConfigs(ctx) + if err != nil { + return nil, hlc.Timestamp{}, err + } + + // Translate the entire SQL state to ensure KV reflects the most up-to-date + // view of things. + var entries []roachpb.SpanConfigEntry + entries, reconciledUpUntil, err = spanconfig.FullTranslate(ctx, f.sqlTranslator) + if err != nil { + return nil, hlc.Timestamp{}, err + } + + updates := make([]spanconfig.Update, len(entries)) + for i, entry := range entries { + updates[i] = spanconfig.Update{ + Span: entry.Span, + Config: entry.Config, + } + } + + toDelete, toUpsert := storeWithExistingSpanConfigs.Apply(ctx, false /* dryrun */, updates...) + if len(toDelete) != 0 || len(toUpsert) != 0 { + if err := f.kvAccessor.UpdateSpanConfigEntries(ctx, toDelete, toUpsert); err != nil { + return nil, hlc.Timestamp{}, err + } + } + + // Keep a copy of the current view of the world (i.e. KVAccessor + // contents). We could also fetch everything from KV, but making a copy here + // is cheaper (and saves an RTT). We'll later mutate + // storeWithExistingSpanConfigs to determine what extraneous entries are in + // KV, in order to delete them. After doing so, we'll issue those same + // deletions against this copy in order for it to reflect an up-to-date view + // of span configs. + storeWithLatestSpanConfigs = storeWithExistingSpanConfigs.Copy(ctx) + + // Delete all updated spans in a store populated with all current entries. + // Because our translation above captures the entire SQL state, deleting all + // "updates" will leave behind only the extraneous entries in KV -- we'll + // get rid of them below. + var storeWithExtraneousSpanConfigs *spanconfigstore.Store + { + for _, update := range updates { + storeWithExistingSpanConfigs.Apply(ctx, false /* dryrun */, spanconfig.Update{ + Span: update.Span, + Config: roachpb.SpanConfig{}, // delete + }) + } + storeWithExtraneousSpanConfigs = storeWithExistingSpanConfigs + } + + deletedSpans, err := f.deleteExtraneousSpanConfigs(ctx, storeWithExtraneousSpanConfigs) + if err != nil { + return nil, hlc.Timestamp{}, err + } + + // Update the store that's supposed to reflect the latest span config + // contents. As before, we could've fetched this state from KV directly, but + // doing it this way is cheaper. + for _, deletedSpan := range deletedSpans { + storeWithLatestSpanConfigs.Apply(ctx, false /* dryrun */, spanconfig.Update{ + Span: deletedSpan, + Config: roachpb.SpanConfig{}, // delete + }) + } + + return storeWithLatestSpanConfigs, reconciledUpUntil, nil +} + +// fetchExistingSpanConfigs returns a store populated with all span configs +// under our purview. +func (f *fullReconciler) fetchExistingSpanConfigs( + ctx context.Context, +) (*spanconfigstore.Store, error) { + var tenantSpan roachpb.Span + if f.codec.ForSystemTenant() { + // The system tenant governs all system keys (meta, liveness, timeseries + // ranges, etc.) and system tenant tables. + // + // TODO(irfansharif): Should we include the scratch range here? Some + // tests make use of it; we may want to declare configs over it and have + // it considered all the same. + tenantSpan = roachpb.Span{ + Key: keys.EverythingSpan.Key, + EndKey: keys.TableDataMax, + } + } else { + // Secondary tenants govern everything prefixed by their tenant ID. + tenPrefix := keys.MakeTenantPrefix(f.tenID) + tenantSpan = roachpb.Span{ + Key: tenPrefix, + EndKey: tenPrefix.PrefixEnd(), + } + } + + store := spanconfigstore.New(roachpb.SpanConfig{}) + { + // Fully populate the store with KVAccessor contents. + entries, err := f.kvAccessor.GetSpanConfigEntriesFor(ctx, []roachpb.Span{ + tenantSpan, + }) + if err != nil { + return nil, err + } + + for _, entry := range entries { + store.Apply(ctx, false /* dryrun */, spanconfig.Update{ + Span: entry.Span, + Config: entry.Config, + }) + } + } + return store, nil +} + +// deleteExtraneousSpanConfigs deletes all extraneous span configs from KV. +func (f *fullReconciler) deleteExtraneousSpanConfigs( + ctx context.Context, storeWithExtraneousSpanConfigs *spanconfigstore.Store, +) ([]roachpb.Span, error) { + var extraneousSpans []roachpb.Span + if err := storeWithExtraneousSpanConfigs.ForEachOverlapping(ctx, keys.EverythingSpan, + func(entry roachpb.SpanConfigEntry) error { + extraneousSpans = append(extraneousSpans, entry.Span) + return nil + }, + ); err != nil { + return nil, err + } + + // Delete the extraneous entries, if any. + if len(extraneousSpans) != 0 { + if err := f.kvAccessor.UpdateSpanConfigEntries(ctx, extraneousSpans, nil); err != nil { + return nil, err + } + } + return extraneousSpans, nil +} + +// incrementalReconciler orchestrates the incremental reconciliation process. +type incrementalReconciler struct { + sqlTranslator spanconfig.SQLTranslator + sqlWatcher spanconfig.SQLWatcher + kvAccessor spanconfig.KVAccessor + storeWithKVContents *spanconfigstore.Store + + execCfg *sql.ExecutorConfig + codec keys.SQLCodec + knobs *spanconfig.TestingKnobs +} + +// reconcile runs the incremental reconciliation process. It takes in: +// - the timestamp to start the incremental process from (typically a timestamp +// we've already reconciled up until); +// - a callback that it invokes periodically with timestamps that it's +// reconciled up until. +func (r *incrementalReconciler) reconcile( + ctx context.Context, startTS hlc.Timestamp, callback func(reconciledUpUntil hlc.Timestamp) error, +) error { + // Watch for incremental updates, applying KV as things change. + return r.sqlWatcher.WatchForSQLUpdates(ctx, startTS, + func(ctx context.Context, descriptorUpdates []spanconfig.DescriptorUpdate, checkpoint hlc.Timestamp) error { + if len(descriptorUpdates) == 0 { + return callback(checkpoint) // nothing to do; propagate the checkpoint + } + + var allIDs descpb.IDs + for _, update := range descriptorUpdates { + allIDs = append(allIDs, update.ID) + } + + // TODO(irfansharif): Would it be easier to just have the translator + // return the set of missing table IDs? We're using two transactions + // here, somewhat wastefully. An alternative would be to have a + // txn-scoped translator. + + missingTableIDs, err := r.filterForMissingTableIDs(ctx, descriptorUpdates) + if err != nil { + return err + } + + entries, _, err := r.sqlTranslator.Translate(ctx, allIDs) + if err != nil { + return err + } + + updates := make([]spanconfig.Update, 0, len(missingTableIDs)+len(entries)) + for _, entry := range entries { + // Update span configs for SQL state that changed. + updates = append(updates, spanconfig.Update{ + Span: entry.Span, + Config: entry.Config, + }) + } + for _, missingID := range missingTableIDs { + // Delete span configs for missing tables. + tableSpan := roachpb.Span{ + Key: r.codec.TablePrefix(uint32(missingID)), + EndKey: r.codec.TablePrefix(uint32(missingID)).PrefixEnd(), + } + updates = append(updates, spanconfig.Update{ + Span: tableSpan, + Config: roachpb.SpanConfig{}, // delete + }) + } + + toDelete, toUpsert := r.storeWithKVContents.Apply(ctx, false /* dryrun */, updates...) + if len(toDelete) != 0 || len(toUpsert) != 0 { + if err := r.kvAccessor.UpdateSpanConfigEntries(ctx, toDelete, toUpsert); err != nil { + return err + } + } + + return callback(checkpoint) + }, + ) +} + +// filterForMissingTableIDs filters the set of updates returning only the set of +// "missing" table IDs. These are descriptors that are no longer found, because +// they've been GC-ed away[1]. +// +// [1]: Or if the ExcludeDroppedDescriptorsFromLookup testing knob is used, +// this includes dropped descriptors. +func (r *incrementalReconciler) filterForMissingTableIDs( + ctx context.Context, updates []spanconfig.DescriptorUpdate, +) (descpb.IDs, error) { + var missingIDs descpb.IDs + if err := sql.DescsTxn(ctx, r.execCfg, + func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) error { + for _, update := range updates { + if update.DescriptorType != catalog.Table { + continue // nothing to do + } + + desc, err := descsCol.GetImmutableDescriptorByID(ctx, txn, update.ID, tree.CommonLookupFlags{ + Required: true, // we want to error out for missing descriptors + IncludeDropped: true, + IncludeOffline: true, + AvoidLeased: true, // we want consistent reads + }) + + considerAsMissing := false + if errors.Is(err, catalog.ErrDescriptorNotFound) { + considerAsMissing = true + } else if err != nil { + return err + } else if r.knobs.ExcludeDroppedDescriptorsFromLookup && desc.Dropped() { + considerAsMissing = true + } + + if considerAsMissing { + missingIDs = append(missingIDs, update.ID) // accumulate the set of missing table IDs + } + } + + return nil + }, + ); err != nil { + return nil, err + } + + return missingIDs, nil +} diff --git a/pkg/spanconfig/spanconfigreconciler/reconciler_test.go b/pkg/spanconfig/spanconfigreconciler/reconciler_test.go new file mode 100644 index 000000000000..b78811f9f2ac --- /dev/null +++ b/pkg/spanconfig/spanconfigreconciler/reconciler_test.go @@ -0,0 +1,14 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigreconciler_test + +// Tests for this package can be found under +// pkg/ccl/spanconfigccl/spanconfigreconcilerccl. diff --git a/pkg/spanconfig/spanconfigtestutils/BUILD.bazel b/pkg/spanconfig/spanconfigtestutils/BUILD.bazel index 2040b2a40436..b99ecea62455 100644 --- a/pkg/spanconfig/spanconfigtestutils/BUILD.bazel +++ b/pkg/spanconfig/spanconfigtestutils/BUILD.bazel @@ -2,13 +2,17 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "spanconfigtestutils", - srcs = ["utils.go"], + srcs = [ + "recorder.go", + "utils.go", + ], importpath = "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigtestutils", visibility = ["//visibility:public"], deps = [ "//pkg/kv", "//pkg/roachpb:with-mocks", "//pkg/spanconfig", + "//pkg/util/syncutil", ], ) diff --git a/pkg/spanconfig/spanconfigtestutils/recorder.go b/pkg/spanconfig/spanconfigtestutils/recorder.go new file mode 100644 index 000000000000..7782badbed33 --- /dev/null +++ b/pkg/spanconfig/spanconfigtestutils/recorder.go @@ -0,0 +1,128 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigtestutils + +import ( + "context" + "fmt" + "sort" + "strings" + + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// KVAccessorRecorder wraps around a KVAccessor and records the mutations +// applied to it. +type KVAccessorRecorder struct { + underlying spanconfig.KVAccessor + + mu struct { + syncutil.Mutex + mutations []mutation + batchCount int + } +} + +var _ spanconfig.KVAccessor = &KVAccessorRecorder{} + +// NewKVAccessorRecorder returns a new KVAccessorRecorder. +func NewKVAccessorRecorder(underlying spanconfig.KVAccessor) *KVAccessorRecorder { + return &KVAccessorRecorder{ + underlying: underlying, + } +} + +type mutation struct { + update spanconfig.Update + batchIdx int +} + +// GetSpanConfigEntriesFor is part of the KVAccessor interface. +func (r *KVAccessorRecorder) GetSpanConfigEntriesFor( + ctx context.Context, spans []roachpb.Span, +) ([]roachpb.SpanConfigEntry, error) { + return r.underlying.GetSpanConfigEntriesFor(ctx, spans) +} + +// UpdateSpanConfigEntries is part of the KVAccessor interface. +func (r *KVAccessorRecorder) UpdateSpanConfigEntries( + ctx context.Context, toDelete []roachpb.Span, toUpsert []roachpb.SpanConfigEntry, +) error { + if err := r.underlying.UpdateSpanConfigEntries(ctx, toDelete, toUpsert); err != nil { + return err + } + + r.mu.Lock() + defer r.mu.Unlock() + + for _, d := range toDelete { + r.mu.mutations = append(r.mu.mutations, mutation{ + update: spanconfig.Update{Span: d}, + batchIdx: r.mu.batchCount, + }) + } + for _, u := range toUpsert { + r.mu.mutations = append(r.mu.mutations, mutation{ + update: spanconfig.Update{Span: u.Span, Config: u.Config}, + batchIdx: r.mu.batchCount, + }) + } + r.mu.batchCount++ + return nil +} + +// WithTxn is part of the KVAccessor interface. +func (r *KVAccessorRecorder) WithTxn(context.Context, *kv.Txn) spanconfig.KVAccessor { + panic("unimplemented") +} + +// Recording returns a string-ified form of the mutations made so far, i.e. list +// of spans that were deleted and entries that were upserted. It optionally +// clears out the recording. +func (r *KVAccessorRecorder) Recording(clear bool) string { + r.mu.Lock() + defer r.mu.Unlock() + + sort.Slice(r.mu.mutations, func(i, j int) bool { + mi, mj := r.mu.mutations[i], r.mu.mutations[j] + if mi.batchIdx != mj.batchIdx { // sort by batch/ts order + return mi.batchIdx < mj.batchIdx + } + if !mi.update.Span.Key.Equal(mj.update.Span.Key) { // sort by key order + return mi.update.Span.Key.Compare(mj.update.Span.Key) < 0 + } + + return mi.update.Deletion() // sort deletes before upserts + }) + + // TODO(irfansharif): We could also print out separators to distinguish + // between different batches, if any. + + var output strings.Builder + for _, m := range r.mu.mutations { + if m.update.Deletion() { + output.WriteString(fmt.Sprintf("delete %s\n", m.update.Span)) + } else { + output.WriteString(fmt.Sprintf("upsert %-35s %s\n", m.update.Span, + PrintSpanConfigDiffedAgainstDefaults(m.update.Config))) + } + } + + if clear { + r.mu.mutations = r.mu.mutations[:0] + r.mu.batchCount = 0 + } + + return output.String() +} diff --git a/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster/BUILD.bazel b/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster/BUILD.bazel index f0ee625179be..95b65d615f22 100644 --- a/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster/BUILD.bazel +++ b/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster/BUILD.bazel @@ -13,6 +13,9 @@ go_library( "//pkg/kv", "//pkg/roachpb:with-mocks", "//pkg/security", + "//pkg/spanconfig", + "//pkg/spanconfig/spanconfigreconciler", + "//pkg/spanconfig/spanconfigtestutils", "//pkg/sql", "//pkg/sql/catalog", "//pkg/sql/catalog/descs", @@ -21,7 +24,9 @@ go_library( "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", + "//pkg/util/hlc", "//pkg/util/log", + "//pkg/util/syncutil", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster/cluster.go b/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster/cluster.go index 6e06f97b885b..92531eb76dd3 100644 --- a/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster/cluster.go +++ b/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster/cluster.go @@ -19,6 +19,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigreconciler" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigtestutils" + "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -73,6 +77,26 @@ func (h *Handle) InitializeTenant(ctx context.Context, tenID roachpb.TenantID) * } } + var tenKnobs *spanconfig.TestingKnobs + if scKnobs := tenantState.TestingKnobs().SpanConfig; scKnobs != nil { + tenKnobs = scKnobs.(*spanconfig.TestingKnobs) + } + tenExecCfg := tenantState.ExecutorConfig().(sql.ExecutorConfig) + tenKVAccessor := tenantState.SpanConfigKVAccessor().(spanconfig.KVAccessor) + tenSQLTranslator := tenantState.SpanConfigSQLTranslator().(spanconfig.SQLTranslator) + tenSQLWatcher := tenantState.SpanConfigSQLWatcher().(spanconfig.SQLWatcher) + + tenantState.recorder = spanconfigtestutils.NewKVAccessorRecorder(tenKVAccessor) + tenantState.reconciler = spanconfigreconciler.New( + tenSQLWatcher, + tenSQLTranslator, + tenantState.recorder, + &tenExecCfg, + tenExecCfg.Codec, + tenID, + tenKnobs, + ) + h.ts[tenID] = tenantState return tenantState } diff --git a/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster/tenant_state.go b/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster/tenant_state.go index 4f9d8f2121cb..1e84f18a5e81 100644 --- a/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster/tenant_state.go +++ b/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster/tenant_state.go @@ -16,6 +16,9 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigreconciler" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigtestutils" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" @@ -23,6 +26,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/stretchr/testify/require" ) @@ -31,14 +36,49 @@ import ( type Tenant struct { serverutils.TestTenantInterface - t *testing.T - db *sqlutils.SQLRunner - cleanup func() + t *testing.T + db *sqlutils.SQLRunner + reconciler *spanconfigreconciler.Reconciler + recorder *spanconfigtestutils.KVAccessorRecorder + cleanup func() + + mu struct { + syncutil.Mutex + lastCheckpoint, tsAfterLastExec hlc.Timestamp + } } -// Exec is a wrapper around gosql.Exec that kills the test on error. +// Exec is a wrapper around gosql.Exec that kills the test on error. It records +// the execution timestamp for subsequent use. func (s *Tenant) Exec(query string, args ...interface{}) { s.db.Exec(s.t, query, args...) + + s.mu.Lock() + defer s.mu.Unlock() + s.mu.tsAfterLastExec = s.Clock().Now() +} + +// TimestampAfterLastExec returns a timestamp after the last time Exec was +// invoked. It can be used for transactional ordering guarantees. +func (s *Tenant) TimestampAfterLastExec() hlc.Timestamp { + s.mu.Lock() + defer s.mu.Unlock() + return s.mu.tsAfterLastExec +} + +// Checkpoint is used to record a checkpointed timestamp, retrievable via +// LastCheckpoint. +func (s *Tenant) Checkpoint(ts hlc.Timestamp) { + s.mu.Lock() + defer s.mu.Unlock() + s.mu.lastCheckpoint = ts +} + +// LastCheckpoint returns the last recorded checkpoint timestamp. +func (s *Tenant) LastCheckpoint() hlc.Timestamp { + s.mu.Lock() + defer s.mu.Unlock() + return s.mu.lastCheckpoint } // Query is a wrapper around gosql.Query that kills the test on error. @@ -46,6 +86,17 @@ func (s *Tenant) Query(query string, args ...interface{}) *gosql.Rows { return s.db.Query(s.t, query, args...) } +// Reconciler returns the reconciler associated with the given tenant. +func (s *Tenant) Reconciler() spanconfig.Reconciler { + return s.reconciler +} + +// KVAccessorRecorder returns the underlying recorder capturing KVAccessor +// mutations made by the tenant. +func (s *Tenant) KVAccessorRecorder() *spanconfigtestutils.KVAccessorRecorder { + return s.recorder +} + // WithMutableTableDescriptor invokes the provided callback with a mutable table // descriptor, changes to which are then committed back to the system. The // callback needs to be idempotent. diff --git a/pkg/sql/tenant.go b/pkg/sql/tenant.go index a02074abb6e6..e21381e3a265 100644 --- a/pkg/sql/tenant.go +++ b/pkg/sql/tenant.go @@ -125,7 +125,7 @@ func CreateTenantRecord( } if !execCfg.Settings.Version.IsActive(ctx, clusterversion.PreSeedTenantSpanConfigs) { - return err + return nil } // Install a single key[1] span config at the start of tenant's keyspace; @@ -158,7 +158,7 @@ func CreateTenantRecord( Config: tenantSpanConfig, }, } - scKVAccessor := execCfg.SpanConfigKVAccessor.WithTxn(txn) + scKVAccessor := execCfg.SpanConfigKVAccessor.WithTxn(ctx, txn) return scKVAccessor.UpdateSpanConfigEntries( ctx, nil /* toDelete */, toUpsert, ) @@ -440,7 +440,7 @@ func GCTenantSync(ctx context.Context, execCfg *ExecutorConfig, info *descpb.Ten EndKey: tenantPrefix.PrefixEnd(), } - scKVAccessor := execCfg.SpanConfigKVAccessor.WithTxn(txn) + scKVAccessor := execCfg.SpanConfigKVAccessor.WithTxn(ctx, txn) entries, err := scKVAccessor.GetSpanConfigEntriesFor(ctx, []roachpb.Span{tenantSpan}) if err != nil { return err diff --git a/pkg/testutils/serverutils/test_tenant_shim.go b/pkg/testutils/serverutils/test_tenant_shim.go index 37dd1ddaece6..1d6ca76392f1 100644 --- a/pkg/testutils/serverutils/test_tenant_shim.go +++ b/pkg/testutils/serverutils/test_tenant_shim.go @@ -84,6 +84,10 @@ type TestTenantInterface interface { // interface{}. SpanConfigKVAccessor() interface{} + // SpanConfigReconciler returns the underlying spanconfig.Reconciler as an + // interface{}. + SpanConfigReconciler() interface{} + // SpanConfigSQLTranslator returns the underlying spanconfig.SQLTranslator as // an interface{}. SpanConfigSQLTranslator() interface{}