Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workload: support loading initial data using IMPORT in run and init #35312

Merged
merged 1 commit into from
Jul 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,8 @@ func backupRestoreTestSetupWithParams(

sqlDB = sqlutils.MakeSQLRunner(tc.Conns[0])
sqlDB.Exec(t, `CREATE DATABASE data`)
const insertBatchSize = 1000
const concurrency = 4
if _, err := workloadsql.Setup(ctx, sqlDB.DB.(*gosql.DB), bankData, insertBatchSize, concurrency); err != nil {
l := workloadsql.InsertsDataLoader{BatchSize: 1000, Concurrency: 4}
if _, err := workloadsql.Setup(ctx, sqlDB.DB.(*gosql.DB), bankData, l); err != nil {
t.Fatalf("%+v", err)
}
if err := workloadsql.Split(ctx, sqlDB.DB.(*gosql.DB), bankData.Tables()[0], 1 /* concurrency */); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,8 @@ func TestAvroLedger(t *testing.T) {

ctx := context.Background()
gen := ledger.FromFlags(`--customers=1`)
_, err := workloadsql.Setup(ctx, db, gen, 0, 0)
var l workloadsql.InsertsDataLoader
_, err := workloadsql.Setup(ctx, db, gen, l)
require.NoError(t, err)

ledger := feed(t, f, `CREATE CHANGEFEED FOR customer, transaction, entry, session
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/validations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ func TestCatchupScanOrdering(t *testing.T) {
ctx := context.Background()
const numRows, numRanges, payloadBytes, maxTransfer = 10, 10, 10, 999
gen := bank.FromConfig(numRows, payloadBytes, numRanges)
if _, err := workloadsql.Setup(ctx, db, gen, 0, 0); err != nil {
var l workloadsql.InsertsDataLoader
if _, err := workloadsql.Setup(ctx, db, gen, l); err != nil {
t.Fatal(err)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/importccl/exportcsv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ func setupExportableBank(t *testing.T, nodes, rows int) (*sqlutils.SQLRunner, st
db.Exec(t, "CREATE DATABASE test")

wk := bank.FromRows(rows)
if _, err := workloadsql.Setup(ctx, conn, wk, 100, 3); err != nil {
l := workloadsql.InsertsDataLoader{BatchSize: 100, Concurrency: 3}
if _, err := workloadsql.Setup(ctx, conn, wk, l); err != nil {
t.Fatal(err)
}

Expand Down
22 changes: 7 additions & 15 deletions pkg/ccl/workloadccl/allccl/all_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,6 @@ import (
"github.com/stretchr/testify/require"
)

const (
directIngestion = true
oneFilePerNode = 1
noInjectStats = false
noSkipPostLoad = false
skipCSVRoundtrip = ``
)

func bigInitialData(meta workload.Meta) bool {
switch meta.Name {
case `tpcc`, `tpch`, `tpcds`:
Expand Down Expand Up @@ -94,11 +86,11 @@ func TestAllRegisteredImportFixture(t *testing.T) {
defer s.Stopper().Stop(ctx)
sqlutils.MakeSQLRunner(db).Exec(t, `CREATE DATABASE d`)

if _, err := workloadccl.ImportFixture(
ctx, db, gen, `d`, directIngestion, oneFilePerNode, noInjectStats, noSkipPostLoad,
skipCSVRoundtrip,
); err != nil {
t.Fatal(err)
l := workloadccl.ImportDataLoader{
DirectIngestion: true,
}
if _, err := workloadsql.Setup(ctx, db, gen, l); err != nil {
t.Fatalf(`%+v`, err)
}

// Run the consistency check if this workload has one.
Expand Down Expand Up @@ -152,8 +144,8 @@ func TestAllRegisteredSetup(t *testing.T) {
sqlutils.MakeSQLRunner(db).Exec(t, `CREATE DATABASE d`)
sqlutils.MakeSQLRunner(db).Exec(t, `SET CLUSTER SETTING kv.range_merge.queue_enabled = false`)

const batchSize, concurrency = 0, 0
if _, err := workloadsql.Setup(ctx, db, gen, batchSize, concurrency); err != nil {
var l workloadsql.InsertsDataLoader
if _, err := workloadsql.Setup(ctx, db, gen, l); err != nil {
t.Fatalf(`%+v`, err)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/workloadccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ func benchmarkImportFixture(b *testing.B, gen workload.Generator) {

b.StartTimer()
const filesPerNode = 1
const directIngest, noInjectStats, skipPostLoad, csvServer = true, false, true, ``
const directIngest, noInjectStats, csvServer = true, false, ``
importBytes, err := ImportFixture(
ctx, db, gen, `d`, directIngest, filesPerNode, noInjectStats, skipPostLoad, csvServer,
ctx, db, gen, `d`, directIngest, filesPerNode, noInjectStats, csvServer,
)
require.NoError(b, err)
bytes += importBytes
Expand Down
76 changes: 51 additions & 25 deletions pkg/ccl/workloadccl/cliccl/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload"
workloadcli "github.com/cockroachdb/cockroach/pkg/workload/cli"
"github.com/cockroachdb/cockroach/pkg/workload/workloadsql"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
Expand Down Expand Up @@ -268,6 +269,29 @@ func fixturesMake(gen workload.Generator, urls []string, _ string) error {
return nil
}

// restoreDataLoader is an InitialDataLoader implementation that loads data with
// RESTORE.
type restoreDataLoader struct {
fixture workloadccl.Fixture
database string
}

// InitialDataLoad implements the InitialDataLoader interface.
func (l restoreDataLoader) InitialDataLoad(
ctx context.Context, db *gosql.DB, gen workload.Generator,
) (int64, error) {
log.Infof(ctx, "starting restore of %d tables", len(gen.Tables()))
start := timeutil.Now()
bytes, err := workloadccl.RestoreFixture(ctx, db, l.fixture, l.database)
if err != nil {
return 0, errors.Wrap(err, `restoring fixture`)
}
elapsed := timeutil.Since(start)
log.Infof(ctx, "restored %s bytes in %d tables (took %s, %s)",
humanizeutil.IBytes(bytes), len(gen.Tables()), elapsed, humanizeutil.DataRate(bytes, elapsed))
return bytes, nil
}

func fixturesLoad(gen workload.Generator, urls []string, dbName string) error {
ctx := context.Background()
gcs, err := getStorage(ctx)
Expand All @@ -289,19 +313,21 @@ func fixturesLoad(gen workload.Generator, urls []string, dbName string) error {
return errors.Wrap(err, `finding fixture`)
}

start := timeutil.Now()
log.Infof(ctx, "starting load of %d tables", len(gen.Tables()))
bytes, err := workloadccl.RestoreFixture(ctx, sqlDB, fixture, dbName)
if err != nil {
return errors.Wrap(err, `restoring fixture`)
l := restoreDataLoader{fixture: fixture, database: dbName}
if _, err := workloadsql.Setup(ctx, sqlDB, gen, l); err != nil {
return err
}

const splitConcurrency = 384 // TODO(dan): Don't hardcode this.
for _, table := range gen.Tables() {
if err := workloadsql.Split(ctx, sqlDB, table, splitConcurrency); err != nil {
return errors.Wrapf(err, `splitting %s`, table.Name)
}
}
elapsed := timeutil.Since(start)
log.Infof(ctx, "loaded %s in %d tables (took %s, %s)",
humanizeutil.IBytes(bytes), len(gen.Tables()), elapsed, humanizeutil.DataRate(bytes, elapsed))

if hooks, ok := gen.(workload.Hookser); *fixturesRunChecks && ok {
if consistencyCheckFn := hooks.Hooks().CheckConsistency; consistencyCheckFn != nil {
log.Info(ctx, "fixture is restored; now running consistency checks (ctrl-c to abort)")
log.Info(ctx, "fixture is imported; now running consistency checks (ctrl-c to abort)")
if err := consistencyCheckFn(ctx, sqlDB); err != nil {
return err
}
Expand All @@ -321,26 +347,26 @@ func fixturesImport(gen workload.Generator, urls []string, dbName string) error
return err
}

log.Infof(ctx, "starting import of %d tables", len(gen.Tables()))
start := timeutil.Now()
directIngestion := *fixturesImportDirectIngestionTable
filesPerNode := *fixturesImportFilesPerNode
injectStats := *fixturesImportInjectStats
noSkipPostLoad := false
csvServer := *fixturesMakeImportCSVServerURL
bytes, err := workloadccl.ImportFixture(
ctx, sqlDB, gen, dbName, directIngestion, filesPerNode, injectStats, noSkipPostLoad, csvServer,
)
if err != nil {
return errors.Wrap(err, `importing fixture`)
l := workloadccl.ImportDataLoader{
DirectIngestion: *fixturesImportDirectIngestionTable,
FilesPerNode: *fixturesImportFilesPerNode,
InjectStats: *fixturesImportInjectStats,
CSVServer: *fixturesMakeImportCSVServerURL,
}
if _, err := workloadsql.Setup(ctx, sqlDB, gen, l); err != nil {
return err
}

const splitConcurrency = 384 // TODO(dan): Don't hardcode this.
for _, table := range gen.Tables() {
if err := workloadsql.Split(ctx, sqlDB, table, splitConcurrency); err != nil {
return errors.Wrapf(err, `splitting %s`, table.Name)
}
}
elapsed := timeutil.Since(start)
log.Infof(ctx, "imported %s in %d tables (took %s, %s)",
humanizeutil.IBytes(bytes), len(gen.Tables()), elapsed, humanizeutil.DataRate(bytes, elapsed))

if hooks, ok := gen.(workload.Hookser); *fixturesRunChecks && ok {
if consistencyCheckFn := hooks.Hooks().CheckConsistency; consistencyCheckFn != nil {
log.Info(ctx, "fixture is imported; now running consistency checks (ctrl-c to abort)")
log.Info(ctx, "fixture is restored; now running consistency checks (ctrl-c to abort)")
if err := consistencyCheckFn(ctx, sqlDB); err != nil {
return err
}
Expand Down
101 changes: 62 additions & 39 deletions pkg/ccl/workloadccl/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload"
"github.com/cockroachdb/cockroach/pkg/workload/workloadsql"
"github.com/pkg/errors"
"github.com/spf13/pflag"
"google.golang.org/api/iterator"
Expand All @@ -37,6 +36,10 @@ const (
fixtureGCSURIScheme = `gs`
)

func init() {
workload.ImportDataLoader = ImportDataLoader{}
}

// FixtureConfig describes a storage place for fixtures.
type FixtureConfig struct {
// GCSBucket is a Google Cloud Storage bucket.
Expand Down Expand Up @@ -262,14 +265,20 @@ func MakeFixture(
if _, err := sqlDB.Exec(`CREATE DATABASE IF NOT EXISTS ` + dbName); err != nil {
return Fixture{}, err
}
const direct, stats, skipPostLoad, csvServer = false, false, true, ""
if _, err := ImportFixture(
ctx, sqlDB, gen, dbName, direct, filesPerNode, stats, skipPostLoad, csvServer,
); err != nil {
l := ImportDataLoader{
FilesPerNode: filesPerNode,
}
// NB: Intentionally don't use workloadsql.Setup because it runs the PostLoad
// hooks (adding foreign keys, etc), but historically the BACKUPs created by
// `fixtures make` didn't have them. Instead they're added by `fixtures load`.
// Ideally, the PostLoad hooks would be idempotent and we could include them
// here (but still run them on load for old fixtures without them), but that
// yak will remain unshaved.
if _, err := l.InitialDataLoad(ctx, sqlDB, gen); err != nil {
return Fixture{}, err
}
g := ctxgroup.WithContext(ctx)

g := ctxgroup.WithContext(ctx)
for _, t := range gen.Tables() {
t := t
g.Go(func() error {
Expand All @@ -280,13 +289,45 @@ func MakeFixture(
return err
})
}

if err := g.Wait(); err != nil {
return Fixture{}, err
}

return GetFixture(ctx, gcs, config, gen)
}

// ImportDataLoader is an InitialDataLoader implementation that loads data with
// IMPORT. The zero-value gets some sane defaults for the tunable settings.
type ImportDataLoader struct {
DirectIngestion bool
FilesPerNode int
InjectStats bool
CSVServer string
}

// InitialDataLoad implements the InitialDataLoader interface.
func (l ImportDataLoader) InitialDataLoad(
ctx context.Context, db *gosql.DB, gen workload.Generator,
) (int64, error) {
if l.FilesPerNode == 0 {
l.FilesPerNode = 1
}

log.Infof(ctx, "starting import of %d tables", len(gen.Tables()))
start := timeutil.Now()
const useConnectionDB = ``
bytes, err := ImportFixture(
ctx, db, gen, useConnectionDB, l.DirectIngestion, l.FilesPerNode, l.InjectStats, l.CSVServer)
if err != nil {
return 0, errors.Wrap(err, `importing fixture`)
}
elapsed := timeutil.Since(start)
log.Infof(ctx, "imported %s bytes in %d tables (took %s, %s)",
humanizeutil.IBytes(bytes), len(gen.Tables()), elapsed, humanizeutil.DataRate(bytes, elapsed))

return bytes, nil
}

// ImportFixture works like MakeFixture, but instead of stopping halfway or
// writing a backup to cloud storage, it finishes ingesting the data.
// It also includes the option to inject pre-calculated table statistics if
Expand All @@ -299,7 +340,6 @@ func ImportFixture(
directIngestion bool,
filesPerNode int,
injectStats bool,
skipPostLoad bool,
csvServer string,
) (int64, error) {
for _, t := range gen.Tables() {
Expand Down Expand Up @@ -346,11 +386,6 @@ func ImportFixture(
if err := g.Wait(); err != nil {
return 0, err
}
if !skipPostLoad {
if err := runPostLoadSteps(ctx, sqlDB, gen); err != nil {
return 0, err
}
}
return atomic.LoadInt64(&bytesAtomic), nil
}

Expand All @@ -367,7 +402,13 @@ func importFixtureTable(
start := timeutil.Now()
var buf bytes.Buffer
var params []interface{}
fmt.Fprintf(&buf, `IMPORT TABLE "%s"."%s" %s CSV DATA (`, dbName, table.Name, table.Schema)
var qualifiedTableName string
if dbName != `` {
qualifiedTableName = fmt.Sprintf(`"%s"."%s"`, dbName, table.Name)
} else {
qualifiedTableName = fmt.Sprintf(`"%s"`, table.Name)
}
fmt.Fprintf(&buf, `IMPORT TABLE %s %s CSV DATA (`, qualifiedTableName, table.Schema)
// Generate $1,...,$N-1, where N is the number of csv paths.
for _, path := range paths {
params = append(params, path)
Expand Down Expand Up @@ -399,10 +440,12 @@ func importFixtureTable(

// Inject pre-calculated stats.
if injectStats && len(table.Stats) > 0 {
err = injectStatistics(dbName, &table, sqlDB)
if err := injectStatistics(qualifiedTableName, &table, sqlDB); err != nil {
return 0, err
}
}

return tableBytes, err
return tableBytes, nil
}

// disableAutoStats disables automatic stats if they are enabled and returns
Expand Down Expand Up @@ -440,14 +483,14 @@ func disableAutoStats(ctx context.Context, sqlDB *gosql.DB) func() {
}

// injectStatistics injects pre-calculated statistics for the given table.
func injectStatistics(dbName string, table *workload.Table, sqlDB *gosql.DB) error {
func injectStatistics(qualifiedTableName string, table *workload.Table, sqlDB *gosql.DB) error {
var encoded []byte
encoded, err := json.Marshal(table.Stats)
if err != nil {
return err
}
_, err = sqlDB.Exec(fmt.Sprintf(`ALTER TABLE "%s"."%s" INJECT STATISTICS '%s'`,
dbName, table.Name, encoded))
_, err = sqlDB.Exec(fmt.Sprintf(
`ALTER TABLE %s INJECT STATISTICS '%s'`, qualifiedTableName, encoded))
return err
}

Expand Down Expand Up @@ -482,29 +525,9 @@ func RestoreFixture(
if err := g.Wait(); err != nil {
return 0, err
}
if err := runPostLoadSteps(ctx, sqlDB, fixture.Generator); err != nil {
return 0, err
}
return atomic.LoadInt64(&bytesAtomic), nil
}

func runPostLoadSteps(ctx context.Context, sqlDB *gosql.DB, gen workload.Generator) error {
if h, ok := gen.(workload.Hookser); ok {
if hooks := h.Hooks(); hooks.PostLoad != nil {
if err := hooks.PostLoad(sqlDB); err != nil {
return errors.Wrap(err, `PostLoad hook`)
}
}
}
const splitConcurrency = 384 // TODO(dan): Don't hardcode this.
for _, table := range gen.Tables() {
if err := workloadsql.Split(ctx, sqlDB, table, splitConcurrency); err != nil {
return errors.Wrapf(err, `splitting %s`, table.Name)
}
}
return nil
}

// ListFixtures returns the object paths to all fixtures stored in a FixtureConfig.
func ListFixtures(
ctx context.Context, gcs *storage.Client, config FixtureConfig,
Expand Down
Loading