Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spanconfig: introduce the spanconfig.SQLWatcher #71968

Merged
merged 1 commit into from
Nov 11, 2021

Conversation

arulajmani
Copy link
Collaborator

This patch introduces the SQLWatcher, which is intended to incrementally
watch for updates to system.zones and system.descriptors. It does so by
establishing rangefeeds at a given timestamp.

The SQLWatcher periodically invokes a callback with a list of updates
that have been observed in the window
(previous checkpointTS, checkpointTS]. The checkpointTS is also
provided to the callback.

Internally, the SQLWatcher uses a buffer to keep track of events
generated by the SQLWatcher's rangefeeds. It also tracks the individual
frontier timestamps of both the rangefeeds. This helps to maintain the
notion of the combined frontier timestamp, which is computed as the
minimum of the two. This combined frontier timestamp serves as the
checkpoint to the SQLWatcher's callback function.

This interface isn't hooked up to anything yet. It'll be used by the
sponconfig.Reconciler soon to perform partial reconciliation once
full reconciliation is done. It is intended that the IDs from the
updates produced by the SQLWatcher will be fed into the SQLTranslator.

References #67679
Carved from #69661

Release note: None

@arulajmani arulajmani requested review from a team as code owners October 26, 2021 00:15
@arulajmani arulajmani requested review from otan and removed request for a team October 26, 2021 00:15
@cockroach-teamcity
Copy link
Member

This change is Reviewable

Copy link
Contributor

@irfansharif irfansharif left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sweet! This looks pretty good. I'm marked the important comments with "[Q]", everything else is minor. I haven't closely gone through the open questions in #69661 to make sure they're addressed here, I encourage you to close that PR after convincing yourself the comments no longer apply/are done.

Reviewed 19 of 20 files at r1.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, @nvanbenschoten, and @otan)


pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 53 at r1 (raw file):

// Add adds the given entry to the buffer.
func (b *Buffer) Add(ctx context.Context, ev Event) error {

We can keep this diff but FWIW this was intentional, even if we're not currently using it, for symmetry with Flush and make it easy to add logging/debugging.


pkg/spanconfig/spanconfig.go, line 85 at r1 (raw file):

type SQLWatcher interface {
	// WatchForSQLUpdates watches for changes to zones and descriptors starting at
	// the given timestamp (exclusive) by establishing rangefeeds over

See our earlier discussion, I think this is an inclusive timestamp. Lets update the text+invariants below accordingly.


pkg/spanconfig/spanconfigsqlwatcher/buffer.go, line 45 at r1 (raw file):

// newBuffer constructs and returns a new buffer.
func newBuffer() *buffer {
	rangefeedBuffer := rangefeedbuffer.New(100 /* limit */)

Should the limit be passed in instead?


pkg/spanconfig/spanconfigsqlwatcher/buffer.go, line 59 at r1 (raw file):

// add records the given event in the buffer.
func (b *buffer) add(ev event) error {

[nit] It would be a bit clearer to bring the definitions of event and rangefeedKind into this file, where they're primarily used.


pkg/spanconfig/spanconfigsqlwatcher/buffer_test.go, line 55 at r1 (raw file):

	err = buffer.add(event{
		update: spanconfig.SQLWatcherUpdate{

[nit] Make a shorthand for constructing this update? Would let you inline a require.NoError(t, buffer.add(update(descID), ts(11)). Ditto for the array of updates: require.Equal(t, updates(1, 2), events).


pkg/spanconfig/spanconfigsqlwatcher/sql_watcher.go, line 75 at r1 (raw file):

	onEvent := func(event event) {
		err := s.buffer.add(event)
		log.Warningf(ctx, "error adding event %v: %v", event, err)

[Q] Did we mean to log unconditionally? We'd get an error here if the buffer reaches its limit without us having observed the frontier bumps from either rangefeed. That feels important to deal with. In #69614 I opted to transparently re-establish the rangefeeds, what do we want to have happen here? Does the caller want to be informed of the (unrecoverable) error through a callback, and be responsible for tear down and re-establishment? Something else?


pkg/spanconfig/spanconfigsqlwatcher/sql_watcher.go, line 87 at r1 (raw file):

	}

	for {

[Q] Is this a busy loop? We're polling pretty aggressively here. Would it be better if we made it all a bit more callback friendly by calling into the user-provided handle as part of the onFrontierAdvance callbacks? It would mean collapsing .advance and .flush into one thing:

func (s* SQLWatcher) WatchForSQLUpdates(..., handle spanconfig.SQLWatcherHandleFunc) {
  // ...
  onFrontierAdvance := func(kind rangefeedKind, timestamp hlc.Timestamp) {
    events, frontierTS := s.buffer.flush(kind, timestamp)
    if len(events) == 0 {
      return // nothing to do
    }
    handle(ctx, events)
  } 
}

I think it also lets us change the somewhat surprising fact that WatchForUpdates is a blocking call -- do we want it to be? The rangefeeds are already being run on separate threads, making it blocking forces callers to run yet another another thread.


pkg/spanconfig/spanconfigsqlwatcher/sql_watcher.go, line 157 at r1 (raw file):

		if !ev.Value.IsPresent() {
			// The descriptor was deleted.
			value = ev.PrevValue

I think it's also possible for ev.PrevValue to not be present. At which point we have nothing to do.


pkg/spanconfig/spanconfigsqlwatcher/sql_watcher_test.go, line 104 at r1 (raw file):

					ManagerDisableJobCreation: true, // disable the automatic job creation.
				},
				JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),

Do we need these testing knobs?


pkg/spanconfig/spanconfigsqlwatcher/sql_watcher_test.go, line 124 at r1 (raw file):

		require.NoError(t, err)

		prevCheckpointTS := ts.Clock().Now()

Should we define prevCheckpointTS within the goroutine using it?


pkg/spanconfig/spanconfigsqlwatcher/sql_watcher_test.go, line 153 at r1 (raw file):

			mu.Lock()
			defer mu.Unlock()
			if len(receivedIDs) == len(tc.expectedIDs) {

Is it possible for receivedIDs to have duplicate IDs? The de-duplication is guaranteed on a per-handle-invocation basis, this might be flakey as a result.


pkg/spanconfig/spanconfigsqlwatcher/sql_watcher_test.go, line 167 at r1 (raw file):

		// Stop the watcher.
		cancel()

If WatchForUpdates wasn't a blocking call, we could wire up the handlers and simply defer sqlWatcher.Close() after sqlWatcher.WatchForSQLUpdates. We wouldn't need to invoke the latter in a separate goroutine either.


pkg/spanconfig/spanconfigsqlwatcher/zones_decoder.go, line 26 at r1 (raw file):

// ZonesDecoder decodes the zone ID (primary key) of rows from system.zones.
// It's not safe for concurrent use.
type ZonesDecoder struct {

Does this need to be exported? Ditto for methods below.


pkg/spanconfig/spanconfigsqlwatcher/zones_decoder.go, line 50 at r1 (raw file):

	)
	if err != nil || !matches {
		return descpb.InvalidID, errors.Newf("failed to decode key in system.zones %v", key)

AssertionFailedf? Here and below.


pkg/spanconfig/spanconfigsqlwatcher/zones_decoder_test.go, line 79 at r1 (raw file):
[Q] Copying over a review comment from #69661:

We can do better than writing to the actual system.zones table. See TestSpanConfigDecoder from #69614; it's better to create a throwaway table with the same schema, mutate that as needed, and check your decoding routines, etc. It should let you avoid skipping over all the initial system populated data, and makes the test more future proof.

It would also let you avoid having to ManagerDisableJobCreation given you'd be operating with a clean slate.


// newBuffer constructs and returns a new buffer.
func newBuffer() *buffer {
rangefeedBuffer := rangefeedbuffer.New(100 /* limit */)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I worry this is too small. Consider a drop database which has 100 tables.


closeOnce sync.Once

descriptorsRF *rangefeed.RangeFeed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@miretskiy and I have been talking about making the rangefeed library support multiple spans. The primary sticking point has been about the initial scan. That wouldn't apply here. I think it's easy to extend the distsender to take multiple spans. Not need to move on this, just food for thought.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I remember you mentioning this way back when on Slack. I felt two rangefeeds were more natural here instead because of the decoding semantics, as decoding is specific to which rangefeed produced the event.


closeOnce sync.Once

descriptorsRF *rangefeed.RangeFeed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@miretskiy and I have been talking about making the rangefeed library support multiple spans. The primary sticking point has been about the initial scan. That wouldn't apply here. I think it's easy to extend the distsender to take multiple spans. Not need to move on this, just food for thought.

return nil
default:
}
events, combinedFrontierTS := s.buffer.flush(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like it's missing some important synchronization. This is just going to spin in a busy loop. Instead I think it'd be better to have a blocking call.

return nil
default:
}
events, combinedFrontierTS := s.buffer.flush(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like it's missing some important synchronization. This is just going to spin in a busy loop. Instead I think it'd be better to have a blocking call.

Copy link
Collaborator Author

@arulajmani arulajmani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm marked the important comments with "[Q]"

💯 💯

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @arulajmani, @irfansharif, @miretskiy, @nvanbenschoten, and @otan)


pkg/spanconfig/spanconfigsqlwatcher/sql_watcher.go, line 75 at r1 (raw file):

Previously, irfansharif (irfan sharif) wrote…

[Q] Did we mean to log unconditionally? We'd get an error here if the buffer reaches its limit without us having observed the frontier bumps from either rangefeed. That feels important to deal with. In #69614 I opted to transparently re-establish the rangefeeds, what do we want to have happen here? Does the caller want to be informed of the (unrecoverable) error through a callback, and be responsible for tear down and re-establishment? Something else?

Good catch, see below.


pkg/spanconfig/spanconfigsqlwatcher/sql_watcher.go, line 87 at r1 (raw file):

Is this a busy loop? We're polling pretty aggressively here.

Good point, I like the idea of combining the of passing in the handle function to the onFrontierAdvance callback.

I think it also lets us change the somewhat surprising fact that WatchForUpdates is a blocking call -- do we want it to be?

I was hoping to keep this call blocking so that errors can be returned to the caller. This also ties into your comment above about surfacing errors if the buffer limit is reached. How do you feel about returning errors from handle and s.buffer.add on an error channel and blocking on that instead? If there is an error it can be returned to the caller and the caller can then tear down these rangefeeds and start a full reconciliation?

Similar to the suggestion @nvanbenschoten had on a spanconfig manager PR, if the caller wants this to be a non-blocking call, it is free to do so.


pkg/spanconfig/spanconfigsqlwatcher/sql_watcher.go, line 93 at r1 (raw file):

Previously, ajwerner wrote…

This feels like it's missing some important synchronization. This is just going to spin in a busy loop. Instead I think it'd be better to have a blocking call.

@irfansharif pointed this out above as well, I'll fix it with his suggestion.


pkg/spanconfig/spanconfigsqlwatcher/sql_watcher.go, line 157 at r1 (raw file):

Previously, irfansharif (irfan sharif) wrote…

I think it's also possible for ev.PrevValue to not be present. At which point we have nothing to do.

Could you remind me how this comes up? I know we had it in our prototype, but I forget what test case triggered it -- is it when a key is deleted?


pkg/spanconfig/spanconfigsqlwatcher/sql_watcher_test.go, line 104 at r1 (raw file):

Previously, irfansharif (irfan sharif) wrote…

Do we need these testing knobs?

Yeah, it speeds up the schema changes this test does.


pkg/spanconfig/spanconfigsqlwatcher/zones_decoder_test.go, line 79 at r1 (raw file):

Previously, irfansharif (irfan sharif) wrote…

[Q] Copying over a review comment from #69661:

We can do better than writing to the actual system.zones table. See TestSpanConfigDecoder from #69614; it's better to create a throwaway table with the same schema, mutate that as needed, and check your decoding routines, etc. It should let you avoid skipping over all the initial system populated data, and makes the test more future proof.

It would also let you avoid having to ManagerDisableJobCreation given you'd be operating with a clean slate.

Oops, I missed this comment. Will address when I push it out.

Copy link
Contributor

@irfansharif irfansharif left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The complexity around error handling is worth discussing in person today. I don't have a clear picture for what to do here, though left some ideas below (mostly grafting from #69614 where the same problems exist). Would love to get other takes.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, @irfansharif, @miretskiy, @nvanbenschoten, and @otan)


pkg/spanconfig/spanconfigsqlwatcher/sql_watcher.go, line 87 at r1 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

Is this a busy loop? We're polling pretty aggressively here.

Good point, I like the idea of combining the of passing in the handle function to the onFrontierAdvance callback.

I think it also lets us change the somewhat surprising fact that WatchForUpdates is a blocking call -- do we want it to be?

I was hoping to keep this call blocking so that errors can be returned to the caller. This also ties into your comment above about surfacing errors if the buffer limit is reached. How do you feel about returning errors from handle and s.buffer.add on an error channel and blocking on that instead? If there is an error it can be returned to the caller and the caller can then tear down these rangefeeds and start a full reconciliation?

Similar to the suggestion @nvanbenschoten had on a spanconfig manager PR, if the caller wants this to be a non-blocking call, it is free to do so.

a) Do we need handle to return an error? Do we want the error from the handler to bubble all the way back up to where the watcher was started? It makes more sense to me to let the user of the library of capture the error in their own scope and teardown/re-establish the watch at that level, instead of bubbling it through the watcher.

b) This error handling complexity because of hard (unrecoverable) limits is something Andrew's complaining about elsewhere -- I need to sit with this for a bit. I think the rangefeedbuffer's API make it pretty difficult to work with; we want to be able to easily tear + restablish the rangefeed on these hard errors and there's re-usable code to be written to make this a convenient pattern.

I was imagining that these hard errors wouldn't need to be surfaced directly to the caller and that the SQLWatcher would internally re-establish rangefeeds transparently. Something like:

onEvent := func(event event) {
	s.Lock() // both rangefeed handlers serialize here
	defer s.Unlock()
	if err := s.buffer.add(event); err != nil {
		// close existing rangefeeds
		s.descriptorsRF.Close()
		s.zonesRF.Close()

		// re-establish feeds
		err := s.watchForDescriptorUpdates(ctx, timestamp, onEvent, onFrontierAdvance)
		err := s.watchForZoneConfigUpdates(ctx, timestamp, onEvent, onFrontierAdvance)
	}
}

If errors occur during re-establishment, perhaps we first retry according to some retry.Options? If we're still stuck past that point, I guess the Watcher as a whole needs to be bounced by the user. That could happen through either the error channel they could block on, or through a specific error callback. I think we'd get a lot of clarity by writing a test that exercises these hard error conditions and seeing what the resulting usage looks like. Perhaps we could state as part of the API that the handler may be invoked with descriptor IDs despite there being no change (that's already true with the full reconciliation).

Something to consider is an explicit .Start method on the SQLWatcher type (not the interface) that establishes the rangefeed, and having the WatchForUpdates method simply install the callback that's later invoked. This way only the callsite around .Start would have to deal with bouncing the SQLWatcher (basically lifecycle management); the callsite around WatchForUpdates can be blissfully unaware of things being bounced elsewhere.


pkg/spanconfig/spanconfigsqlwatcher/sql_watcher.go, line 157 at r1 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

Could you remind me how this comes up? I know we had it in our prototype, but I forget what test case triggered it -- is it when a key is deleted?

It's possible to write a KV tombstone on top of another KV tombstone. In those cases, the new and old value will be empty.


pkg/spanconfig/spanconfigsqlwatcher/sql_watcher_test.go, line 104 at r1 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

Yeah, it speeds up the schema changes this test does.

Oh, ok, worth adding as a comment.

Copy link
Collaborator Author

@arulajmani arulajmani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for brainstorming some of this stuff earlier today, I've tried my best to capture some the discussions I had around this with @irfansharif and @nvanbenschoten today around error handling/serializing calls to the handler callback. I've also tried to enumerate the guarantees this interface hopes to provide explicitly in the main package. Should be good for another look.

Dismissed @irfansharif from 5 discussions.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @irfansharif, @miretskiy, @nvanbenschoten, and @otan)


pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 53 at r1 (raw file):

Previously, irfansharif (irfan sharif) wrote…

We can keep this diff but FWIW this was intentional, even if we're not currently using it, for symmetry with Flush and make it easy to add logging/debugging.

Not a fan of unused params, whoever wants it can plumb it down.


pkg/spanconfig/spanconfig.go, line 85 at r1 (raw file):

Previously, irfansharif (irfan sharif) wrote…

See our earlier discussion, I think this is an inclusive timestamp. Lets update the text+invariants below accordingly.

Sorry for not closing this out earlier, let's move the discussion here instead. I'm not sure if the code snippet you linked there applies to us, as we're not doing an initial scan using the rangfeed client library in the SQLWatcher.

Without an initial scan the timestamp eventually makes its way down here (from what I can tell):

// If the RangeFeed is performing a catch-up scan then it will observe all
// values above args.Timestamp. If the RangeFeed is requesting previous
// values for every update then it will also need to look for the version
// proceeding each value observed during the catch-up scan timestamp. This
// means that the earliest value observed by the catch-up scan will be
// args.Timestamp.Next and the earliest timestamp used to retrieve the
// previous version of a value will be args.Timestamp, so this is the
// timestamp we must check against the GCThreshold.

My read from this is that the timestamp is exclusive, but I'd appreciate if someone more familiar with this stuff could confirm or correct me if I'm wrong.


pkg/spanconfig/spanconfigsqlwatcher/buffer.go, line 45 at r1 (raw file):

Previously, ajwerner wrote…

I worry this is too small. Consider a drop database which has 100 tables.

I hardcoded this thing just to make sure my tests pass and then forgot to come back to it.


pkg/spanconfig/spanconfigsqlwatcher/buffer.go, line 45 at r1 (raw file):

Previously, irfansharif (irfan sharif) wrote…

Should the limit be passed in instead?

Done


pkg/spanconfig/spanconfigsqlwatcher/buffer.go, line 59 at r1 (raw file):

Previously, irfansharif (irfan sharif) wrote…

[nit] It would be a bit clearer to bring the definitions of event and rangefeedKind into this file, where they're primarily used.

Done.


pkg/spanconfig/spanconfigsqlwatcher/buffer_test.go, line 55 at r1 (raw file):

Previously, irfansharif (irfan sharif) wrote…

[nit] Make a shorthand for constructing this update? Would let you inline a require.NoError(t, buffer.add(update(descID), ts(11)). Ditto for the array of updates: require.Equal(t, updates(1, 2), events).

Done.


pkg/spanconfig/spanconfigsqlwatcher/sql_watcher.go, line 87 at r1 (raw file):

Previously, irfansharif (irfan sharif) wrote…

a) Do we need handle to return an error? Do we want the error from the handler to bubble all the way back up to where the watcher was started? It makes more sense to me to let the user of the library of capture the error in their own scope and teardown/re-establish the watch at that level, instead of bubbling it through the watcher.

b) This error handling complexity because of hard (unrecoverable) limits is something Andrew's complaining about elsewhere -- I need to sit with this for a bit. I think the rangefeedbuffer's API make it pretty difficult to work with; we want to be able to easily tear + restablish the rangefeed on these hard errors and there's re-usable code to be written to make this a convenient pattern.

I was imagining that these hard errors wouldn't need to be surfaced directly to the caller and that the SQLWatcher would internally re-establish rangefeeds transparently. Something like:

onEvent := func(event event) {
	s.Lock() // both rangefeed handlers serialize here
	defer s.Unlock()
	if err := s.buffer.add(event); err != nil {
		// close existing rangefeeds
		s.descriptorsRF.Close()
		s.zonesRF.Close()

		// re-establish feeds
		err := s.watchForDescriptorUpdates(ctx, timestamp, onEvent, onFrontierAdvance)
		err := s.watchForZoneConfigUpdates(ctx, timestamp, onEvent, onFrontierAdvance)
	}
}

If errors occur during re-establishment, perhaps we first retry according to some retry.Options? If we're still stuck past that point, I guess the Watcher as a whole needs to be bounced by the user. That could happen through either the error channel they could block on, or through a specific error callback. I think we'd get a lot of clarity by writing a test that exercises these hard error conditions and seeing what the resulting usage looks like. Perhaps we could state as part of the API that the handler may be invoked with descriptor IDs despite there being no change (that's already true with the full reconciliation).

Something to consider is an explicit .Start method on the SQLWatcher type (not the interface) that establishes the rangefeed, and having the WatchForUpdates method simply install the callback that's later invoked. This way only the callsite around .Start would have to deal with bouncing the SQLWatcher (basically lifecycle management); the callsite around WatchForUpdates can be blissfully unaware of things being bounced elsewhere.

Ended up discussing this offline with @nvanbenschoten and @irfansharif (separately), so I'll try my best to summarize the discussions that lead to the current iteration of this:

The onEvent and onFrontierAdvance callbacks that are passed to both the system.zones and system.descriptors rangefeeds are as lightweight as possible -- they simply return errors/notifications that the frontier has advanced on channels. Reacting to these errors (bubbling up to the caller) and frontier advance notifications (calling handle) happens on the WatchForSQLUpdates main thread. This ends up being a clean way to provide some important guarantees to the callers of this interface (I updated my comment in spanconfig.go to enumerate these).

By making the onFrontierAdvance callback lightweight, and not calling the handler in it with some different serialization scheme, also has the added benefit of not blocking the rangefeed which is neat. @irfansharif and I talked about how this ends up being quite useful for the spanconfig.Reconciler. In particular, this allows the Reconciler to process a
batch of IDs using the spanconfig.Translator (which is RPC expensive), find diffs, issue updates using the spanconfig.KVAccessor and if all this succeeds checkpoint the job using the passed in timestamp all using the handle callback.


@irfansharif we never discussed this explicitly, but considering we're mostly aligned on the above, that precludes the suggestion about internally re-establishing rangefeeds from your last comment, right?


pkg/spanconfig/spanconfigsqlwatcher/sql_watcher.go, line 157 at r1 (raw file):

Previously, irfansharif (irfan sharif) wrote…

It's possible to write a KV tombstone on top of another KV tombstone. In those cases, the new and old value will be empty.

Done.


pkg/spanconfig/spanconfigsqlwatcher/sql_watcher_test.go, line 104 at r1 (raw file):

Previously, irfansharif (irfan sharif) wrote…

Oh, ok, worth adding as a comment.

Done.


pkg/spanconfig/spanconfigsqlwatcher/sql_watcher_test.go, line 124 at r1 (raw file):

Previously, irfansharif (irfan sharif) wrote…

Should we define prevCheckpointTS within the goroutine using it?

Done. This was slightly subtle though, because I was using the prevCheckpointTS as the start timestamp in the call to WatchForSQLUpdates. That must be defined in this goroutine so we don't miss the call to tc.stmt. I renamed the thing to say startTS to make it clearer.


pkg/spanconfig/spanconfigsqlwatcher/sql_watcher_test.go, line 153 at r1 (raw file):

Previously, irfansharif (irfan sharif) wrote…

Is it possible for receivedIDs to have duplicate IDs? The de-duplication is guaranteed on a per-handle-invocation basis, this might be flakey as a result.

That's fair, switched this to use a map so we can de-duplicate in the test as well.


pkg/spanconfig/spanconfigsqlwatcher/sql_watcher_test.go, line 167 at r1 (raw file):

Previously, irfansharif (irfan sharif) wrote…

If WatchForUpdates wasn't a blocking call, we could wire up the handlers and simply defer sqlWatcher.Close() after sqlWatcher.WatchForSQLUpdates. We wouldn't need to invoke the latter in a separate goroutine either.

We discussed this offline, I think we're aligned on having WatchForSQLUpdates being a blocking call. I've summarized the discussion elsewhere.


pkg/spanconfig/spanconfigsqlwatcher/zones_decoder.go, line 26 at r1 (raw file):

Previously, irfansharif (irfan sharif) wrote…

Does this need to be exported? Ditto for methods below.

Yeah, the zonesdecoder_test is in a test package because of import cycles. I could wrap the decoding in a testing method instead if you prefer.


pkg/spanconfig/spanconfigsqlwatcher/zones_decoder.go, line 50 at r1 (raw file):

Previously, irfansharif (irfan sharif) wrote…

AssertionFailedf? Here and below.

Done.


pkg/spanconfig/spanconfigsqlwatcher/zones_decoder_test.go, line 79 at r1 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

Oops, I missed this comment. Will address when I push it out.

This cleans up the "initial count" business really well. One thing to note is that the dummy table doesn't have two column families like system.zones (We apparently don't copy over column families when doing the whole LIKE system.zones INCLUDING ALL thing). I don't think this is a big deal for our test, considering column families don't matter for the primary key (and the zone config decoder only ever decodes the primary key). I've left a comment and changed the name to this effect though.

Copy link
Contributor

@irfansharif irfansharif left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty great, nice work! I like the concurrency model we ended up with by having the main watcher thread invoke the (potentially expensive) callbacks serially without blocking the underlying rangefeeds. All my comments are minor, though I do think we may be unintentionally blocking the rangefeeds on the main watcher thread (easy enough to fix).

type SQLWatcher interface {
// WatchForSQLUpdates watches for changes to zones and descriptors starting at
// the given timestamp (exclusive) by establishing rangefeeds over
// system.zones and system.descriptors.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid talking about implementation specific details (rangefeeds, system tables) in the interface, this stuff is better suited to the concrete impl itself. How about something to the effect of:

WatchForSQLUpdates watches for possible zone config changes, informing callers of which descriptor IDs/named zones may have changed."

// the given timestamp (exclusive) by establishing rangefeeds over
// system.zones and system.descriptors.
//
// It periodically calls the handle function with a list of SQLWatcherUpdates
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not periodic. s/handle function/handler. Instead of just saying "checkpointTS" in the sentence here, say instead that it's the timestamp at which the change was observed. Later we can specify that this timestamp can be used as a checkpoint to re-start the watcher process from. TL;DR drop the passive voice. How about the following instead for here and below?

The callback is invoked serially, with a monotonically increasing timestamp, and with updates from the last provided timestamp (exclusive) to the current one (inclusive). If the callback errors out, the Watcher as a whole is wound down. Callers are free to persist the update timestamps as checkpoints and use it to re-establish the watcher from a given point in time.


// SQLWatcherHandleFunc is the type of the handler function periodically invoked
// by the SQLWatcher.
type SQLWatcherHandleFunc func(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[mega nit] I think the commentary above would be a lot more readable if we inlined this definition. Readers are forced to jump around to identify the references you're making. I think it improves readability by a lot, at the cost of being just a little more verbose with code.


// SQLWatcherUpdate captures the ID and type of a descriptor or zone that the
// SQLWatcher has observed change.
type SQLWatcherUpdate struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of tying this naming to the SQLWatcher intimately, you could make it similar to spanconfig.Update below and call it spanconfig.DescriptorUpdate.

// SQLWatcherUpdate captures the ID and type of a descriptor or zone that the
// SQLWatcher has observed change.
type SQLWatcherUpdate struct {
// ID of the descriptor/zone that has changed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/has/may have. Ditto below.

&spanconfig.TestingKnobs{},
)

stopTraffic := make(chan struct{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we achieve the same thing with a thread concurrently that just updated the same table's zone config over and over again? Instead of using a boolean stoppedTraffic (and the supporting mutex), lets use a sync.WaitGroup instead. Instead of using a stopTraffic channel, we could use a cancellabe context and forever spin attempting to execute zone config changes. To stop the thread, we'd simply cancel the context (and discard context.Canceled errors).

}
return s.buffer.add(event)
}()
log.Warningf(ctx, "error adding event %v: %v", event, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we log even if there isn't an error?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be very noisy. Would you consider a log.EveryN?

return errors.Wrapf(err, "error establishing rangefeed over system.zones")
}
// We always tear down rangefeeds before returning to the caller.
defer s.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If one of the rangefeeds is established and not the other, are we going to forget to close one of them?

for {
select {
case <-ctx.Done():
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return ctx.Err(). For the happy path, you want to be informed of a shutdown through the stopper instead.

return err
case <-frontierAdvanced:
events, combinedFrontierTS := s.buffer.flush(ctx)
if len(events) != 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] Outdent a bit by if len(events) == 0 { continue } instead.

Copy link
Contributor

@irfansharif irfansharif left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 7 of 13 files at r2.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, @irfansharif, @miretskiy, @nvanbenschoten, and @otan)


pkg/spanconfig/spanconfig.go, line 85 at r1 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

Sorry for not closing this out earlier, let's move the discussion here instead. I'm not sure if the code snippet you linked there applies to us, as we're not doing an initial scan using the rangfeed client library in the SQLWatcher.

Without an initial scan the timestamp eventually makes its way down here (from what I can tell):

// If the RangeFeed is performing a catch-up scan then it will observe all
// values above args.Timestamp. If the RangeFeed is requesting previous
// values for every update then it will also need to look for the version
// proceeding each value observed during the catch-up scan timestamp. This
// means that the earliest value observed by the catch-up scan will be
// args.Timestamp.Next and the earliest timestamp used to retrieve the
// previous version of a value will be args.Timestamp, so this is the
// timestamp we must check against the GCThreshold.

My read from this is that the timestamp is exclusive, but I'd appreciate if someone more familiar with this stuff could confirm or correct me if I'm wrong.

Hm yea if we're not doing an initial scan, looks like it's an exclusive timestamp. Good catch.


pkg/spanconfig/spanconfigsqlwatcher/zones_decoder.go, line 26 at r1 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

Yeah, the zonesdecoder_test is in a test package because of import cycles. I could wrap the decoding in a testing method instead if you prefer.

Yes, please.


pkg/spanconfig/spanconfigsqlwatcher/zones_decoder_test.go, line 79 at r1 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

This cleans up the "initial count" business really well. One thing to note is that the dummy table doesn't have two column families like system.zones (We apparently don't copy over column families when doing the whole LIKE system.zones INCLUDING ALL thing). I don't think this is a big deal for our test, considering column families don't matter for the primary key (and the zone config decoder only ever decodes the primary key). I've left a comment and changed the name to this effect though.

Much better, thanks!

}

rangefeedEvent := event{
timestamp: timestamp,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tiny bug I was running into in #71994:

Suggested change
timestamp: timestamp,
timestamp: ev.Value.Timestamp,

If you can think up the kind of testing that would've caught this, I'll encourage you to add it.

Comment on lines 102 to 120
seenIDs := make(map[descpb.ID]struct{})
// First we determine the checkpoint timestamp, which is the minimum
// checkpoint timestamp of all event types.
combinedFrontierTS = hlc.MaxTimestamp
for _, ts := range b.mu.rangefeedFrontiers {
combinedFrontierTS.Backward(ts)
}

events := b.mu.buffer.Flush(ctx, combinedFrontierTS)

for _, ev := range events {
update := ev.(event).update
// De-duplicate IDs from the returned result.
if _, seen := seenIDs[update.ID]; !seen {
seenIDs[update.ID] = struct{}{}
updates = append(updates, update)
}
}
return updates, combinedFrontierTS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't tell you why I care about this, because it's not really reasonable. It's totally fine to allocate the memory you're allocating here and what not. Nevertheless, I feel like typing these nitpicks, so here you go:

  1. Let's only hold the mutex on the buffer while we find the timestamp:
// flush computes the combined frontier timestamp of the buffer and returns a
// the relevant events which were buffered up to that timestamp.
func  flushEvents(
	ctx context.Context,
) (updates []rangefeedbuffer.Event, combinedFrontierTS hlc.Timestamp) {
         b.mu.Lock()
	defer b.mu.Unlock()
	// First we determine the checkpoint timestamp, which is the minimum
	// checkpoint timestamp of all event types.
	combinedFrontierTS = hlc.MaxTimestamp
	for _, ts := range b.mu.rangefeedFrontiers {
		combinedFrontierTS.Backward(ts)
	}

	return b.mu.buffer.Flush(ctx, combinedFrontierTS), combinedFrontierTS
}
  1. more nit-picky, we don't need the map
// flush computes the combined frontier timestamp of the buffer and returns a
// list of unique spanconfig.SQLWatcherUpdates below this timestamp. The
// combined frontier timestamp is also returned.
func (b *buffer) flush(
	ctx context.Context,
) (updates []spanconfig.SQLWatcherUpdate, combinedFrontierTS hlc.Timestamp) {
        var events []rangefeedbuffer.Event
        events, combinedFrontierTS = b.flushEvents(ctx)
        sort.Slice(events, func(i, j int) bool {
           ei, ej := events[i].(event), events[j].(event)
           if ei.update.ID == ej.update.ID {
               return ei.timestamp.Less(ej.timestamp)
           }
           return ei.update.ID < ej.update.ID
        })
        for i, ev := range events {
           if i == 0 || events[i-1].(event).update.ID != ev.(event).update.ID {
              updates = append(updates, ev.(event).update)
           }
        }
	return updates, combinedFrontierTS
}

// De-duplicate IDs from the returned result.
if _, seen := seenIDs[update.ID]; !seen {
seenIDs[update.ID] = struct{}{}
updates = append(updates, update)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we be combining the event types in some way?


// DescriptorType of the descriptor/zone that has changed. Could be either the
// specific type or catalog.Any.
DescriptorType catalog.DescriptorType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be a bitmask so that the events corresponding to a single ID can be combined?

Copy link
Collaborator Author

@arulajmani arulajmani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TFTRs, addressed all comments, PTAL

Dismissed @irfansharif from 30 discussions.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @irfansharif, @miretskiy, @nvanbenschoten, and @otan)


pkg/server/server_sql.go, line 845 at r2 (raw file):

Previously, irfansharif (irfan sharif) wrote…

Do this within spanconfigsqlwatcher.New. That way for tests that don't care about installing specific knobs, passing in nil is perfectly allowable. Ditto below for the manager.

Sure, done.


pkg/spanconfig/spanconfig.go, line 85 at r2 (raw file):

Previously, irfansharif (irfan sharif) wrote…

Avoid talking about implementation specific details (rangefeeds, system tables) in the interface, this stuff is better suited to the concrete impl itself. How about something to the effect of:

WatchForSQLUpdates watches for possible zone config changes, informing callers of which descriptor IDs/named zones may have changed."

Done. I don't think the possible/may have phrasing here is correct -- we inform callers of definite changes.


pkg/spanconfig/spanconfig.go, line 87 at r2 (raw file):
~Done. Dropped the passive voice. I find laying out the semantics in bulleted form much more readable.

Instead of just saying "checkpointTS" in the sentence here, say instead that it's the timestamp at which the change was observed.

That's not really true, the change could have been observed at a timestamp < checkpointTS.


pkg/spanconfig/spanconfig.go, line 107 at r2 (raw file):

Previously, irfansharif (irfan sharif) wrote…

[mega nit] I think the commentary above would be a lot more readable if we inlined this definition. Readers are forced to jump around to identify the references you're making. I think it improves readability by a lot, at the cost of being just a little more verbose with code.

Done.


pkg/spanconfig/spanconfig.go, line 113 at r2 (raw file):

Previously, irfansharif (irfan sharif) wrote…

Instead of tying this naming to the SQLWatcher intimately, you could make it similar to spanconfig.Update below and call it spanconfig.DescriptorUpdate.

Done.


pkg/spanconfig/spanconfig.go, line 114 at r2 (raw file):

Previously, irfansharif (irfan sharif) wrote…

s/has/may have. Ditto below.

See comments elsewhere about the phrasing here.


pkg/spanconfig/spanconfig.go, line 118 at r2 (raw file):

Previously, irfansharif (irfan sharif) wrote…

"or catalog.Any if the specific information is unavailable."

Done.


pkg/spanconfig/spanconfig.go, line 119 at r2 (raw file):

Previously, ajwerner wrote…

should this be a bitmask so that the events corresponding to a single ID can be combined?

You're right, we might want to combine these when de-duplicating in the buffer. I don't think we need a bitmask, because it can only ever be "Any" or one of the concrete descriptor types. Using a regular old enum seems easier.


pkg/spanconfig/testing_knobs.go, line 37 at r2 (raw file):

The way this interceptor is wired up, you'd have to trigger another event in your test for it to run into an error.

Checkout TestSQLWatcherOnEventError, which uses this thing -- triggering an event that makes the Watcher fail on an event is intentional here. The test ensures that the handler doesn't run if there is an error in the Watcher. If you don't trigger an event then the handler has no reason to run.


pkg/spanconfig/spanconfigsqlwatcher/buffer.go, line 25 at r2 (raw file):

Previously, irfansharif (irfan sharif) wrote…

s/All methods ... concurrently/It's safe for concurrent use.

Done.


pkg/spanconfig/spanconfigsqlwatcher/buffer.go, line 117 at r2 (raw file):

Previously, ajwerner wrote…

should we be combining the event types in some way?

Done.


pkg/spanconfig/spanconfigsqlwatcher/buffer.go, line 120 at r2 (raw file):

Previously, ajwerner wrote…

I can't tell you why I care about this, because it's not really reasonable. It's totally fine to allocate the memory you're allocating here and what not. Nevertheless, I feel like typing these nitpicks, so here you go:

  1. Let's only hold the mutex on the buffer while we find the timestamp:
// flush computes the combined frontier timestamp of the buffer and returns a
// the relevant events which were buffered up to that timestamp.
func  flushEvents(
	ctx context.Context,
) (updates []rangefeedbuffer.Event, combinedFrontierTS hlc.Timestamp) {
         b.mu.Lock()
	defer b.mu.Unlock()
	// First we determine the checkpoint timestamp, which is the minimum
	// checkpoint timestamp of all event types.
	combinedFrontierTS = hlc.MaxTimestamp
	for _, ts := range b.mu.rangefeedFrontiers {
		combinedFrontierTS.Backward(ts)
	}

	return b.mu.buffer.Flush(ctx, combinedFrontierTS), combinedFrontierTS
}
  1. more nit-picky, we don't need the map
// flush computes the combined frontier timestamp of the buffer and returns a
// list of unique spanconfig.SQLWatcherUpdates below this timestamp. The
// combined frontier timestamp is also returned.
func (b *buffer) flush(
	ctx context.Context,
) (updates []spanconfig.SQLWatcherUpdate, combinedFrontierTS hlc.Timestamp) {
        var events []rangefeedbuffer.Event
        events, combinedFrontierTS = b.flushEvents(ctx)
        sort.Slice(events, func(i, j int) bool {
           ei, ej := events[i].(event), events[j].(event)
           if ei.update.ID == ej.update.ID {
               return ei.timestamp.Less(ej.timestamp)
           }
           return ei.update.ID < ej.update.ID
        })
        for i, ev := range events {
           if i == 0 || events[i-1].(event).update.ID != ev.(event).update.ID {
              updates = append(updates, ev.(event).update)
           }
        }
	return updates, combinedFrontierTS
}

Cool, done!


pkg/spanconfig/spanconfigsqlwatcher/buffer_test.go, line 27 at r2 (raw file):

Previously, irfansharif (irfan sharif) wrote…

This test is a pretty great candidate for a datadriven one given the very many mutations that are happening. Want to do it as part of this PR? (Also happy to take a TODO if we can land sooner.)

We talked about this offline, using a datadriven test here doesn't get us too much.


pkg/spanconfig/spanconfigsqlwatcher/buffer_test.go, line 43 at r2 (raw file):

Previously, irfansharif (irfan sharif) wrote…

Take in a variadic set of integers instead to avoid needing to cast to an integer slice at all callsites.

Done.


pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher_test.go, line 91 at r2 (raw file):

Previously, irfansharif (irfan sharif) wrote…

[nit] Capitalize "One".

Done.


pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher_test.go, line 128 at r2 (raw file):

Previously, irfansharif (irfan sharif) wrote…

Heh, did you mean to add a comment?

Heh, yeah, wanted to add some words about why we need to save the startTS in this thread. Done.


pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher_test.go, line 132 at r2 (raw file):

Previously, irfansharif (irfan sharif) wrote…

[nit] Anonymous structs can be your friend:

mu struct {
  synctutil.Mutex
  receivedIDs map[descpb.ID]struct{}
}

Then you'd have:

mu.Lock()
mu.receivedIDs = ...

Good call!


pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher_test.go, line 135 at r2 (raw file):

Previously, irfansharif (irfan sharif) wrote…

Maybe call this "watcherCtx, watcherCancel". When tearing down the watcher at the end, use a sync.WaitGroup to wait on for this goroutine to actually finish. I think in this current form, though unlikely, it's possible to trip up the leaktest detector.

Done.


pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher_test.go, line 217 at r2 (raw file):

Previously, irfansharif (irfan sharif) wrote…

Should this be a t.Fatal/Error instead?

Done.


pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher_test.go, line 253 at r2 (raw file):

Previously, irfansharif (irfan sharif) wrote…

Would we achieve the same thing with a thread concurrently that just updated the same table's zone config over and over again? Instead of using a boolean stoppedTraffic (and the supporting mutex), lets use a sync.WaitGroup instead. Instead of using a stopTraffic channel, we could use a cancellabe context and forever spin attempting to execute zone config changes. To stop the thread, we'd simply cancel the context (and discard context.Canceled errors).

Switched to a WaitGroup, I find a channel to stop the background thread slightly more readable than using two separate contexts in the test.


pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go, line 50 at r2 (raw file):

Previously, irfansharif (irfan sharif) wrote…

Another thing to do is to pick a memory size (X << 20 == X MB), guesstimate a row size (say, 5 KB), and combine the two to figure out how many entries your buffer can take.

We're only ever storing a spanconfig.DescriptorUpdate in the buffer, which is 2 integers (8 bytes?)


pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go, line 97 at r2 (raw file):

Previously, irfansharif (irfan sharif) wrote…

Are we blocking the rangefeed from advancing if the main thread that's consuming from these channels (and invoking expensive RPC-issuing handlers) are busy elsewhere? Ditto for our handling of errCh below.

Yeah, I think you're right. I'm adding an arbitrary buffer to these channels for now, but Im curious if there's a better way here.


pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go, line 109 at r2 (raw file):

Previously, irfansharif (irfan sharif) wrote…

Will we log even if there isn't an error?

Done.


pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go, line 127 at r2 (raw file):

Previously, irfansharif (irfan sharif) wrote…

If one of the rangefeeds is established and not the other, are we going to forget to close one of them?

Done.


pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go, line 132 at r2 (raw file):

Previously, irfansharif (irfan sharif) wrote…

Return ctx.Err(). For the happy path, you want to be informed of a shutdown through the stopper instead.

Done.


pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go, line 137 at r2 (raw file):

Previously, irfansharif (irfan sharif) wrote…

[nit] Outdent a bit by if len(events) == 0 { continue } instead.

Done.


pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go, line 215 at r2 (raw file):

Previously, irfansharif (irfan sharif) wrote…

Tiny bug I was running into in #71994:

			timestamp: ev.Value.Timestamp,

If you can think up the kind of testing that would've caught this, I'll encourage you to add it.

Done.


pkg/spanconfig/spanconfigsqlwatcher/zones_decoder.go, line 26 at r1 (raw file):

Previously, irfansharif (irfan sharif) wrote…

Yes, please.

Done.

Copy link
Contributor

@ajwerner ajwerner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is getting close. Did I miss testing regarding the overflow of the buffer?

}
return s.buffer.add(event)
}()
log.Warningf(ctx, "error adding event %v: %v", event, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be very noisy. Would you consider a log.EveryN?

WatchForSQLUpdates(
ctx context.Context,
startTS hlc.Timestamp,
handler func(ctx context.Context, updates []DescriptorUpdate, checkpointTS hlc.Timestamp) error,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: give this a type

descriptorsRangefeed

// numRangefeeds should be listed last.
numRangefeeds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uber-nit: numRangefeeds int = iota so you don't use it as a rangefeedKind

// sqlWatcher's buffer. We store spanconfig.DescriptorUpdate's inside the buffer
// which is comprised of 2 integers, so we estimate 8 bytes as the size here.
// We use this value to calculate the cap for the buffer.
const sqlWatcherBufferEntrySize = 8 // 8 bytes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't each integer 8 bytes? what about unsafe.Sizeof(spanconfig.DescriptorUpdate{})? Also, is there boxing?

}
}

// WatchForSQLUpdates is part of the spanconfig.SQLWatcher interface.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface smells a bit. It's not obvious that you cannot call this more than once, but you definitely cannot. Maybe we should have New return something called like a Factory or, perhaps, just stash these dependencies in a struct and that have WatchForSQLUpdates construct a new struct which is currently SQLWatcher.

// These guarantees mean that users of this interface are free to persist the
// checkpointTS and later use it to re-establish the SQLWatcher without
// missing any updates.
WatchForSQLUpdates(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should do more to explain the semantics of the memory errors. The caller is going to need to deal with them and restart. That's now part of the contract.

Copy link
Contributor

@irfansharif irfansharif left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 7 of 16 files at r3.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, @irfansharif, @miretskiy, @nvanbenschoten, and @otan)


pkg/spanconfig/spanconfig.go, line 85 at r2 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

Done. I don't think the possible/may have phrasing here is correct -- we inform callers of definite changes.

If alter table configure zone using the same parameters are already applied (either through inheritance, or set explicitly), that's what I was getting with "may have changed"; it's worth being pedantic IMO.


pkg/spanconfig/spanconfig.go, line 87 at r2 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

~Done. Dropped the passive voice. I find laying out the semantics in bulleted form much more readable.

Instead of just saying "checkpointTS" in the sentence here, say instead that it's the timestamp at which the change was observed.

That's not really true, the change could have been observed at a timestamp < checkpointTS.

For (4), instead of "invocation" say handler?


pkg/spanconfig/spanconfig.go, line 142 at r3 (raw file):

Previously, ajwerner wrote…

nit: give this a type

It was my suggestion to skip the intermediate type to help with the readability of surrounding comment referring to the symbols/args in the interface comment. I think indenting would help here though.

  handler func(
    ctx context.Context,
    updates []DescriptorUpdate,
    checkpointTS hlc.Timestamp,
  ) error

pkg/spanconfig/testing_knobs.go, line 37 at r2 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

The way this interceptor is wired up, you'd have to trigger another event in your test for it to run into an error.

Checkout TestSQLWatcherOnEventError, which uses this thing -- triggering an event that makes the Watcher fail on an event is intentional here. The test ensures that the handler doesn't run if there is an error in the Watcher. If you don't trigger an event then the handler has no reason to run.

I understand what TestSQLWatcherOnEventError is doing. I'm saying this testing knob construction does not allow you to insert an event part way through the run (which is possible if the buffer overflows). You're simply creating a watcher that will error out before any handler event is called.

See what we did for s.knobs.KVSubscriberErrorInjectionCh, that's what I think we should do here too.


pkg/spanconfig/types.go, line 49 at r3 (raw file):

	// DescriptorType of the descriptor/zone that has changed. Could be either the
	// specific type or Any if no information is available.

"catalog.Any" would be clearer IMO.


pkg/spanconfig/spanconfigsqlwatcher/sql_watcher.go, line 87 at r1 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

Ended up discussing this offline with @nvanbenschoten and @irfansharif (separately), so I'll try my best to summarize the discussions that lead to the current iteration of this:

The onEvent and onFrontierAdvance callbacks that are passed to both the system.zones and system.descriptors rangefeeds are as lightweight as possible -- they simply return errors/notifications that the frontier has advanced on channels. Reacting to these errors (bubbling up to the caller) and frontier advance notifications (calling handle) happens on the WatchForSQLUpdates main thread. This ends up being a clean way to provide some important guarantees to the callers of this interface (I updated my comment in spanconfig.go to enumerate these).

By making the onFrontierAdvance callback lightweight, and not calling the handler in it with some different serialization scheme, also has the added benefit of not blocking the rangefeed which is neat. @irfansharif and I talked about how this ends up being quite useful for the spanconfig.Reconciler. In particular, this allows the Reconciler to process a
batch of IDs using the spanconfig.Translator (which is RPC expensive), find diffs, issue updates using the spanconfig.KVAccessor and if all this succeeds checkpoint the job using the passed in timestamp all using the handle callback.


@irfansharif we never discussed this explicitly, but considering we're mostly aligned on the above, that precludes the suggestion about internally re-establishing rangefeeds from your last comment, right?

Yea, not automagically re-establishing rangefeeds internally seems sound. Closing this comment thread in favor of another above to discuss the behaviour of the main thread supporting a blocking/long-running handler. I don't think the latter is necessary, if the spanconfig.Reconciler wants to do something expensive, it seems fair game to have that component worry about spinning off another thread for that purpose.


pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go, line 50 at r2 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

We're only ever storing a spanconfig.DescriptorUpdate in the buffer, which is 2 integers (8 bytes?)

Aren't we also buffering the timestamp each event is tagged with?


pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go, line 97 at r2 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

Yeah, I think you're right. I'm adding an arbitrary buffer to these channels for now, but Im curious if there's a better way here.

Instead of having the main thread invoke a handler that's possibly long running, it's fine to have the users (spanconfig.Reconciler) deal with complexity of wanting to do long running tasks in a separate thread -- forcing a short-running handler thread to collect whatever updates it needs to to power the former. Using an arbitrary buffer size is unsatisfying -- we're effectively limiting how long running the handler can be before the rangefeeds get blocked.


pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go, line 127 at r2 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

Done.

It would be a bit more readable to return the rangefeed type from these watchFor... methods, and defer a close right here.

zonesRF, err := s.watchFor(...)
if err != nil {
  ...
}
defer zonesRF.Close()

Right now you'd have to jump across three method bodies to convince yourself teardown is happening correctly. Also, do you need to attach the rangefeed closure to the embedded stopper if we're closing it ourselves before returning from the blocking call?

Copy link
Contributor

@irfansharif irfansharif left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, @irfansharif, @miretskiy, @nvanbenschoten, and @otan)


pkg/spanconfig/types.go, line 49 at r3 (raw file):

Previously, irfansharif (irfan sharif) wrote…

"catalog.Any" would be clearer IMO.

Oh, I didn't see we added new symbols here. I don't think we should, let's please re-use the authoritative catalog symbols. These symbols don't feel like they belong in this package either (spanconfig.{Table,Schema,Database,Type,Any sounds misplaced). If the intent is just to combine things, we could just use the catalog symbols, right?

Please move the DescriptorUpdate and Update symbols back into spanconfig.go.


pkg/spanconfig/spanconfigsqlwatcher/buffer.go, line 127 at r3 (raw file):

	for i, ev := range events {
		if i == 0 || events[i-1].(event).update.ID != ev.(event).update.ID {
			updates = append(updates, ev.(event).update)

[nit] Use continue in this block to outdent whatever's happening below. This Combine thing does not feel like it belongs as a base package exported symbol -- it could be inlined here and just use the catalog symbols instead.

Copy link
Collaborator Author

@arulajmani arulajmani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did I miss testing regarding the overflow of the buffer?

I have this TestSQLWatcherOnEventError which exercises the same codepath without actually making the buffer overflow. Were you looking for something more?

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @irfansharif, @miretskiy, @nvanbenschoten, and @otan)


pkg/spanconfig/spanconfig.go, line 85 at r2 (raw file):

Previously, irfansharif (irfan sharif) wrote…

If alter table configure zone using the same parameters are already applied (either through inheritance, or set explicitly), that's what I was getting with "may have changed"; it's worth being pedantic IMO.

Switched "change" to "updated" in that case.


pkg/spanconfig/spanconfig.go, line 139 at r3 (raw file):

Previously, ajwerner wrote…

I think you should do more to explain the semantics of the memory errors. The caller is going to need to deal with them and restart. That's now part of the contract.

Done.


pkg/spanconfig/spanconfig.go, line 142 at r3 (raw file):

Previously, irfansharif (irfan sharif) wrote…

It was my suggestion to skip the intermediate type to help with the readability of surrounding comment referring to the symbols/args in the interface comment. I think indenting would help here though.

  handler func(
    ctx context.Context,
    updates []DescriptorUpdate,
    checkpointTS hlc.Timestamp,
  ) error

Mild preference for not indenting if we're keeping things inline.


pkg/spanconfig/testing_knobs.go, line 37 at r2 (raw file):

Previously, irfansharif (irfan sharif) wrote…

I understand what TestSQLWatcherOnEventError is doing. I'm saying this testing knob construction does not allow you to insert an event part way through the run (which is possible if the buffer overflows). You're simply creating a watcher that will error out before any handler event is called.

See what we did for s.knobs.KVSubscriberErrorInjectionCh, that's what I think we should do here too.

I gave this a shot after our discussion yesterday, but I don't see a good reason to make the switch. It's much easier to work with a static error being returned (possibly with some sort of filter/condition in the future) than trying to do things by injecting a channel here. Injecting a function allows me to do things without needing a separate gorotuine, as the call to WatchForSQLUpdates is blocking, as opposed to what you had in the KVSubscriber tests.


pkg/spanconfig/spanconfigsqlwatcher/buffer.go, line 69 at r3 (raw file):

Previously, ajwerner wrote…

uber-nit: numRangefeeds int = iota so you don't use it as a rangefeedKind

Done.


pkg/spanconfig/spanconfigsqlwatcher/buffer.go, line 127 at r3 (raw file):

Previously, irfansharif (irfan sharif) wrote…

[nit] Use continue in this block to outdent whatever's happening below. This Combine thing does not feel like it belongs as a base package exported symbol -- it could be inlined here and just use the catalog symbols instead.

Done.


pkg/spanconfig/types.go, line 49 at r3 (raw file):

Previously, irfansharif (irfan sharif) wrote…

Oh, I didn't see we added new symbols here. I don't think we should, let's please re-use the authoritative catalog symbols. These symbols don't feel like they belong in this package either (spanconfig.{Table,Schema,Database,Type,Any sounds misplaced). If the intent is just to combine things, we could just use the catalog symbols, right?

Please move the DescriptorUpdate and Update symbols back into spanconfig.go.

Done.


pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go, line 127 at r2 (raw file):

Previously, irfansharif (irfan sharif) wrote…

It would be a bit more readable to return the rangefeed type from these watchFor... methods, and defer a close right here.

zonesRF, err := s.watchFor(...)
if err != nil {
  ...
}
defer zonesRF.Close()

Right now you'd have to jump across three method bodies to convince yourself teardown is happening correctly. Also, do you need to attach the rangefeed closure to the embedded stopper if we're closing it ourselves before returning from the blocking call?

Done.


pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go, line 52 at r3 (raw file):

Previously, ajwerner wrote…

isn't each integer 8 bytes? what about unsafe.Sizeof(spanconfig.DescriptorUpdate{})? Also, is there boxing?

Done.


pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go, line 76 at r3 (raw file):

Previously, ajwerner wrote…

This interface smells a bit. It's not obvious that you cannot call this more than once, but you definitely cannot. Maybe we should have New return something called like a Factory or, perhaps, just stash these dependencies in a struct and that have WatchForSQLUpdates construct a new struct which is currently SQLWatcher.

Switched to using a Factory here, good catch!

Copy link
Contributor

@ajwerner ajwerner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

Reviewed 8 of 13 files at r4, 1 of 1 files at r5, all commit messages.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, @irfansharif, @nvanbenschoten, and @otan)


pkg/spanconfig/spanconfig.go, line 135 at r5 (raw file):

Quoted 6 lines of code…
	// 1. Calls to the handler are serial.
	// 2. The timestamp supplied to the handler is monotonically increasing.
	// 3. The list of DescriptorUpdates supplied to handler includes all events
	// in the window (prevInvocationCheckpointTS, checkpointTS].
	// 4. No further calls to the handler are made if a call to the handler
	// returns an error.

nit: indent these by two spaces so they appear better in the godoc.


pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go, line 91 at r5 (raw file):

// sqlWatcherBufferEntrySize is the size of an entry stored in the sqlWatcher's
// buffer. We use this value to calculate the buffer capacity.
const sqlWatcherBufferEntrySize = int64(unsafe.Sizeof(event{}))

nit int64(unsafe.Sizeof(event{}) + unsafe.SizeOf(rangefeedbuffer.Event(nil)) to pay for the interface box. It'll be 16, as it is for all interface headers.

Copy link
Contributor

@irfansharif irfansharif left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! :lgtm_strong:

Reviewed 7 of 13 files at r4, all commit messages.
Reviewable status: :shipit: complete! 2 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, @irfansharif, @nvanbenschoten, and @otan)


pkg/spanconfig/spanconfig.go, line 166 at r5 (raw file):

// SQLWatcherFactory is used to construct SQLWatchers.
type SQLWatcherFactory interface {

Have a slight preference to have this interface avoid the intermediate step of returning a SQLWatcher that then needs to be invoked, in favor of making the SQLWatcher itself a factory that encloses all the dependencies and keeps an internal stateful struct (what today is *SQLWatcher). Something like:

// physicalFeedFactory constructs a physical feed which writes into sink and
// runs until the group's context expires.
type physicalFeedFactory interface {
Run(ctx context.Context, sink kvevent.Writer, cfg physicalConfig) error
}

Would reduce the symbols in this package. Not worth doing in this PR.


pkg/spanconfig/testing_knobs.go, line 37 at r2 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

I gave this a shot after our discussion yesterday, but I don't see a good reason to make the switch. It's much easier to work with a static error being returned (possibly with some sort of filter/condition in the future) than trying to do things by injecting a channel here. Injecting a function allows me to do things without needing a separate gorotuine, as the call to WatchForSQLUpdates is blocking, as opposed to what you had in the KVSubscriber tests.

Don't want to hold up this PR anymore, but perhaps a better test would just be setting up the watcher with a low buffer size (it seems to be parametrized with the size) and seeing it fail, rather than injecting an error.


pkg/spanconfig/spanconfigsqlwatcher/buffer.go, line 127 at r3 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

Done.

Did you mean to continue + indent?


pkg/spanconfig/spanconfigsqlwatcher/buffer.go, line 141 at r5 (raw file):

}

// Combine takes two catalog.DescriptorTypes and combines them according to the

Lowercase "combine".


pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go, line 127 at r2 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

Done.

Why even attach descriptorsRF to the surrounding struct? Do we expect it to use it elsewhere? Seems not -- returning it from the watchFor... methods seems simpler.

This patch introduces the SQLWatcher, which is intended to incrementally
watch for updates to system.zones and system.descriptors. It does so by
establishing rangefeeds at a given timestamp.

The SQLWatcher invokes a callback from time to time  with a list of
updates that have been observed in the window
(previous checkpointTS, checkpointTS]. The checkpointTS is also
provided to the callback.

Internally, the SQLWatcher uses a buffer to keep track of events
generated by the SQLWatcher's rangefeeds. It also tracks the individual
frontier timestamps of both the rangefeeds. This helps to maintain the
notion of the combined frontier timestamp, which is computed as the
minimum of the two. This combined frontier timestamp serves as the
checkpoint to the SQLWatcher's callback function.

This interface isn't hooked up to anything yet. It'll be used by the
sponconfig.Reconciler soon to perform partial reconciliation once
full reconciliation is done. It is intended that the IDs from the
updates produced by the SQLWatcher will be fed into the SQLTranslator.

References cockroachdb#67679
Carved from cockroachdb#69661

Release note: None
Copy link
Collaborator Author

@arulajmani arulajmani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TFTRs!

bors r=irfansharif,ajwerner

Dismissed @irfansharif from 4 discussions.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @irfansharif, @nvanbenschoten, and @otan)


pkg/spanconfig/spanconfig.go, line 166 at r5 (raw file):

Previously, irfansharif (irfan sharif) wrote…

Have a slight preference to have this interface avoid the intermediate step of returning a SQLWatcher that then needs to be invoked, in favor of making the SQLWatcher itself a factory that encloses all the dependencies and keeps an internal stateful struct (what today is *SQLWatcher). Something like:

// physicalFeedFactory constructs a physical feed which writes into sink and
// runs until the group's context expires.
type physicalFeedFactory interface {
Run(ctx context.Context, sink kvevent.Writer, cfg physicalConfig) error
}

Would reduce the symbols in this package. Not worth doing in this PR.

I had a slight preference to make these steps explicit, but yeah, it does add another interface in here. We can see how things look once the Reconciler is hooked up.


pkg/spanconfig/spanconfigsqlwatcher/buffer.go, line 127 at r3 (raw file):

Previously, irfansharif (irfan sharif) wrote…

Did you mean to continue + indent?

Done.


pkg/spanconfig/spanconfigsqlwatcher/buffer.go, line 141 at r5 (raw file):

Previously, irfansharif (irfan sharif) wrote…

Lowercase "combine".

copy pasta


pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go, line 127 at r2 (raw file):

Previously, irfansharif (irfan sharif) wrote…

Why even attach descriptorsRF to the surrounding struct? Do we expect it to use it elsewhere? Seems not -- returning it from the watchFor... methods seems simpler.

Done

@craig
Copy link
Contributor

craig bot commented Nov 11, 2021

Build failed (retrying...):

@craig
Copy link
Contributor

craig bot commented Nov 11, 2021

Build succeeded:

@craig craig bot merged commit 81519ab into cockroachdb:master Nov 11, 2021
@irfansharif
Copy link
Contributor

🎉🎉

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants