Skip to content

Commit

Permalink
spanconfig: introduce spanconfig.Reconciler
Browse files Browse the repository at this point in the history
Reconciler is responsible for reconciling a tenant's zone configs (SQL
construct) with the cluster's span configs (KV construct). It's the
central engine for the span configs infrastructure; a single Reconciler
instance is active for every tenant in the system.

    type Reconciler interface {
      // Reconcile starts the incremental reconciliation process from
      // the given checkpoint. If it does not find MVCC history going
      // far back enough[1], it falls back to a scan of all
      // descriptors and zone configs before being able to do more
      // incremental work. The provided callback is invoked with
      // timestamps that can be safely checkpointed. A future
      // Reconciliation attempt can make use of this timestamp to
      // reduce the amount of necessary work (provided the MVCC
      // history is still available).
      //
      // [1]: It's possible for system.{zones,descriptor} to have been
      //      GC-ed away; think suspended tenants.
      Reconcile(
        ctx context.Context,
        checkpoint hlc.Timestamp,
        callback func(checkpoint hlc.Timestamp) error,
      ) error
    }

Let's walk through what it does. At a high-level, we maintain an
in-memory data structure that's up-to-date with the contents of the KV
(at least the subset of spans we have access to, i.e. the keyspace
carved out for our tenant ID). We watch for changes to SQL state
(descriptors, zone configs), translate the SQL updates to the flattened
span+config form, "diff" the updates against our data structure to see
if there are any changes we need to inform KV of. If so, we do, and
ensure that our data structure is kept up-to-date. We continue watching
for future updates and repeat as necessary.

There's only single instance of the Reconciler running for a given
tenant at a given point it time (mutual exclusion/leasing is provided by
the jobs subsystem). We needn't worry about contending writers, or the
KV state being changed from underneath us. What we do have to worry
about, however, is suspended tenants' not being reconciling while
suspended. It's possible for a suspended tenant's SQL state to be GC-ed
away at older MVCC timestamps; when watching for changes, we could fail
to observe tables/indexes/partitions getting deleted. Left as is, this
would result in us never issuing a corresponding deletion requests for
the dropped span configs -- we'd be leaving orphaned span configs lying
around (taking up storage space and creating pointless empty ranges). A
"full reconciliation pass" is our attempt to find all these extraneous
entries in KV and to delete them.

We can use our span config data structure here too, one that's
pre-populated with the contents of KV. We translate the entire SQL state
into constituent spans and configs, diff against our data structure to
generate KV updates that we then apply. We follow this with clearing out
all these spans in our data structure, leaving behind all extraneous
entries to be found in KV -- entries we can then simply issue deletes
for.

Release note: None
  • Loading branch information
irfansharif committed Dec 14, 2021
1 parent a9bdfac commit 7d097c3
Show file tree
Hide file tree
Showing 33 changed files with 1,930 additions and 118 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen
trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 21.2-28 set the active cluster version in the format '<major>.<minor>'
version version 21.2-32 set the active cluster version in the format '<major>.<minor>'
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ ALL_TESTS = [
"//pkg/ccl/serverccl/diagnosticsccl:diagnosticsccl_test",
"//pkg/ccl/serverccl/statusccl:statusccl_test",
"//pkg/ccl/serverccl:serverccl_test",
"//pkg/ccl/spanconfigccl/spanconfigreconcilerccl:spanconfigreconcilerccl_test",
"//pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl:spanconfigsqltranslatorccl_test",
"//pkg/ccl/sqlproxyccl/denylist:denylist_test",
"//pkg/ccl/sqlproxyccl/idle:idle_test",
Expand Down Expand Up @@ -184,6 +185,7 @@ ALL_TESTS = [
"//pkg/spanconfig/spanconfigkvaccessor:spanconfigkvaccessor_test",
"//pkg/spanconfig/spanconfigkvsubscriber:spanconfigkvsubscriber_test",
"//pkg/spanconfig/spanconfigmanager:spanconfigmanager_test",
"//pkg/spanconfig/spanconfigreconciler:spanconfigreconciler_test",
"//pkg/spanconfig/spanconfigsqltranslator:spanconfigsqltranslator_test",
"//pkg/spanconfig/spanconfigsqlwatcher:spanconfigsqlwatcher_test",
"//pkg/spanconfig/spanconfigstore:spanconfigstore_test",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/kvccl/kvtenantccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ func (c *Connector) UpdateSpanConfigEntries(
}

// WithTxn implements the spanconfig.KVAccessor interface.
func (c *Connector) WithTxn(*kv.Txn) spanconfig.KVAccessor {
func (c *Connector) WithTxn(context.Context, *kv.Txn) spanconfig.KVAccessor {
panic("not applicable")
}

Expand Down
35 changes: 35 additions & 0 deletions pkg/ccl/spanconfigccl/spanconfigreconcilerccl/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "spanconfigreconcilerccl_test",
srcs = [
"datadriven_test.go",
"main_test.go",
],
data = glob(["testdata/**"]),
deps = [
"//pkg/base",
"//pkg/ccl/kvccl/kvtenantccl",
"//pkg/ccl/partitionccl",
"//pkg/ccl/utilccl",
"//pkg/jobs",
"//pkg/keys",
"//pkg/roachpb:with-mocks",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigtestutils",
"//pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/randutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
238 changes: 238 additions & 0 deletions pkg/ccl/spanconfigccl/spanconfigreconcilerccl/datadriven_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
// Copyright 2021 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package spanconfigreconcilerccl

import (
"context"
"fmt"
"sort"
"strings"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
_ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigtestutils"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

// TestDataDriven is a data-driven test for spanconfig.Reconciler. It lets test
// authors spin up secondary tenants, create arbitrary schema objects with
// arbitrary zone configs, and verify that the global span config state is as
// we'd expect. Only fields that differ from the static RANGE DEFAULT are
// printed in the test output for readability. The following syntax is provided:
//
// - "initialize" tenant=<int>
// Initialize a secondary tenant with the given ID.
//
// - "exec-sql" [tenant=<int>]
// Executes the input SQL query for the given tenant. All statements are
// executed in a single transaction.
//
// - "query-sql" [tenant=<int>]
// Executes the input SQL query for the given tenant and print the results.
//
// - "reconcile" [tenant=<int>]
// Start the reconciliation process for the given tenant.
//
// - "mutations" [tenant=<int>] [discard]
// Print the latest set of mutations issued by the reconciler for the given
// tenant. If 'discard' is specified, nothing is printed.
//
// - "state" [offset=<int>] [limit=<int]
// Print out the contents of KVAccessor directly, skipping 'offset' entries,
// returning up to the specified limit if any.
//
// TODO(irfansharif): Provide a way to stop reconcilers and/or start them back
// up again. It would let us add simulate for suspended tenants, and behavior of
// the reconciler with existing kvaccessor state (populated by an earlier
// incarnation). When tearing existing reconcilers down, we'd need to
// synchronize with "last-exec" timestamp safely seeing as how no further
// checkpoints are expected.
//
// TODO(irfansharif): Test tenant teardown/GC -- all tenant-scoped span configs
// must be cleared out.
func TestDataDriven(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
datadriven.Walk(t, testutils.TestDataPath(t), func(t *testing.T, path string) {
scKnobs := &spanconfig.TestingKnobs{
// Instead of relying on the GC job to wait out TTLs and clear out
// descriptors, let's simply exclude dropped tables to simulate
// descriptors no longer existing. See comment on
// ExcludeDroppedDescriptorsFromLookup for more details.
ExcludeDroppedDescriptorsFromLookup: true,
// We run the reconciler manually in this test.
ManagerDisableJobCreation: true,
// Checkpoint noops frequently; speeds this test up.
SQLWatcherCheckpointNoopsEveryDurationOverride: 100 * time.Millisecond,
}
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
EnableSpanConfigs: true,
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), // speeds up test
SpanConfig: scKnobs,
},
},
})
defer tc.Stopper().Stop(ctx)

{
tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0))
tdb.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`)
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`)
}

spanConfigTestCluster := spanconfigtestcluster.NewHandle(t, tc)
defer spanConfigTestCluster.Cleanup()

systemTenant := spanConfigTestCluster.InitializeTenant(ctx, roachpb.SystemTenantID)
kvAccessor := systemTenant.SpanConfigKVAccessor().(spanconfig.KVAccessor)
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
tenantID := roachpb.SystemTenantID
if d.HasArg("tenant") {
if d.Cmd == "state" {
d.Fatalf(t, "unexpected argument 'tenant' for command %q", d.Cmd)
}

var id uint64
d.ScanArgs(t, "tenant", &id)
tenantID = roachpb.MakeTenantID(id)
}

tenant, found := spanConfigTestCluster.LookupTenant(tenantID)
if d.Cmd != "initialize" {
require.Truef(t, found, "tenant %s not found (was it initialized?)", tenantID)
}

switch d.Cmd {
case "initialize":
secondaryTenant := spanConfigTestCluster.InitializeTenant(ctx, tenantID)
secondaryTenant.Exec(`SET CLUSTER SETTING sql.zone_configs.experimental_allow_for_secondary_tenant.enabled = true`)

case "exec-sql":
// Run under an explicit transaction -- we rely on having a
// single timestamp for the statements (see
// tenant.TimestampAfterLastExec) for ordering guarantees.
tenant.Exec(fmt.Sprintf("BEGIN; %s; COMMIT;", d.Input))

case "query-sql":
rows := tenant.Query(d.Input)
output, err := sqlutils.RowsToDataDrivenOutput(rows)
require.NoError(t, err)
return output

case "reconcile":
tsBeforeReconcilerStart := tenant.Clock().Now()
go func() {
err := tenant.Reconciler().Reconcile(ctx, hlc.Timestamp{}, func(checkpoint hlc.Timestamp) error {
tenant.Checkpoint(checkpoint)
return nil
})
require.NoError(t, err)
}()

testutils.SucceedsSoon(t, func() error {
if tenant.LastCheckpoint().Less(tsBeforeReconcilerStart) {
return errors.New("expected reconciler to have started")
}
return nil
})

case "mutations":
testutils.SucceedsSoon(t, func() error {
lastCheckpoint, lastExec := tenant.LastCheckpoint(), tenant.TimestampAfterLastExec()
if lastCheckpoint.Less(lastExec) {
return errors.Newf("last checkpoint timestamp (%s) lagging last sql execution (%s)",
lastCheckpoint.GoTime(), lastExec.GoTime())
}
return nil
})

output := tenant.KVAccessorRecorder().Recording(true /* clear */)
if d.HasArg("discard") {
return ""
}
return output

case "state":
testutils.SucceedsSoon(t, func() error {
// To observe up-to-date KVAccess state, we wait for all
// tenant checkpoints to cross their last execution
// timestamp.
for _, tenant := range spanConfigTestCluster.Tenants() {
lastCheckpoint, lastExec := tenant.LastCheckpoint(), tenant.TimestampAfterLastExec()
if lastCheckpoint.IsEmpty() {
continue // reconciler wasn't started
}
if lastCheckpoint.Less(lastExec) {
return errors.Newf("last checkpoint timestamp (%s) lagging last sql execution (%s)",
lastCheckpoint.GoTime(), lastExec.GoTime())
}
}
return nil
})
entries, err := kvAccessor.GetSpanConfigEntriesFor(ctx, []roachpb.Span{keys.EverythingSpan})
require.NoError(t, err)
sort.Slice(entries, func(i, j int) bool {
return entries[i].Span.Key.Compare(entries[j].Span.Key) < 0
})

var offset, limit int
if d.HasArg("offset") {
d.ScanArgs(t, "offset", &offset)
require.True(t, offset >= 0)
require.Truef(t, offset <= len(entries),
"offset (%d) larger than number of entries (%d)", offset, len(entries))
}
if d.HasArg("limit") {
d.ScanArgs(t, "limit", &limit)
require.True(t, limit >= 0)
} else {
limit = len(entries)
}
var output strings.Builder
if offset > 0 && len(entries) > 0 {
output.WriteString("...\n") // print leading elipses
}

entries = entries[offset:]
for i, entry := range entries {
if i == limit {
output.WriteString("...\n") // print trailing elipses
break
}

output.WriteString(fmt.Sprintf("%-42s %s\n", entry.Span,
spanconfigtestutils.PrintSpanConfigDiffedAgainstDefaults(entry.Config)))
}
return output.String()

default:
t.Fatalf("unknown command: %s", d.Cmd)
}
return ""
})
})
}
33 changes: 33 additions & 0 deletions pkg/ccl/spanconfigccl/spanconfigreconcilerccl/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2021 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package spanconfigreconcilerccl

import (
"os"
"testing"

"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
)

//go:generate ../../../util/leaktest/add-leaktest.sh *_test.go

func TestMain(m *testing.M) {
defer utilccl.TestingEnableEnterprise()()
security.SetAssetLoader(securitytest.EmbeddedAssets)
randutil.SeedForTests()
serverutils.InitTestServerFactory(server.TestServerFactory)
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
os.Exit(m.Run())
}
Loading

0 comments on commit 7d097c3

Please sign in to comment.