Skip to content

Commit

Permalink
spanconfig/store: support applying a batch of updates
Browse files Browse the repository at this point in the history
We first introduced spanconfig.StoreWriter in cockroachdb#70287. Here we extend the
interface to accept a batch of updates instead of just one.

    type StoreWriter interface {
        Apply(ctx context.Context, dryrun bool, updates ...Update) (
	    deleted []roachpb.Span, added []roachpb.SpanConfigEntry,
        )
    }

The implementation is subtle -- we're not processing one update at a
time. The semantics we're after is applying a batch of updates
atomically on a consistent snapshot of the underlying store. This comes
up in the upcoming spanconfig.Reconciler (cockroachdb#71994) -- there, following a
zone config/descriptor change, we want to update KV state in a single
request/RTT instead of an RTT per descendent table. The intended
usage is better captured in the aforementioned PR; let's now talk
about what this entails for the datastructure.

To apply a single update, we want to find all overlapping spans and
clear out just the intersections. If the update is adding a new span
config, we'll also want to add the corresponding store entry after. We
do this by deleting all overlapping spans in their entirety and
re-adding the non-overlapping segments. Pseudo-code:

  for entry in store.overlapping(update.span):
      union, intersection = union(update.span, entry), intersection(update.span, entry)
      pre  = span{union.start_key, intersection.start_key}
      post = span{intersection.end_key, union.end_key}

      delete {span=entry.span, conf=entry.conf}
      if entry.contains(update.span.start_key):
          # First entry overlapping with update.
          add {span=pre, conf=entry.conf} if non-empty
      if entry.contains(update.span.end_key):
          # Last entry overlapping with update.
          add {span=post, conf=entry.conf} if non-empty

  if adding:
      add {span=update.span, conf=update.conf} # add ourselves

When extending to a set of updates, things are more involved. Let's
assume that the updates are non-overlapping and sorted by start key. As
before, we want to delete overlapping entries in their entirety and
re-add the non-overlapping segments. With multiple updates, it's
possible that a segment being re-added will overlap another update.  If
processing one update at a time in sorted order, we want to only re-add
the gap between the consecutive updates.

    keyspace         a  b  c  d  e  f  g  h  i  j
    existing state      [--------X--------)
    updates          [--A--)           [--B--)

When processing [a,c):A, after deleting [b,h):X, it would be incorrect
to re-add [c,h):X since we're also looking to apply [g,i):B. Instead of
re-adding the trailing segment right away, we carry it forward and
process it when iterating over the second, possibly overlapping update.
In our example, when iterating over [g,i):B we can subtract the overlap
from [c,h):X and only re-add [c,g):X.

It's also possible for the segment to extend past the second update. In
the example below, when processing [d,f):B and having [b,h):X carried
over, we want to re-add [c,d):X and carry forward [f,h):X to the update
after (i.e. [g,i):C)).

    keyspace         a  b  c  d  e  f  g  h  i  j
    existing state      [--------X--------)
    updates          [--A--)  [--B--)  [--C--)

One final note: we're iterating through the updates without actually
applying any mutations. Going back to our first example, when processing
[g,i):B, retrieving the set of overlapping spans would (again) retrieve
[b,h):X -- an entry we've already encountered when processing [a,c):A.
Re-adding non-overlapping segments naively would re-add [b,g):X -- an
entry that overlaps with our last update [a,c):A. When retrieving
overlapping entries, we need to exclude any that overlap with the
segment that was carried over. Pseudo-code:

  carry-over = <empty>
  for update in updates:
      carried-over, carry-over = carry-over, <empty>
      if update.overlap(carried-over):
          # Fill in the gap between consecutive updates.
          add {span=span{carried-over.start_key, update.start_key}, conf=carried-over.conf}
          # Consider the trailing span after update; carry it forward if non-empty.
          carry-over = {span=span{update.end_key, carried-over.end_key}, conf=carried-over.conf}
      else:
          add {span=carried-over.span, conf=carried-over.conf} if non-empty

      for entry in store.overlapping(update.span):
         if entry.overlap(processed):
              continue # already processed

          union, intersection = union(update.span, entry), intersection(update.span, entry)
          pre  = span{union.start_key, intersection.start_key}
          post = span{intersection.end_key, union.end_key}

          delete {span=entry.span, conf=entry.conf}
          if entry.contains(update.span.start_key):
              # First entry overlapping with update.
              add {span=pre, conf=entry.conf} if non-empty
          if entry.contains(update.span.end_key):
              # Last entry overlapping with update.
              carry-over = {span=post, conf=entry.conf}

       if adding:
          add {span=update.span, conf=update.conf} # add ourselves

  add {span=carry-over.span, conf=carry-over.conf} if non-empty

We've extended the randomized testing suite to generate batches of
updates at a time. We've also added a few illustrated datadriven tests.

Release note: None
  • Loading branch information
irfansharif committed Nov 30, 2021
1 parent 2ac3cf4 commit ed14f74
Show file tree
Hide file tree
Showing 17 changed files with 1,019 additions and 147 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_spanconfigs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestSpanConfigUpdateAppliedToReplica(t *testing.T) {
span := repl.Desc().RSpan().AsRawSpanWithNoLocals()
conf := roachpb.SpanConfig{NumReplicas: 5, NumVoters: 3}

deleted, added := spanConfigStore.Apply(ctx, spanconfig.Update{Span: span, Config: conf}, false /* dryrun */)
deleted, added := spanConfigStore.Apply(ctx, false /* dryrun */, spanconfig.Update{Span: span, Config: conf})
require.Empty(t, deleted)
require.Len(t, added, 1)
require.True(t, added[0].Span.Equal(span))
Expand Down
10 changes: 10 additions & 0 deletions pkg/roachpb/span_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,16 @@ func (c ConstraintsConjunction) String() string {
return sb.String()
}

// Equal compares two span config entries.
func (s *SpanConfigEntry) Equal(o SpanConfigEntry) bool {
return s.Span.Equal(o.Span) && s.Config.Equal(o.Config)
}

// Empty returns true if the span config entry is empty.
func (s *SpanConfigEntry) Empty() bool {
return s.Equal(SpanConfigEntry{})
}

// TestingDefaultSpanConfig exports the default span config for testing purposes.
func TestingDefaultSpanConfig() SpanConfig {
return SpanConfig{
Expand Down
22 changes: 16 additions & 6 deletions pkg/spanconfig/spanconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,18 +192,19 @@ type Store interface {

// StoreWriter is the write-only portion of the Store interface.
type StoreWriter interface {
// Apply applies the given update[1]. It also returns the existing spans that
// were deleted and entries that were newly added to make room for the
// update. The deleted list can double as a list of overlapping spans in the
// Store, provided the update is not a no-op[2].
// Apply applies a batch of non-overlapping updates atomically[1] and
// returns (i) the existing spans that were deleted, and (ii) the entries
// that were newly added to make room for the batch. The deleted list can
// also double as a list of overlapping spans in the Store[2].
//
// Span configs are stored in non-overlapping fashion. When an update
// overlaps with existing configs, the existing configs are deleted. If the
// overlap is only partial, the non-overlapping components of the existing
// configs are re-added. If the update itself is adding an entry, that too
// is added. This is best illustrated with the following example:
//
// [--- X --) is a span with config X
// [--- X --) is a span with config X
// [xxxxxxxx) is a span being deleted
//
// Store | [--- A ----)[------------- B -----------)[---------- C -----)
// Update | [------------------ D -------------)
Expand All @@ -212,6 +213,15 @@ type StoreWriter interface {
// Added | [------------------ D -------------)[--- C -----)
// Store* | [--- A ----)[------------------ D -------------)[--- C -----)
//
// Generalizing to multiple updates:
//
// Store | [--- A ----)[------------- B -----------)[---------- C -----)
// Updates | [--- D ----) [xxxxxxxxx) [--- E ---)
// |
// Deleted | [------------- B -----------)[---------- C -----)
// Added | [--- D ----)[-- B --) [-- C -)[--- E ---)
// Store* | [--- A ----)[--- D ----)[-- B --) [-- C -)[--- E ---)
//
// TODO(irfansharif): We'll make use of the dryrun option in a future PR
// when wiring up the reconciliation job to use the KVAccessor. Since the
// KVAccessor is a "targeted" API (the spans being deleted/upserted
Expand Down Expand Up @@ -255,7 +265,7 @@ type StoreWriter interface {
// against a StoreWriter (populated using KVAccessor contents) using
// the descriptor's span config entry would return empty lists,
// indicating a no-op.
Apply(ctx context.Context, update Update, dryrun bool) (
Apply(ctx context.Context, dryrun bool, updates ...Update) (
deleted []roachpb.Span, added []roachpb.SpanConfigEntry,
)
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,10 @@ func (s *KVSubscriber) run(ctx context.Context) error {
events := buffer.Flush(ctx, frontierTS)
s.mu.Lock()
for _, ev := range events {
s.mu.internal.Apply(ctx, ev.(*bufferEvent).Update, false /* dryrun */)
// TODO(irfansharif): We can apply a batch of updates atomically
// now that the StoreWriter interface supports it; it'll let us
// avoid this mutex.
s.mu.internal.Apply(ctx, false /* dryrun */, ev.(*bufferEvent).Update)
}
handlers := s.mu.handlers
s.mu.Unlock()
Expand All @@ -353,7 +356,7 @@ func (s *KVSubscriber) run(ctx context.Context) error {
events := buffer.Flush(ctx, initialScanTS)
freshStore := spanconfigstore.New(s.fallback)
for _, ev := range events {
freshStore.Apply(ctx, ev.(*bufferEvent).Update, false /* dryrun */)
freshStore.Apply(ctx, false /* dryrun */, ev.(*bufferEvent).Update)
}

s.mu.Lock()
Expand Down
Loading

0 comments on commit ed14f74

Please sign in to comment.