Skip to content

Commit

Permalink
Merge #73171 #73290
Browse files Browse the repository at this point in the history
73171: spanconfig/sqlwatcher: hide factory pattern, speed up tests, surface no-op checkpoints periodically r=irfansharif a=irfansharif

See individual commits.

---

**spanconfig/sqlwatcher: speed up tests**

Using a more aggressive closed timestamp target duration brings the runtime for
TestSQLWatcherReactsToUpdate down from 45s+ to single digits. To speed it up,
we also just re-use the same SQLWatcher instead of creating a new one for every
test case.

Other tests in the package benefit from a similar treatment. Using the default
target duration in fact masked a buggy test; in a future commit we end up
rewrite that test so it's skipped for now.

**spanconfig/sqlwatcher: hide the factory pattern**

The only mutable state across concurrent WatchForSQLUpdate calls was the
internal buffer, which does not need to hang off the surrounding struct.  This
obviates the need for the factory pattern we were using -- callers can set up
multiple SQLWatchers concurrently as is (see TestSQLWatcherMultiple).

This PR first hides the factory under the package boundary; in a later commit
it shed it altogether. This has the benefit of reducing the number of symbols
in pkg/spanconfig and making it symmetric with the other spanconfig
dependencies typically found together (KVAccessor, SQLTranslator). It's also
every-so-slightly less verbose to use in the upcoming spanconfig.Reconciler
(#71994).

**spanconfig/sqlwatcher: checkpoint, even without updates**

If `system.{descriptor,zones}` is static, we would previously discard
all checkpoint events. This is not what we want -- we'd still like to
know how caught up we are, with ongoing updates or otherwise.

MVCC GC, after all, can still happen; if our last checkpoint was when
the last update occurred, we may end up doing a lot of unnecessary work
when finding out our last checkpoint was GC-ed from underneath us. Lack
of periodic checkpoints also makes tests difficult to write -- you'd
have to induce a benign update to flush out all earlier ones. This
latent flakiness was uncovered after speeding up some existing tests in
an earlier commit.

NB: For incremental (possibly noop) checkpoints, we need to ensure that
the sqlwatcher's buffer is initialized with a low watermark. When
flushing events, we take the most conservative timestamp. If not
initialized to a high water, this might be 0 -- violating the
sqlwatcher's monotonically increasing checkpoint ts invariant.

Release note: None


73290: sql/schemachanger: remove cycle DepEdge rules, add SameStage kind r=ajwerner a=ajwerner

First few commits are mostly minor cleanup. Last commit is the meat of the change.

This commit seeks to rectify an early mistake in the architecture of the
declarative schema changer. In the original design, we knew we wanted certain
transitions to happen in the same stage. In order to deal with that, we created
rules that allowed for special types of cycles in dependencies to exist. This
was a mistake. Instead, we replace this by a `Kind` property of `DepEdge`s
which indicates whether the target pointed to merely needs to `HappenBefore`
or whether it also needs to happen in the `SameStage`. This allows us to
express exactly what we meant.

This change also uncovered some broken cycles which never were intended to
exist. The resultant plans generally look better.

Release note: None


Co-authored-by: irfan sharif <irfanmahmoudsharif@gmail.com>
Co-authored-by: Andrew Werner <awerner32@gmail.com>
  • Loading branch information
3 people committed Nov 30, 2021
3 parents 80a2beb + d00baeb + 05080aa commit 3d2e539
Show file tree
Hide file tree
Showing 33 changed files with 1,486 additions and 1,055 deletions.
54 changes: 27 additions & 27 deletions pkg/ccl/changefeedccl/helpers_tenant_shim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,33 +51,33 @@ func (t *testServerShim) ServingSQLAddr() string {
return t.SQLAddr()
}

func (t *testServerShim) Stopper() *stop.Stopper { panic(unsupportedShimMethod) }
func (t *testServerShim) Start(context.Context) error { panic(unsupportedShimMethod) }
func (t *testServerShim) Node() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) NodeID() roachpb.NodeID { panic(unsupportedShimMethod) }
func (t *testServerShim) ClusterID() uuid.UUID { panic(unsupportedShimMethod) }
func (t *testServerShim) ServingRPCAddr() string { panic(unsupportedShimMethod) }
func (t *testServerShim) RPCAddr() string { panic(unsupportedShimMethod) }
func (t *testServerShim) DB() *kv.DB { panic(unsupportedShimMethod) }
func (t *testServerShim) RPCContext() *rpc.Context { panic(unsupportedShimMethod) }
func (t *testServerShim) LeaseManager() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) InternalExecutor() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) ExecutorConfig() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) TracerI() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) GossipI() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) RangeFeedFactory() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) Clock() *hlc.Clock { panic(unsupportedShimMethod) }
func (t *testServerShim) DistSenderI() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) MigrationServer() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SpanConfigAccessor() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SpanConfigSQLTranslator() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SpanConfigSQLWatcherFactory() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SQLServer() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SQLLivenessProvider() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) StartupMigrationsManager() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) NodeLiveness() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) HeartbeatNodeLiveness() error { panic(unsupportedShimMethod) }
func (t *testServerShim) NodeDialer() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) Stopper() *stop.Stopper { panic(unsupportedShimMethod) }
func (t *testServerShim) Start(context.Context) error { panic(unsupportedShimMethod) }
func (t *testServerShim) Node() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) NodeID() roachpb.NodeID { panic(unsupportedShimMethod) }
func (t *testServerShim) ClusterID() uuid.UUID { panic(unsupportedShimMethod) }
func (t *testServerShim) ServingRPCAddr() string { panic(unsupportedShimMethod) }
func (t *testServerShim) RPCAddr() string { panic(unsupportedShimMethod) }
func (t *testServerShim) DB() *kv.DB { panic(unsupportedShimMethod) }
func (t *testServerShim) RPCContext() *rpc.Context { panic(unsupportedShimMethod) }
func (t *testServerShim) LeaseManager() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) InternalExecutor() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) ExecutorConfig() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) TracerI() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) GossipI() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) RangeFeedFactory() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) Clock() *hlc.Clock { panic(unsupportedShimMethod) }
func (t *testServerShim) DistSenderI() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) MigrationServer() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SpanConfigAccessor() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SpanConfigSQLTranslator() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SpanConfigSQLWatcher() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SQLServer() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SQLLivenessProvider() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) StartupMigrationsManager() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) NodeLiveness() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) HeartbeatNodeLiveness() error { panic(unsupportedShimMethod) }
func (t *testServerShim) NodeDialer() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SetDistSQLSpanResolver(spanResolver interface{}) {
panic(unsupportedShimMethod)
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,12 +857,14 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
// only do it if COCKROACH_EXPERIMENTAL_SPAN_CONFIGS is set.
spanConfigKnobs, _ := cfg.TestingKnobs.SpanConfig.(*spanconfig.TestingKnobs)
sqlTranslator := spanconfigsqltranslator.New(execCfg, codec)
sqlWatcherFactory := spanconfigsqlwatcher.NewFactory(
sqlWatcher := spanconfigsqlwatcher.New(
codec,
cfg.Settings,
cfg.rangeFeedFactory,
1<<20, /* 1 MB bufferMemLimit */
cfg.stopper,
// TODO(irfansharif): What should this no-op cadence be?
30*time.Second, /* checkpointNoopsEvery */
spanConfigKnobs,
)
spanConfigMgr = spanconfigmanager.New(
Expand All @@ -872,7 +874,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
cfg.stopper,
cfg.Settings,
cfg.spanConfigAccessor,
sqlWatcherFactory,
sqlWatcher,
sqlTranslator,
spanConfigKnobs,
)
Expand Down
6 changes: 3 additions & 3 deletions pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,14 +999,14 @@ func (ts *TestServer) SpanConfigSQLTranslator() interface{} {
return ts.sqlServer.spanconfigMgr.SQLTranslator
}

// SpanConfigSQLWatcherFactory is part of TestServerInterface.
func (ts *TestServer) SpanConfigSQLWatcherFactory() interface{} {
// SpanConfigSQLWatcher is part of TestServerInterface.
func (ts *TestServer) SpanConfigSQLWatcher() interface{} {
if ts.sqlServer.spanconfigMgr == nil {
panic(
"span config manager uninitialized; see EnableSpanConfigs testing knob to use span configs",
)
}
return ts.sqlServer.spanconfigMgr.SQLWatcherFactory
return ts.sqlServer.spanconfigMgr.SQLWatcher
}

// SQLServer is part of TestServerInterface.
Expand Down
45 changes: 13 additions & 32 deletions pkg/spanconfig/spanconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,32 +120,21 @@ func FullTranslate(

// SQLWatcher watches for events on system.zones and system.descriptors.
type SQLWatcher interface {
// WatchForSQLUpdates watches for updates to zones and descriptors starting at
// the given timestamp (exclusive), informing callers using the handler
// callback.
// WatchForSQLUpdates watches for updates to zones and descriptors starting
// at the given timestamp (exclusive), informing callers periodically using
// the given handler[1] and a checkpoint timestamp. The handler is invoked:
// - serially, in the same thread where WatchForSQLUpdates was called;
// - with a monotonically increasing timestamp;
// - with updates from the last provided timestamp (exclusive) to the
// current one (inclusive).
//
// The handler callback[1] is invoked from time to time with a list of updates
// and a checkpointTS. Invocations of the handler callback provide the
// following semantics:
// 1. Calls to the handler are serial.
// 2. The timestamp supplied to the handler is monotonically increasing.
// 3. The list of DescriptorUpdates supplied to handler includes all events
// in the window (prevInvocationCheckpointTS, checkpointTS].
// 4. No further calls to the handler are made if a call to the handler
// returns an error.
// If the handler errors out, it's not invoked subsequently (and internal
// processes are wound down accordingly). Callers are free to persist the
// checkpoint timestamps and use it to re-establish the watcher without
// missing any updates.
//
// These guarantees mean that users of this interface are free to persist the
// checkpointTS and later use it to re-establish a SQLWatcher without missing
// any updates.
// [1]: Users should avoid doing expensive work in the handler.
//
// WatchForSQLUpdates can only ever be called once, effectively making the
// SQLWatcher a single use interface.
//
// WatchForSQLUpdates may run out of memory and return an error if it is
// tracking too many events between two checkpoints.
//
// [1] Users of this interface should not intend to do expensive work in the
// handler callback.
// TODO(arul): Possibly get rid of this limitation.
WatchForSQLUpdates(
ctx context.Context,
Expand All @@ -165,22 +154,14 @@ type DescriptorUpdate struct {
DescriptorType catalog.DescriptorType
}

// SQLWatcherFactory is used to construct new SQLWatchers.
type SQLWatcherFactory interface {
// New returns a new SQLWatcher.
New() SQLWatcher
}

// ReconciliationDependencies captures what's needed by the span config
// reconciliation job to perform its task. The job is responsible for
// reconciling a tenant's zone configurations with the clusters span
// configurations.
type ReconciliationDependencies interface {
KVAccessor

SQLTranslator

SQLWatcherFactory
SQLWatcher
}

// Store is a data structure used to store spans and their corresponding
Expand Down
22 changes: 11 additions & 11 deletions pkg/spanconfig/spanconfigmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type Manager struct {
knobs *spanconfig.TestingKnobs

spanconfig.KVAccessor
spanconfig.SQLWatcherFactory
spanconfig.SQLWatcher
spanconfig.SQLTranslator
}

Expand All @@ -74,23 +74,23 @@ func New(
stopper *stop.Stopper,
settings *cluster.Settings,
kvAccessor spanconfig.KVAccessor,
sqlWatcherFactory spanconfig.SQLWatcherFactory,
sqlWatcher spanconfig.SQLWatcher,
sqlTranslator spanconfig.SQLTranslator,
knobs *spanconfig.TestingKnobs,
) *Manager {
if knobs == nil {
knobs = &spanconfig.TestingKnobs{}
}
return &Manager{
db: db,
jr: jr,
ie: ie,
stopper: stopper,
settings: settings,
KVAccessor: kvAccessor,
SQLWatcherFactory: sqlWatcherFactory,
SQLTranslator: sqlTranslator,
knobs: knobs,
db: db,
jr: jr,
ie: ie,
stopper: stopper,
settings: settings,
KVAccessor: kvAccessor,
SQLWatcher: sqlWatcher,
SQLTranslator: sqlTranslator,
knobs: knobs,
}
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/spanconfig/spanconfigmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestManagerConcurrentJobCreation(t *testing.T) {
ts.Stopper(),
ts.ClusterSettings(),
ts.SpanConfigAccessor().(spanconfig.KVAccessor),
ts.SpanConfigSQLWatcherFactory().(spanconfig.SQLWatcherFactory),
ts.SpanConfigSQLWatcher().(spanconfig.SQLWatcher),
ts.SpanConfigSQLTranslator().(spanconfig.SQLTranslator),
&spanconfig.TestingKnobs{
ManagerCreatedJobInterceptor: func(jobI interface{}) {
Expand Down Expand Up @@ -163,7 +163,7 @@ func TestManagerStartsJobIfFailed(t *testing.T) {
ts.Stopper(),
ts.ClusterSettings(),
ts.SpanConfigAccessor().(spanconfig.KVAccessor),
ts.SpanConfigSQLWatcherFactory().(spanconfig.SQLWatcherFactory),
ts.SpanConfigSQLWatcher().(spanconfig.SQLWatcher),
ts.SpanConfigSQLTranslator().(spanconfig.SQLTranslator),
&spanconfig.TestingKnobs{
ManagerAfterCheckedReconciliationJobExistsInterceptor: func(exists bool) {
Expand Down Expand Up @@ -239,7 +239,7 @@ func TestManagerCheckJobConditions(t *testing.T) {
ts.Stopper(),
ts.ClusterSettings(),
ts.SpanConfigAccessor().(spanconfig.KVAccessor),
ts.SpanConfigSQLWatcherFactory().(spanconfig.SQLWatcherFactory),
ts.SpanConfigSQLWatcher().(spanconfig.SQLWatcher),
ts.SpanConfigSQLTranslator().(spanconfig.SQLTranslator),
&spanconfig.TestingKnobs{
ManagerDisableJobCreation: true,
Expand Down
2 changes: 2 additions & 0 deletions pkg/spanconfig/spanconfigsqlwatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ go_library(
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/log/logcrash",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
],
)
Expand Down
8 changes: 6 additions & 2 deletions pkg/spanconfig/spanconfigsqlwatcher/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,15 @@ const (
numRangefeeds int = iota
)

// newBuffer constructs and returns a new buffer.
func newBuffer(limit int) *buffer {
// newBuffer constructs a new buffer initialized with a starting frontier
// timestamp.
func newBuffer(limit int, initialFrontierTS hlc.Timestamp) *buffer {
rangefeedBuffer := rangefeedbuffer.New(limit)
eventBuffer := &buffer{}
eventBuffer.mu.buffer = rangefeedBuffer
for i := range eventBuffer.mu.rangefeedFrontiers {
eventBuffer.mu.rangefeedFrontiers[i].Forward(initialFrontierTS)
}
return eventBuffer
}

Expand Down
11 changes: 6 additions & 5 deletions pkg/spanconfig/spanconfigsqlwatcher/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ func TestBuffer(t *testing.T) {
}

ctx := context.Background()
buffer := newBuffer(10 /* limit */)
buffer := newBuffer(10 /* limit */, ts(1))

// Sanity check the newly initialized event buffer.
updates, combinedFrontierTS, err := buffer.flush(ctx)
require.NoError(t, err)
require.Equal(t, ts(0), combinedFrontierTS)
require.Equal(t, ts(1), combinedFrontierTS)
require.True(t, len(updates) == 0)

// Add a few events without advancing any of the frontiers. We don't expect
Expand All @@ -69,15 +69,15 @@ func TestBuffer(t *testing.T) {
require.NoError(t, err)
updates, combinedFrontierTS, err = buffer.flush(ctx)
require.NoError(t, err)
require.Equal(t, ts(0), combinedFrontierTS)
require.Equal(t, ts(1), combinedFrontierTS)
require.True(t, len(updates) == 0)

// Advance the zones frontier. We expect flush to still not return any results
// as the descriptors frontier hasn't been advanced yet.
buffer.advance(zonesRangefeed, ts(11))
updates, combinedFrontierTS, err = buffer.flush(ctx)
require.NoError(t, err)
require.Equal(t, ts(0), combinedFrontierTS)
require.Equal(t, ts(1), combinedFrontierTS)
require.True(t, len(updates) == 0)

// Advance the descriptors frontier to a lower timestamp than the zones
Expand Down Expand Up @@ -150,14 +150,15 @@ func TestBuffer(t *testing.T) {
func TestBufferCombinesDescriptorTypes(t *testing.T) {
defer leaktest.AfterTest(t)()

buffer := newBuffer(10 /* limit */)
ctx := context.Background()

ts := func(nanos int) hlc.Timestamp {
return hlc.Timestamp{
WallTime: int64(nanos),
}
}
buffer := newBuffer(10 /* limit */, ts(0))

makeEvent := func(descID int, descType catalog.DescriptorType, timestamp hlc.Timestamp) event {
return event{
update: spanconfig.DescriptorUpdate{
Expand Down
Loading

0 comments on commit 3d2e539

Please sign in to comment.