Skip to content

Commit

Permalink
partitionccl: enhance partition test to use leaseholders and more nodes
Browse files Browse the repository at this point in the history
Prior to this commit the partitioning tests worked by creating a 3 node cluster
and then expressed constraints over the three nodes. It then validates that
the cluster conforms to the constraints by querying data and examining the
trace to determine which node held the data.

This is problematic for one because it is succeptible to cockroachdb#40333. In rare
cases we'll down-replicate to the wrong single node (e.g. if the right one
is not live) and we won't ever fix it.

It also doesn't exercise leaseholder preferences.

This PR adds functionality to configure clusters with larger numbers of nodes
where each expectation in the config can now refer to a leaseholder_preference
rather than a constraint and we'll allocate the additional nodes to 3
datacenters.

This larger test creates dramatically more data movement and has been useful
when testing cockroachdb#40892.

Release justification: Only touches testing and is useful for testing a
release blocker.

Release note: None
  • Loading branch information
ajwerner committed Sep 20, 2019
1 parent f59a9cd commit 3afd9fa
Showing 1 changed file with 139 additions and 90 deletions.
229 changes: 139 additions & 90 deletions pkg/ccl/partitionccl/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
gosql "database/sql"
"fmt"
"math/rand"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -56,9 +57,13 @@ type partitioningTest struct {
// table name should be.
schema string

// configs are each a shorthand for a zone config, formatted as
// `@index_name` or `.partition_name`. Optionally a suffix of a colon and a
// comma-separated list of constraints may be included (`@index_name:+dc1`).
// configs are each a shorthand for a zone config for leaseholder preferences,
// Constraints will be added such that data is constrained to the dc of the
// leaseholder.
//
// The values are formatted as `@index_name` or `.partition_name`.
// Optionally a suffix of a colon and a comma-separated list of constraints
// may be included (`@index_name:+dc1`).
// These will be parsed into `parsed.subzones`.
configs []string

Expand Down Expand Up @@ -102,11 +107,87 @@ type partitioningTest struct {
}
}

type clusterConfiguration struct {
numNodes int
}

type repartitioningTest struct {
index string
old, new partitioningTest
}

func (rt *repartitioningTest) run(
ctx context.Context, t *testing.T, db *gosql.DB, sqlDB *sqlutils.SQLRunner,
) {
t.Run(fmt.Sprintf("%s/%s", rt.old.name, rt.new.name), func(t *testing.T) {
sqlDB.Exec(t, `DROP DATABASE IF EXISTS data`)
sqlDB.Exec(t, `CREATE DATABASE data`)

{
if err := rt.old.parse(); err != nil {
t.Fatalf("%+v", err)
}
sqlDB.Exec(t, rt.old.parsed.createStmt)
fmt.Println(rt.old.parsed.zoneConfigStmts)
sqlDB.Exec(t, rt.old.parsed.zoneConfigStmts)

testutils.SucceedsSoon(t, rt.old.verifyScansFn(ctx, t, db))
}

{
if err := rt.new.parse(); err != nil {
t.Fatalf("%+v", err)
}
sqlDB.Exec(t, fmt.Sprintf("ALTER TABLE %s RENAME TO %s", rt.old.parsed.tableName, rt.new.parsed.tableName))

testIndex, _, err := rt.new.parsed.tableDesc.FindIndexByName(rt.index)
if err != nil {
t.Fatalf("%+v", err)
}

var repartition bytes.Buffer
if testIndex.ID == rt.new.parsed.tableDesc.PrimaryIndex.ID {
fmt.Fprintf(&repartition, `ALTER TABLE %s `, rt.new.parsed.tableName)
} else {
fmt.Fprintf(&repartition, `ALTER INDEX %s@%s `, rt.new.parsed.tableName, testIndex.Name)
}
if testIndex.Partitioning.NumColumns == 0 {
repartition.WriteString(`PARTITION BY NOTHING`)
} else {
if err := sql.ShowCreatePartitioning(
&sqlbase.DatumAlloc{}, rt.new.parsed.tableDesc, testIndex,
&testIndex.Partitioning, &repartition, 0 /* indent */, 0, /* colOffset */
); err != nil {
t.Fatalf("%+v", err)
}
}
sqlDB.Exec(t, repartition.String())

// Verify that repartitioning removes zone configs for partitions that
// have been removed.
newPartitionNames := map[string]struct{}{}
for _, name := range rt.new.parsed.tableDesc.PartitionNames() {
newPartitionNames[name] = struct{}{}
}
for _, row := range sqlDB.QueryStr(
t, "SELECT partition_name FROM crdb_internal.zones WHERE partition_name IS NOT NULL") {
partitionName := row[0]
if _, ok := newPartitionNames[partitionName]; !ok {
t.Errorf("zone config for removed partition %q exists after repartitioning", partitionName)
}
}

// NB: Not all old zone configurations are removed. This statement will
// overwrite any with the same name and the repartitioning removes any
// for partitions that no longer exist, but there could still be some
// sitting around (e.g., when a repartitioning preserves a partition but
// does not apply a new zone config). This is fine.
sqlDB.Exec(t, rt.new.parsed.zoneConfigStmts)
testutils.SucceedsSoon(t, rt.new.verifyScansFn(ctx, t, db))
}
})
}

// parse fills in the various fields of `partitioningTest.parsed`.
func (pt *partitioningTest) parse() error {
if pt.parsed.parsed {
Expand Down Expand Up @@ -142,13 +223,13 @@ func (pt *partitioningTest) parse() error {
var zoneConfigStmts bytes.Buffer
// TODO(dan): Can we run all the zoneConfigStmts in a txn?
for _, c := range pt.configs {
var subzoneShort, constraints string
var subzoneShort, leasePreferences string
configParts := strings.Split(c, `:`)
switch len(configParts) {
case 1:
subzoneShort = configParts[0]
case 2:
subzoneShort, constraints = configParts[0], configParts[1]
subzoneShort, leasePreferences = configParts[0], configParts[1]
default:
panic(errors.Errorf("unsupported config: %s", c))
}
Expand Down Expand Up @@ -177,16 +258,19 @@ func (pt *partitioningTest) parse() error {
return errors.Wrapf(err, "could not find index %s", indexName)
}
subzone.IndexID = uint32(idxDesc.ID)
constraints := strings.Replace(leasePreferences, "n", "zone=dc", -1)
if len(constraints) > 0 {
if subzone.PartitionName == "" {
fmt.Fprintf(&zoneConfigStmts,
`ALTER INDEX %s@%s CONFIGURE ZONE USING constraints = '[%s]';`,
pt.parsed.tableName, idxDesc.Name, constraints,
`ALTER INDEX %s@%s CONFIGURE ZONE USING constraints = '[%s]', lease_preferences = '[[%q]]';
`,
pt.parsed.tableName, idxDesc.Name, constraints, leasePreferences,
)
} else {
fmt.Fprintf(&zoneConfigStmts,
`ALTER PARTITION %s OF INDEX %s@%s CONFIGURE ZONE USING constraints = '[%s]';`,
subzone.PartitionName, pt.parsed.tableName, idxDesc.Name, constraints,
`ALTER PARTITION %s OF INDEX %s@%s CONFIGURE ZONE USING constraints = '[%s]', lease_preferences = '[[%s]]';
`,
subzone.PartitionName, pt.parsed.tableName, idxDesc.Name, constraints, leasePreferences,
)
}
}
Expand All @@ -197,6 +281,15 @@ func (pt *partitioningTest) parse() error {
}
subzone.Config.Constraints = parsedConstraints.Constraints
subzone.Config.InheritedConstraints = parsedConstraints.Inherited
var parsedLeasePreferences config.ConstraintsList
if err := yaml.UnmarshalStrict([]byte("["+leasePreferences+"]"), &parsedLeasePreferences); err != nil {
return errors.Wrapf(err, "parsing lease preferences: %s", constraints)
}
for _, c := range parsedLeasePreferences.Constraints {
subzone.Config.LeasePreferences = append(subzone.Config.LeasePreferences,
config.LeasePreference{Constraints: c.Constraints})
}
subzone.Config.InheritedLeasePreferences = parsedLeasePreferences.Inherited

pt.parsed.subzones = append(pt.parsed.subzones, subzone)
}
Expand Down Expand Up @@ -1095,7 +1188,8 @@ func verifyScansOnNode(
}
traceLines = append(traceLines, traceLine.String)
if strings.Contains(traceLine.String, "read completed") {
if strings.Contains(traceLine.String, "SystemCon") {
if strings.Contains(traceLine.String, "SystemCon") ||
strings.Contains(traceLine.String, "Min-") {
// Ignore trace lines for the system config range (abbreviated as
// "SystemCon" in pretty printing of the range descriptor). A read might
// be performed to the system config range to update the table lease.
Expand All @@ -1120,12 +1214,12 @@ func verifyScansOnNode(
}

func setupPartitioningTestCluster(
ctx context.Context, t testing.TB,
ctx context.Context, t testing.TB, numDCs, nodesPerDC int,
) (*gosql.DB, *sqlutils.SQLRunner, func()) {
cfg := config.DefaultZoneConfig()
cfg.NumReplicas = proto.Int32(1)
cfg.NumReplicas = proto.Int32(3)

tsArgs := func(attr string) base.TestServerArgs {
tsArgs := func(attr, locality string) base.TestServerArgs {
return base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &storage.StoreTestingKnobs{
Expand All @@ -1142,14 +1236,30 @@ func setupPartitioningTestCluster(
{InMemory: true, Attributes: roachpb.Attributes{Attrs: []string{attr}}},
},
UseDatabase: "data",
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{
{
Key: "zone",
Value: locality,
},
},
},
}
}
numNodes := numDCs * nodesPerDC
tcArgs := func(numNodes int) base.TestClusterArgs {
serverArgs := make(map[int]base.TestServerArgs)
for i := 0; i < numNodes; i++ {
serverArgs[i] = tsArgs(
"n"+strconv.Itoa(i+1),
"dc"+strconv.Itoa((i%numDCs)+1),
)
}
return base.TestClusterArgs{
ServerArgsPerNode: serverArgs,
}
}
tcArgs := base.TestClusterArgs{ServerArgsPerNode: map[int]base.TestServerArgs{
0: tsArgs("n1"),
1: tsArgs("n2"),
2: tsArgs("n3"),
}}
tc := testcluster.StartTestCluster(t, 3, tcArgs)
tc := testcluster.StartTestCluster(t, numNodes, tcArgs(numNodes))

sqlDB := sqlutils.MakeSQLRunner(tc.Conns[0])
sqlDB.Exec(t, `CREATE DATABASE data`)
Expand Down Expand Up @@ -1179,7 +1289,7 @@ func TestInitialPartitioning(t *testing.T) {
testCases := allPartitioningTests(rng)

ctx := context.Background()
db, sqlDB, cleanup := setupPartitioningTestCluster(ctx, t)
db, sqlDB, cleanup := setupPartitioningTestCluster(ctx, t, 3, 1)
defer cleanup()

for _, test := range testCases {
Expand Down Expand Up @@ -1287,76 +1397,15 @@ func TestRepartitioning(t *testing.T) {
if err != nil {
t.Fatalf("%+v", err)
}

ctx := context.Background()
db, sqlDB, cleanup := setupPartitioningTestCluster(ctx, t)
defer cleanup()

for _, test := range testCases {
t.Run(fmt.Sprintf("%s/%s", test.old.name, test.new.name), func(t *testing.T) {
sqlDB.Exec(t, `DROP DATABASE IF EXISTS data`)
sqlDB.Exec(t, `CREATE DATABASE data`)

{
if err := test.old.parse(); err != nil {
t.Fatalf("%+v", err)
}
sqlDB.Exec(t, test.old.parsed.createStmt)
sqlDB.Exec(t, test.old.parsed.zoneConfigStmts)

testutils.SucceedsSoon(t, test.old.verifyScansFn(ctx, t, db))
}

{
if err := test.new.parse(); err != nil {
t.Fatalf("%+v", err)
}
sqlDB.Exec(t, fmt.Sprintf("ALTER TABLE %s RENAME TO %s", test.old.parsed.tableName, test.new.parsed.tableName))

testIndex, _, err := test.new.parsed.tableDesc.FindIndexByName(test.index)
if err != nil {
t.Fatalf("%+v", err)
}

var repartition bytes.Buffer
if testIndex.ID == test.new.parsed.tableDesc.PrimaryIndex.ID {
fmt.Fprintf(&repartition, `ALTER TABLE %s `, test.new.parsed.tableName)
} else {
fmt.Fprintf(&repartition, `ALTER INDEX %s@%s `, test.new.parsed.tableName, testIndex.Name)
}
if testIndex.Partitioning.NumColumns == 0 {
repartition.WriteString(`PARTITION BY NOTHING`)
} else {
if err := sql.ShowCreatePartitioning(
&sqlbase.DatumAlloc{}, test.new.parsed.tableDesc, testIndex,
&testIndex.Partitioning, &repartition, 0 /* indent */, 0, /* colOffset */
); err != nil {
t.Fatalf("%+v", err)
}
}
sqlDB.Exec(t, repartition.String())

// Verify that repartitioning removes zone configs for partitions that
// have been removed.
newPartitionNames := map[string]struct{}{}
for _, name := range test.new.parsed.tableDesc.PartitionNames() {
newPartitionNames[name] = struct{}{}
}
for _, row := range sqlDB.QueryStr(
t, "SELECT partition_name FROM crdb_internal.zones WHERE partition_name IS NOT NULL") {
partitionName := row[0]
if _, ok := newPartitionNames[partitionName]; !ok {
t.Errorf("zone config for removed partition %q exists after repartitioning", partitionName)
}
}

// NB: Not all old zone configurations are removed. This statement will
// overwrite any with the same name and the repartitioning removes any
// for partitions that no longer exist, but there could still be some
// sitting around (e.g., when a repartitioning preserves a partition but
// does not apply a new zone config). This is fine.
sqlDB.Exec(t, test.new.parsed.zoneConfigStmts)
testutils.SucceedsSoon(t, test.new.verifyScansFn(ctx, t, db))
// All of the partitioning teosts assume 3 datacenters.
const numDCs = 3
for _, nodesPerDC := range []int{1, 3, 5} {
t.Run(fmt.Sprintf("nodes=%d", nodesPerDC*numDCs), func(t *testing.T) {
ctx := context.Background()
db, sqlDB, cleanup := setupPartitioningTestCluster(ctx, t, 3, nodesPerDC)
defer cleanup()
for _, test := range testCases {
test.run(ctx, t, db, sqlDB)
}
})
}
Expand Down

0 comments on commit 3afd9fa

Please sign in to comment.