Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spanconfig: introduce spanconfig.StoreWriter (and its impl) #70287

Merged
merged 1 commit into from
Oct 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ ALL_TESTS = [
"//pkg/settings:settings_test",
"//pkg/spanconfig/spanconfigkvaccessor:spanconfigkvaccessor_test",
"//pkg/spanconfig/spanconfigmanager:spanconfigmanager_test",
"//pkg/spanconfig/spanconfigstore:spanconfigstore_test",
"//pkg/sql/catalog/catalogkeys:catalogkeys_test",
"//pkg/sql/catalog/catalogkv:catalogkv_test",
"//pkg/sql/catalog/catformat:catformat_test",
Expand Down
5 changes: 5 additions & 0 deletions pkg/config/zonepb/zone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1429,3 +1429,8 @@ func TestZoneConfigToSpanConfigConversion(t *testing.T) {
require.Equal(t, tc.expectSpanConfig, spanConfig)
}
}

func TestDefaultZoneAndSpanConfigs(t *testing.T) {
converted := DefaultZoneConfigRef().AsSpanConfig()
require.True(t, converted.Equal(roachpb.TestingDefaultSpanConfig()))
}
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func TestRangeSplitsWithWritePressure(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
// Override default span config.
cfg := kvserver.TestingDefaultSpanConfig()
cfg := roachpb.TestingDefaultSpanConfig()
cfg.RangeMaxBytes = 1 << 18

// Manually create the local test cluster so that the split queue
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/queue_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestBaseQueueConcurrent(t *testing.T) {
cfg: StoreConfig{
Clock: hlc.NewClock(hlc.UnixNano, time.Second),
AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()},
DefaultSpanConfig: TestingDefaultSpanConfig(),
DefaultSpanConfig: roachpb.TestingDefaultSpanConfig(),
},
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/split_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestSplitQueueShouldQueue(t *testing.T) {
repl.mu.Lock()
repl.mu.state.Stats = &enginepb.MVCCStats{KeyBytes: test.bytes}
repl.mu.Unlock()
conf := TestingDefaultSpanConfig()
conf := roachpb.TestingDefaultSpanConfig()
conf.RangeMaxBytes = test.maxBytes
repl.SetSpanConfig(conf)

Expand Down
5 changes: 0 additions & 5 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2972,8 +2972,3 @@ func min(a, b int) int {
}
return b
}

// TestingDefaultSpanConfig exposes the default span config for testing purposes.
func TestingDefaultSpanConfig() roachpb.SpanConfig {
return zonepb.DefaultZoneConfigRef().AsSpanConfig()
}
20 changes: 20 additions & 0 deletions pkg/roachpb/span_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,23 @@ func (c ConstraintsConjunction) String() string {
}
return sb.String()
}

// TestingDefaultSpanConfig exports the default span config for testing purposes.
func TestingDefaultSpanConfig() SpanConfig {
return SpanConfig{
RangeMinBytes: 128 << 20, // 128 MB
RangeMaxBytes: 512 << 20, // 512 MB
// Use 25 hours instead of the previous 24 to make users successful by
// default. Users desiring to take incremental backups every 24h may
// incorrectly assume that the previous default 24h was sufficient to do
// that. But the equation for incremental backups is:
// GC TTLSeconds >= (desired backup interval) (time to perform incremental backup)
// We think most new users' incremental backups will complete within an
// hour, and larger clusters will have more experienced operators and will
// understand how to change these settings if needed.
GCPolicy: GCPolicy{
TTLSeconds: 25 * 60 * 60,
},
NumReplicas: 3,
}
}
108 changes: 97 additions & 11 deletions pkg/spanconfig/spanconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,110 @@ type ReconciliationDependencies interface {
// through the KVAccessor.
}

// Store is a data structure used to store span configs.
// Store is a data structure used to store spans and their corresponding
// configs.
type Store interface {
StoreWriter
StoreReader

// TODO(irfansharif): We'll want to add a StoreWriter interface here once we
// implement a data structure to store span configs. We expect this data
// structure to be used in KV to eventually replace the use of the
// gossip-backed system config span.
}

// Silence the unused linter.
var _ Store = nil
// StoreWriter is the write-only portion of the Store interface.
type StoreWriter interface {
// Apply applies the given update[1]. It also returns the existing spans that
// were deleted and entries that were newly added to make room for the
// update. The deleted list can double as a list of overlapping spans in the
// Store, provided the update is not a no-op[2].
//
// Span configs are stored in non-overlapping fashion. When an update
// overlaps with existing configs, the existing configs are deleted. If the
// overlap is only partial, the non-overlapping components of the existing
// configs are re-added. If the update itself is adding an entry, that too
// is added. This is best illustrated with the following example:
//
// [--- X --) is a span with config X
//
// Store | [--- A ----)[------------- B -----------)[---------- C -----)
// Update | [------------------ D -------------)
// |
// Deleted | [------------- B -----------)[---------- C -----)
// Added | [------------------ D -------------)[--- C -----)
// Store* | [--- A ----)[------------------ D -------------)[--- C -----)
//
// TODO(irfansharif): We'll make use of the dryrun option in a future PR
// when wiring up the reconciliation job to use the KVAccessor. Since the
// KVAccessor is a "targeted" API (the spans being deleted/upserted
// have to already be present with the exact same bounds), we'll dryrun an
// update against a StoreWriter (pre-populated with the entries present in
// KV) to generate the targeted deletes and upserts we'd need to issue.
// After successfully installing them in KV, we can keep our StoreWriter
// up-to-date by actually applying the update.
//
// There's also the question of a "full reconciliation pass". We'll be
// generating updates reactively listening in on changes to
// system.{descriptor,zones} (see SQLWatcher). It's possible then for a
// suspended tenant's table history to be GC-ed away and for its SQLWatcher
// to never detect that a certain table/index/partition has been deleted.
// Left as is, this results in us never issuing a corresponding span config
// deletion request. We'd be leaving a bunch of delete-able span configs
// lying around, and a bunch of empty ranges as a result of those. A "full
// reconciliation pass" is our attempt to find all these extraneous entries
// in KV and to delete them.
//
// We can use a StoreWriter here too (one that's pre-populated with the
// contents of KVAccessor, as before). We'd iterate through all descriptors,
// find all overlapping spans, issue KVAccessor deletes for them, and upsert
// the descriptor's span config[3]. As for the StoreWriter itself, we'd
// simply delete the overlapping entries. After iterating through all the
// descriptors, we'd finally issue KVAccessor deletes for all span configs
// still remaining in the Store.
//
// TODO(irfansharif): The descriptions above presume holding the entire set
// of span configs in memory, but we could break away from that by adding
// pagination + retrieval limit to the GetSpanConfigEntriesFor API. We'd
// then paginate through chunks of the keyspace at a time, do a "full
// reconciliation pass" over just that chunk, and continue.
//
// [1]: Unless dryrun is true. We'll still generate the same {deleted,added}
// lists.
// [2]: We could alternatively expose a GetAllOverlapping() API to make
// things clearer.
// [3]: We could skip the delete + upsert dance if the descriptor's exact
// span config entry already exists in KV. Using Apply (dryrun=true)
// against a StoreWriter (populated using KVAccessor contents) using
// the descriptor's span config entry would return empty lists,
// indicating a no-op.
Apply(ctx context.Context, update Update, dryrun bool) (
deleted []roachpb.Span, added []roachpb.SpanConfigEntry,
)
}

// StoreReader is the read-only portion of the Store interface. It's an adaptor
// interface implemented by config.SystemConfig to let us later swap out the
// source with one backed by a view of `system.span_configurations`.
// StoreReader is the read-only portion of the Store interface. It doubles as an
// adaptor interface for config.SystemConfig.
type StoreReader interface {
NeedsSplit(ctx context.Context, start, end roachpb.RKey) bool
ComputeSplitKey(ctx context.Context, start, end roachpb.RKey) roachpb.RKey
GetSpanConfigForKey(ctx context.Context, key roachpb.RKey) (roachpb.SpanConfig, error)
}

// Update captures what span has seen a config change. It will be the unit of
// what a {SQL,KV}Watcher emits, and what can be applied to a StoreWriter.
type Update struct {
// Span captures the key span being updated.
Span roachpb.Span

// Config captures the span config the key span was updated to. An empty
// config indicates the span config being deleted.
Config roachpb.SpanConfig
}

// Deletion returns true if the update corresponds to a span config being
// deleted.
func (u Update) Deletion() bool {
return u.Config.IsEmpty()
}

// Addition returns true if the update corresponds to a span config being
// added.
func (u Update) Addition() bool {
return !u.Deletion()
}
35 changes: 35 additions & 0 deletions pkg/spanconfig/spanconfigstore/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "spanconfigstore",
srcs = [
"shadow.go",
"store.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigstore",
visibility = ["//visibility:public"],
deps = [
"//pkg/keys",
"//pkg/roachpb:with-mocks",
"//pkg/spanconfig",
"//pkg/util/interval",
"//pkg/util/log",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
],
)

go_test(
name = "spanconfigstore_test",
srcs = ["store_test.go"],
data = glob(["testdata/**"]),
embed = [":spanconfigstore"],
deps = [
"//pkg/roachpb:with-mocks",
"//pkg/spanconfig",
"//pkg/util/leaktest",
"//pkg/util/randutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_stretchr_testify//require",
],
)
91 changes: 91 additions & 0 deletions pkg/spanconfig/spanconfigstore/shadow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package spanconfigstore

import (
"context"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

// ShadowReader wraps around two spanconfig.StoreReaders and logs warnings (if
// expensive logging is enabled) when there are divergent results from the two.
type ShadowReader struct {
new, old spanconfig.StoreReader
}

// NewShadowReader instantiates a new shadow reader.
func NewShadowReader(new, old spanconfig.StoreReader) *ShadowReader {
return &ShadowReader{
new: new,
old: old,
}
}

var _ = NewShadowReader // defeat the unused linter.

var _ spanconfig.StoreReader = &ShadowReader{}

// NeedsSplit is part of the spanconfig.StoreReader interface.
func (s *ShadowReader) NeedsSplit(ctx context.Context, start, end roachpb.RKey) bool {
newResult := s.new.NeedsSplit(ctx, start, end)
if log.ExpensiveLogEnabled(ctx, 1) {
oldResult := s.old.NeedsSplit(ctx, start, end)
if newResult != oldResult {
log.Warningf(ctx, "needs split: mismatched responses between old result (%t) and new (%t) for start=%s end=%s",
oldResult, newResult, start.String(), end.String())
}
}

return newResult
}

// ComputeSplitKey is part of the spanconfig.StoreReader interface.
func (s *ShadowReader) ComputeSplitKey(ctx context.Context, start, end roachpb.RKey) roachpb.RKey {
newResult := s.new.ComputeSplitKey(ctx, start, end)
if log.ExpensiveLogEnabled(ctx, 1) {
oldResult := s.old.ComputeSplitKey(ctx, start, end)
if !newResult.Equal(oldResult) {
str := func(k roachpb.RKey) string {
if len(k) == 0 {
return ""
}
return k.String()
}

log.Warningf(ctx, "compute split key: mismatched responses between old result (%s) and new (%s) for start=%s end=%s",
str(oldResult), str(newResult), str(start), str(end))
}
}
return newResult
}

// GetSpanConfigForKey is part of the spanconfig.StoreReader interface.
func (s *ShadowReader) GetSpanConfigForKey(
ctx context.Context, key roachpb.RKey,
) (roachpb.SpanConfig, error) {
newResult, errNew := s.new.GetSpanConfigForKey(ctx, key)
if log.ExpensiveLogEnabled(ctx, 1) {
oldResult, errOld := s.old.GetSpanConfigForKey(ctx, key)
if !newResult.Equal(oldResult) {
log.Warningf(ctx, "get span config for key: mismatched responses between old result (%s) and new(%s) for key=%s",
oldResult.String(), newResult.String(), key.String())
}
if !errors.Is(errNew, errOld) {
log.Warningf(ctx, "get span config for key: mismatched errors between old result (%s) and new (%s) for key=%s",
errOld, errNew, key.String())
}
}
return newResult, errNew
}
Loading