Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spanconfig: introduce the spanconfig.SQLWatcher #71968

Merged
merged 1 commit into from
Nov 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ ALL_TESTS = [
"//pkg/spanconfig/spanconfigkvsubscriber:spanconfigkvsubscriber_test",
"//pkg/spanconfig/spanconfigmanager:spanconfigmanager_test",
"//pkg/spanconfig/spanconfigsqltranslator:spanconfigsqltranslator_test",
"//pkg/spanconfig/spanconfigsqlwatcher:spanconfigsqlwatcher_test",
"//pkg/spanconfig/spanconfigstore:spanconfigstore_test",
"//pkg/spanconfig/spanconfigtestutils:spanconfigtestutils_test",
"//pkg/sql/catalog/catalogkeys:catalogkeys_test",
Expand Down
53 changes: 27 additions & 26 deletions pkg/ccl/changefeedccl/helpers_tenant_shim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,32 +51,33 @@ func (t *testServerShim) ServingSQLAddr() string {
return t.SQLAddr()
}

func (t *testServerShim) Stopper() *stop.Stopper { panic(unsupportedShimMethod) }
func (t *testServerShim) Start(context.Context) error { panic(unsupportedShimMethod) }
func (t *testServerShim) Node() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) NodeID() roachpb.NodeID { panic(unsupportedShimMethod) }
func (t *testServerShim) ClusterID() uuid.UUID { panic(unsupportedShimMethod) }
func (t *testServerShim) ServingRPCAddr() string { panic(unsupportedShimMethod) }
func (t *testServerShim) RPCAddr() string { panic(unsupportedShimMethod) }
func (t *testServerShim) DB() *kv.DB { panic(unsupportedShimMethod) }
func (t *testServerShim) RPCContext() *rpc.Context { panic(unsupportedShimMethod) }
func (t *testServerShim) LeaseManager() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) InternalExecutor() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) ExecutorConfig() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) TracerI() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) GossipI() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) RangeFeedFactory() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) Clock() *hlc.Clock { panic(unsupportedShimMethod) }
func (t *testServerShim) DistSenderI() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) MigrationServer() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SpanConfigAccessor() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SpanConfigSQLTranslator() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SQLServer() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SQLLivenessProvider() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) StartupMigrationsManager() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) NodeLiveness() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) HeartbeatNodeLiveness() error { panic(unsupportedShimMethod) }
func (t *testServerShim) NodeDialer() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) Stopper() *stop.Stopper { panic(unsupportedShimMethod) }
func (t *testServerShim) Start(context.Context) error { panic(unsupportedShimMethod) }
func (t *testServerShim) Node() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) NodeID() roachpb.NodeID { panic(unsupportedShimMethod) }
func (t *testServerShim) ClusterID() uuid.UUID { panic(unsupportedShimMethod) }
func (t *testServerShim) ServingRPCAddr() string { panic(unsupportedShimMethod) }
func (t *testServerShim) RPCAddr() string { panic(unsupportedShimMethod) }
func (t *testServerShim) DB() *kv.DB { panic(unsupportedShimMethod) }
func (t *testServerShim) RPCContext() *rpc.Context { panic(unsupportedShimMethod) }
func (t *testServerShim) LeaseManager() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) InternalExecutor() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) ExecutorConfig() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) TracerI() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) GossipI() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) RangeFeedFactory() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) Clock() *hlc.Clock { panic(unsupportedShimMethod) }
func (t *testServerShim) DistSenderI() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) MigrationServer() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SpanConfigAccessor() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SpanConfigSQLTranslator() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SpanConfigSQLWatcherFactory() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SQLServer() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SQLLivenessProvider() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) StartupMigrationsManager() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) NodeLiveness() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) HeartbeatNodeLiveness() error { panic(unsupportedShimMethod) }
func (t *testServerShim) NodeDialer() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SetDistSQLSpanResolver(spanResolver interface{}) {
panic(unsupportedShimMethod)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func New(limit int) *Buffer {
}

// Add adds the given entry to the buffer.
func (b *Buffer) Add(ctx context.Context, ev Event) error {
func (b *Buffer) Add(ev Event) error {
b.mu.Lock()
defer b.mu.Unlock()

Expand Down
26 changes: 13 additions & 13 deletions pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ func TestBuffer(t *testing.T) {
}

{ // Flushing at a timestamp lower than buffered events should return nothing.
require.NoError(t, buffer.Add(ctx, makeEvent("a", ts(13)))) // a@13
require.NoError(t, buffer.Add(ctx, makeEvent("b", ts(11)))) // b@11
require.NoError(t, buffer.Add(ctx, makeEvent("c", ts(15)))) // c@15
require.NoError(t, buffer.Add(ctx, makeEvent("d", ts(12)))) // d@12
require.NoError(t, buffer.Add(ctx, makeEvent("e", ts(18)))) // e@18
require.NoError(t, buffer.Add(makeEvent("a", ts(13)))) // a@13
require.NoError(t, buffer.Add(makeEvent("b", ts(11)))) // b@11
require.NoError(t, buffer.Add(makeEvent("c", ts(15)))) // c@15
require.NoError(t, buffer.Add(makeEvent("d", ts(12)))) // d@12
require.NoError(t, buffer.Add(makeEvent("e", ts(18)))) // e@18

events := buffer.Flush(ctx, ts(10))
require.True(t, len(events) == 0)
Expand All @@ -75,18 +75,18 @@ func TestBuffer(t *testing.T) {
}

{ // Adding events with timestamps <= the last flush are discarded.
require.NoError(t, buffer.Add(ctx, makeEvent("a", ts(13)))) // a@13
require.NoError(t, buffer.Add(ctx, makeEvent("b", ts(11)))) // b@11
require.NoError(t, buffer.Add(ctx, makeEvent("c", ts(15)))) // c@15
require.NoError(t, buffer.Add(ctx, makeEvent("d", ts(12)))) // d@12
require.NoError(t, buffer.Add(makeEvent("a", ts(13)))) // a@13
require.NoError(t, buffer.Add(makeEvent("b", ts(11)))) // b@11
require.NoError(t, buffer.Add(makeEvent("c", ts(15)))) // c@15
require.NoError(t, buffer.Add(makeEvent("d", ts(12)))) // d@12

events := buffer.Flush(ctx, ts(15))
require.True(t, len(events) == 0)
}

{ // Additional events are flushed out at appropriate points.
require.NoError(t, buffer.Add(ctx, makeEvent("f", ts(19)))) // f@19
require.NoError(t, buffer.Add(ctx, makeEvent("g", ts(21)))) // g@21
require.NoError(t, buffer.Add(makeEvent("f", ts(19)))) // f@19
require.NoError(t, buffer.Add(makeEvent("g", ts(21)))) // g@21

events := buffer.Flush(ctx, ts(20))
require.True(t, len(events) == 2)
Expand All @@ -107,10 +107,10 @@ func TestBuffer(t *testing.T) {

{ // Ensure that buffer limits are respected.
for i := 0; i < limit; i++ {
require.NoError(t, buffer.Add(ctx, makeEvent("x", ts(101)))) // x@101
require.NoError(t, buffer.Add(makeEvent("x", ts(101)))) // x@101
}

err := buffer.Add(ctx, makeEvent("x", ts(101)))
err := buffer.Add(makeEvent("x", ts(101)))
require.ErrorIs(t, err, rangefeedbuffer.ErrBufferLimitExceeded)
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ go_library(
"//pkg/spanconfig/spanconfigkvsubscriber",
"//pkg/spanconfig/spanconfigmanager",
"//pkg/spanconfig/spanconfigsqltranslator",
"//pkg/spanconfig/spanconfigsqlwatcher",
"//pkg/sql",
"//pkg/sql/catalog/bootstrap",
"//pkg/sql/catalog/catalogkeys",
Expand Down
10 changes: 10 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigmanager"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigsqltranslator"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigsqlwatcher"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
Expand Down Expand Up @@ -842,13 +843,22 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
// only do it if COCKROACH_EXPERIMENTAL_SPAN_CONFIGS is set.
spanConfigKnobs, _ := cfg.TestingKnobs.SpanConfig.(*spanconfig.TestingKnobs)
sqlTranslator := spanconfigsqltranslator.New(execCfg, codec)
sqlWatcherFactory := spanconfigsqlwatcher.NewFactory(
codec,
cfg.Settings,
cfg.rangeFeedFactory,
1<<20, /* 1 MB bufferMemLimit */
cfg.stopper,
spanConfigKnobs,
)
spanConfigMgr = spanconfigmanager.New(
cfg.db,
jobRegistry,
cfg.circularInternalExecutor,
cfg.stopper,
cfg.Settings,
cfg.spanConfigAccessor,
sqlWatcherFactory,
sqlTranslator,
spanConfigKnobs,
)
Expand Down
10 changes: 10 additions & 0 deletions pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -961,6 +961,16 @@ func (ts *TestServer) SpanConfigSQLTranslator() interface{} {
return ts.sqlServer.spanconfigMgr.SQLTranslator
}

// SpanConfigSQLWatcherFactory is part of TestServerInterface.
func (ts *TestServer) SpanConfigSQLWatcherFactory() interface{} {
if ts.sqlServer.spanconfigMgr == nil {
panic(
"span config manager uninitialized; see EnableSpanConfigs testing knob to use span configs",
)
}
return ts.sqlServer.spanconfigMgr.SQLWatcherFactory
}

// SQLServer is part of TestServerInterface.
func (ts *TestServer) SQLServer() interface{} {
return ts.PGServer().SQLServer
Expand Down
1 change: 1 addition & 0 deletions pkg/spanconfig/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
"//pkg/base",
"//pkg/keys",
"//pkg/roachpb:with-mocks",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/util/hlc",
],
Expand Down
63 changes: 56 additions & 7 deletions pkg/spanconfig/spanconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)
Expand Down Expand Up @@ -117,6 +118,59 @@ func FullTranslate(
return s.Translate(ctx, descpb.IDs{keys.RootNamespaceID})
}

// SQLWatcher watches for events on system.zones and system.descriptors.
type SQLWatcher interface {
// WatchForSQLUpdates watches for updates to zones and descriptors starting at
// the given timestamp (exclusive), informing callers using the handler
// callback.
//
// The handler callback[1] is invoked from time to time with a list of updates
// and a checkpointTS. Invocations of the handler callback provide the
// following semantics:
// 1. Calls to the handler are serial.
// 2. The timestamp supplied to the handler is monotonically increasing.
// 3. The list of DescriptorUpdates supplied to handler includes all events
// in the window (prevInvocationCheckpointTS, checkpointTS].
// 4. No further calls to the handler are made if a call to the handler
// returns an error.
//
// These guarantees mean that users of this interface are free to persist the
// checkpointTS and later use it to re-establish a SQLWatcher without missing
// any updates.
//
// WatchForSQLUpdates can only ever be called once, effectively making the
// SQLWatcher a single use interface.
//
// WatchForSQLUpdates may run out of memory and return an error if it is
// tracking too many events between two checkpoints.
//
// [1] Users of this interface should not intend to do expensive work in the
// handler callback.
// TODO(arul): Possibly get rid of this limitation.
WatchForSQLUpdates(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should do more to explain the semantics of the memory errors. The caller is going to need to deal with them and restart. That's now part of the contract.

ctx context.Context,
startTS hlc.Timestamp,
handler func(ctx context.Context, updates []DescriptorUpdate, checkpointTS hlc.Timestamp) error,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: give this a type

) error
}

// DescriptorUpdate captures the ID and type of a descriptor or zone that the
// SQLWatcher has observed updated.
type DescriptorUpdate struct {
// ID of the descriptor/zone that has been updated.
ID descpb.ID

// DescriptorType of the descriptor/zone that has been updated. Could be either
// the specific type or catalog.Any if no information is available.
DescriptorType catalog.DescriptorType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be a bitmask so that the events corresponding to a single ID can be combined?

}

// SQLWatcherFactory is used to construct new SQLWatchers.
type SQLWatcherFactory interface {
// New returns a new SQLWatcher.
New() SQLWatcher
}

// ReconciliationDependencies captures what's needed by the span config
// reconciliation job to perform its task. The job is responsible for
// reconciling a tenant's zone configurations with the clusters span
Expand All @@ -126,11 +180,7 @@ type ReconciliationDependencies interface {

SQLTranslator

// TODO(arul): We'll also want access to a "SQLWatcher", something that
// watches for changes to system.{descriptors, zones} to feed IDs to the
// SQLTranslator. These interfaces will be used by the "Reconciler to perform
// full/partial reconciliation, checkpoint the span config job, and update KV
// with the tenants span config state.
SQLWatcherFactory
}

// Store is a data structure used to store spans and their corresponding
Expand Down Expand Up @@ -235,8 +285,7 @@ func (u Update) Deletion() bool {
return u.Config.IsEmpty()
}

// Addition returns true if the update corresponds to a span config being
// added.
// Addition returns true if the update corresponds to a span config being added.
func (u Update) Addition() bool {
return !u.Deletion()
}
2 changes: 1 addition & 1 deletion pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (s *KVSubscriber) run(ctx context.Context) error {
update.Config = entry.Config
}

if err := buffer.Add(ctx, &bufferEvent{update, ev.Value.Timestamp}); err != nil {
if err := buffer.Add(&bufferEvent{update, ev.Value.Timestamp}); err != nil {
select {
case <-ctx.Done():
// The context is canceled when the rangefeed is closed by the
Expand Down
19 changes: 11 additions & 8 deletions pkg/spanconfig/spanconfigmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Manager struct {
knobs *spanconfig.TestingKnobs

spanconfig.KVAccessor
spanconfig.SQLWatcherFactory
spanconfig.SQLTranslator
}

Expand All @@ -73,21 +74,23 @@ func New(
stopper *stop.Stopper,
settings *cluster.Settings,
kvAccessor spanconfig.KVAccessor,
sqlWatcherFactory spanconfig.SQLWatcherFactory,
sqlTranslator spanconfig.SQLTranslator,
knobs *spanconfig.TestingKnobs,
) *Manager {
if knobs == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was intentional; I find it better to fold in the knobs nil check into the constructor for cases where the caller doesn't care about plumbing in a testing knob. Can we revert?

knobs = &spanconfig.TestingKnobs{}
}
return &Manager{
db: db,
jr: jr,
ie: ie,
stopper: stopper,
settings: settings,
KVAccessor: kvAccessor,
SQLTranslator: sqlTranslator,
knobs: knobs,
db: db,
jr: jr,
ie: ie,
stopper: stopper,
settings: settings,
KVAccessor: kvAccessor,
SQLWatcherFactory: sqlWatcherFactory,
SQLTranslator: sqlTranslator,
knobs: knobs,
}
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/spanconfig/spanconfigmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func TestManagerConcurrentJobCreation(t *testing.T) {
ts.Stopper(),
ts.ClusterSettings(),
ts.SpanConfigAccessor().(spanconfig.KVAccessor),
ts.SpanConfigSQLWatcherFactory().(spanconfig.SQLWatcherFactory),
ts.SpanConfigSQLTranslator().(spanconfig.SQLTranslator),
&spanconfig.TestingKnobs{
ManagerCreatedJobInterceptor: func(jobI interface{}) {
Expand Down Expand Up @@ -162,6 +163,7 @@ func TestManagerStartsJobIfFailed(t *testing.T) {
ts.Stopper(),
ts.ClusterSettings(),
ts.SpanConfigAccessor().(spanconfig.KVAccessor),
ts.SpanConfigSQLWatcherFactory().(spanconfig.SQLWatcherFactory),
ts.SpanConfigSQLTranslator().(spanconfig.SQLTranslator),
&spanconfig.TestingKnobs{
ManagerAfterCheckedReconciliationJobExistsInterceptor: func(exists bool) {
Expand Down Expand Up @@ -237,6 +239,7 @@ func TestManagerCheckJobConditions(t *testing.T) {
ts.Stopper(),
ts.ClusterSettings(),
ts.SpanConfigAccessor().(spanconfig.KVAccessor),
ts.SpanConfigSQLWatcherFactory().(spanconfig.SQLWatcherFactory),
ts.SpanConfigSQLTranslator().(spanconfig.SQLTranslator),
&spanconfig.TestingKnobs{
ManagerDisableJobCreation: true,
Expand Down
Loading