Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spanconfig/sqlwatcher: hide factory pattern, speed up tests, surface no-op checkpoints periodically #73171

Merged
merged 4 commits into from
Nov 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 27 additions & 27 deletions pkg/ccl/changefeedccl/helpers_tenant_shim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,33 +51,33 @@ func (t *testServerShim) ServingSQLAddr() string {
return t.SQLAddr()
}

func (t *testServerShim) Stopper() *stop.Stopper { panic(unsupportedShimMethod) }
func (t *testServerShim) Start(context.Context) error { panic(unsupportedShimMethod) }
func (t *testServerShim) Node() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) NodeID() roachpb.NodeID { panic(unsupportedShimMethod) }
func (t *testServerShim) ClusterID() uuid.UUID { panic(unsupportedShimMethod) }
func (t *testServerShim) ServingRPCAddr() string { panic(unsupportedShimMethod) }
func (t *testServerShim) RPCAddr() string { panic(unsupportedShimMethod) }
func (t *testServerShim) DB() *kv.DB { panic(unsupportedShimMethod) }
func (t *testServerShim) RPCContext() *rpc.Context { panic(unsupportedShimMethod) }
func (t *testServerShim) LeaseManager() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) InternalExecutor() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) ExecutorConfig() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) TracerI() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) GossipI() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) RangeFeedFactory() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) Clock() *hlc.Clock { panic(unsupportedShimMethod) }
func (t *testServerShim) DistSenderI() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) MigrationServer() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SpanConfigAccessor() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SpanConfigSQLTranslator() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SpanConfigSQLWatcherFactory() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SQLServer() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SQLLivenessProvider() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) StartupMigrationsManager() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) NodeLiveness() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) HeartbeatNodeLiveness() error { panic(unsupportedShimMethod) }
func (t *testServerShim) NodeDialer() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) Stopper() *stop.Stopper { panic(unsupportedShimMethod) }
func (t *testServerShim) Start(context.Context) error { panic(unsupportedShimMethod) }
func (t *testServerShim) Node() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) NodeID() roachpb.NodeID { panic(unsupportedShimMethod) }
func (t *testServerShim) ClusterID() uuid.UUID { panic(unsupportedShimMethod) }
func (t *testServerShim) ServingRPCAddr() string { panic(unsupportedShimMethod) }
func (t *testServerShim) RPCAddr() string { panic(unsupportedShimMethod) }
func (t *testServerShim) DB() *kv.DB { panic(unsupportedShimMethod) }
func (t *testServerShim) RPCContext() *rpc.Context { panic(unsupportedShimMethod) }
func (t *testServerShim) LeaseManager() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) InternalExecutor() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) ExecutorConfig() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) TracerI() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) GossipI() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) RangeFeedFactory() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) Clock() *hlc.Clock { panic(unsupportedShimMethod) }
func (t *testServerShim) DistSenderI() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) MigrationServer() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SpanConfigAccessor() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SpanConfigSQLTranslator() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SpanConfigSQLWatcher() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SQLServer() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SQLLivenessProvider() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) StartupMigrationsManager() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) NodeLiveness() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) HeartbeatNodeLiveness() error { panic(unsupportedShimMethod) }
func (t *testServerShim) NodeDialer() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SetDistSQLSpanResolver(spanResolver interface{}) {
panic(unsupportedShimMethod)
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,12 +855,14 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
// only do it if COCKROACH_EXPERIMENTAL_SPAN_CONFIGS is set.
spanConfigKnobs, _ := cfg.TestingKnobs.SpanConfig.(*spanconfig.TestingKnobs)
sqlTranslator := spanconfigsqltranslator.New(execCfg, codec)
sqlWatcherFactory := spanconfigsqlwatcher.NewFactory(
sqlWatcher := spanconfigsqlwatcher.New(
codec,
cfg.Settings,
cfg.rangeFeedFactory,
1<<20, /* 1 MB bufferMemLimit */
cfg.stopper,
// TODO(irfansharif): What should this no-op cadence be?
30*time.Second, /* checkpointNoopsEvery */
spanConfigKnobs,
)
spanConfigMgr = spanconfigmanager.New(
Expand All @@ -870,7 +872,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
cfg.stopper,
cfg.Settings,
cfg.spanConfigAccessor,
sqlWatcherFactory,
sqlWatcher,
sqlTranslator,
spanConfigKnobs,
)
Expand Down
6 changes: 3 additions & 3 deletions pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,14 +978,14 @@ func (ts *TestServer) SpanConfigSQLTranslator() interface{} {
return ts.sqlServer.spanconfigMgr.SQLTranslator
}

// SpanConfigSQLWatcherFactory is part of TestServerInterface.
func (ts *TestServer) SpanConfigSQLWatcherFactory() interface{} {
// SpanConfigSQLWatcher is part of TestServerInterface.
func (ts *TestServer) SpanConfigSQLWatcher() interface{} {
if ts.sqlServer.spanconfigMgr == nil {
panic(
"span config manager uninitialized; see EnableSpanConfigs testing knob to use span configs",
)
}
return ts.sqlServer.spanconfigMgr.SQLWatcherFactory
return ts.sqlServer.spanconfigMgr.SQLWatcher
}

// SQLServer is part of TestServerInterface.
Expand Down
45 changes: 13 additions & 32 deletions pkg/spanconfig/spanconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,32 +120,21 @@ func FullTranslate(

// SQLWatcher watches for events on system.zones and system.descriptors.
type SQLWatcher interface {
// WatchForSQLUpdates watches for updates to zones and descriptors starting at
// the given timestamp (exclusive), informing callers using the handler
// callback.
// WatchForSQLUpdates watches for updates to zones and descriptors starting
// at the given timestamp (exclusive), informing callers periodically using
// the given handler[1] and a checkpoint timestamp. The handler is invoked:
// - serially, in the same thread where WatchForSQLUpdates was called;
// - with a monotonically increasing timestamp;
// - with updates from the last provided timestamp (exclusive) to the
// current one (inclusive).
//
// The handler callback[1] is invoked from time to time with a list of updates
// and a checkpointTS. Invocations of the handler callback provide the
// following semantics:
// 1. Calls to the handler are serial.
// 2. The timestamp supplied to the handler is monotonically increasing.
// 3. The list of DescriptorUpdates supplied to handler includes all events
// in the window (prevInvocationCheckpointTS, checkpointTS].
// 4. No further calls to the handler are made if a call to the handler
// returns an error.
// If the handler errors out, it's not invoked subsequently (and internal
// processes are wound down accordingly). Callers are free to persist the
// checkpoint timestamps and use it to re-establish the watcher without
// missing any updates.
//
// These guarantees mean that users of this interface are free to persist the
// checkpointTS and later use it to re-establish a SQLWatcher without missing
// any updates.
// [1]: Users should avoid doing expensive work in the handler.
//
// WatchForSQLUpdates can only ever be called once, effectively making the
// SQLWatcher a single use interface.
//
// WatchForSQLUpdates may run out of memory and return an error if it is
// tracking too many events between two checkpoints.
//
// [1] Users of this interface should not intend to do expensive work in the
// handler callback.
// TODO(arul): Possibly get rid of this limitation.
WatchForSQLUpdates(
ctx context.Context,
Expand All @@ -165,22 +154,14 @@ type DescriptorUpdate struct {
DescriptorType catalog.DescriptorType
}

// SQLWatcherFactory is used to construct new SQLWatchers.
type SQLWatcherFactory interface {
// New returns a new SQLWatcher.
New() SQLWatcher
}

// ReconciliationDependencies captures what's needed by the span config
// reconciliation job to perform its task. The job is responsible for
// reconciling a tenant's zone configurations with the clusters span
// configurations.
type ReconciliationDependencies interface {
KVAccessor

SQLTranslator

SQLWatcherFactory
SQLWatcher
}

// Store is a data structure used to store spans and their corresponding
Expand Down
22 changes: 11 additions & 11 deletions pkg/spanconfig/spanconfigmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type Manager struct {
knobs *spanconfig.TestingKnobs

spanconfig.KVAccessor
spanconfig.SQLWatcherFactory
spanconfig.SQLWatcher
spanconfig.SQLTranslator
}

Expand All @@ -74,23 +74,23 @@ func New(
stopper *stop.Stopper,
settings *cluster.Settings,
kvAccessor spanconfig.KVAccessor,
sqlWatcherFactory spanconfig.SQLWatcherFactory,
sqlWatcher spanconfig.SQLWatcher,
sqlTranslator spanconfig.SQLTranslator,
knobs *spanconfig.TestingKnobs,
) *Manager {
if knobs == nil {
knobs = &spanconfig.TestingKnobs{}
}
return &Manager{
db: db,
jr: jr,
ie: ie,
stopper: stopper,
settings: settings,
KVAccessor: kvAccessor,
SQLWatcherFactory: sqlWatcherFactory,
SQLTranslator: sqlTranslator,
knobs: knobs,
db: db,
jr: jr,
ie: ie,
stopper: stopper,
settings: settings,
KVAccessor: kvAccessor,
SQLWatcher: sqlWatcher,
SQLTranslator: sqlTranslator,
knobs: knobs,
}
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/spanconfig/spanconfigmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestManagerConcurrentJobCreation(t *testing.T) {
ts.Stopper(),
ts.ClusterSettings(),
ts.SpanConfigAccessor().(spanconfig.KVAccessor),
ts.SpanConfigSQLWatcherFactory().(spanconfig.SQLWatcherFactory),
ts.SpanConfigSQLWatcher().(spanconfig.SQLWatcher),
ts.SpanConfigSQLTranslator().(spanconfig.SQLTranslator),
&spanconfig.TestingKnobs{
ManagerCreatedJobInterceptor: func(jobI interface{}) {
Expand Down Expand Up @@ -163,7 +163,7 @@ func TestManagerStartsJobIfFailed(t *testing.T) {
ts.Stopper(),
ts.ClusterSettings(),
ts.SpanConfigAccessor().(spanconfig.KVAccessor),
ts.SpanConfigSQLWatcherFactory().(spanconfig.SQLWatcherFactory),
ts.SpanConfigSQLWatcher().(spanconfig.SQLWatcher),
ts.SpanConfigSQLTranslator().(spanconfig.SQLTranslator),
&spanconfig.TestingKnobs{
ManagerAfterCheckedReconciliationJobExistsInterceptor: func(exists bool) {
Expand Down Expand Up @@ -239,7 +239,7 @@ func TestManagerCheckJobConditions(t *testing.T) {
ts.Stopper(),
ts.ClusterSettings(),
ts.SpanConfigAccessor().(spanconfig.KVAccessor),
ts.SpanConfigSQLWatcherFactory().(spanconfig.SQLWatcherFactory),
ts.SpanConfigSQLWatcher().(spanconfig.SQLWatcher),
ts.SpanConfigSQLTranslator().(spanconfig.SQLTranslator),
&spanconfig.TestingKnobs{
ManagerDisableJobCreation: true,
Expand Down
2 changes: 2 additions & 0 deletions pkg/spanconfig/spanconfigsqlwatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ go_library(
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/log/logcrash",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
],
)
Expand Down
8 changes: 6 additions & 2 deletions pkg/spanconfig/spanconfigsqlwatcher/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,15 @@ const (
numRangefeeds int = iota
)

// newBuffer constructs and returns a new buffer.
func newBuffer(limit int) *buffer {
// newBuffer constructs a new buffer initialized with a starting frontier
// timestamp.
func newBuffer(limit int, initialFrontierTS hlc.Timestamp) *buffer {
rangefeedBuffer := rangefeedbuffer.New(limit)
eventBuffer := &buffer{}
eventBuffer.mu.buffer = rangefeedBuffer
for i := range eventBuffer.mu.rangefeedFrontiers {
eventBuffer.mu.rangefeedFrontiers[i].Forward(initialFrontierTS)
}
return eventBuffer
}

Expand Down
11 changes: 6 additions & 5 deletions pkg/spanconfig/spanconfigsqlwatcher/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ func TestBuffer(t *testing.T) {
}

ctx := context.Background()
buffer := newBuffer(10 /* limit */)
buffer := newBuffer(10 /* limit */, ts(1))

// Sanity check the newly initialized event buffer.
updates, combinedFrontierTS, err := buffer.flush(ctx)
require.NoError(t, err)
require.Equal(t, ts(0), combinedFrontierTS)
require.Equal(t, ts(1), combinedFrontierTS)
require.True(t, len(updates) == 0)

// Add a few events without advancing any of the frontiers. We don't expect
Expand All @@ -69,15 +69,15 @@ func TestBuffer(t *testing.T) {
require.NoError(t, err)
updates, combinedFrontierTS, err = buffer.flush(ctx)
require.NoError(t, err)
require.Equal(t, ts(0), combinedFrontierTS)
require.Equal(t, ts(1), combinedFrontierTS)
require.True(t, len(updates) == 0)

// Advance the zones frontier. We expect flush to still not return any results
// as the descriptors frontier hasn't been advanced yet.
buffer.advance(zonesRangefeed, ts(11))
updates, combinedFrontierTS, err = buffer.flush(ctx)
require.NoError(t, err)
require.Equal(t, ts(0), combinedFrontierTS)
require.Equal(t, ts(1), combinedFrontierTS)
require.True(t, len(updates) == 0)

// Advance the descriptors frontier to a lower timestamp than the zones
Expand Down Expand Up @@ -150,14 +150,15 @@ func TestBuffer(t *testing.T) {
func TestBufferCombinesDescriptorTypes(t *testing.T) {
defer leaktest.AfterTest(t)()

buffer := newBuffer(10 /* limit */)
ctx := context.Background()

ts := func(nanos int) hlc.Timestamp {
return hlc.Timestamp{
WallTime: int64(nanos),
}
}
buffer := newBuffer(10 /* limit */, ts(0))

makeEvent := func(descID int, descType catalog.DescriptorType, timestamp hlc.Timestamp) event {
return event{
update: spanconfig.DescriptorUpdate{
Expand Down
Loading