Skip to content

Commit

Permalink
storage: introduce a couple of replication reports
Browse files Browse the repository at this point in the history
This patch implements the "Insights into Constraint Conformance" RFC
(#38309). At the time of this patch, the RFC is still pending and also
out of date.
Developed together with Darin.

The patch introduces the following tables in the system database,
providing information about constraint conformance, replication status
and critical localities (i.e. localities that, were they to become
unavailable, would cause some ranges to lose quorum):
  CREATE TABLE replication_constraint_stats (
      zone_id INT8 NOT NULL,
      subzone_id INT8 NOT NULL,
      type STRING NOT NULL,
      config STRING NOT NULL,
      report_id INT8 NOT NULL,
      violation_start TIMESTAMP NULL,
      violating_ranges INT8 NOT NULL,
      CONSTRAINT "primary" PRIMARY KEY (zone_id ASC, subzone_id ASC, type ASC, config ASC),
      FAMILY "primary" (zone_id, subzone_id, type, config, report_id, violation_start, violating_ranges)
  );

  CREATE TABLE replication_stats (
      zone_id INT8 NOT NULL,
      subzone_id INT8 NOT NULL,
      report_id INT8 NOT NULL,
      total_ranges INT8 NOT NULL,
      unavailable_ranges INT8 NOT NULL,
      under_replicated_ranges INT8 NOT NULL,
      over_replicated_ranges INT8 NOT NULL,
      CONSTRAINT "primary" PRIMARY KEY (zone_id ASC, subzone_id ASC),
      FAMILY "primary" (zone_id, subzone_id, report_id, total_ranges, unavailable_ranges, under_replicated_ranges, over_replicated_ranges)
  );

    CREATE TABLE replication_critical_localities (
      zone_id INT8 NOT NULL,
      subzone_id INT8 NOT NULL,
      locality STRING NOT NULL,
      report_id INT8 NOT NULL,
      at_risk_ranges INT8 NOT NULL,
      CONSTRAINT "primary" PRIMARY KEY (zone_id ASC, subzone_id ASC, locality ASC),
      FAMILY "primary" (zone_id, subzone_id, locality, report_id, at_risk_ranges)
  )

And also a system.report_meta table with metadata for all these reports
(their creation time).

The reports are generated periodically (once a minute by default,
subject to the cluster setting kv.replication_reports.interval) by a job
running on the leaseholder of range 1.
The data is produced by joing range descriptor data from meta2 with zone
config information from the gossiped SystemConfig.

Release note (sql change): The following system tables containing report about
replication status, constraint conformance and critical localities are
introduced: replication_constraint_stats, replication_stats,
replication_critical_localities.
  • Loading branch information
andreimatei committed Sep 10, 2019
1 parent 1b75c93 commit 91bb588
Show file tree
Hide file tree
Showing 48 changed files with 5,076 additions and 852 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
<tr><td><code>kv.range_split.load_qps_threshold</code></td><td>integer</td><td><code>2500</code></td><td>the QPS over which, the range becomes a candidate for load based splitting</td></tr>
<tr><td><code>kv.rangefeed.concurrent_catchup_iterators</code></td><td>integer</td><td><code>64</code></td><td>number of rangefeeds catchup iterators a store will allow concurrently before queueing</td></tr>
<tr><td><code>kv.rangefeed.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if set, rangefeed registration is enabled</td></tr>
<tr><td><code>kv.replication_reports.interval</code></td><td>duration</td><td><code>1m0s</code></td><td>the frequency for generating the replication_constraint_stats, replication_stats_report and replication_critical_localities reports (set to 0 to disable)</td></tr>
<tr><td><code>kv.snapshot_rebalance.max_rate</code></td><td>byte size</td><td><code>8.0 MiB</code></td><td>the rate limit (bytes/sec) to use for rebalance and upreplication snapshots</td></tr>
<tr><td><code>kv.snapshot_recovery.max_rate</code></td><td>byte size</td><td><code>8.0 MiB</code></td><td>the rate limit (bytes/sec) to use for recovery snapshots</td></tr>
<tr><td><code>kv.snapshot_sst.sync_size</code></td><td>byte size</td><td><code>2.0 MiB</code></td><td>threshold after which snapshot SST writes must fsync</td></tr>
Expand Down
3 changes: 2 additions & 1 deletion pkg/base/test_server_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ const (
// ReplicationManual means that the split and replication queues of all
// servers are stopped, and the test must manually control splitting and
// replication through the TestServer.
// Note that the server starts with a number of system ranges.
// Note that the server starts with a number of system ranges,
// all with a single replica on node 1.
ReplicationManual
)
40 changes: 21 additions & 19 deletions pkg/ccl/logictestccl/testdata/logic_test/zone
Original file line number Diff line number Diff line change
Expand Up @@ -518,25 +518,27 @@ query TTTTTT
SELECT target, range_name, database_name, table_name, index_name, partition_name
FROM crdb_internal.zones
----
RANGE default default NULL NULL NULL NULL
DATABASE system NULL system NULL NULL NULL
TABLE system.public.jobs NULL system jobs NULL NULL
RANGE meta meta NULL NULL NULL NULL
RANGE system system NULL NULL NULL NULL
RANGE liveness liveness NULL NULL NULL NULL
TABLE test.public.t NULL test t NULL NULL
INDEX test.public.t@secondary NULL test t secondary NULL
INDEX test.public.t@tertiary NULL test t tertiary NULL
INDEX test.public.t36642@secondary NULL test t36642 secondary NULL
INDEX test.public.t36642@tertiary NULL test t36642 tertiary NULL
INDEX test.public.t36644@secondary NULL test t36644 secondary NULL
INDEX test.public.t36644@tertiary NULL test t36644 tertiary NULL
PARTITION x1_idx OF INDEX test.public.t38391@foo NULL test t38391 foo x1_idx
DATABASE "my database" NULL my database NULL NULL NULL
TABLE "my database".public."my table" NULL my database my table NULL NULL
INDEX "my database".public."my table"@"my index" NULL my database my table my index NULL
PARTITION "my partition" OF INDEX "my database".public."my table"@primary NULL my database my table primary my partition
PARTITION "my partition" OF INDEX "my database".public."my table"@"my index" NULL my database my table my index my partition
RANGE default default NULL NULL NULL NULL
DATABASE system NULL system NULL NULL NULL
TABLE system.public.jobs NULL system jobs NULL NULL
RANGE meta meta NULL NULL NULL NULL
RANGE system system NULL NULL NULL NULL
RANGE liveness liveness NULL NULL NULL NULL
TABLE system.public.replication_constraint_stats NULL system replication_constraint_stats NULL NULL
TABLE system.public.replication_stats NULL system replication_stats NULL NULL
TABLE test.public.t NULL test t NULL NULL
INDEX test.public.t@secondary NULL test t secondary NULL
INDEX test.public.t@tertiary NULL test t tertiary NULL
INDEX test.public.t36642@secondary NULL test t36642 secondary NULL
INDEX test.public.t36642@tertiary NULL test t36642 tertiary NULL
INDEX test.public.t36644@secondary NULL test t36644 secondary NULL
INDEX test.public.t36644@tertiary NULL test t36644 tertiary NULL
PARTITION x1_idx OF INDEX test.public.t38391@foo NULL test t38391 foo x1_idx
DATABASE "my database" NULL my database NULL NULL NULL
TABLE "my database".public."my table" NULL my database my table NULL NULL
INDEX "my database".public."my table"@"my index" NULL my database my table my index NULL
PARTITION "my partition" OF INDEX "my database".public."my table"@primary NULL my database my table primary my partition
PARTITION "my partition" OF INDEX "my database".public."my table"@"my index" NULL my database my table my index my partition

# Test the zone information being displayed in SHOW CREATE
statement ok
Expand Down
8 changes: 8 additions & 0 deletions pkg/cli/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2176,6 +2176,10 @@ writing ` + os.DevNull + `
debug/nodes/1/ranges/18.json
debug/nodes/1/ranges/19.json
debug/nodes/1/ranges/20.json
debug/nodes/1/ranges/21.json
debug/nodes/1/ranges/22.json
debug/nodes/1/ranges/23.json
debug/nodes/1/ranges/24.json
debug/schema/defaultdb@details.json
debug/schema/postgres@details.json
debug/schema/system@details.json
Expand All @@ -2187,6 +2191,10 @@ writing ` + os.DevNull + `
debug/schema/system/locations.json
debug/schema/system/namespace.json
debug/schema/system/rangelog.json
debug/schema/system/replication_constraint_stats.json
debug/schema/system/replication_critical_localities.json
debug/schema/system/replication_stats.json
debug/schema/system/reports_meta.json
debug/schema/system/role_members.json
debug/schema/system/settings.json
debug/schema/system/table_statistics.json
Expand Down
2 changes: 2 additions & 0 deletions pkg/cmd/roachtest/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ func runGossipRestartNodeOne(ctx context.Context, t *test, c *cluster) {
run(`ALTER RANGE meta %[1]s CONFIGURE ZONE %[2]s 'constraints: {"-rack=0"}'`)
run(`ALTER RANGE liveness %[1]s CONFIGURE ZONE %[2]s 'constraints: {"-rack=0"}'`)
run(`ALTER TABLE system.jobs %[1]s CONFIGURE ZONE %[2]s 'constraints: {"-rack=0"}'`)
run(`ALTER TABLE system.replication_stats %[1]s CONFIGURE ZONE %[2]s 'constraints: {"-rack=0"}'`)
run(`ALTER TABLE system.replication_constraint_stats %[1]s CONFIGURE ZONE %[2]s 'constraints: {"-rack=0"}'`)

var lastReplCount int
if err := retry.ForDuration(2*time.Minute, func() error {
Expand Down
12 changes: 8 additions & 4 deletions pkg/config/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ func (s *SystemConfig) GetLargestObjectID(maxID uint32) (uint32, error) {
// or database, specified by key.id). It is the caller's
// responsibility to ensure that the range does not need to be split.
func (s *SystemConfig) GetZoneConfigForKey(key roachpb.RKey) (*ZoneConfig, error) {
return s.getZoneConfigForKey(DecodeKeyIntoZoneIDAndSuffix(key))
}

// DecodeKeyIntoZoneIDAndSuffix figures out the zone that the key belongs to.
func DecodeKeyIntoZoneIDAndSuffix(key roachpb.RKey) (id uint32, keySuffix []byte) {
objectID, keySuffix, ok := DecodeObjectID(key)
if !ok {
// Not in the structured data namespace.
Expand Down Expand Up @@ -278,8 +283,7 @@ func (s *SystemConfig) GetZoneConfigForKey(key roachpb.RKey) (*ZoneConfig, error
objectID = keys.SystemRangesID
}
}

return s.getZoneConfigForKey(objectID, keySuffix)
return objectID, keySuffix
}

// isPseudoTableID returns true if id is in keys.PseudoTableIDs.
Expand Down Expand Up @@ -350,14 +354,14 @@ func (s *SystemConfig) getZoneConfigForKey(id uint32, keySuffix []byte) (*ZoneCo
}
if entry.zone != nil {
if entry.placeholder != nil {
if subzone := entry.placeholder.GetSubzoneForKeySuffix(keySuffix); subzone != nil {
if subzone, _ := entry.placeholder.GetSubzoneForKeySuffix(keySuffix); subzone != nil {
if indexSubzone := entry.placeholder.GetSubzone(subzone.IndexID, ""); indexSubzone != nil {
subzone.Config.InheritFromParent(&indexSubzone.Config)
}
subzone.Config.InheritFromParent(entry.zone)
return &subzone.Config, nil
}
} else if subzone := entry.zone.GetSubzoneForKeySuffix(keySuffix); subzone != nil {
} else if subzone, _ := entry.zone.GetSubzoneForKeySuffix(keySuffix); subzone != nil {
if indexSubzone := entry.zone.GetSubzone(subzone.IndexID, ""); indexSubzone != nil {
subzone.Config.InheritFromParent(&indexSubzone.Config)
}
Expand Down
37 changes: 20 additions & 17 deletions pkg/config/zone.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,20 +438,23 @@ func (z *ZoneConfig) CopyFromZone(other ZoneConfig, fieldList []tree.Name) {
}
}

// StoreMatchesConstraint returns whether a store matches the given constraint.
func StoreMatchesConstraint(store roachpb.StoreDescriptor, constraint Constraint) bool {
hasConstraint := storeHasConstraint(store, constraint)
// StoreSatisfiesConstraint checks whether a store satisfies the given constraint.
// If the constraint is of the PROHIBITED type, satisfying it means the store
// not matching the constraint's spec.
func StoreSatisfiesConstraint(store roachpb.StoreDescriptor, constraint Constraint) bool {
hasConstraint := StoreMatchesConstraint(store, constraint)
if (constraint.Type == Constraint_REQUIRED && !hasConstraint) ||
(constraint.Type == Constraint_PROHIBITED && hasConstraint) {
return false
}
return true
}

// storeHasConstraint returns whether a store's attributes or node's locality
// matches the key value pair in the constraint. It notably ignores whether
// the constraint is required, prohibited, positive, or otherwise.
func storeHasConstraint(store roachpb.StoreDescriptor, c Constraint) bool {
// StoreMatchesConstraint returns whether a store's attributes or node's
// locality match the constraint's spec. It notably ignores whether the
// constraint is required, prohibited, positive, or otherwise.
// Also see StoreSatisfiesConstraint().
func StoreMatchesConstraint(store roachpb.StoreDescriptor, c Constraint) bool {
if c.Key == "" {
for _, attrs := range []roachpb.Attributes{store.Attrs, store.Node.Attrs} {
for _, attr := range attrs.Attrs {
Expand All @@ -460,19 +463,19 @@ func storeHasConstraint(store roachpb.StoreDescriptor, c Constraint) bool {
}
}
}
} else {
for _, tier := range store.Node.Locality.Tiers {
if c.Key == tier.Key && c.Value == tier.Value {
return true
}
return false
}
for _, tier := range store.Node.Locality.Tiers {
if c.Key == tier.Key && c.Value == tier.Value {
return true
}
}
return false
}

// DeleteTableConfig removes any configuration that applies to the table
// targeted by this ZoneConfig, leaving only its subzone configs, if any. After
// calling DeleteTableConfig, IsZubzonePlaceholder will return true.
// calling DeleteTableConfig, IsSubzonePlaceholder will return true.
//
// Only table zones can have subzones, so it does not make sense to call this
// method on non-table ZoneConfigs.
Expand Down Expand Up @@ -514,19 +517,19 @@ func (z *ZoneConfig) GetSubzone(indexID uint32, partition string) *Subzone {
}

// GetSubzoneForKeySuffix returns the ZoneConfig for the subzone that contains
// keySuffix, if it exists.
func (z ZoneConfig) GetSubzoneForKeySuffix(keySuffix []byte) *Subzone {
// keySuffix, if it exists and its position in the subzones slice.
func (z ZoneConfig) GetSubzoneForKeySuffix(keySuffix []byte) (*Subzone, int32) {
// TODO(benesch): Use binary search instead.
for _, s := range z.SubzoneSpans {
// The span's Key is stored with the prefix removed, so we can compare
// directly to keySuffix. An unset EndKey implies Key.PrefixEnd().
if (s.Key.Compare(keySuffix) <= 0) &&
((s.EndKey == nil && bytes.HasPrefix(keySuffix, s.Key)) || s.EndKey.Compare(keySuffix) > 0) {
copySubzone := z.Subzones[s.SubzoneIndex]
return &copySubzone
return &copySubzone, s.SubzoneIndex
}
}
return nil
return nil, -1
}

// SetSubzone installs subzone into the ZoneConfig, overwriting any existing
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/zone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,9 @@ func TestConstraintsListYAML(t *testing.T) {
{input: "{+a: 1, '+a=1,+b,+c=d': b}", expectErr: true},
{input: "[+a: 1]", expectErr: true},
{input: "[+a: 1, '+a=1,+b,+c=d': 2]", expectErr: true},
{input: "{\"+a=1,+b=2\": 1}"}, // this will work in SQL: constraints='{"+a=1,+b=2": 1}'
{input: "{\"+a=1,+b=2,+c\": 1}"}, // won't work in SQL: constraints='{"+a=1,+b=2,+c": 1}'
{input: "{'+a=1,+b=2,+c': 1}"}, // this will work in SQL: constraints=e'{\'+a=1,+b=2,+c\': 1}'
}

for _, tc := range testCases {
Expand Down
7 changes: 7 additions & 0 deletions pkg/internal/client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,13 @@ func (db *DB) Run(ctx context.Context, b *Batch) error {
return sendAndFill(ctx, db.send, b)
}

// NewTxn creates a new RootTxn.
func (db *DB) NewTxn(ctx context.Context, debugName string) *Txn {
txn := NewTxn(ctx, db, db.ctx.NodeID.Get(), RootTxn)
txn.SetDebugName(debugName)
return txn
}

// Txn executes retryable in the context of a distributed transaction. The
// transaction is automatically aborted if retryable returns any error aside
// from recoverable internal errors, and is automatically committed
Expand Down
2 changes: 2 additions & 0 deletions pkg/internal/client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ type Txn struct {
// in the batch with the current node's ID.
// If the gatewayNodeID is set and this is a root transaction, we optimize
// away any clock uncertainty for our own node, as our clock is accessible.
//
// See also db.NewTxn().
func NewTxn(ctx context.Context, db *DB, gatewayNodeID roachpb.NodeID, typ TxnType) *Txn {
now := db.clock.Now()
txn := roachpb.MakeTransaction(
Expand Down
32 changes: 18 additions & 14 deletions pkg/keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,20 +312,24 @@ const (
// to "Ranges" instead of a Table - these IDs are needed to store custom
// configuration for non-table ranges (e.g. Zone Configs).
// NOTE: IDs must be <= MaxReservedDescID.
LeaseTableID = 11
EventLogTableID = 12
RangeEventTableID = 13
UITableID = 14
JobsTableID = 15
MetaRangesID = 16
SystemRangesID = 17
TimeseriesRangesID = 18
WebSessionsTableID = 19
TableStatisticsTableID = 20
LocationsTableID = 21
LivenessRangesID = 22
RoleMembersTableID = 23
CommentsTableID = 24
LeaseTableID = 11
EventLogTableID = 12
RangeEventTableID = 13
UITableID = 14
JobsTableID = 15
MetaRangesID = 16
SystemRangesID = 17
TimeseriesRangesID = 18
WebSessionsTableID = 19
TableStatisticsTableID = 20
LocationsTableID = 21
LivenessRangesID = 22
RoleMembersTableID = 23
CommentsTableID = 24
ReplicationConstraintStatsTableID = 25
ReplicationCriticalLocalitiesTableID = 26
ReplicationStatsTableID = 27
ReportsMetaTableID = 28

// CommentType is type for system.comments
DatabaseCommentType = 0
Expand Down
31 changes: 18 additions & 13 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/reports"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/growstack"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
Expand All @@ -44,7 +45,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/logtags"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -142,18 +143,19 @@ func (nm nodeMetrics) callComplete(d time.Duration, pErr *roachpb.Error) {
// IDs for bootstrapping the node itself or new stores as they're added
// on subsequent instantiations.
type Node struct {
stopper *stop.Stopper
clusterID *base.ClusterIDContainer // UUID for Cockroach cluster
Descriptor roachpb.NodeDescriptor // Node ID, network/physical topology
storeCfg storage.StoreConfig // Config to use and pass to stores
eventLogger sql.EventLogger
stores *storage.Stores // Access to node-local stores
metrics nodeMetrics
recorder *status.MetricsRecorder
startedAt int64
lastUp int64
initialBoot bool // True if this is the first time this node has started.
txnMetrics kv.TxnMetrics
stopper *stop.Stopper
clusterID *base.ClusterIDContainer // UUID for Cockroach cluster
Descriptor roachpb.NodeDescriptor // Node ID, network/physical topology
storeCfg storage.StoreConfig // Config to use and pass to stores
eventLogger sql.EventLogger
stores *storage.Stores // Access to node-local stores
metrics nodeMetrics
recorder *status.MetricsRecorder
constraintStatsCollector *reports.Reporter
startedAt int64
lastUp int64
initialBoot bool // True if this is the first time this node has started.
txnMetrics kv.TxnMetrics

perReplicaServer storage.Server
}
Expand Down Expand Up @@ -271,6 +273,7 @@ func NewNode(
eventLogger: eventLogger,
clusterID: clusterID,
}
n.constraintStatsCollector = reports.NewReporter(n.stores, &n.storeCfg)
n.perReplicaServer = storage.MakeServer(&n.Descriptor, n.stores)
return n
}
Expand Down Expand Up @@ -498,6 +501,8 @@ func (n *Node) start(
// bumped immediately, which would be possible if gossip got started earlier).
n.startGossip(ctx, n.stopper)

n.constraintStatsCollector.Start(ctx, n.stopper)

allEngines := append([]engine.Engine(nil), initializedEngines...)
allEngines = append(allEngines, emptyEngines...)
log.Infof(ctx, "%s: started with %v engine(s) and attributes %v", n, allEngines, attrs.Attrs)
Expand Down
Loading

0 comments on commit 91bb588

Please sign in to comment.