diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index cc9bcfac9536..6a913b046e52 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -861,6 +861,8 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { cfg.rangeFeedFactory, 1<<20, /* 1 MB bufferMemLimit */ cfg.stopper, + // TODO(irfansharif): What should this no-op cadence be? + 30*time.Second, /* checkpointNoopsEvery */ spanConfigKnobs, ) spanConfigMgr = spanconfigmanager.New( diff --git a/pkg/spanconfig/spanconfigsqlwatcher/BUILD.bazel b/pkg/spanconfig/spanconfigsqlwatcher/BUILD.bazel index bb47eba6a7c7..976946678e7d 100644 --- a/pkg/spanconfig/spanconfigsqlwatcher/BUILD.bazel +++ b/pkg/spanconfig/spanconfigsqlwatcher/BUILD.bazel @@ -22,11 +22,13 @@ go_library( "//pkg/sql/rowenc", "//pkg/sql/sem/tree", "//pkg/sql/types", + "//pkg/util", "//pkg/util/hlc", "//pkg/util/log", "//pkg/util/log/logcrash", "//pkg/util/stop", "//pkg/util/syncutil", + "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", ], ) diff --git a/pkg/spanconfig/spanconfigsqlwatcher/buffer.go b/pkg/spanconfig/spanconfigsqlwatcher/buffer.go index 8a50ecd2cba1..c6d64f2356dc 100644 --- a/pkg/spanconfig/spanconfigsqlwatcher/buffer.go +++ b/pkg/spanconfig/spanconfigsqlwatcher/buffer.go @@ -71,11 +71,15 @@ const ( numRangefeeds int = iota ) -// newBuffer constructs and returns a new buffer. -func newBuffer(limit int) *buffer { +// newBuffer constructs a new buffer initialized with a starting frontier +// timestamp. +func newBuffer(limit int, initialFrontierTS hlc.Timestamp) *buffer { rangefeedBuffer := rangefeedbuffer.New(limit) eventBuffer := &buffer{} eventBuffer.mu.buffer = rangefeedBuffer + for i := range eventBuffer.mu.rangefeedFrontiers { + eventBuffer.mu.rangefeedFrontiers[i].Forward(initialFrontierTS) + } return eventBuffer } diff --git a/pkg/spanconfig/spanconfigsqlwatcher/buffer_test.go b/pkg/spanconfig/spanconfigsqlwatcher/buffer_test.go index 1005940394b7..986340bcbccf 100644 --- a/pkg/spanconfig/spanconfigsqlwatcher/buffer_test.go +++ b/pkg/spanconfig/spanconfigsqlwatcher/buffer_test.go @@ -52,12 +52,12 @@ func TestBuffer(t *testing.T) { } ctx := context.Background() - buffer := newBuffer(10 /* limit */) + buffer := newBuffer(10 /* limit */, ts(1)) // Sanity check the newly initialized event buffer. updates, combinedFrontierTS, err := buffer.flush(ctx) require.NoError(t, err) - require.Equal(t, ts(0), combinedFrontierTS) + require.Equal(t, ts(1), combinedFrontierTS) require.True(t, len(updates) == 0) // Add a few events without advancing any of the frontiers. We don't expect @@ -69,7 +69,7 @@ func TestBuffer(t *testing.T) { require.NoError(t, err) updates, combinedFrontierTS, err = buffer.flush(ctx) require.NoError(t, err) - require.Equal(t, ts(0), combinedFrontierTS) + require.Equal(t, ts(1), combinedFrontierTS) require.True(t, len(updates) == 0) // Advance the zones frontier. We expect flush to still not return any results @@ -77,7 +77,7 @@ func TestBuffer(t *testing.T) { buffer.advance(zonesRangefeed, ts(11)) updates, combinedFrontierTS, err = buffer.flush(ctx) require.NoError(t, err) - require.Equal(t, ts(0), combinedFrontierTS) + require.Equal(t, ts(1), combinedFrontierTS) require.True(t, len(updates) == 0) // Advance the descriptors frontier to a lower timestamp than the zones @@ -150,7 +150,6 @@ func TestBuffer(t *testing.T) { func TestBufferCombinesDescriptorTypes(t *testing.T) { defer leaktest.AfterTest(t)() - buffer := newBuffer(10 /* limit */) ctx := context.Background() ts := func(nanos int) hlc.Timestamp { @@ -158,6 +157,8 @@ func TestBufferCombinesDescriptorTypes(t *testing.T) { WallTime: int64(nanos), } } + buffer := newBuffer(10 /* limit */, ts(0)) + makeEvent := func(descID int, descType catalog.DescriptorType, timestamp hlc.Timestamp) event { return event{ update: spanconfig.DescriptorUpdate{ diff --git a/pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go b/pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go index 5d3a1d6ac8dd..472a41e735da 100644 --- a/pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go +++ b/pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go @@ -12,6 +12,7 @@ package spanconfigsqlwatcher import ( "context" + "time" "unsafe" "github.com/cockroachdb/cockroach/pkg/keys" @@ -22,10 +23,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/logcrash" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" ) @@ -36,12 +39,13 @@ var _ spanconfig.SQLWatcher = &SQLWatcher{} // establishes rangefeeds over system.zones and system.descriptors to // incrementally watch for SQL updates. type SQLWatcher struct { - codec keys.SQLCodec - settings *cluster.Settings - stopper *stop.Stopper - knobs *spanconfig.TestingKnobs - rangeFeedFactory *rangefeed.Factory - bufferMemLimit int64 + codec keys.SQLCodec + settings *cluster.Settings + stopper *stop.Stopper + knobs *spanconfig.TestingKnobs + rangeFeedFactory *rangefeed.Factory + bufferMemLimit int64 + checkpointNoopsEvery time.Duration } // New constructs a new SQLWatcher. @@ -51,18 +55,20 @@ func New( rangeFeedFactory *rangefeed.Factory, bufferMemLimit int64, stopper *stop.Stopper, + checkpointNoopsEvery time.Duration, knobs *spanconfig.TestingKnobs, ) *SQLWatcher { if knobs == nil { knobs = &spanconfig.TestingKnobs{} } return &SQLWatcher{ - codec: codec, - settings: settings, - rangeFeedFactory: rangeFeedFactory, - stopper: stopper, - bufferMemLimit: bufferMemLimit, - knobs: knobs, + codec: codec, + settings: settings, + rangeFeedFactory: rangeFeedFactory, + stopper: stopper, + bufferMemLimit: bufferMemLimit, + checkpointNoopsEvery: checkpointNoopsEvery, + knobs: knobs, } } @@ -73,15 +79,15 @@ const sqlWatcherBufferEntrySize = int64(unsafe.Sizeof(event{}) + unsafe.Sizeof(r // WatchForSQLUpdates is part of the spanconfig.SQLWatcher interface. func (s *SQLWatcher) WatchForSQLUpdates( ctx context.Context, - timestamp hlc.Timestamp, + startTS hlc.Timestamp, handler func(context.Context, []spanconfig.DescriptorUpdate, hlc.Timestamp) error, ) error { - return s.watch(ctx, timestamp, handler) + return s.watch(ctx, startTS, handler) } func (s *SQLWatcher) watch( ctx context.Context, - timestamp hlc.Timestamp, + startTS hlc.Timestamp, handler func(context.Context, []spanconfig.DescriptorUpdate, hlc.Timestamp) error, ) error { @@ -101,7 +107,7 @@ func (s *SQLWatcher) watch( // serial semantics. errCh := make(chan error) frontierAdvanced := make(chan struct{}) - buf := newBuffer(int(s.bufferMemLimit / sqlWatcherBufferEntrySize)) + buf := newBuffer(int(s.bufferMemLimit/sqlWatcherBufferEntrySize), startTS) onFrontierAdvance := func(ctx context.Context, rangefeed rangefeedKind, timestamp hlc.Timestamp) { buf.advance(rangefeed, timestamp) select { @@ -131,17 +137,18 @@ func (s *SQLWatcher) watch( } } - descriptorsRF, err := s.watchForDescriptorUpdates(ctx, timestamp, onEvent, onFrontierAdvance) + descriptorsRF, err := s.watchForDescriptorUpdates(ctx, startTS, onEvent, onFrontierAdvance) if err != nil { return errors.Wrapf(err, "error establishing rangefeed over system.descriptors") } defer descriptorsRF.Close() - zonesRF, err := s.watchForZoneConfigUpdates(ctx, timestamp, onEvent, onFrontierAdvance) + zonesRF, err := s.watchForZoneConfigUpdates(ctx, startTS, onEvent, onFrontierAdvance) if err != nil { return errors.Wrapf(err, "error establishing rangefeed over system.zones") } defer zonesRF.Close() + checkpointNoops := util.Every(s.checkpointNoopsEvery) for { select { case <-ctx.Done(): @@ -155,7 +162,7 @@ func (s *SQLWatcher) watch( if err != nil { return err } - if len(events) == 0 { + if len(events) == 0 && !checkpointNoops.ShouldProcess(timeutil.Now()) { continue } if err := handler(ctx, events, combinedFrontierTS); err != nil { @@ -170,7 +177,7 @@ func (s *SQLWatcher) watch( // callback is invoked whenever the rangefeed frontier is advanced as well. func (s *SQLWatcher) watchForDescriptorUpdates( ctx context.Context, - timestamp hlc.Timestamp, + startTS hlc.Timestamp, onEvent func(context.Context, event), onFrontierAdvance func(context.Context, rangefeedKind, hlc.Timestamp), ) (*rangefeed.RangeFeed, error) { @@ -239,7 +246,7 @@ func (s *SQLWatcher) watchForDescriptorUpdates( ctx, "sql-watcher-descriptor-rangefeed", descriptorTableSpan, - timestamp, + startTS, handleEvent, rangefeed.WithDiff(), rangefeed.WithOnFrontierAdvance(func(ctx context.Context, resolvedTS hlc.Timestamp) { @@ -250,7 +257,7 @@ func (s *SQLWatcher) watchForDescriptorUpdates( return nil, err } - log.Infof(ctx, "established range feed over system.descriptors table starting at time %s", timestamp) + log.Infof(ctx, "established range feed over system.descriptors table starting at time %s", startTS) return rf, nil } @@ -260,7 +267,7 @@ func (s *SQLWatcher) watchForDescriptorUpdates( // advanced. func (s *SQLWatcher) watchForZoneConfigUpdates( ctx context.Context, - timestamp hlc.Timestamp, + startTS hlc.Timestamp, onEvent func(context.Context, event), onFrontierAdvance func(context.Context, rangefeedKind, hlc.Timestamp), ) (*rangefeed.RangeFeed, error) { @@ -296,7 +303,7 @@ func (s *SQLWatcher) watchForZoneConfigUpdates( ctx, "sql-watcher-zones-rangefeed", zoneTableSpan, - timestamp, + startTS, handleEvent, rangefeed.WithOnFrontierAdvance(func(ctx context.Context, resolvedTS hlc.Timestamp) { onFrontierAdvance(ctx, zonesRangefeed, resolvedTS) @@ -306,6 +313,6 @@ func (s *SQLWatcher) watchForZoneConfigUpdates( return nil, err } - log.Infof(ctx, "established range feed over system.zones table starting at time %s", timestamp) + log.Infof(ctx, "established range feed over system.zones table starting at time %s", startTS) return rf, nil } diff --git a/pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher_test.go b/pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher_test.go index 75d35a7a7c57..a5bead9a0dab 100644 --- a/pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher_test.go +++ b/pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher_test.go @@ -121,12 +121,14 @@ func TestSQLWatcherReactsToUpdates(t *testing.T) { tdb.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`) + noopCheckpointDuration := 100 * time.Millisecond sqlWatcher := spanconfigsqlwatcher.New( keys.SystemSQLCodec, ts.ClusterSettings(), ts.RangeFeedFactory().(*rangefeed.Factory), 1<<20, /* 1 MB, bufferMemLimit */ ts.Stopper(), + noopCheckpointDuration, nil, /* knobs */ ) @@ -138,22 +140,19 @@ func TestSQLWatcherReactsToUpdates(t *testing.T) { mu.receivedIDs = make(map[descpb.ID]struct{}) var wg sync.WaitGroup + watcherStartTS := ts.Clock().Now() + watcherCtx, watcherCancel := context.WithCancel(context.Background()) wg.Add(1) go func() { defer wg.Done() - startTS := ts.Clock().Now() - mu.Lock() - mu.lastCheckpoint = startTS - mu.Unlock() - - _ = sqlWatcher.WatchForSQLUpdates(watcherCtx, startTS, + _ = sqlWatcher.WatchForSQLUpdates(watcherCtx, watcherStartTS, func(ctx context.Context, updates []spanconfig.DescriptorUpdate, checkpointTS hlc.Timestamp) error { mu.Lock() defer mu.Unlock() - require.True(t, mu.lastCheckpoint.Less(checkpointTS)) + require.True(t, mu.lastCheckpoint.LessEq(checkpointTS)) mu.lastCheckpoint = checkpointTS for _, update := range updates { @@ -216,12 +215,14 @@ func TestSQLWatcherMultiple(t *testing.T) { tdb.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`) + noopCheckpointDuration := 100 * time.Millisecond sqlWatcher := spanconfigsqlwatcher.New( keys.SystemSQLCodec, ts.ClusterSettings(), ts.RangeFeedFactory().(*rangefeed.Factory), 1<<20, /* 1 MB, bufferMemLimit */ ts.Stopper(), + noopCheckpointDuration, nil, /* knobs */ ) @@ -315,9 +316,8 @@ func TestSQLWatcherMultiple(t *testing.T) { watcher2Cancel() watcher3Cancel() + wg.Wait() } - - wg.Wait() } // TestSQLWatcherOnEventError ensures that if there is an error processing a @@ -343,12 +343,14 @@ func TestSQLWatcherOnEventError(t *testing.T) { tdb.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`) + noopCheckpointDuration := 100 * time.Millisecond sqlWatcher := spanconfigsqlwatcher.New( keys.SystemSQLCodec, ts.ClusterSettings(), ts.RangeFeedFactory().(*rangefeed.Factory), 1<<20, /* 1 MB, bufferMemLimit */ ts.Stopper(), + noopCheckpointDuration, &spanconfig.TestingKnobs{ SQLWatcherOnEventInterceptor: func() error { return errors.New("boom") @@ -389,12 +391,14 @@ func TestSQLWatcherHandlerError(t *testing.T) { tdb.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`) + noopCheckpointDuration := 100 * time.Millisecond sqlWatcher := spanconfigsqlwatcher.New( keys.SystemSQLCodec, ts.ClusterSettings(), ts.RangeFeedFactory().(*rangefeed.Factory), 1<<20, /* 1 MB, bufferMemLimit */ ts.Stopper(), + noopCheckpointDuration, nil, /* knobs */ ) @@ -441,3 +445,95 @@ func TestSQLWatcherHandlerError(t *testing.T) { // Ensure that the handler was called only once. require.Equal(t, int32(1), atomic.LoadInt32(&numCalled)) } + +func TestWatcherReceivesNoopCheckpoints(t *testing.T) { + defer leaktest.AfterTest(t)() + + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + SpanConfig: &spanconfig.TestingKnobs{ + ManagerDisableJobCreation: true, // disable the automatic job creation. + }, + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), // speed up schema changes. + }, + }, + }) + ctx := context.Background() + defer tc.Stopper().Stop(ctx) + ts := tc.Server(0 /* idx */) + tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0 /* idx */)) + tdb.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) + tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`) + + noopCheckpointDuration := 25 * time.Millisecond + sqlWatcher := spanconfigsqlwatcher.New( + keys.SystemSQLCodec, + ts.ClusterSettings(), + ts.RangeFeedFactory().(*rangefeed.Factory), + 1<<20, /* 1 MB, bufferMemLimit */ + ts.Stopper(), + noopCheckpointDuration, + nil, /* knobs */ + ) + + beforeStmtTS := ts.Clock().Now() + tdb.Exec(t, "CREATE TABLE t()") + afterStmtTS := ts.Clock().Now() + const expDescID descpb.ID = 52 + + var wg sync.WaitGroup + mu := struct { + syncutil.Mutex + numCheckpoints int + lastCheckpoint hlc.Timestamp + }{} + + watch := func(ctx context.Context, onExit func(), onCheckpoint func(hlc.Timestamp)) { + defer wg.Done() + + receivedIDs := make(map[descpb.ID]struct{}) + err := sqlWatcher.WatchForSQLUpdates(ctx, beforeStmtTS, + func(_ context.Context, updates []spanconfig.DescriptorUpdate, checkpointTS hlc.Timestamp) error { + onCheckpoint(checkpointTS) + + for _, update := range updates { + receivedIDs[update.ID] = struct{}{} + } + return nil + }) + require.True(t, testutils.IsError(err, "context canceled")) + require.Equal(t, 1, len(receivedIDs)) + _, seen := receivedIDs[expDescID] + require.True(t, seen) + } + + { + // Run the first watcher; wait for it to observe the update before + // tearing it down. + + watcherCtx, watcherCancel := context.WithCancel(ctx) + wg.Add(1) + go watch(watcherCtx, func() { wg.Done() }, func(ts hlc.Timestamp) { + mu.Lock() + mu.lastCheckpoint = ts + mu.numCheckpoints++ + mu.Unlock() + }) + + testutils.SucceedsSoon(t, func() error { + mu.Lock() + defer mu.Unlock() + + if mu.lastCheckpoint.Less(afterStmtTS) { + return errors.New("last checkpoint precedes statement timestamp") + } + if mu.numCheckpoints < 3 { + return errors.New("didn't receive no-op checkpoints") + } + return nil + }) + watcherCancel() + wg.Wait() + } +}