Skip to content

Commit

Permalink
spanconfig: introduce spanconfig.Reporter
Browse files Browse the repository at this point in the history
This is KV-side API for multi-tenant replication reports (#89987)

Release note: None
  • Loading branch information
irfansharif committed Nov 5, 2022
1 parent b39e627 commit 6929e22
Show file tree
Hide file tree
Showing 42 changed files with 1,509 additions and 68 deletions.
4 changes: 4 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ ALL_TESTS = [
"//pkg/spanconfig/spanconfigmanager:spanconfigmanager_test",
"//pkg/spanconfig/spanconfigptsreader:spanconfigptsreader_test",
"//pkg/spanconfig/spanconfigreconciler:spanconfigreconciler_test",
"//pkg/spanconfig/spanconfigreporter:spanconfigreporter_test",
"//pkg/spanconfig/spanconfigsqltranslator:spanconfigsqltranslator_test",
"//pkg/spanconfig/spanconfigsqlwatcher:spanconfigsqlwatcher_test",
"//pkg/spanconfig/spanconfigstore:spanconfigstore_test",
Expand Down Expand Up @@ -1343,6 +1344,8 @@ GO_TARGETS = [
"//pkg/spanconfig/spanconfigptsreader:spanconfigptsreader_test",
"//pkg/spanconfig/spanconfigreconciler:spanconfigreconciler",
"//pkg/spanconfig/spanconfigreconciler:spanconfigreconciler_test",
"//pkg/spanconfig/spanconfigreporter:spanconfigreporter",
"//pkg/spanconfig/spanconfigreporter:spanconfigreporter_test",
"//pkg/spanconfig/spanconfigsplitter:spanconfigsplitter",
"//pkg/spanconfig/spanconfigsqltranslator:spanconfigsqltranslator",
"//pkg/spanconfig/spanconfigsqltranslator:spanconfigsqltranslator_test",
Expand Down Expand Up @@ -2594,6 +2597,7 @@ GET_X_DATA_TARGETS = [
"//pkg/spanconfig/spanconfigmanager:get_x_data",
"//pkg/spanconfig/spanconfigptsreader:get_x_data",
"//pkg/spanconfig/spanconfigreconciler:get_x_data",
"//pkg/spanconfig/spanconfigreporter:get_x_data",
"//pkg/spanconfig/spanconfigsplitter:get_x_data",
"//pkg/spanconfig/spanconfigsqltranslator:get_x_data",
"//pkg/spanconfig/spanconfigsqlwatcher:get_x_data",
Expand Down
25 changes: 25 additions & 0 deletions pkg/ccl/kvccl/kvtenantccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ var _ serverpb.TenantStatusServer = (*Connector)(nil)
// Connector is capable of accessing span configurations for secondary tenants.
var _ spanconfig.KVAccessor = (*Connector)(nil)

// Reporter is capable of generating span configuration conformance reports for
// secondary tenants.
var _ spanconfig.Reporter = (*Connector)(nil)

// NewConnector creates a new Connector.
// NOTE: Calling Start will set cfg.RPCContext.ClusterID.
func NewConnector(cfg kvtenant.ConnectorConfig, addrs []string) *Connector {
Expand Down Expand Up @@ -534,6 +538,27 @@ func (c *Connector) UpdateSpanConfigRecords(
})
}

// SpanConfigConformance implements the spanconfig.Reporter interface.
func (c *Connector) SpanConfigConformance(
ctx context.Context, spans []roachpb.Span,
) (roachpb.SpanConfigConformanceReport, error) {
var report roachpb.SpanConfigConformanceReport
if err := c.withClient(ctx, func(ctx context.Context, c *client) error {
resp, err := c.SpanConfigConformance(ctx, &roachpb.SpanConfigConformanceRequest{
Spans: spans,
})
if err != nil {
return err
}

report = resp.Report
return nil
}); err != nil {
return roachpb.SpanConfigConformanceReport{}, err
}
return report, nil
}

// GetAllSystemSpanConfigsThatApply implements the spanconfig.KVAccessor
// interface.
func (c *Connector) GetAllSystemSpanConfigsThatApply(
Expand Down
6 changes: 6 additions & 0 deletions pkg/ccl/kvccl/kvtenantccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ func (m *mockServer) UpdateSpanConfigs(
panic("unimplemented")
}

func (m *mockServer) SpanConfigConformance(
context.Context, *roachpb.SpanConfigConformanceRequest,
) (*roachpb.SpanConfigConformanceResponse, error) {
panic("unimplemented")
}

func gossipEventForClusterID(clusterID uuid.UUID) *roachpb.GossipSubscriptionEvent {
return &roachpb.GossipSubscriptionEvent{
Key: gossip.KeyClusterID,
Expand Down
Empty file.
6 changes: 6 additions & 0 deletions pkg/kv/kvclient/kvcoord/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ func (n Node) UpdateSpanConfigs(
panic("unimplemented")
}

func (n Node) SpanConfigConformance(
context.Context, *roachpb.SpanConfigConformanceRequest,
) (*roachpb.SpanConfigConformanceResponse, error) {
panic("implement me")
}

func (n Node) TenantSettings(
*roachpb.TenantSettingsRequest, roachpb.Internal_TenantSettingsServer,
) error {
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvclient/kvcoord/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,12 @@ func (m *mockInternalClient) GetSpanConfigs(
return nil, fmt.Errorf("unsupported GetSpanConfigs call")
}

func (m *mockInternalClient) SpanConfigConformance(
_ context.Context, _ *roachpb.SpanConfigConformanceRequest, _ ...grpc.CallOption,
) (*roachpb.SpanConfigConformanceResponse, error) {
return nil, fmt.Errorf("unsupported SpanConfigConformance call")
}

func (m *mockInternalClient) GetAllSystemSpanConfigsThatApply(
context.Context, *roachpb.GetAllSystemSpanConfigsThatApplyRequest, ...grpc.CallOption,
) (*roachpb.GetAllSystemSpanConfigsThatApplyResponse, error) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvclient/kvtenant/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ type Connector interface {
// applicable to secondary tenants.
spanconfig.KVAccessor

// Reporter provides access to conformance reports, i.e. whether ranges
// backing queried keyspans conform the span configs that apply to them.
spanconfig.Reporter

// OverridesMonitor provides access to tenant cluster setting overrides.
settingswatcher.OverridesMonitor

Expand Down
44 changes: 34 additions & 10 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -998,10 +998,18 @@ func (a *Allocator) AllocateTargetFromList(
targetType TargetReplicaType,
) (roachpb.ReplicationTarget, string) {
existingReplicas := append(existingVoters, existingNonVoters...)
analyzedOverallConstraints := constraint.AnalyzeConstraints(ctx, a.StorePool.GetStoreDescriptor,
existingReplicas, conf.NumReplicas, conf.Constraints)
analyzedVoterConstraints := constraint.AnalyzeConstraints(ctx, a.StorePool.GetStoreDescriptor,
existingVoters, conf.GetNumVoters(), conf.VoterConstraints)
analyzedOverallConstraints := constraint.AnalyzeConstraints(
a.StorePool,
existingReplicas,
conf.NumReplicas,
conf.Constraints,
)
analyzedVoterConstraints := constraint.AnalyzeConstraints(
a.StorePool,
existingVoters,
conf.GetNumVoters(),
conf.VoterConstraints,
)

var constraintsChecker constraintsCheckFn
switch t := targetType; t {
Expand Down Expand Up @@ -1140,10 +1148,18 @@ func (a Allocator) RemoveTarget(
}

existingReplicas := append(existingVoters, existingNonVoters...)
analyzedOverallConstraints := constraint.AnalyzeConstraints(ctx, a.StorePool.GetStoreDescriptor,
existingReplicas, conf.NumReplicas, conf.Constraints)
analyzedVoterConstraints := constraint.AnalyzeConstraints(ctx, a.StorePool.GetStoreDescriptor,
existingVoters, conf.GetNumVoters(), conf.VoterConstraints)
analyzedOverallConstraints := constraint.AnalyzeConstraints(
a.StorePool,
existingReplicas,
conf.NumReplicas,
conf.Constraints,
)
analyzedVoterConstraints := constraint.AnalyzeConstraints(
a.StorePool,
existingVoters,
conf.GetNumVoters(),
conf.VoterConstraints,
)

var constraintsChecker constraintsCheckFn
switch t := targetType; t {
Expand Down Expand Up @@ -1275,9 +1291,17 @@ func (a Allocator) RebalanceTarget(

zero := roachpb.ReplicationTarget{}
analyzedOverallConstraints := constraint.AnalyzeConstraints(
ctx, a.StorePool.GetStoreDescriptor, existingReplicas, conf.NumReplicas, conf.Constraints)
a.StorePool,
existingReplicas,
conf.NumReplicas,
conf.Constraints,
)
analyzedVoterConstraints := constraint.AnalyzeConstraints(
ctx, a.StorePool.GetStoreDescriptor, existingVoters, conf.GetNumVoters(), conf.VoterConstraints)
a.StorePool,
existingVoters,
conf.GetNumVoters(),
conf.VoterConstraints,
)
var removalConstraintsChecker constraintsCheckFn
var rebalanceConstraintsChecker rebalanceConstraintsCheckFn
var replicaSetToRebalance, replicasWithExcludedStores []roachpb.ReplicaDescriptor
Expand Down
15 changes: 9 additions & 6 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,13 @@ var (
}
)

func getTestStoreDesc(storeID roachpb.StoreID) (roachpb.StoreDescriptor, bool) {
type mockStoreResolver struct{}

var _ constraint.StoreResolver = mockStoreResolver{}

func (m mockStoreResolver) GetStoreDescriptor(
storeID roachpb.StoreID,
) (roachpb.StoreDescriptor, bool) {
desc, ok := testStores[storeID]
return desc, ok
}
Expand Down Expand Up @@ -936,9 +942,7 @@ func TestAllocateConstraintsCheck(t *testing.T) {
Constraints: tc.constraints,
NumReplicas: tc.numReplicas,
}
analyzed := constraint.AnalyzeConstraints(
context.Background(), getTestStoreDesc, testStoreReplicas(tc.existing),
conf.NumReplicas, conf.Constraints)
analyzed := constraint.AnalyzeConstraints(mockStoreResolver{}, testStoreReplicas(tc.existing), conf.NumReplicas, conf.Constraints)
for _, s := range testStores {
valid, necessary := allocateConstraintsCheck(s, analyzed)
if e, a := tc.expectedValid[s.StoreID], valid; e != a {
Expand Down Expand Up @@ -1071,8 +1075,7 @@ func TestRemoveConstraintsCheck(t *testing.T) {
Constraints: tc.constraints,
NumReplicas: tc.numReplicas,
}
analyzed := constraint.AnalyzeConstraints(
context.Background(), getTestStoreDesc, existing, conf.NumReplicas, conf.Constraints)
analyzed := constraint.AnalyzeConstraints(mockStoreResolver{}, existing, conf.NumReplicas, conf.Constraints)
for storeID, expected := range tc.expected {
valid, necessary := removeConstraintsCheck(testStores[storeID], analyzed)
if e, a := expected.valid, valid; e != a {
Expand Down
15 changes: 4 additions & 11 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3387,9 +3387,7 @@ func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) {
}
// No constraints.
conf := roachpb.SpanConfig{}
analyzed := constraint.AnalyzeConstraints(
ctx, a.StorePool.GetStoreDescriptor, existingRepls, conf.NumReplicas,
conf.Constraints)
analyzed := constraint.AnalyzeConstraints(a.StorePool, existingRepls, conf.NumReplicas, conf.Constraints)
allocationConstraintsChecker := voterConstraintsCheckerForAllocation(analyzed, constraint.EmptyAnalyzedConstraints)
removalConstraintsChecker := voterConstraintsCheckerForRemoval(analyzed, constraint.EmptyAnalyzedConstraints)
rebalanceConstraintsChecker := voterConstraintsCheckerForRebalance(analyzed, constraint.EmptyAnalyzedConstraints)
Expand Down Expand Up @@ -3743,9 +3741,7 @@ func TestAllocateCandidatesNumReplicasConstraints(t *testing.T) {
}
}
conf := roachpb.SpanConfig{Constraints: tc.constraints}
analyzed := constraint.AnalyzeConstraints(
ctx, a.StorePool.GetStoreDescriptor, existingRepls, conf.NumReplicas,
conf.Constraints)
analyzed := constraint.AnalyzeConstraints(a.StorePool, existingRepls, conf.NumReplicas, conf.Constraints)
checkFn := voterConstraintsCheckerForAllocation(analyzed, constraint.EmptyAnalyzedConstraints)

candidates := rankedCandidateListForAllocation(
Expand Down Expand Up @@ -3975,8 +3971,7 @@ func TestRemoveCandidatesNumReplicasConstraints(t *testing.T) {
StoreID: storeID,
}
}
analyzed := constraint.AnalyzeConstraints(ctx, a.StorePool.GetStoreDescriptor, existingRepls,
0 /* numReplicas */, tc.constraints)
analyzed := constraint.AnalyzeConstraints(a.StorePool, existingRepls, 0, tc.constraints)

// Check behavior in a span config where `voter_constraints` are empty.
checkFn := voterConstraintsCheckerForRemoval(analyzed, constraint.EmptyAnalyzedConstraints)
Expand Down Expand Up @@ -5182,9 +5177,7 @@ func TestRebalanceCandidatesNumReplicasConstraints(t *testing.T) {
Constraints: tc.constraints,
NumReplicas: tc.numReplicas,
}
analyzed := constraint.AnalyzeConstraints(
ctx, a.StorePool.GetStoreDescriptor, existingRepls,
conf.NumReplicas, conf.Constraints)
analyzed := constraint.AnalyzeConstraints(a.StorePool, existingRepls, conf.NumReplicas, conf.Constraints)
removalConstraintsChecker := voterConstraintsCheckerForRemoval(
analyzed,
constraint.EmptyAnalyzedConstraints,
Expand Down
28 changes: 14 additions & 14 deletions pkg/kv/kvserver/constraint/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,15 @@

package constraint

import (
"context"
import "github.com/cockroachdb/cockroach/pkg/roachpb"

"github.com/cockroachdb/cockroach/pkg/roachpb"
)

// AnalyzedConstraints represents the result or AnalyzeConstraints(). It
// combines a zone's constraints with information about which stores satisfy
// what term of the constraints disjunction.
// AnalyzedConstraints represents the result of AnalyzeConstraints(). It
// combines a span config's constraints with information about which stores
// satisfy what term of the constraints disjunction.
type AnalyzedConstraints struct {
Constraints []roachpb.ConstraintsConjunction
// True if the per-replica constraints don't fully cover all the desired
// replicas in the range (sum(constraints.NumReplicas) < zone.NumReplicas).
// replicas in the range (sum(constraints.NumReplicas) < config.NumReplicas).
// In such cases, we allow replicas that don't match any of the per-replica
// constraints, but never mark them as necessary.
UnconstrainedReplicas bool
Expand All @@ -39,13 +35,17 @@ type AnalyzedConstraints struct {
// satisfied by any given configuration of replicas.
var EmptyAnalyzedConstraints = AnalyzedConstraints{}

// AnalyzeConstraints processes the zone config constraints that apply to a
// StoreResolver resolves a store descriptor by a given ID.
type StoreResolver interface {
GetStoreDescriptor(storeID roachpb.StoreID) (roachpb.StoreDescriptor, bool)
}

// AnalyzeConstraints processes the span config constraints that apply to a
// range along with the current replicas for a range, spitting back out
// information about which constraints are satisfied by which replicas and
// which replicas satisfy which constraints, aiding in allocation decisions.
func AnalyzeConstraints(
ctx context.Context,
getStoreDescFn func(roachpb.StoreID) (roachpb.StoreDescriptor, bool),
storeResolver StoreResolver,
existing []roachpb.ReplicaDescriptor,
numReplicas int32,
constraints []roachpb.ConstraintsConjunction,
Expand All @@ -67,7 +67,7 @@ func AnalyzeConstraints(
// happen once a node is hooked into gossip), trust that it's valid. This
// is a much more stable failure state than frantically moving everything
// off such a node.
store, ok := getStoreDescFn(repl.StoreID)
store, ok := storeResolver.GetStoreDescriptor(repl.StoreID)
if !ok || ConjunctionsCheck(store, subConstraints.Constraints) {
result.SatisfiedBy[i] = append(result.SatisfiedBy[i], store.StoreID)
result.Satisfies[store.StoreID] = append(result.Satisfies[store.StoreID], i)
Expand All @@ -82,7 +82,7 @@ func AnalyzeConstraints(

// ConjunctionsCheck checks a store against a single set of constraints (out of
// the possibly numerous sets that apply to a range), returning true iff the
// store matches the constraints. The contraints are AND'ed together; a store
// store matches the constraints. The constraints are AND'ed together; a store
// matches the conjunction if it matches all of them.
func ConjunctionsCheck(store roachpb.StoreDescriptor, constraints []roachpb.Constraint) bool {
for _, constraint := range constraints {
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/replica_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,10 @@ func calcRangeCounter(
status := desc.Replicas().ReplicationStatus(func(rDesc roachpb.ReplicaDescriptor) bool {
return livenessMap[rDesc.NodeID].IsLive
},
// neededVoters - we don't care about the under/over-replication
// determinations from the report because it's too magic. We'll do our own
// determination below.
0)
// needed{Voters,NonVoters} - we don't care about the
// under/over-replication determinations from the report because
// it's too magic. We'll do our own determination below.
0, 0)
unavailable = !status.Available
liveVoters := calcLiveVoterReplicas(desc, livenessMap)
liveNonVoters := calcLiveNonVoterReplicas(desc, livenessMap)
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/reports/replication_stats_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,10 @@ func (v *replicationStatsVisitor) countRange(
) {
status := r.Replicas().ReplicationStatus(func(rDesc roachpb.ReplicaDescriptor) bool {
return v.nodeChecker(rDesc.NodeID)
}, replicationFactor)
// NB: this reporting code was written before ReplicationStatus reported
// on non-voting replicas. This code will also soon be removed in favor
// of something that works with multi-tenancy (#89987).
}, replicationFactor, 0)
// Note that a range can be under-replicated and over-replicated at the same
// time if it has many replicas, but sufficiently many of them are on dead
// nodes.
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1096,7 +1096,8 @@ type StoreConfig struct {

// SpanConfigsDisabled determines whether we're able to use the span configs
// infrastructure or not.
// TODO(richardjcai): We can likely remove this.
//
// TODO(irfansharif): We can remove this.
SpanConfigsDisabled bool
// Used to subscribe to span configuration changes, keeping up-to-date a
// data structure useful for retrieving span configs. Only available if
Expand Down
4 changes: 4 additions & 0 deletions pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3025,6 +3025,10 @@ service Internal {
// keyspans.
rpc UpdateSpanConfigs (UpdateSpanConfigsRequest) returns (UpdateSpanConfigsResponse) { }

// SpanConfigConformance is used to determine whether ranges backing the given
// keyspans conform to span configs that apply over them.
rpc SpanConfigConformance (SpanConfigConformanceRequest) returns (SpanConfigConformanceResponse) { }

// TenantSettings is used by tenants to obtain and stay up to date with tenant
// setting overrides.
rpc TenantSettings (TenantSettingsRequest) returns (stream TenantSettingsEvent) { }
Expand Down
11 changes: 11 additions & 0 deletions pkg/roachpb/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,17 @@ func (r ReplicaDescriptor) IsAnyVoter() bool {
}
}

// IsNonVoter returns true if the replica is a non-voter. Can be used as a
// filter for ReplicaDescriptors.Filter.
func (r ReplicaDescriptor) IsNonVoter() bool {
switch r.Type {
case NON_VOTER:
return true
default:
return false
}
}

// PercentilesFromData derives percentiles from a slice of data points.
// Sorts the input data if it isn't already sorted.
func PercentilesFromData(data []float64) Percentiles {
Expand Down
Loading

0 comments on commit 6929e22

Please sign in to comment.