Skip to content

Commit

Permalink
roachtest/tests: introduce metamorphic testing to cdc
Browse files Browse the repository at this point in the history
Prior to this commit, roachtest/cdc relies solely on periodic checks of changefeed status and latency. This patch takes
the first step to introduce a metamorphic testing framework.

Given the absence of a way to evaluate the output file correctness yet, this new approach involves running two
changefeeds with different configurations, retrieving their roachtests’ output files, and comparing their data outputs.

Due to potential duplicates in the changefeed output, the test follows these steps”
1. create two empty tables with the same scheme as the workload tables
2. convert parquet data to datums
3. execute `UPSERT` statements on the tables with the datums to eliminate duplicates
4. confirm the identical content of the two tables by checking their fingerprints

Limitations with this approach include:
- This solution only works for parquet files as of now. (A round trip conversion is guaranteed between parquet data
   format and datums. Other data formats are more complicated.)
- INSERT is the only operation involved.
- Due to the large file size, the test randomly selects one target table for changefeeds.
- Currently, the changefeeds use the same configurations. However, we plan to change this soon following a discussion
   to determine the specfic configurations that will be randomized.

Part of: cockroachdb#111066

Release note: None
  • Loading branch information
wenyihu6 committed Feb 15, 2024
1 parent ea9fb4e commit c8f84bf
Show file tree
Hide file tree
Showing 8 changed files with 445 additions and 5 deletions.
26 changes: 23 additions & 3 deletions pkg/ccl/changefeedccl/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,18 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/json"
"github.com/cockroachdb/cockroach/pkg/util/parquet"
"github.com/cockroachdb/errors"
)

// includeParquestTestMetadata configures the parquet writer to write
// metadata required for reading parquet files in tests.
var includeParquestTestMetadata = buildutil.CrdbTestBuild
// includeParquestTestMetadata configures the parquet writer to write extra
// column metadata for parquet files in tests.
var includeParquestTestMetadata = buildutil.CrdbTestBuild ||
envutil.EnvOrDefaultBool("COCKROACH_CHANGEFEED_TESTING_INCLUDE_PARQUET_TEST_METADATA",
false)

type parquetWriter struct {
inner *parquet.Writer
Expand Down Expand Up @@ -115,6 +118,7 @@ func newParquetWriterFromRow(
if opts, err = addParquetTestMetadata(row, encodingOpts, opts); err != nil {
return nil, err
}
opts = append(opts, parquet.WithMetadata(parquet.MakeReaderMetadata(schemaDef)))
}
writer, err := parquet.NewWriter(schemaDef, sink, opts...)

Expand Down Expand Up @@ -336,3 +340,19 @@ func deserializeMap(s string) (orderedKeys []string, m map[string]int, err error
}
return orderedKeys, m, nil
}

// TestingGetEventTypeColIdx returns the index of the extra column added to
// every parquet file which indicate the type of event that generated a
// particular row. Please read parquetCrdbEventTypeColName and
// addParquetTestMetadata for more details.
func TestingGetEventTypeColIdx(rd parquet.ReadDatumsMetadata) (int, error) {
columnsNamesString, ok := rd.MetaFields["allCols"]
if !ok {
return -1, errors.Errorf("could not find column names in parquet metadata")
}
_, columnNameSet, err := deserializeMap(columnsNamesString)
if err != nil {
return -1, err
}
return columnNameSet[parquetCrdbEventTypeColName], nil
}
7 changes: 7 additions & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ go_library(
"cdc.go",
"cdc_bench.go",
"cdc_filtering.go",
"cdc_helper.go",
"cdc_stats.go",
"chaos.go",
"clearrange.go",
Expand Down Expand Up @@ -195,6 +196,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/blobs",
"//pkg/ccl/changefeedccl",
"//pkg/ccl/changefeedccl/cdctest",
"//pkg/ccl/changefeedccl/changefeedbase",
Expand Down Expand Up @@ -232,8 +234,10 @@ go_library(
"//pkg/roachprod/logger",
"//pkg/roachprod/prometheus",
"//pkg/roachprod/vm",
"//pkg/security/username",
"//pkg/server/authserver",
"//pkg/server/serverpb",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
Expand All @@ -254,7 +258,9 @@ go_library(
"//pkg/util/httputil",
"//pkg/util/humanizeutil",
"//pkg/util/intsets",
"//pkg/util/ioctx",
"//pkg/util/log",
"//pkg/util/parquet",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/retry",
Expand Down Expand Up @@ -306,6 +312,7 @@ go_test(
name = "tests_test",
srcs = [
"blocklist_test.go",
"cdc_helper_test.go",
"drt_test.go",
"query_comparison_util_test.go",
"restore_test.go",
Expand Down
43 changes: 43 additions & 0 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ var envVars = []string{
// NB: This is crucial for chaos tests as we expect changefeeds to see
// many retries.
"COCKROACH_CHANGEFEED_TESTING_FAST_RETRY=true",
"COCKROACH_CHANGEFEED_TESTING_INCLUDE_PARQUET_TEST_METADATA=true",
"COCKROACH_CHANGEFEED_TESTING_INCLUDE_PARQUET_READER_METADATA=true",
}

type cdcTester struct {
Expand Down Expand Up @@ -1163,6 +1165,47 @@ func registerCDC(r registry.Registry) {

},
})
r.Add(registry.TestSpec{
Name: "cdc/initial-scan-only/parquet/metamorphic",
Owner: registry.OwnerCDC,
Benchmark: true,
Cluster: r.MakeClusterSpec(4, spec.CPU(16), spec.Arch(vm.ArchAMD64)),
RequiresLicense: true,
CompatibleClouds: registry.AllExceptAWS,
Suites: registry.Suites(registry.Nightly),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
ct := newCDCTester(ctx, t, c)
defer ct.Close()

// Metamorphic testing runs two changefeeds,
// Run the workload with 1 warehouse only to speed up the test.
ct.runTPCCWorkload(tpccArgs{warehouses: 1})

// Randomly select one table as changefeed target and skip other tables to
// speed up the test.
randomlySelectedIndex := getRandomIndex(len(allTpccTargets))
selectedTargetTable := allTpccTargets[randomlySelectedIndex]
trimmedTargetTable := strings.TrimPrefix(selectedTargetTable, `tpcc.`)

firstFeed := ct.newChangefeed(feedArgs{
sinkType: cloudStorageSink,
targets: []string{selectedTargetTable},
opts: map[string]string{"initial_scan": "'only'", "format": "'parquet'"},
})
firstFeed.waitForCompletion()

secFeed := ct.newChangefeed(feedArgs{
sinkType: cloudStorageSink,
targets: []string{selectedTargetTable},
opts: map[string]string{"initial_scan": "'only'", "format": "'parquet'"},
})
secFeed.waitForCompletion()

db := c.Conn(context.Background(), t.L(), 1)
sqlRunner := sqlutils.MakeSQLRunner(db)
checkTwoChangeFeedExportContent(ctx, t, sqlRunner, firstFeed.sinkURI, secFeed.sinkURI, trimmedTargetTable)
},
})
r.Add(registry.TestSpec{
Name: "cdc/tpcc-1000/sink=null",
Owner: registry.OwnerCDC,
Expand Down
Loading

0 comments on commit c8f84bf

Please sign in to comment.