Skip to content

Commit

Permalink
rangedesciter: carve out library for range desc iteration
Browse files Browse the repository at this point in the history
Informs cockroachdb#87503; pure code-movement. Going to use it in future commits as
part of multi-tenant replication reports (cockroachdb#89987) where we'll need to
iterate over the set of range descriptors.

Release note: None
  • Loading branch information
irfansharif committed Oct 14, 2022
1 parent 708b242 commit c69227a
Show file tree
Hide file tree
Showing 9 changed files with 213 additions and 95 deletions.
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ go_library(
"//pkg/util/pprofutil",
"//pkg/util/protoutil",
"//pkg/util/quotapool",
"//pkg/util/rangedesciter",
"//pkg/util/retry",
"//pkg/util/schedulerlatency",
"//pkg/util/stop",
Expand Down
8 changes: 5 additions & 3 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/netutil"
"github.com/cockroachdb/cockroach/pkg/util/netutil/addr"
"github.com/cockroachdb/cockroach/pkg/util/rangedesciter"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -1004,9 +1005,10 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
var systemDeps upgrade.SystemDeps
if codec.ForSystemTenant() {
c = upgradecluster.New(upgradecluster.ClusterConfig{
NodeLiveness: nodeLiveness,
Dialer: cfg.nodeDialer,
DB: cfg.db,
NodeLiveness: nodeLiveness,
Dialer: cfg.nodeDialer,
RangeDescIterator: rangedesciter.New(cfg.db),
DB: cfg.db,
})
systemDeps = upgrade.SystemDeps{
Cluster: c,
Expand Down
24 changes: 0 additions & 24 deletions pkg/upgrade/system_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,30 +94,6 @@ type Cluster interface {
// just be the `Migrate` request, with code added within [1] to do the
// specific things intended for the specified version.
//
// It's important to note that the closure is being executed in the context of
// a distributed transaction that may be automatically retried. So something
// like the following is an anti-pattern:
//
// processed := 0
// _ = h.IterateRangeDescriptors(...,
// func(descriptors ...roachpb.RangeDescriptor) error {
// processed += len(descriptors) // we'll over count if retried
// log.Infof(ctx, "processed %d ranges", processed)
// },
// )
//
// Instead we allow callers to pass in a callback to signal on every attempt
// (including the first). This lets us salvage the example above:
//
// var processed int
// init := func() { processed = 0 }
// _ = h.IterateRangeDescriptors(..., init,
// func(descriptors ...roachpb.RangeDescriptor) error {
// processed += len(descriptors)
// log.Infof(ctx, "processed %d ranges", processed)
// },
// )
//
// [1]: pkg/kv/kvserver/batch_eval/cmd_migrate.go
IterateRangeDescriptors(
ctx context.Context,
Expand Down
6 changes: 1 addition & 5 deletions pkg/upgrade/upgradecluster/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/upgrade/upgradecluster",
visibility = ["//visibility:public"],
deps = [
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/roachpb",
Expand All @@ -20,6 +19,7 @@ go_library(
"//pkg/util/ctxgroup",
"//pkg/util/log",
"//pkg/util/quotapool",
"//pkg/util/rangedesciter",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@org_golang_google_grpc//:go_default_library",
Expand All @@ -30,23 +30,19 @@ go_test(
name = "upgradecluster_test",
size = "small",
srcs = [
"client_test.go",
"helper_test.go",
"main_test.go",
"nodes_test.go",
],
args = ["-test.timeout=55s"],
embed = [":upgradecluster"],
deps = [
"//pkg/keys",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/server/serverpb",
"//pkg/sql/tests",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
Expand Down
53 changes: 5 additions & 48 deletions pkg/upgrade/upgradecluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ package upgradecluster
import (
"context"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -23,7 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/cockroach/pkg/util/rangedesciter"
"github.com/cockroachdb/redact"
"google.golang.org/grpc"
)
Expand All @@ -42,6 +41,9 @@ type ClusterConfig struct {
// Dialer constructs connections to other nodes.
Dialer NodeDialer

// RangeDescIterator iterates through all range descriptors.
RangeDescIterator rangedesciter.Iterator

// DB provides access the kv.DB instance backing the cluster.
//
// TODO(irfansharif): We could hide the kv.DB instance behind an interface
Expand Down Expand Up @@ -143,50 +145,5 @@ func (c *Cluster) ForEveryNode(
func (c *Cluster) IterateRangeDescriptors(
ctx context.Context, blockSize int, init func(), fn func(...roachpb.RangeDescriptor) error,
) error {
return c.c.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Inform the caller that we're starting a fresh attempt to page in
// range descriptors.
init()

// Iterate through meta{1,2} to pull out all the range descriptors.
var lastRangeIDInMeta1 roachpb.RangeID
return txn.Iterate(ctx, keys.MetaMin, keys.MetaMax, blockSize,
func(rows []kv.KeyValue) error {
descriptors := make([]roachpb.RangeDescriptor, 0, len(rows))
var desc roachpb.RangeDescriptor
for _, row := range rows {
err := row.ValueProto(&desc)
if err != nil {
return errors.Wrapf(err, "unable to unmarshal range descriptor from %s", row.Key)
}

// In small enough clusters it's possible for the same range
// descriptor to be stored in both meta1 and meta2. This
// happens when some range spans both the meta and the user
// keyspace. Consider when r1 is [/Min,
// /System/NodeLiveness); we'll store the range descriptor
// in both /Meta2/<r1.EndKey> and in /Meta1/KeyMax[1].
//
// As part of iterator we'll de-duplicate this descriptor
// away by checking whether we've seen it before in meta1.
// Since we're scanning over the meta range in sorted
// order, it's enough to check against the last range
// descriptor we've seen in meta1.
//
// [1]: See kvserver.rangeAddressing.
if desc.RangeID == lastRangeIDInMeta1 {
continue
}

descriptors = append(descriptors, desc)
if keys.InMeta1(keys.RangeMetaKey(desc.StartKey)) {
lastRangeIDInMeta1 = desc.RangeID
}
}

// Invoke fn with the current chunk (of size ~blockSize) of
// range descriptors.
return fn(descriptors...)
})
})
return c.c.RangeDescIterator.Iterate(ctx, blockSize, init, fn)
}
39 changes: 39 additions & 0 deletions pkg/util/rangedesciter/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "rangedesciter",
srcs = ["rangedesciter.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/util/rangedesciter",
visibility = ["//visibility:public"],
deps = [
"//pkg/keys",
"//pkg/kv",
"//pkg/roachpb",
"@com_github_cockroachdb_errors//:errors",
],
)

go_test(
name = "rangedesciter_test",
srcs = [
"main_test.go",
"rangedesciter_test.go",
],
args = ["-test.timeout=295s"],
deps = [
":rangedesciter",
"//pkg/keys",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql/tests",
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
],
)

get_x_data(name = "get_x_data")
31 changes: 31 additions & 0 deletions pkg/util/rangedesciter/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package rangedesciter_test

import (
"os"
"testing"

"github.com/cockroachdb/cockroach/pkg/security/securityassets"
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
)

func TestMain(m *testing.M) {
securityassets.SetLoader(securitytest.EmbeddedAssets)
serverutils.InitTestServerFactory(server.TestServerFactory)
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
os.Exit(m.Run())
}

//go:generate ../../util/leaktest/add-leaktest.sh *_test.go
125 changes: 125 additions & 0 deletions pkg/util/rangedesciter/rangedesciter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package rangedesciter

import (
"context"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/errors"
)

// Iterator paginates through every range descriptor in the system.
type Iterator interface {
// Iterate paginates through range descriptors in the system using the given
// page size. It's important to note that the closure is being executed in
// the context of a distributed transaction that may be automatically
// retried. So something like the following is an anti-pattern:
//
// processed := 0
// _ = rdi.Iterate(...,
// func(descriptors ...roachpb.RangeDescriptor) error {
// processed += len(descriptors) // we'll over count if retried
// log.Infof(ctx, "processed %d ranges", processed)
// },
// )
//
// Instead we allow callers to pass in a callback to signal on every attempt
// (including the first). This lets us salvage the example above:
//
// var processed int
// init := func() { processed = 0 }
// _ = rdi.Iterate(..., init,
// func(descriptors ...roachpb.RangeDescriptor) error {
// processed += len(descriptors)
// log.Infof(ctx, "processed %d ranges", processed)
// },
// )
Iterate(
ctx context.Context, pageSize int, init func(),
fn func(descriptors ...roachpb.RangeDescriptor) error,
) error
}

// DB is a database handle to a CRDB cluster.
type DB interface {
Txn(ctx context.Context, retryable func(context.Context, *kv.Txn) error) error
}

// iteratorImpl is a concrete (private) implementation of the Iterator
// interface.
type iteratorImpl struct {
db DB
}

// New returns an Iterator.
func New(db DB) Iterator {
return &iteratorImpl{db: db}
}

var _ Iterator = &iteratorImpl{}

// Iterate implements the Iterator interface.
func (i *iteratorImpl) Iterate(
ctx context.Context,
pageSize int,
init func(),
fn func(descriptors ...roachpb.RangeDescriptor) error,
) error {
return i.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Inform the caller that we're starting a fresh attempt to page in
// range descriptors.
init()

// Iterate through meta{1,2} to pull out all the range descriptors.
var lastRangeIDInMeta1 roachpb.RangeID
return txn.Iterate(ctx, keys.MetaMin, keys.MetaMax, pageSize,
func(rows []kv.KeyValue) error {
descriptors := make([]roachpb.RangeDescriptor, 0, len(rows))
var desc roachpb.RangeDescriptor
for _, row := range rows {
err := row.ValueProto(&desc)
if err != nil {
return errors.Wrapf(err, "unable to unmarshal range descriptor from %s", row.Key)
}

// In small enough clusters it's possible for the same range
// descriptor to be stored in both meta1 and meta2. This
// happens when some range spans both the meta and the user
// keyspace. Consider when r1 is [/Min,
// /System/NodeLiveness); we'll store the range descriptor
// in both /Meta2/<r1.EndKey> and in /Meta1/KeyMax[1].
//
// As part of iterator we'll de-duplicate this descriptor
// away by checking whether we've seen it before in meta1.
// Since we're scanning over the meta range in sorted
// order, it's enough to check against the last range
// descriptor we've seen in meta1.
//
// [1]: See kvserver.rangeAddressing.
if desc.RangeID == lastRangeIDInMeta1 {
continue
}

descriptors = append(descriptors, desc)
if keys.InMeta1(keys.RangeMetaKey(desc.StartKey)) {
lastRangeIDInMeta1 = desc.RangeID
}
}

// Invoke fn with the current chunk (of size ~blockSize) of
// range descriptors.
return fn(descriptors...)
})
})
}
Loading

0 comments on commit c69227a

Please sign in to comment.