Skip to content

Commit

Permalink
spanconfig/sqlwatcher: checkpoint, even without updates
Browse files Browse the repository at this point in the history
If `system.{descriptor,zones}` is static, we would previously discard
all checkpoint events. This is not what we want -- we'd still like to
know how caught up we are, with ongoing updates or otherwise.

MVCC GC, after all, can still happen; if our last checkpoint was when
the last update occurred, we may end up doing a lot of unnecessary work
when finding out our last checkpoint was GC-ed from underneath us. Lack
of periodic checkpoints also makes tests difficult to write -- you'd
have to induce a benign update to flush out all earlier ones. This
latent flakiness was uncovered after speeding up some existing tests in
an earlier commit.

NB: For incremental (possibly noop) checkpoints, we need to ensure that
the sqlwatcher's buffer is initialized with a low watermark. When
flushing events, we take the most conservative timestamp. If not
initialized to a high water, this might be 0 -- violating the
sqlwatcher's monotonically increasing checkpoint ts invariant.

Release note: None
  • Loading branch information
irfansharif committed Nov 30, 2021
1 parent 6fdc475 commit d00baeb
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 41 deletions.
2 changes: 2 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,8 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
cfg.rangeFeedFactory,
1<<20, /* 1 MB bufferMemLimit */
cfg.stopper,
// TODO(irfansharif): What should this no-op cadence be?
30*time.Second, /* checkpointNoopsEvery */
spanConfigKnobs,
)
spanConfigMgr = spanconfigmanager.New(
Expand Down
2 changes: 2 additions & 0 deletions pkg/spanconfig/spanconfigsqlwatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ go_library(
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/log/logcrash",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
],
)
Expand Down
8 changes: 6 additions & 2 deletions pkg/spanconfig/spanconfigsqlwatcher/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,15 @@ const (
numRangefeeds int = iota
)

// newBuffer constructs and returns a new buffer.
func newBuffer(limit int) *buffer {
// newBuffer constructs a new buffer initialized with a starting frontier
// timestamp.
func newBuffer(limit int, initialFrontierTS hlc.Timestamp) *buffer {
rangefeedBuffer := rangefeedbuffer.New(limit)
eventBuffer := &buffer{}
eventBuffer.mu.buffer = rangefeedBuffer
for i := range eventBuffer.mu.rangefeedFrontiers {
eventBuffer.mu.rangefeedFrontiers[i].Forward(initialFrontierTS)
}
return eventBuffer
}

Expand Down
11 changes: 6 additions & 5 deletions pkg/spanconfig/spanconfigsqlwatcher/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ func TestBuffer(t *testing.T) {
}

ctx := context.Background()
buffer := newBuffer(10 /* limit */)
buffer := newBuffer(10 /* limit */, ts(1))

// Sanity check the newly initialized event buffer.
updates, combinedFrontierTS, err := buffer.flush(ctx)
require.NoError(t, err)
require.Equal(t, ts(0), combinedFrontierTS)
require.Equal(t, ts(1), combinedFrontierTS)
require.True(t, len(updates) == 0)

// Add a few events without advancing any of the frontiers. We don't expect
Expand All @@ -69,15 +69,15 @@ func TestBuffer(t *testing.T) {
require.NoError(t, err)
updates, combinedFrontierTS, err = buffer.flush(ctx)
require.NoError(t, err)
require.Equal(t, ts(0), combinedFrontierTS)
require.Equal(t, ts(1), combinedFrontierTS)
require.True(t, len(updates) == 0)

// Advance the zones frontier. We expect flush to still not return any results
// as the descriptors frontier hasn't been advanced yet.
buffer.advance(zonesRangefeed, ts(11))
updates, combinedFrontierTS, err = buffer.flush(ctx)
require.NoError(t, err)
require.Equal(t, ts(0), combinedFrontierTS)
require.Equal(t, ts(1), combinedFrontierTS)
require.True(t, len(updates) == 0)

// Advance the descriptors frontier to a lower timestamp than the zones
Expand Down Expand Up @@ -150,14 +150,15 @@ func TestBuffer(t *testing.T) {
func TestBufferCombinesDescriptorTypes(t *testing.T) {
defer leaktest.AfterTest(t)()

buffer := newBuffer(10 /* limit */)
ctx := context.Background()

ts := func(nanos int) hlc.Timestamp {
return hlc.Timestamp{
WallTime: int64(nanos),
}
}
buffer := newBuffer(10 /* limit */, ts(0))

makeEvent := func(descID int, descType catalog.DescriptorType, timestamp hlc.Timestamp) event {
return event{
update: spanconfig.DescriptorUpdate{
Expand Down
57 changes: 32 additions & 25 deletions pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package spanconfigsqlwatcher

import (
"context"
"time"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/keys"
Expand All @@ -22,10 +23,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

Expand All @@ -36,12 +39,13 @@ var _ spanconfig.SQLWatcher = &SQLWatcher{}
// establishes rangefeeds over system.zones and system.descriptors to
// incrementally watch for SQL updates.
type SQLWatcher struct {
codec keys.SQLCodec
settings *cluster.Settings
stopper *stop.Stopper
knobs *spanconfig.TestingKnobs
rangeFeedFactory *rangefeed.Factory
bufferMemLimit int64
codec keys.SQLCodec
settings *cluster.Settings
stopper *stop.Stopper
knobs *spanconfig.TestingKnobs
rangeFeedFactory *rangefeed.Factory
bufferMemLimit int64
checkpointNoopsEvery time.Duration
}

// New constructs a new SQLWatcher.
Expand All @@ -51,18 +55,20 @@ func New(
rangeFeedFactory *rangefeed.Factory,
bufferMemLimit int64,
stopper *stop.Stopper,
checkpointNoopsEvery time.Duration,
knobs *spanconfig.TestingKnobs,
) *SQLWatcher {
if knobs == nil {
knobs = &spanconfig.TestingKnobs{}
}
return &SQLWatcher{
codec: codec,
settings: settings,
rangeFeedFactory: rangeFeedFactory,
stopper: stopper,
bufferMemLimit: bufferMemLimit,
knobs: knobs,
codec: codec,
settings: settings,
rangeFeedFactory: rangeFeedFactory,
stopper: stopper,
bufferMemLimit: bufferMemLimit,
checkpointNoopsEvery: checkpointNoopsEvery,
knobs: knobs,
}
}

Expand All @@ -73,15 +79,15 @@ const sqlWatcherBufferEntrySize = int64(unsafe.Sizeof(event{}) + unsafe.Sizeof(r
// WatchForSQLUpdates is part of the spanconfig.SQLWatcher interface.
func (s *SQLWatcher) WatchForSQLUpdates(
ctx context.Context,
timestamp hlc.Timestamp,
startTS hlc.Timestamp,
handler func(context.Context, []spanconfig.DescriptorUpdate, hlc.Timestamp) error,
) error {
return s.watch(ctx, timestamp, handler)
return s.watch(ctx, startTS, handler)
}

func (s *SQLWatcher) watch(
ctx context.Context,
timestamp hlc.Timestamp,
startTS hlc.Timestamp,
handler func(context.Context, []spanconfig.DescriptorUpdate, hlc.Timestamp) error,
) error {

Expand All @@ -101,7 +107,7 @@ func (s *SQLWatcher) watch(
// serial semantics.
errCh := make(chan error)
frontierAdvanced := make(chan struct{})
buf := newBuffer(int(s.bufferMemLimit / sqlWatcherBufferEntrySize))
buf := newBuffer(int(s.bufferMemLimit/sqlWatcherBufferEntrySize), startTS)
onFrontierAdvance := func(ctx context.Context, rangefeed rangefeedKind, timestamp hlc.Timestamp) {
buf.advance(rangefeed, timestamp)
select {
Expand Down Expand Up @@ -131,17 +137,18 @@ func (s *SQLWatcher) watch(
}
}

descriptorsRF, err := s.watchForDescriptorUpdates(ctx, timestamp, onEvent, onFrontierAdvance)
descriptorsRF, err := s.watchForDescriptorUpdates(ctx, startTS, onEvent, onFrontierAdvance)
if err != nil {
return errors.Wrapf(err, "error establishing rangefeed over system.descriptors")
}
defer descriptorsRF.Close()
zonesRF, err := s.watchForZoneConfigUpdates(ctx, timestamp, onEvent, onFrontierAdvance)
zonesRF, err := s.watchForZoneConfigUpdates(ctx, startTS, onEvent, onFrontierAdvance)
if err != nil {
return errors.Wrapf(err, "error establishing rangefeed over system.zones")
}
defer zonesRF.Close()

checkpointNoops := util.Every(s.checkpointNoopsEvery)
for {
select {
case <-ctx.Done():
Expand All @@ -155,7 +162,7 @@ func (s *SQLWatcher) watch(
if err != nil {
return err
}
if len(events) == 0 {
if len(events) == 0 && !checkpointNoops.ShouldProcess(timeutil.Now()) {
continue
}
if err := handler(ctx, events, combinedFrontierTS); err != nil {
Expand All @@ -170,7 +177,7 @@ func (s *SQLWatcher) watch(
// callback is invoked whenever the rangefeed frontier is advanced as well.
func (s *SQLWatcher) watchForDescriptorUpdates(
ctx context.Context,
timestamp hlc.Timestamp,
startTS hlc.Timestamp,
onEvent func(context.Context, event),
onFrontierAdvance func(context.Context, rangefeedKind, hlc.Timestamp),
) (*rangefeed.RangeFeed, error) {
Expand Down Expand Up @@ -239,7 +246,7 @@ func (s *SQLWatcher) watchForDescriptorUpdates(
ctx,
"sql-watcher-descriptor-rangefeed",
descriptorTableSpan,
timestamp,
startTS,
handleEvent,
rangefeed.WithDiff(),
rangefeed.WithOnFrontierAdvance(func(ctx context.Context, resolvedTS hlc.Timestamp) {
Expand All @@ -250,7 +257,7 @@ func (s *SQLWatcher) watchForDescriptorUpdates(
return nil, err
}

log.Infof(ctx, "established range feed over system.descriptors table starting at time %s", timestamp)
log.Infof(ctx, "established range feed over system.descriptors table starting at time %s", startTS)
return rf, nil
}

Expand All @@ -260,7 +267,7 @@ func (s *SQLWatcher) watchForDescriptorUpdates(
// advanced.
func (s *SQLWatcher) watchForZoneConfigUpdates(
ctx context.Context,
timestamp hlc.Timestamp,
startTS hlc.Timestamp,
onEvent func(context.Context, event),
onFrontierAdvance func(context.Context, rangefeedKind, hlc.Timestamp),
) (*rangefeed.RangeFeed, error) {
Expand Down Expand Up @@ -296,7 +303,7 @@ func (s *SQLWatcher) watchForZoneConfigUpdates(
ctx,
"sql-watcher-zones-rangefeed",
zoneTableSpan,
timestamp,
startTS,
handleEvent,
rangefeed.WithOnFrontierAdvance(func(ctx context.Context, resolvedTS hlc.Timestamp) {
onFrontierAdvance(ctx, zonesRangefeed, resolvedTS)
Expand All @@ -306,6 +313,6 @@ func (s *SQLWatcher) watchForZoneConfigUpdates(
return nil, err
}

log.Infof(ctx, "established range feed over system.zones table starting at time %s", timestamp)
log.Infof(ctx, "established range feed over system.zones table starting at time %s", startTS)
return rf, nil
}
Loading

0 comments on commit d00baeb

Please sign in to comment.