Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
37080: workloadccl: hook --csv-server flag up to `fixtures import` too r=tbg a=danhhz

Previously it was only hooked up to `fixtures make`. This allows a cockroach 2.1
binary to do the following (with a recent worklod binary):

    roachprod run cluster -- "./workload csv-server --port=8081 &> logs/workload-csv-server.log < /dev/null &"
    roachprod run cluster:1 -- "./workload fixtures import tpcc --csv-server=http://localhost:8081"

Release note: None

Co-authored-by: Daniel Harrison <daniel.harrison@gmail.com>
  • Loading branch information
craig[bot] and danhhz committed Apr 24, 2019
2 parents c66f29f + 2480f4b commit df024f7
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 6 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/workloadccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func benchmarkImportFixture(b *testing.B, gen workload.Generator) {
const filesPerNode = 1
importBytes, err := ImportFixture(
ctx, db, gen, `d`, true /* directIngestion */, filesPerNode, false, /* injectStats */
``, /* csvServer */
)
require.NoError(b, err)
bytes += importBytes
Expand Down
14 changes: 9 additions & 5 deletions pkg/ccl/workloadccl/cliccl/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func config() workloadccl.FixtureConfig {
if len(*gcsBillingProjectOverride) > 0 {
config.BillingProject = *gcsBillingProjectOverride
}
config.CSVServerURL = *fixturesMakeCSVServerURL
config.CSVServerURL = *fixturesMakeImportCSVServerURL
return config
}

Expand Down Expand Up @@ -82,7 +82,10 @@ var fixturesURLCmd = workloadcli.SetCmdDefaults(&cobra.Command{
Short: `generate the GCS URL for a fixture`,
})

var fixturesMakeCSVServerURL = fixturesMakeCmd.PersistentFlags().String(
var fixturesLoadImportShared = pflag.NewFlagSet(`load/import`, pflag.ContinueOnError)
var fixturesMakeImportShared = pflag.NewFlagSet(`load/import`, pflag.ContinueOnError)

var fixturesMakeImportCSVServerURL = fixturesMakeImportShared.String(
`csv-server`, ``,
`Skip saving CSVs to cloud storage, instead get them from a 'csv-server' running at this url`)

Expand All @@ -94,8 +97,6 @@ var fixturesMakeFilesPerNode = fixturesMakeCmd.PersistentFlags().Int(
`files-per-node`, 1,
`number of file URLs to generate per node when using csv-server`)

var fixturesLoadImportShared = pflag.NewFlagSet(`load/import`, pflag.ContinueOnError)

var fixturesImportDirectIngestionTable = fixturesImportCmd.PersistentFlags().Bool(
`experimental-direct-ingestion`, false,
`Use the faster, but limited and still quite experimental, IMPORT without a distributed sort`)
Expand Down Expand Up @@ -161,6 +162,7 @@ func init() {
Args: cobra.RangeArgs(0, 1),
})
genMakeCmd.Flags().AddFlagSet(genFlags)
genMakeCmd.Flags().AddFlagSet(fixturesMakeImportShared)
genMakeCmd.Run = workloadcli.CmdHelper(gen, fixturesMake)
fixturesMakeCmd.AddCommand(genMakeCmd)

Expand All @@ -179,6 +181,7 @@ func init() {
})
genImportCmd.Flags().AddFlagSet(genFlags)
genImportCmd.Flags().AddFlagSet(fixturesLoadImportShared)
genImportCmd.Flags().AddFlagSet(fixturesMakeImportShared)
genImportCmd.Run = workloadcli.CmdHelper(gen, fixturesImport)
fixturesImportCmd.AddCommand(genImportCmd)

Expand Down Expand Up @@ -327,8 +330,9 @@ func fixturesImport(gen workload.Generator, urls []string, dbName string) error
directIngestion := *fixturesImportDirectIngestionTable
filesPerNode := *fixturesImportFilesPerNode
injectStats := *fixturesImportInjectStats
csvServer := *fixturesMakeImportCSVServerURL
bytes, err := workloadccl.ImportFixture(
ctx, sqlDB, gen, dbName, directIngestion, filesPerNode, injectStats,
ctx, sqlDB, gen, dbName, directIngestion, filesPerNode, injectStats, csvServer,
)
if err != nil {
return errors.Wrap(err, `importing fixture`)
Expand Down
8 changes: 7 additions & 1 deletion pkg/ccl/workloadccl/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ func ImportFixture(
directIngestion bool,
filesPerNode int,
injectStats bool,
csvServer string,
) (int64, error) {
var numNodes int
if err := sqlDB.QueryRow(numNodesQuery).Scan(&numNodes); err != nil {
Expand All @@ -460,9 +461,14 @@ func ImportFixture(
defer enableFn()
}

pathPrefix := csvServer
if pathPrefix == `` {
pathPrefix = `experimental-workload://`
}

for _, t := range tables {
table := t
paths := csvServerPaths(`experimental-workload://`, gen, table, numNodes*filesPerNode)
paths := csvServerPaths(pathPrefix, gen, table, numNodes*filesPerNode)
g.GoCtx(func(ctx context.Context) error {
tableBytes, err := importFixtureTable(
ctx, sqlDB, dbName, table, paths, directIngestion, `` /* output */, injectStats)
Expand Down
28 changes: 28 additions & 0 deletions pkg/ccl/workloadccl/fixture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package workloadccl
import (
"context"
"fmt"
"net/http/httptest"
"os"
"strconv"
"strings"
Expand Down Expand Up @@ -185,6 +186,7 @@ func TestImportFixture(t *testing.T) {
sqlDB.Exec(t, `CREATE DATABASE distsort`)
_, err := ImportFixture(
ctx, db, gen, `distsort`, false /* directIngestion */, filesPerNode, true, /* injectStats */
``, /* csvServer */
)
require.NoError(t, err)
sqlDB.CheckQueryResults(t,
Expand All @@ -201,6 +203,7 @@ func TestImportFixture(t *testing.T) {
sqlDB.Exec(t, `CREATE DATABASE direct`)
_, err = ImportFixture(
ctx, db, gen, `direct`, true /* directIngestion */, filesPerNode, false, /* injectStats */
``, /* csvServer */
)
require.NoError(t, err)
sqlDB.CheckQueryResults(t,
Expand All @@ -218,5 +221,30 @@ func TestImportFixture(t *testing.T) {
{"__auto__", "{key}", "10", "10", "0"},
{"__auto__", "{value}", "10", "1", "0"},
})
}

func TestImportFixtureCSVServer(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
ts := httptest.NewServer(workload.CSVMux(workload.Registered()))
defer ts.Close()

s, db, _ := serverutils.StartServer(t, base.TestServerArgs{UseDatabase: `d`})
defer s.Stopper().Stop(ctx)
sqlDB := sqlutils.MakeSQLRunner(db)

gen := makeTestWorkload()
flag := fmt.Sprintf(`val=%d`, timeutil.Now().UnixNano())
if err := gen.Flags().Parse([]string{"--" + flag}); err != nil {
t.Fatalf(`%+v`, err)
}

const filesPerNode = 1
sqlDB.Exec(t, `CREATE DATABASE d`)
_, err := ImportFixture(
ctx, db, gen, `d`, false /* directIngestion */, filesPerNode, false /* injectStats */, ts.URL,
)
require.NoError(t, err)
sqlDB.CheckQueryResults(t,
`SELECT count(*) FROM d.fx`, [][]string{{strconv.Itoa(fixtureTestGenRows)}})
}

0 comments on commit df024f7

Please sign in to comment.