Skip to content

Commit

Permalink
Merge #104234
Browse files Browse the repository at this point in the history
104234: importer: use new parquet writer for EXPORT INTO PARQUET r=miretskiy a=jayshrivastava

### roachtest: add test for EXPORT INTO PARQUET
This change adds a rochtest which sets up a 3-node CRDB cluster
on 8vCPU machines initialized with the TPC-C database containing
250 warehouses. Then, it executes 30 `EXPORT INTO PARQUET` statements
concurrently, repeatedly for 10 minutes. The main purpose of this
test is to use it for benchmarking. It sets up grafana as well
so metrics can be observed during the test.

This change also adds a daily roachtest which runs exports
on a 100 warehouse TPCC database, without setting up grafana.

Informs: #103317
Release note: None

---

### importer: use new parquet writer for EXPORT INTO PARQUET

Previously, `EXPORT INTO PARQUET` used a library which had
very poor memory usage. This change updates this command to
use the new parquet writer in `util/parquet`, which is newer
and does not use excessive memory.

To ensure backwards compatability, as much testing code as
possible is left untouched to ensure the old `EXPORT INTO PARQUET`
behavior is the same when using the new library. Some of the the
old production code and dependencies are also left untouched
because they are used in the test code.

This commit leaves some TODOs to clean up the tests and production
code once we have confidence in the new library.

Informs: #104329
Informs: #104278
Informs: #104279
Closes: #103317

Release note (general change): `EXPORT INTO PARQUET` will now
use a new internal implementation for writing parquet files
using parquet spec version 2.6. There should be no significant
impact to the structure of files being written. There is one
minor change:
- all columns written to parquet files will be nullable
  (ie. the parquet repetition type is `OPTIONAL`)

Co-authored-by: Jayant Shrivastava <jayants@cockroachlabs.com>
  • Loading branch information
craig[bot] and jayshrivastava committed Jun 5, 2023
2 parents 41d3583 + c1e6a64 commit e2f691d
Show file tree
Hide file tree
Showing 8 changed files with 298 additions and 202 deletions.
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ go_library(
"drt.go",
"encryption.go",
"event_log.go",
"export_parquet.go",
"failover.go",
"fixtures.go",
"flowable.go",
Expand Down
171 changes: 171 additions & 0 deletions pkg/cmd/roachtest/tests/export_parquet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package tests

import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/grafana"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
)

func registerExportParquet(r registry.Registry) {
// This test sets up a 3-node CRDB cluster on 8vCPU machines initialized with
// the TPC-C database containing 250 warehouses. Then, it executes 30 `EXPORT
// INTO PARQUET` statements concurrently, repeatedly for 10 minutes.
r.Add(registry.TestSpec{
Name: "export/parquet/bench",
Owner: registry.OwnerCDC,
Tags: registry.Tags("manual"),
Cluster: r.MakeClusterSpec(4, spec.CPU(8)),
RequiresLicense: false,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
if c.Spec().NodeCount < 4 {
t.Fatalf("expected at least 4 nodes, found %d", c.Spec().NodeCount)
}

numWarehouses, numConcurrentExports, exportDuration, pauseDuration := 250, 30, 10*time.Minute, 2*time.Minute
if c.IsLocal() {
numWarehouses, numConcurrentExports, exportDuration, pauseDuration = 10, 2, 1*time.Minute, 10*time.Second
}

// Set up grafana.
crdbNodes := c.Spec().NodeCount - 1
workloadNode := crdbNodes + 1
cfg := (&prometheus.Config{}).
WithPrometheusNode(c.Node(workloadNode).InstallNodes()[0]).
WithCluster(c.Range(1, crdbNodes).InstallNodes()).
WithNodeExporter(c.Range(1, crdbNodes).InstallNodes()).
WithGrafanaDashboardJSON(grafana.ChangefeedRoachtestGrafanaDashboardJSON)
cfg.Grafana.Enabled = true
if !t.SkipInit() {
err := c.StartGrafana(ctx, t.L(), cfg)
if err != nil {
t.Errorf("error starting prometheus/grafana: %s", err)
}
nodeURLs, err := c.ExternalIP(ctx, t.L(), c.Node(workloadNode))
if err != nil {
t.Errorf("error getting grafana node external ip: %s", err)
}
t.Status(fmt.Sprintf("started grafana at http://%s:3000/d/928XNlN4k/basic?from=now-15m&to=now", nodeURLs[0]))
} else {
t.Status("skipping grafana installation")
}

t.Status(fmt.Sprintf("initializing tpcc database with %d warehouses", numWarehouses))
tpccOpts := tpccOptions{
Warehouses: numWarehouses,
SkipPostRunCheck: true,
ExtraSetupArgs: "--checks=false",
DisableDefaultScheduledBackup: true,
}
setupTPCC(ctx, t, c, tpccOpts)
t.Status("finished initializing tpcc database")

// Add padding to let the cluster metrics settle after initializing tpcc.
t.Status(fmt.Sprintf("waiting for %s", pauseDuration))
time.Sleep(pauseDuration)

t.Status(fmt.Sprintf("running exports for %s", exportDuration))
// Signal workers to stop after the export duration.
cancelWorkers := atomic.Int64{}
_ = time.AfterFunc(exportDuration, func() {
t.Status("terminating workers...")
cancelWorkers.Store(1)
})

wg := sync.WaitGroup{}
for i := 0; i < numConcurrentExports; i++ {
wg.Add(1)
go func(i int, target string) {
t.Status(fmt.Sprintf("worker %d/%d starting export of target %s", i+1, numConcurrentExports, target))
fileNum := 0
db := c.Conn(ctx, t.L(), 1)
for cancelWorkers.Load() == 0 {
_, err := db.Exec(
fmt.Sprintf("EXPORT INTO PARQUET 'nodelocal://1/outputfile%d' FROM SELECT * FROM %s", fileNum, target))
fileNum += 1
if err != nil {
t.Fatalf(err.Error())
}
}
t.Status(fmt.Sprintf("worker %d/%d terminated", i+1, numConcurrentExports))
wg.Done()
}(i, allTpccTargets[i%len(allTpccTargets)])
}
wg.Wait()

// Uncomment when using --debug to inspect metrics, gather profiles, etc.
// t.FailNow()
},
})

// This test sets up a 3-node CRDB cluster on 8vCPU machines initialized with
// the TPC-C database containing 100 warehouses. Then, it executes concurrent
// exports until the entire database is exported.
r.Add(registry.TestSpec{
Name: "export/parquet/tpcc-100",
Owner: registry.OwnerCDC,
Tags: registry.Tags("daily"),
Cluster: r.MakeClusterSpec(4, spec.CPU(8)),
RequiresLicense: false,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
if c.Spec().NodeCount < 4 {
t.Fatalf("expected at least 4 nodes, found %d", c.Spec().NodeCount)
}

numWarehouses, pauseDuration := 100, 2*time.Minute
if c.IsLocal() {
numWarehouses, pauseDuration = 10, 10*time.Second
}

t.Status(fmt.Sprintf("initializing tpcc database with %d warehouses", numWarehouses))
tpccOpts := tpccOptions{
Warehouses: numWarehouses,
SkipPostRunCheck: true,
ExtraSetupArgs: "--checks=false",
DisableDefaultScheduledBackup: true,
}
setupTPCC(ctx, t, c, tpccOpts)
t.Status("finished initializing tpcc database")

// Add padding to let the cluster metrics settle after initializing tpcc.
t.Status(fmt.Sprintf("waiting for %s", pauseDuration))
time.Sleep(pauseDuration)

numWorkers := len(allTpccTargets)
wg := sync.WaitGroup{}
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(i int, target string) {
t.Status(fmt.Sprintf("worker %d/%d starting export of target %s", i+1, numWorkers, target))
db := c.Conn(ctx, t.L(), 1)
_, err := db.Exec(
fmt.Sprintf("EXPORT INTO PARQUET 'nodelocal://1/outputfile%d' FROM SELECT * FROM %s", i, target))
if err != nil {
t.Fatalf(err.Error())
}
t.Status(fmt.Sprintf("worker %d/%d terminated", i+1, numWorkers))
wg.Done()
}(i, allTpccTargets[i])
}
wg.Wait()
},
})
}
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func RegisterTests(r registry.Registry) {
registerBackupNodeShutdown(r)
registerCDC(r)
registerCDCMixedVersions(r)
registerExportParquet(r)
registerCancel(r)
registerChangeReplicasMixedVersion(r)
registerClearRange(r)
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/execinfra/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,9 @@ type TestingKnobs struct {
// Changefeed contains testing knobs specific to the changefeed system.
Changefeed base.ModuleTestingKnobs

// Export contains testing knobs for `EXPORT INTO ...`.
Export base.ModuleTestingKnobs

// Flowinfra contains testing knobs specific to the flowinfra system
Flowinfra base.ModuleTestingKnobs

Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ filegroup(
go_library(
name = "importer",
srcs = [
"export_base.go",
"exportcsv.go",
"exportparquet.go",
"import_job.go",
Expand Down Expand Up @@ -110,6 +111,7 @@ go_library(
"//pkg/util/log",
"//pkg/util/log/eventpb",
"//pkg/util/log/logutil",
"//pkg/util/parquet",
"//pkg/util/protoutil",
"//pkg/util/retry",
"//pkg/util/syncutil",
Expand All @@ -123,7 +125,6 @@ go_library(
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
"@com_github_fraugster_parquet_go//:parquet-go",
"@com_github_fraugster_parquet_go//parquet",
"@com_github_fraugster_parquet_go//parquetschema",
"@com_github_lib_pq//oid",
Expand Down Expand Up @@ -238,6 +239,7 @@ go_test(
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/log/eventpb",
"//pkg/util/parquet",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/retry",
Expand All @@ -250,7 +252,6 @@ go_test(
"//pkg/workload/workloadsql",
"@com_github_cockroachdb_cockroach_go_v2//crdb",
"@com_github_cockroachdb_errors//:errors",
"@com_github_fraugster_parquet_go//:parquet-go",
"@com_github_fraugster_parquet_go//parquet",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_gogo_protobuf//proto",
Expand Down
22 changes: 22 additions & 0 deletions pkg/sql/importer/export_base.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package importer

// ExportTestingKnobs contains testing knobs for Export.
type ExportTestingKnobs struct {
// EnableParquetTestMetadata makes `EXPORT INTO ` with the
// parquet format write CRDB-specific metadata that is required
// for tests to read raw data in parquet files into CRDB datums.
EnableParquetTestMetadata bool
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
func (*ExportTestingKnobs) ModuleTestingKnobs() {}
Loading

0 comments on commit e2f691d

Please sign in to comment.