From d49d5353ed9432307b806deb95acfdc9ce43c191 Mon Sep 17 00:00:00 2001 From: Daniel Harrison Date: Wed, 14 Jun 2017 17:09:00 -0400 Subject: [PATCH] sampledataccl: add an abstraction for ccl testdata Data is a representation of a sql table, used for creating test data in various forms (sql rows, kvs, enterprise backup). We only have one for the moment (a bank table with an id, balance, and payload for padding row-size), but the indirection of the interface didn't really add any complexity. --- pkg/ccl/sqlccl/load.go | 4 + pkg/ccl/utilccl/sampledataccl/bankdata.go | 389 ++++++++++++++++++ .../utilccl/sampledataccl/bankdata_test.go | 198 +++++++++ pkg/ccl/utilccl/sampledataccl/main_test.go | 34 ++ 4 files changed, 625 insertions(+) create mode 100644 pkg/ccl/utilccl/sampledataccl/bankdata.go create mode 100644 pkg/ccl/utilccl/sampledataccl/bankdata_test.go create mode 100644 pkg/ccl/utilccl/sampledataccl/main_test.go diff --git a/pkg/ccl/sqlccl/load.go b/pkg/ccl/sqlccl/load.go index 62307910e23e..63c481a2e4f9 100644 --- a/pkg/ccl/sqlccl/load.go +++ b/pkg/ccl/sqlccl/load.go @@ -326,6 +326,10 @@ func writeSST( kvs []engine.MVCCKeyValue, ts hlc.Timestamp, ) error { + if len(kvs) == 0 { + return nil + } + filename := fmt.Sprintf("load-%d.sst", rand.Int63()) log.Info(ctx, "writesst ", filename) diff --git a/pkg/ccl/utilccl/sampledataccl/bankdata.go b/pkg/ccl/utilccl/sampledataccl/bankdata.go new file mode 100644 index 000000000000..c0efbc51d6db --- /dev/null +++ b/pkg/ccl/utilccl/sampledataccl/bankdata.go @@ -0,0 +1,389 @@ +// Copyright 2017 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/LICENSE + +package sampledataccl + +import ( + "bytes" + gosql "database/sql" + "fmt" + "io" + "io/ioutil" + "math/rand" + "path/filepath" + "strconv" + "strings" + "testing" + + "github.com/pkg/errors" + "golang.org/x/net/context" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/sqlccl" + "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/randutil" +) + +// TODO(dan): Once the interface for this has settled a bit, move it out of ccl. + +// Data is a representation of a sql table, used for creating test data in +// various forms (sql rows, kvs, enterprise backup). All data tables live in the +// `data` database. +type Data interface { + // Name returns the fully-qualified name of the represented table. + Name() string + + // Schema returns the schema for the represented table, such that it can be + // used in fmt.Sprintf(`CREATE TABLE %s %s`, data.Name(), data.Schema`())`. + Schema() string + + // NextRow iterates through the rows in the represented table, returning one + // per call. The row is represented as a slice of strings, each string one + // of the columns in the row. When no more rows are available, the bool + // returned is false. + NextRow() ([]string, bool) + + // NextRow iterates through the split points in the represented table, + // returning one per call. The split is represented as a slice of strings, + // each string one of the columns in the split. When no more splits are + // available, the bool returned is false. + NextSplit() ([]string, bool) +} + +// InsertBatched inserts all rows represented by `data` into `db` in batches of +// `batchSize` rows. The table must exist. +func InsertBatched(db *gosql.DB, data Data, batchSize int) error { + var stmt bytes.Buffer + for { + stmt.Reset() + fmt.Fprintf(&stmt, `INSERT INTO %s VALUES `, data.Name()) + i := 0 + done := false + for ; i < batchSize; i++ { + row, ok := data.NextRow() + if !ok { + done = true + break + } + if i != 0 { + stmt.WriteRune(',') + } + fmt.Fprintf(&stmt, `(%s)`, strings.Join(row, `,`)) + } + if i > 0 { + if _, err := db.Exec(stmt.String()); err != nil { + return err + } + } + if done { + return nil + } + continue + } +} + +// Split creates the configured number of ranges in an already created created +// version of the table represented by `data`. +func Split(db *gosql.DB, data Data) error { + var stmt bytes.Buffer + fmt.Fprintf(&stmt, `ALTER TABLE %s SPLIT AT VALUES `, data.Name()) + i := 0 + for ; ; i++ { + split, ok := data.NextSplit() + if !ok { + break + } + if i != 0 { + stmt.WriteRune(',') + } + fmt.Fprintf(&stmt, `(%s)`, strings.Join(split, `,`)) + } + if i > 0 { + if _, err := db.Exec(stmt.String()); err != nil { + return err + } + } + return nil +} + +// Setup creates a table in `db` with all the rows and splits of `data`. +func Setup(db *gosql.DB, data Data) error { + if _, err := db.Exec(`CREATE DATABASE IF NOT EXISTS data`); err != nil { + return err + } + if _, err := db.Exec(fmt.Sprintf(`CREATE TABLE %s %s`, data.Name(), data.Schema())); err != nil { + return err + } + const batchSize = 1000 + if err := InsertBatched(db, data, batchSize); err != nil { + return err + } + // This occasionally flakes, so ignore errors. + _ = Split(db, data) + return nil +} + +// ToBackup creates an enterprise backup in `dir`. +func ToBackup(t testing.TB, data Data, dir string) (*Backup, error) { + return toBackup(t, data, dir, 0) +} + +func toBackup(t testing.TB, data Data, dir string, chunkBytes int64) (*Backup, error) { + tempDir, dirCleanupFn := testutils.TempDir(t) + defer dirCleanupFn() + + // TODO(dan): Get rid of the `t testing.TB` parameter and this `TestServer`. + ctx := context.Background() + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + if _, err := db.Exec(`CREATE DATABASE data`); err != nil { + return nil, err + } + + var stmts bytes.Buffer + fmt.Fprintf(&stmts, "CREATE TABLE %s %s;\n", data.Name(), data.Schema()) + for { + row, ok := data.NextRow() + if !ok { + break + } + fmt.Fprintf(&stmts, "INSERT INTO %s VALUES (%s);\n", data.Name(), strings.Join(row, `,`)) + } + + // TODO(dan): The csv load will be less overhead, use it when we have it. + ts := hlc.Timestamp{WallTime: hlc.UnixNano()} + desc, err := sqlccl.Load(ctx, db, &stmts, `data`, `nodelocal://`+dir, ts, chunkBytes, tempDir) + if err != nil { + return nil, err + } + return &Backup{BaseDir: dir, Desc: desc}, nil +} + +// Backup is a representation of an enterprise BACKUP. +type Backup struct { + // BaseDir can be used for a RESTORE. All paths in the descriptor are + // relative to this. + BaseDir string + Desc sqlccl.BackupDescriptor + + fileIdx int + iterIdx int +} + +// ResetKeyValueIteration resets the NextKeyValues iteration to the first kv. +func (b *Backup) ResetKeyValueIteration() { + b.fileIdx = 0 + b.iterIdx = 0 +} + +// NextKeyValues iterates and returns every *user table data* key-value in the +// backup. At least `count` kvs will be returned, but rows are not broken up, so +// slightly more than `count` may come back. If fewer than `count` are +// available, err will be `io.EOF` and kvs may be partially filled with the +// remainer. +func (b *Backup) NextKeyValues( + count int, newTableID sqlbase.ID, +) ([]engine.MVCCKeyValue, roachpb.Span, error) { + var userTables []*sqlbase.TableDescriptor + for _, d := range b.Desc.Descriptors { + if t := d.GetTable(); t != nil && t.ParentID != keys.SystemDatabaseID { + userTables = append(userTables, t) + } + } + if len(userTables) != 1 { + return nil, roachpb.Span{}, errors.Errorf( + "only backups of one table are currently supported, got %d", len(userTables)) + } + tableDesc := userTables[0] + + newDesc := *tableDesc + newDesc.ID = newTableID + newDescBytes, err := sqlbase.WrapDescriptor(&newDesc).Marshal() + if err != nil { + return nil, roachpb.Span{}, err + } + kr, err := storageccl.MakeKeyRewriter([]roachpb.ImportRequest_TableRekey{{ + OldID: uint32(tableDesc.ID), NewDesc: newDescBytes, + }}) + if err != nil { + return nil, roachpb.Span{}, err + } + + var kvs []engine.MVCCKeyValue + span := roachpb.Span{Key: keys.MaxKey} + for ; b.fileIdx < len(b.Desc.Files); b.fileIdx++ { + file := b.Desc.Files[b.fileIdx] + + sst := engine.MakeRocksDBSstFileReader() + defer sst.Close() + fileContents, err := ioutil.ReadFile(filepath.Join(b.BaseDir, file.Path)) + if err != nil { + return nil, roachpb.Span{}, err + } + if err := sst.IngestExternalFile(fileContents); err != nil { + return nil, roachpb.Span{}, err + } + + it := sst.NewIterator(false) + defer it.Close() + it.Seek(engine.MVCCKey{Key: file.Span.Key}) + + iterIdx := 0 + for ; ; it.Next() { + if len(kvs) >= count { + break + } + if iterIdx < b.iterIdx { + iterIdx++ + continue + } + + ok, err := it.Valid() + if err != nil { + return nil, roachpb.Span{}, err + } + if !ok || it.UnsafeKey().Key.Compare(file.Span.EndKey) >= 0 { + break + } + + if iterIdx < b.iterIdx { + break + } + b.iterIdx = iterIdx + + key := it.Key() + key.Key, ok, err = kr.RewriteKey(key.Key) + if err != nil { + return nil, roachpb.Span{}, err + } + if !ok { + return nil, roachpb.Span{}, errors.Errorf("rewriter did not match key: %s", key.Key) + } + v := roachpb.Value{RawBytes: it.Value()} + v.ClearChecksum() + v.InitChecksum(key.Key) + kvs = append(kvs, engine.MVCCKeyValue{Key: key, Value: v.RawBytes}) + + if key.Key.Compare(span.Key) < 0 { + span.Key = append(span.Key[:0], key.Key...) + } + if key.Key.Compare(span.EndKey) > 0 { + span.EndKey = append(span.EndKey[:0], key.Key...) + } + iterIdx++ + } + b.iterIdx = iterIdx + + if len(kvs) >= count { + break + } + if ok, _ := it.Valid(); !ok { + b.iterIdx = 0 + } + } + + span.EndKey = span.EndKey.Next() + if len(kvs) < count { + return kvs, span, io.EOF + } + return kvs, span, nil +} + +type bankData struct { + Rows int + PayloadBytes int + Ranges int + Rng *rand.Rand + + rowIdx int + splitIdx int +} + +var _ Data = &bankData{} + +// bankConfigDefault will trigger the default for any of the parameters for +// `Bank`. +const bankConfigDefault = -1 + +// BankRows returns Bank testdata with the given number of rows and default +// payload size and range count. +func BankRows(rows int) Data { + return Bank(rows, bankConfigDefault, bankConfigDefault) +} + +// Bank returns a bank table with three columns: an `id INT PRIMARY KEY` +// representing an account number, a `balance` INT, and a `payload` BYTES to pad +// the size of the rows for various tests. +func Bank(rows int, payloadBytes int, ranges int) Data { + // TODO(dan): This interface is a little wonky, but it's a bit annoying to + // replace it with a struct because that would make most of the callsites + // wrap. Complicating things is that there needs to be a distinction between + // the default value and an explicit 0 for ranges and payloadBytes. + if rows == bankConfigDefault { + rows = 1000 + } + if payloadBytes == bankConfigDefault { + payloadBytes = 100 + } + if ranges == bankConfigDefault { + ranges = 10 + } + if ranges > rows { + ranges = rows + } + rng, _ := randutil.NewPseudoRand() + return &bankData{Rows: rows, PayloadBytes: payloadBytes, Ranges: ranges, Rng: rng} +} + +// Name implements the Data interface. +func (d *bankData) Name() string { + return `data.bank` +} + +// Schema implements the Data interface. +func (d *bankData) Schema() string { + return `( + id INT PRIMARY KEY, + balance INT, + payload STRING, + FAMILY (id, balance, payload) + )` +} + +// NextRow implements the Data interface. +func (d *bankData) NextRow() ([]string, bool) { + if d.rowIdx >= d.Rows { + return nil, false + } + payload := fmt.Sprintf(`'initial-%x'`, randutil.RandBytes(d.Rng, d.PayloadBytes)) + row := []string{ + strconv.Itoa(d.rowIdx), // id + `0`, // balance + payload, // payload + } + d.rowIdx++ + return row, true +} + +// NextSplit implements the Data interface. +func (d *bankData) NextSplit() ([]string, bool) { + if d.splitIdx+1 >= d.Ranges { + return nil, false + } + // TODO(dan): This doesn't make evenly sized splits. + split := []string{strconv.Itoa(d.splitIdx)} + d.splitIdx++ + return split, true +} diff --git a/pkg/ccl/utilccl/sampledataccl/bankdata_test.go b/pkg/ccl/utilccl/sampledataccl/bankdata_test.go new file mode 100644 index 000000000000..7e8d71caf5ff --- /dev/null +++ b/pkg/ccl/utilccl/sampledataccl/bankdata_test.go @@ -0,0 +1,198 @@ +// Copyright 2017 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/LICENSE + +package sampledataccl + +import ( + "bytes" + "fmt" + "io" + "testing" + + "golang.org/x/net/context" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" +) + +func TestInsertBatched(t *testing.T) { + defer leaktest.AfterTest(t)() + + tests := []struct { + rows int + batchSize int + }{ + {10, 1}, + {10, 9}, + {10, 10}, + {10, 100}, + } + + ctx := context.Background() + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + + for _, test := range tests { + t.Run(fmt.Sprintf("rows=%d/batch=%d", test.rows, test.batchSize), func(t *testing.T) { + sqlDB := sqlutils.MakeSQLRunner(t, db) + + data := Bank(test.rows, 0, bankConfigDefault) + sqlDB.Exec(`CREATE DATABASE IF NOT EXISTS data`) + sqlDB.Exec(fmt.Sprintf(`DROP TABLE IF EXISTS %s`, data.Name())) + sqlDB.Exec(fmt.Sprintf(`CREATE TABLE %s %s`, data.Name(), data.Schema())) + + if err := InsertBatched(sqlDB.DB, data, test.batchSize); err != nil { + t.Fatalf("%+v", err) + } + + var rowCount int + sqlDB.QueryRow(fmt.Sprintf(`SELECT COUNT(*) FROM %s`, data.Name())).Scan(&rowCount) + if rowCount != test.rows { + t.Errorf("got %d rows expected %d", rowCount, test.rows) + } + }) + } +} + +func TestSplit(t *testing.T) { + defer leaktest.AfterTest(t)() + + tests := []struct { + rows int + ranges int + expectedRanges int + }{ + {10, 0, 1}, // We always have at least one range. + {10, 1, 1}, + {10, 9, 9}, + {10, 10, 10}, + {10, 100, 10}, // Don't make more ranges than rows. + } + + ctx := context.Background() + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + + for _, test := range tests { + t.Run(fmt.Sprintf("rows=%d/ranges=%d", test.rows, test.ranges), func(t *testing.T) { + sqlDB := sqlutils.MakeSQLRunner(t, db) + + data := Bank(test.rows, bankConfigDefault, test.ranges) + sqlDB.Exec(`CREATE DATABASE IF NOT EXISTS data`) + sqlDB.Exec(fmt.Sprintf(`DROP TABLE IF EXISTS %s`, data.Name())) + sqlDB.Exec(fmt.Sprintf(`CREATE TABLE %s %s`, data.Name(), data.Schema())) + + if err := Split(sqlDB.DB, data); err != nil { + t.Fatalf("%+v", err) + } + + var rangeCount int + sqlDB.QueryRow( + fmt.Sprintf(`SELECT COUNT(*) FROM [SHOW TESTING_RANGES FROM TABLE %s]`, data.Name()), + ).Scan(&rangeCount) + if rangeCount != test.expectedRanges { + t.Errorf("got %d ranges expected %d", rangeCount, test.expectedRanges) + } + }) + } +} + +func TestToBackup(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + + const payloadBytes = 100 + chunkBytesSizes := []int64{ + 0, // 0 means default ~32MB + 3 * payloadBytes, // a number that will not evently divide the number of rows + } + + for _, rows := range []int{1, 10} { + for _, chunkBytes := range chunkBytesSizes { + t.Run(fmt.Sprintf("rows=%d/chunk=%d", rows, chunkBytes), func(t *testing.T) { + dir, dirCleanupFn := testutils.TempDir(t) + defer dirCleanupFn() + + data := Bank(rows, payloadBytes, bankConfigDefault) + backup, err := toBackup(t, data, dir, chunkBytes) + if err != nil { + t.Fatalf("%+v", err) + } + + t.Run("Restore", func(t *testing.T) { + sqlDB := sqlutils.MakeSQLRunner(t, db) + sqlDB.Exec(`DROP DATABASE IF EXISTS data`) + sqlDB.Exec(`CREATE DATABASE data`) + sqlDB.Exec(`RESTORE data.* FROM $1`, `nodelocal://`+dir) + + var rowCount int + sqlDB.QueryRow(fmt.Sprintf(`SELECT COUNT(*) FROM %s`, data.Name())).Scan(&rowCount) + if rowCount != rows { + t.Errorf("got %d rows expected %d", rowCount, rows) + } + }) + + t.Run("NextKeyValues", func(t *testing.T) { + for _, requestedKVs := range []int{2, 3} { + newTableID := sqlbase.ID(keys.MaxReservedDescID + requestedKVs) + newTablePrefix := roachpb.Key(keys.MakeTablePrefix(uint32(newTableID))) + + keys := make(map[string]struct{}, rows) + for { + kvs, span, err := backup.NextKeyValues(requestedKVs, newTableID) + if err != nil && err != io.EOF { + t.Fatalf("%+v", err) + } + if len(kvs) < requestedKVs && err != io.EOF { + t.Errorf("got %d kvs requested at least %d", len(kvs), requestedKVs) + } + for k := range keys { + key := roachpb.Key(k) + if key.Compare(span.Key) >= 0 && key.Compare(span.EndKey) < 0 { + t.Errorf("previous key %s overlaps this span %s", key, span) + } + } + for _, kv := range kvs { + key := kv.Key.Key + if key.Compare(span.Key) < 0 || key.Compare(span.EndKey) >= 0 { + t.Errorf("key %s is not in %s", kv.Key, span) + } + if _, ok := keys[string(key)]; ok { + t.Errorf("key %s was output twice", key) + } + keys[string(key)] = struct{}{} + if !bytes.HasPrefix(key, newTablePrefix) { + t.Errorf("key %s is not for table %d", key, newTableID) + } + } + if err == io.EOF { + break + } + } + + if len(keys) != rows { + t.Errorf("got %d kvs expected %d", len(keys), rows) + } + // Don't reconstruct the backup between runs so we can + // test the reset. + backup.ResetKeyValueIteration() + } + }) + }) + } + } +} diff --git a/pkg/ccl/utilccl/sampledataccl/main_test.go b/pkg/ccl/utilccl/sampledataccl/main_test.go new file mode 100644 index 000000000000..8ff662c629f1 --- /dev/null +++ b/pkg/ccl/utilccl/sampledataccl/main_test.go @@ -0,0 +1,34 @@ +// Copyright 2017 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/LICENSE + +package sampledataccl + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/randutil" +) + +func TestMain(m *testing.M) { + defer settings.TestingSetBool(&utilccl.EnterpriseEnabled, true)() + security.SetAssetLoader(securitytest.EmbeddedAssets) + randutil.SeedForTests() + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} + +//go:generate ../../../util/leaktest/add-leaktest.sh *_test.go