Skip to content

Commit

Permalink
workload: support loading initial data using IMPORT in run and init
Browse files Browse the repository at this point in the history
The --data-loader flag allows using IMPORTs instead of INSERTs to load
the initial table data. IMPORT is much faster but requires a CCL binary.

A few responsibilities changed in this PR. Before:
- Setup runs PostLoad
- ImportFixture conditionally runs PostLoad/splits
- `fixtures import` runs PostLoad/splits via ImportFixture
- MakeFixture doesn't include PostLoad/splits in BACKUP
- RestoreFixture always runs PostLoad/splits
- `fixtures load` runs PostLoad/splits via RestoreFixture

After:
- Setup runs PostLoad
- ImportFixture/RestoreFixture don't run splits
- `fixtures import` runs splits directly and PostLoad via Setup
- MakeFixture still doesn't include PostLoad/splits in BACKUP
- `fixtures load` runs splits directly and PostLoad via Setup

Example output:

    $ ./cockroach workload init tpcc
    I190417 19:03:15.614640 1 workload/dataload.go:146  imported warehouse (0s, 1 rows)
    I190417 19:03:15.619253 1 workload/dataload.go:146  imported district (0s, 10 rows)
    I190417 19:03:16.851827 1 workload/dataload.go:146  imported customer (1s, 29835 rows)
    I190417 19:03:17.658953 1 workload/dataload.go:146  imported history (1s, 29816 rows)
    I190417 19:03:18.318497 1 workload/dataload.go:146  imported order (1s, 29613 rows)
    I190417 19:03:18.380737 1 workload/dataload.go:146  imported new_order (0s, 7771 rows)
    I190417 19:03:19.607526 1 workload/dataload.go:146  imported item (1s, 99021 rows)
    I190417 19:03:22.630372 1 workload/dataload.go:146  imported stock (3s, 99719 rows)
    I190417 19:03:28.703662 1 workload/dataload.go:146  imported order_line (6s, 299668 rows)
    $ ./cockroach workload init tpcc --data-loader=IMPORT
    I190417 19:03:59.048167 1 ccl/workloadccl/fixture.go:453  starting import of 9 tables
    I190417 19:03:59.472322 71 ccl/workloadccl/fixture.go:561  imported district (0s, 10 rows, 0 index entries, 1003 B)
    I190417 19:03:59.617672 75 ccl/workloadccl/fixture.go:561  imported new_order (1s, 9000 rows, 0 index entries, 114 KiB)
    I190417 19:03:59.850071 70 ccl/workloadccl/fixture.go:561  imported warehouse (1s, 1 rows, 0 index entries, 52 B)
    I190417 19:04:01.585920 74 ccl/workloadccl/fixture.go:561  imported order (3s, 30000 rows, 60000 index entries, 1.7 MiB)
    I190417 19:04:02.567965 73 ccl/workloadccl/fixture.go:561  imported history (4s, 30000 rows, 60000 index entries, 3.8 MiB)
    I190417 19:04:02.621967 76 ccl/workloadccl/fixture.go:561  imported item (4s, 100000 rows, 0 index entries, 7.8 MiB)
    I190417 19:04:02.703264 72 ccl/workloadccl/fixture.go:561  imported customer (4s, 30000 rows, 30000 index entries, 18 MiB)
    I190417 19:04:04.139393 77 ccl/workloadccl/fixture.go:561  imported stock (5s, 100000 rows, 100000 index entries, 32 MiB)
    I190417 19:04:05.380937 78 ccl/workloadccl/fixture.go:561  imported order_line (6s, 300343 rows, 300343 index entries, 22 MiB)
    I190417 19:04:05.489742 1 ccl/workloadccl/fixture.go:461  imported 84 MiB bytes in 9 tables (took 6.441337s, 13.10 MiB/s)
    $ ./cockroach workload fixtures import tpcc
    I190417 19:09:35.084985 1 ccl/workloadccl/fixture.go:453  starting import of 9 tables
    I190417 19:09:35.582722 24 ccl/workloadccl/fixture.go:561  imported district (0s, 10 rows, 0 index entries, 1008 B)
    I190417 19:09:35.657582 28 ccl/workloadccl/fixture.go:561  imported new_order (1s, 9000 rows, 0 index entries, 114 KiB)
    I190417 19:09:35.941624 23 ccl/workloadccl/fixture.go:561  imported warehouse (1s, 1 rows, 0 index entries, 53 B)
    I190417 19:09:37.821479 27 ccl/workloadccl/fixture.go:561  imported order (3s, 30000 rows, 60000 index entries, 1.7 MiB)
    I190417 19:09:38.509619 26 ccl/workloadccl/fixture.go:561  imported history (3s, 30000 rows, 60000 index entries, 3.8 MiB)
    I190417 19:09:38.598668 25 ccl/workloadccl/fixture.go:561  imported customer (4s, 30000 rows, 30000 index entries, 18 MiB)
    I190417 19:09:38.677361 29 ccl/workloadccl/fixture.go:561  imported item (4s, 100000 rows, 0 index entries, 7.8 MiB)
    I190417 19:09:40.245621 30 ccl/workloadccl/fixture.go:561  imported stock (5s, 100000 rows, 100000 index entries, 32 MiB)
    I190417 19:09:41.406895 31 ccl/workloadccl/fixture.go:561  imported order_line (6s, 300343 rows, 300343 index entries, 22 MiB)
    I190417 19:09:41.532481 1 ccl/workloadccl/fixture.go:461  imported 84 MiB bytes in 9 tables (took 6.447149s, 13.09 MiB/s)
    I190417 19:09:41.545086 1 workload/workload.go:325  data is loaded; now running consistency checks (ctrl-c to abort)
    I190417 19:09:41.552516 1 workload/tpcc/tpcc.go:322  check 3.3.2.1 took 7.364ms
    I190417 19:09:41.593546 1 workload/tpcc/tpcc.go:322  check 3.3.2.2 took 40.99ms
    I190417 19:09:41.601818 1 workload/tpcc/tpcc.go:322  check 3.3.2.3 took 8.229ms
    I190417 19:09:41.868776 1 workload/tpcc/tpcc.go:322  check 3.3.2.4 took 266.907ms
    I190417 19:09:41.958044 1 workload/tpcc/tpcc.go:322  check 3.3.2.5 took 89.214ms
    I190417 19:09:42.322464 1 workload/tpcc/tpcc.go:322  check 3.3.2.7 took 364.368ms
    I190417 19:09:42.365025 1 workload/tpcc/tpcc.go:322  check 3.3.2.8 took 42.53ms
    I190417 19:09:42.412558 1 workload/tpcc/tpcc.go:322  check 3.3.2.9 took 47.501ms
    $ ./cockroach workload fixtures load tpcc
    I190417 19:18:42.645587 1 ccl/workloadccl/cliccl/fixtures.go:286  starting restore of 9 tables
    I190417 19:18:56.447737 97 ccl/workloadccl/fixture.go:647  loaded new_order (14s, 9000 rows, 0 index entries, 114 KiB)
    I190417 19:18:56.447870 116 ccl/workloadccl/fixture.go:647  loaded order_line (14s, 299890 rows, 299890 index entries, 20 MiB)
    I190417 19:18:57.365546 96 ccl/workloadccl/fixture.go:647  loaded order (15s, 30000 rows, 60000 index entries, 1.7 MiB)
    I190417 19:18:57.365630 114 ccl/workloadccl/fixture.go:647  loaded item (15s, 100000 rows, 0 index entries, 7.8 MiB)
    I190417 19:18:58.836369 92 ccl/workloadccl/fixture.go:647  loaded warehouse (16s, 1 rows, 0 index entries, 52 B)
    I190417 19:18:58.836384 94 ccl/workloadccl/fixture.go:647  loaded customer (16s, 30000 rows, 30000 index entries, 18 MiB)
    I190417 19:18:59.315224 93 ccl/workloadccl/fixture.go:647  loaded district (17s, 10 rows, 0 index entries, 1011 B)
    I190417 19:18:59.923138 95 ccl/workloadccl/fixture.go:647  loaded history (17s, 30000 rows, 60000 index entries, 3.8 MiB)
    I190417 19:19:02.510851 115 ccl/workloadccl/fixture.go:647  loaded stock (20s, 100000 rows, 100000 index entries, 32 MiB)
    I190417 19:19:02.632022 1 ccl/workloadccl/cliccl/fixtures.go:293  imported 83 MiB bytes in 9 tables (took 19.986216s, 4.16 MiB/s)
    I190417 19:19:02.644449 1 workload/workload.go:325  data is loaded; now running consistency checks (ctrl-c to abort)
    I190417 19:19:02.651055 1 workload/tpcc/tpcc.go:322  check 3.3.2.1 took 6.562ms
    I190417 19:19:02.691053 1 workload/tpcc/tpcc.go:322  check 3.3.2.2 took 39.967ms
    I190417 19:19:02.699790 1 workload/tpcc/tpcc.go:322  check 3.3.2.3 took 8.697ms
    I190417 19:19:02.977256 1 workload/tpcc/tpcc.go:322  check 3.3.2.4 took 277.415ms
    I190417 19:19:03.070779 1 workload/tpcc/tpcc.go:322  check 3.3.2.5 took 93.462ms
    I190417 19:19:03.450045 1 workload/tpcc/tpcc.go:322  check 3.3.2.7 took 379.226ms
    I190417 19:19:03.499275 1 workload/tpcc/tpcc.go:322  check 3.3.2.8 took 49.183ms
    I190417 19:19:03.552005 1 workload/tpcc/tpcc.go:322  check 3.3.2.9 took 52.678ms

Release note (cli change): `workload run` and `workload init` now
support loading initial table data using IMPORT
  • Loading branch information
danhhz committed Jul 23, 2019
1 parent 4233a87 commit f491ba3
Show file tree
Hide file tree
Showing 15 changed files with 330 additions and 197 deletions.
5 changes: 2 additions & 3 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,8 @@ func backupRestoreTestSetupWithParams(

sqlDB = sqlutils.MakeSQLRunner(tc.Conns[0])
sqlDB.Exec(t, `CREATE DATABASE data`)
const insertBatchSize = 1000
const concurrency = 4
if _, err := workloadsql.Setup(ctx, sqlDB.DB.(*gosql.DB), bankData, insertBatchSize, concurrency); err != nil {
l := workloadsql.InsertsDataLoader{BatchSize: 1000, Concurrency: 4}
if _, err := workloadsql.Setup(ctx, sqlDB.DB.(*gosql.DB), bankData, l); err != nil {
t.Fatalf("%+v", err)
}
if err := workloadsql.Split(ctx, sqlDB.DB.(*gosql.DB), bankData.Tables()[0], 1 /* concurrency */); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,8 @@ func TestAvroLedger(t *testing.T) {

ctx := context.Background()
gen := ledger.FromFlags(`--customers=1`)
_, err := workloadsql.Setup(ctx, db, gen, 0, 0)
var l workloadsql.InsertsDataLoader
_, err := workloadsql.Setup(ctx, db, gen, l)
require.NoError(t, err)

ledger := feed(t, f, `CREATE CHANGEFEED FOR customer, transaction, entry, session
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/validations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ func TestCatchupScanOrdering(t *testing.T) {
ctx := context.Background()
const numRows, numRanges, payloadBytes, maxTransfer = 10, 10, 10, 999
gen := bank.FromConfig(numRows, payloadBytes, numRanges)
if _, err := workloadsql.Setup(ctx, db, gen, 0, 0); err != nil {
var l workloadsql.InsertsDataLoader
if _, err := workloadsql.Setup(ctx, db, gen, l); err != nil {
t.Fatal(err)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/importccl/exportcsv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ func setupExportableBank(t *testing.T, nodes, rows int) (*sqlutils.SQLRunner, st
db.Exec(t, "CREATE DATABASE test")

wk := bank.FromRows(rows)
if _, err := workloadsql.Setup(ctx, conn, wk, 100, 3); err != nil {
l := workloadsql.InsertsDataLoader{BatchSize: 100, Concurrency: 3}
if _, err := workloadsql.Setup(ctx, conn, wk, l); err != nil {
t.Fatal(err)
}

Expand Down
22 changes: 7 additions & 15 deletions pkg/ccl/workloadccl/allccl/all_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,6 @@ import (
"github.com/stretchr/testify/require"
)

const (
directIngestion = true
oneFilePerNode = 1
noInjectStats = false
noSkipPostLoad = false
skipCSVRoundtrip = ``
)

func bigInitialData(meta workload.Meta) bool {
switch meta.Name {
case `tpcc`, `tpch`, `tpcds`:
Expand Down Expand Up @@ -94,11 +86,11 @@ func TestAllRegisteredImportFixture(t *testing.T) {
defer s.Stopper().Stop(ctx)
sqlutils.MakeSQLRunner(db).Exec(t, `CREATE DATABASE d`)

if _, err := workloadccl.ImportFixture(
ctx, db, gen, `d`, directIngestion, oneFilePerNode, noInjectStats, noSkipPostLoad,
skipCSVRoundtrip,
); err != nil {
t.Fatal(err)
l := workloadccl.ImportDataLoader{
DirectIngestion: true,
}
if _, err := workloadsql.Setup(ctx, db, gen, l); err != nil {
t.Fatalf(`%+v`, err)
}

// Run the consistency check if this workload has one.
Expand Down Expand Up @@ -152,8 +144,8 @@ func TestAllRegisteredSetup(t *testing.T) {
sqlutils.MakeSQLRunner(db).Exec(t, `CREATE DATABASE d`)
sqlutils.MakeSQLRunner(db).Exec(t, `SET CLUSTER SETTING kv.range_merge.queue_enabled = false`)

const batchSize, concurrency = 0, 0
if _, err := workloadsql.Setup(ctx, db, gen, batchSize, concurrency); err != nil {
var l workloadsql.InsertsDataLoader
if _, err := workloadsql.Setup(ctx, db, gen, l); err != nil {
t.Fatalf(`%+v`, err)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/workloadccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ func benchmarkImportFixture(b *testing.B, gen workload.Generator) {

b.StartTimer()
const filesPerNode = 1
const directIngest, noInjectStats, skipPostLoad, csvServer = true, false, true, ``
const directIngest, noInjectStats, csvServer = true, false, ``
importBytes, err := ImportFixture(
ctx, db, gen, `d`, directIngest, filesPerNode, noInjectStats, skipPostLoad, csvServer,
ctx, db, gen, `d`, directIngest, filesPerNode, noInjectStats, csvServer,
)
require.NoError(b, err)
bytes += importBytes
Expand Down
76 changes: 51 additions & 25 deletions pkg/ccl/workloadccl/cliccl/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload"
workloadcli "github.com/cockroachdb/cockroach/pkg/workload/cli"
"github.com/cockroachdb/cockroach/pkg/workload/workloadsql"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
Expand Down Expand Up @@ -268,6 +269,29 @@ func fixturesMake(gen workload.Generator, urls []string, _ string) error {
return nil
}

// restoreDataLoader is an InitialDataLoader implementation that loads data with
// RESTORE.
type restoreDataLoader struct {
fixture workloadccl.Fixture
database string
}

// InitialDataLoad implements the InitialDataLoader interface.
func (l restoreDataLoader) InitialDataLoad(
ctx context.Context, db *gosql.DB, gen workload.Generator,
) (int64, error) {
log.Infof(ctx, "starting restore of %d tables", len(gen.Tables()))
start := timeutil.Now()
bytes, err := workloadccl.RestoreFixture(ctx, db, l.fixture, l.database)
if err != nil {
return 0, errors.Wrap(err, `restoring fixture`)
}
elapsed := timeutil.Since(start)
log.Infof(ctx, "restored %s bytes in %d tables (took %s, %s)",
humanizeutil.IBytes(bytes), len(gen.Tables()), elapsed, humanizeutil.DataRate(bytes, elapsed))
return bytes, nil
}

func fixturesLoad(gen workload.Generator, urls []string, dbName string) error {
ctx := context.Background()
gcs, err := getStorage(ctx)
Expand All @@ -289,19 +313,21 @@ func fixturesLoad(gen workload.Generator, urls []string, dbName string) error {
return errors.Wrap(err, `finding fixture`)
}

start := timeutil.Now()
log.Infof(ctx, "starting load of %d tables", len(gen.Tables()))
bytes, err := workloadccl.RestoreFixture(ctx, sqlDB, fixture, dbName)
if err != nil {
return errors.Wrap(err, `restoring fixture`)
l := restoreDataLoader{fixture: fixture, database: dbName}
if _, err := workloadsql.Setup(ctx, sqlDB, gen, l); err != nil {
return err
}

const splitConcurrency = 384 // TODO(dan): Don't hardcode this.
for _, table := range gen.Tables() {
if err := workloadsql.Split(ctx, sqlDB, table, splitConcurrency); err != nil {
return errors.Wrapf(err, `splitting %s`, table.Name)
}
}
elapsed := timeutil.Since(start)
log.Infof(ctx, "loaded %s in %d tables (took %s, %s)",
humanizeutil.IBytes(bytes), len(gen.Tables()), elapsed, humanizeutil.DataRate(bytes, elapsed))

if hooks, ok := gen.(workload.Hookser); *fixturesRunChecks && ok {
if consistencyCheckFn := hooks.Hooks().CheckConsistency; consistencyCheckFn != nil {
log.Info(ctx, "fixture is restored; now running consistency checks (ctrl-c to abort)")
log.Info(ctx, "fixture is imported; now running consistency checks (ctrl-c to abort)")
if err := consistencyCheckFn(ctx, sqlDB); err != nil {
return err
}
Expand All @@ -321,26 +347,26 @@ func fixturesImport(gen workload.Generator, urls []string, dbName string) error
return err
}

log.Infof(ctx, "starting import of %d tables", len(gen.Tables()))
start := timeutil.Now()
directIngestion := *fixturesImportDirectIngestionTable
filesPerNode := *fixturesImportFilesPerNode
injectStats := *fixturesImportInjectStats
noSkipPostLoad := false
csvServer := *fixturesMakeImportCSVServerURL
bytes, err := workloadccl.ImportFixture(
ctx, sqlDB, gen, dbName, directIngestion, filesPerNode, injectStats, noSkipPostLoad, csvServer,
)
if err != nil {
return errors.Wrap(err, `importing fixture`)
l := workloadccl.ImportDataLoader{
DirectIngestion: *fixturesImportDirectIngestionTable,
FilesPerNode: *fixturesImportFilesPerNode,
InjectStats: *fixturesImportInjectStats,
CSVServer: *fixturesMakeImportCSVServerURL,
}
if _, err := workloadsql.Setup(ctx, sqlDB, gen, l); err != nil {
return err
}

const splitConcurrency = 384 // TODO(dan): Don't hardcode this.
for _, table := range gen.Tables() {
if err := workloadsql.Split(ctx, sqlDB, table, splitConcurrency); err != nil {
return errors.Wrapf(err, `splitting %s`, table.Name)
}
}
elapsed := timeutil.Since(start)
log.Infof(ctx, "imported %s in %d tables (took %s, %s)",
humanizeutil.IBytes(bytes), len(gen.Tables()), elapsed, humanizeutil.DataRate(bytes, elapsed))

if hooks, ok := gen.(workload.Hookser); *fixturesRunChecks && ok {
if consistencyCheckFn := hooks.Hooks().CheckConsistency; consistencyCheckFn != nil {
log.Info(ctx, "fixture is imported; now running consistency checks (ctrl-c to abort)")
log.Info(ctx, "fixture is restored; now running consistency checks (ctrl-c to abort)")
if err := consistencyCheckFn(ctx, sqlDB); err != nil {
return err
}
Expand Down
101 changes: 62 additions & 39 deletions pkg/ccl/workloadccl/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload"
"github.com/cockroachdb/cockroach/pkg/workload/workloadsql"
"github.com/pkg/errors"
"github.com/spf13/pflag"
"google.golang.org/api/iterator"
Expand All @@ -37,6 +36,10 @@ const (
fixtureGCSURIScheme = `gs`
)

func init() {
workload.ImportDataLoader = ImportDataLoader{}
}

// FixtureConfig describes a storage place for fixtures.
type FixtureConfig struct {
// GCSBucket is a Google Cloud Storage bucket.
Expand Down Expand Up @@ -262,14 +265,20 @@ func MakeFixture(
if _, err := sqlDB.Exec(`CREATE DATABASE IF NOT EXISTS ` + dbName); err != nil {
return Fixture{}, err
}
const direct, stats, skipPostLoad, csvServer = false, false, true, ""
if _, err := ImportFixture(
ctx, sqlDB, gen, dbName, direct, filesPerNode, stats, skipPostLoad, csvServer,
); err != nil {
l := ImportDataLoader{
FilesPerNode: filesPerNode,
}
// NB: Intentionally don't use workloadsql.Setup because it runs the PostLoad
// hooks (adding foreign keys, etc), but historically the BACKUPs created by
// `fixtures make` didn't have them. Instead they're added by `fixtures load`.
// Ideally, the PostLoad hooks would be idempotent and we could include them
// here (but still run them on load for old fixtures without them), but that
// yak will remain unshaved.
if _, err := l.InitialDataLoad(ctx, sqlDB, gen); err != nil {
return Fixture{}, err
}
g := ctxgroup.WithContext(ctx)

g := ctxgroup.WithContext(ctx)
for _, t := range gen.Tables() {
t := t
g.Go(func() error {
Expand All @@ -280,13 +289,45 @@ func MakeFixture(
return err
})
}

if err := g.Wait(); err != nil {
return Fixture{}, err
}

return GetFixture(ctx, gcs, config, gen)
}

// ImportDataLoader is an InitialDataLoader implementation that loads data with
// IMPORT. The zero-value gets some sane defaults for the tunable settings.
type ImportDataLoader struct {
DirectIngestion bool
FilesPerNode int
InjectStats bool
CSVServer string
}

// InitialDataLoad implements the InitialDataLoader interface.
func (l ImportDataLoader) InitialDataLoad(
ctx context.Context, db *gosql.DB, gen workload.Generator,
) (int64, error) {
if l.FilesPerNode == 0 {
l.FilesPerNode = 1
}

log.Infof(ctx, "starting import of %d tables", len(gen.Tables()))
start := timeutil.Now()
const useConnectionDB = ``
bytes, err := ImportFixture(
ctx, db, gen, useConnectionDB, l.DirectIngestion, l.FilesPerNode, l.InjectStats, l.CSVServer)
if err != nil {
return 0, errors.Wrap(err, `importing fixture`)
}
elapsed := timeutil.Since(start)
log.Infof(ctx, "imported %s bytes in %d tables (took %s, %s)",
humanizeutil.IBytes(bytes), len(gen.Tables()), elapsed, humanizeutil.DataRate(bytes, elapsed))

return bytes, nil
}

// ImportFixture works like MakeFixture, but instead of stopping halfway or
// writing a backup to cloud storage, it finishes ingesting the data.
// It also includes the option to inject pre-calculated table statistics if
Expand All @@ -299,7 +340,6 @@ func ImportFixture(
directIngestion bool,
filesPerNode int,
injectStats bool,
skipPostLoad bool,
csvServer string,
) (int64, error) {
for _, t := range gen.Tables() {
Expand Down Expand Up @@ -346,11 +386,6 @@ func ImportFixture(
if err := g.Wait(); err != nil {
return 0, err
}
if !skipPostLoad {
if err := runPostLoadSteps(ctx, sqlDB, gen); err != nil {
return 0, err
}
}
return atomic.LoadInt64(&bytesAtomic), nil
}

Expand All @@ -367,7 +402,13 @@ func importFixtureTable(
start := timeutil.Now()
var buf bytes.Buffer
var params []interface{}
fmt.Fprintf(&buf, `IMPORT TABLE "%s"."%s" %s CSV DATA (`, dbName, table.Name, table.Schema)
var qualifiedTableName string
if dbName != `` {
qualifiedTableName = fmt.Sprintf(`"%s"."%s"`, dbName, table.Name)
} else {
qualifiedTableName = fmt.Sprintf(`"%s"`, table.Name)
}
fmt.Fprintf(&buf, `IMPORT TABLE %s %s CSV DATA (`, qualifiedTableName, table.Schema)
// Generate $1,...,$N-1, where N is the number of csv paths.
for _, path := range paths {
params = append(params, path)
Expand Down Expand Up @@ -399,10 +440,12 @@ func importFixtureTable(

// Inject pre-calculated stats.
if injectStats && len(table.Stats) > 0 {
err = injectStatistics(dbName, &table, sqlDB)
if err := injectStatistics(qualifiedTableName, &table, sqlDB); err != nil {
return 0, err
}
}

return tableBytes, err
return tableBytes, nil
}

// disableAutoStats disables automatic stats if they are enabled and returns
Expand Down Expand Up @@ -440,14 +483,14 @@ func disableAutoStats(ctx context.Context, sqlDB *gosql.DB) func() {
}

// injectStatistics injects pre-calculated statistics for the given table.
func injectStatistics(dbName string, table *workload.Table, sqlDB *gosql.DB) error {
func injectStatistics(qualifiedTableName string, table *workload.Table, sqlDB *gosql.DB) error {
var encoded []byte
encoded, err := json.Marshal(table.Stats)
if err != nil {
return err
}
_, err = sqlDB.Exec(fmt.Sprintf(`ALTER TABLE "%s"."%s" INJECT STATISTICS '%s'`,
dbName, table.Name, encoded))
_, err = sqlDB.Exec(fmt.Sprintf(
`ALTER TABLE %s INJECT STATISTICS '%s'`, qualifiedTableName, encoded))
return err
}

Expand Down Expand Up @@ -482,29 +525,9 @@ func RestoreFixture(
if err := g.Wait(); err != nil {
return 0, err
}
if err := runPostLoadSteps(ctx, sqlDB, fixture.Generator); err != nil {
return 0, err
}
return atomic.LoadInt64(&bytesAtomic), nil
}

func runPostLoadSteps(ctx context.Context, sqlDB *gosql.DB, gen workload.Generator) error {
if h, ok := gen.(workload.Hookser); ok {
if hooks := h.Hooks(); hooks.PostLoad != nil {
if err := hooks.PostLoad(sqlDB); err != nil {
return errors.Wrap(err, `PostLoad hook`)
}
}
}
const splitConcurrency = 384 // TODO(dan): Don't hardcode this.
for _, table := range gen.Tables() {
if err := workloadsql.Split(ctx, sqlDB, table, splitConcurrency); err != nil {
return errors.Wrapf(err, `splitting %s`, table.Name)
}
}
return nil
}

// ListFixtures returns the object paths to all fixtures stored in a FixtureConfig.
func ListFixtures(
ctx context.Context, gcs *storage.Client, config FixtureConfig,
Expand Down
Loading

0 comments on commit f491ba3

Please sign in to comment.