From 8abea21ef4e94e9fe5af10b4079e6858c964956b Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Mon, 23 Sep 2019 18:38:37 +0200 Subject: [PATCH] roachtest: deflake tpccbench chaos tests These tests were running a chaos agent across cluster restarts. Whenever a cluster restart would overlap with the chaos agent restarting a node, one of the two operations would fail and jam the test. Fixes #39005. Release justification: de-flakes a roachtest without changes to the release binary. Release note: None --- pkg/cmd/roachtest/tpcc.go | 295 +++++++++++++++++++------------------- 1 file changed, 145 insertions(+), 150 deletions(-) diff --git a/pkg/cmd/roachtest/tpcc.go b/pkg/cmd/roachtest/tpcc.go index b261531519b3..a947405c3cb7 100644 --- a/pkg/cmd/roachtest/tpcc.go +++ b/pkg/cmd/roachtest/tpcc.go @@ -32,7 +32,6 @@ import ( "github.com/cockroachdb/ttycolor" "github.com/lib/pq" "github.com/pkg/errors" - "golang.org/x/sync/errgroup" ) type tpccOptions struct { @@ -714,168 +713,164 @@ func runTPCCBench(ctx context.Context, t *test, c *cluster, b tpccBenchSpec) { c.Put(ctx, workload, "./workload", loadNodes) c.Start(ctx, t, append(b.startOpts(), roachNodes)...) - // Wait after restarting nodes before applying load. This lets - // things settle down to avoid unexpected cluster states. - const restartWait = 15 * time.Second - time.Sleep(restartWait) - useHAProxy := b.Chaos - if useHAProxy { - if len(loadNodes) > 1 { - t.Fatal("distributed chaos benchmarking not supported") - } - t.Status("installing haproxy") - if err := c.Install(ctx, t.l, loadNodes, "haproxy"); err != nil { - t.Fatal(err) + const restartWait = 15 * time.Second + { + // Wait after restarting nodes before applying load. This lets + // things settle down to avoid unexpected cluster states. + time.Sleep(restartWait) + if useHAProxy { + if len(loadNodes) > 1 { + t.Fatal("distributed chaos benchmarking not supported") + } + t.Status("installing haproxy") + if err := c.Install(ctx, t.l, loadNodes, "haproxy"); err != nil { + t.Fatal(err) + } + c.Run(ctx, loadNodes, "./cockroach gen haproxy --insecure --url {pgurl:1}") + c.Run(ctx, loadNodes, "haproxy -f haproxy.cfg -D") } - c.Run(ctx, loadNodes, "./cockroach gen haproxy --insecure --url {pgurl:1}") - c.Run(ctx, loadNodes, "haproxy -f haproxy.cfg -D") - } - m := newMonitor(ctx, c, roachNodes) - m.Go(func(ctx context.Context) error { - t.Status("setting up dataset") - err := loadTPCCBench(ctx, t, c, b, roachNodes, c.Node(loadNodes[0])) - if err != nil { - return err - } + m := newMonitor(ctx, c, roachNodes) + m.Go(func(ctx context.Context) error { + t.Status("setting up dataset") + return loadTPCCBench(ctx, t, c, b, roachNodes, c.Node(loadNodes[0])) + }) + m.Wait() + } - // Search between 1 and b.LoadWarehouses for the largest number of - // warehouses that can be operated on while sustaining a throughput - // threshold, set to a fraction of max tpmC. - precision := int(math.Max(1.0, float64(b.LoadWarehouses/200))) - initStepSize := precision + // Search between 1 and b.LoadWarehouses for the largest number of + // warehouses that can be operated on while sustaining a throughput + // threshold, set to a fraction of max tpmC. + precision := int(math.Max(1.0, float64(b.LoadWarehouses/200))) + initStepSize := precision - // Create a temp directory to store the local copy of results from the - // workloads. - resultsDir, err := ioutil.TempDir("", "roachtest-tpcc") - if err != nil { - return errors.Wrap(err, "failed to create temp dir") - } - defer func() { _ = os.RemoveAll(resultsDir) }() - s := search.NewLineSearcher(1, b.LoadWarehouses, b.EstimatedMax, initStepSize, precision) - res, err := s.Search(func(warehouses int) (bool, error) { - // Restart the cluster before each iteration to help eliminate - // inter-trial interactions. - m.ExpectDeaths(int32(len(roachNodes))) - c.Stop(ctx, roachNodes) - c.Start(ctx, t, append(b.startOpts(), roachNodes)...) - time.Sleep(restartWait) - - // Set up the load generation configuration. - rampDur := 5 * time.Minute - loadDur := 10 * time.Minute - loadDone := make(chan time.Time, numLoadGroups) - - // If we're running chaos in this configuration, modify this config. - if b.Chaos { - // Increase the load generation duration. - loadDur = 10 * time.Minute - - // Kill one node at a time. - ch := Chaos{ - Timer: Periodic{Period: 90 * time.Second, DownTime: 5 * time.Second}, - Target: roachNodes.randNode, - Stopper: loadDone, - } - m.Go(ch.Runner(c, m)) - } - if b.Distribution == multiRegion { - rampDur = 3 * time.Minute - loadDur = 15 * time.Minute + // Create a temp directory to store the local copy of results from the + // workloads. + resultsDir, err := ioutil.TempDir("", "roachtest-tpcc") + if err != nil { + t.Fatal(errors.Wrap(err, "failed to create temp dir")) + } + defer func() { _ = os.RemoveAll(resultsDir) }() + s := search.NewLineSearcher(1, b.LoadWarehouses, b.EstimatedMax, initStepSize, precision) + if res, err := s.Search(func(warehouses int) (bool, error) { + m := newMonitor(ctx, c, roachNodes) + // Restart the cluster before each iteration to help eliminate + // inter-trial interactions. + m.ExpectDeaths(int32(len(roachNodes))) + c.Stop(ctx, roachNodes) + c.Start(ctx, t, append(b.startOpts(), roachNodes)...) + time.Sleep(restartWait) + + // Set up the load generation configuration. + rampDur := 5 * time.Minute + loadDur := 10 * time.Minute + loadDone := make(chan time.Time, numLoadGroups) + + // If we're running chaos in this configuration, modify this config. + if b.Chaos { + // Increase the load generation duration. + loadDur = 10 * time.Minute + + // Kill one node at a time. + ch := Chaos{ + Timer: Periodic{Period: 90 * time.Second, DownTime: 5 * time.Second}, + Target: roachNodes.randNode, + Stopper: loadDone, } + m.Go(ch.Runner(c, m)) + } + if b.Distribution == multiRegion { + rampDur = 3 * time.Minute + loadDur = 15 * time.Minute + } - // If we're running multiple load generators, run them in parallel and then - // aggregate resultChan. In order to process the results we need to copy - // over the histograms. Create a temp dir which will contain the fetched - // data. - var eg errgroup.Group - resultChan := make(chan *tpcc.Result, numLoadGroups) - for groupIdx, group := range loadGroups { - // Copy for goroutine - groupIdx := groupIdx - group := group - eg.Go(func() error { - sqlGateways := group.roachNodes - if useHAProxy { - sqlGateways = group.loadNodes - } + // If we're running multiple load generators, run them in parallel and then + // aggregate resultChan. In order to process the results we need to copy + // over the histograms. Create a temp dir which will contain the fetched + // data. + resultChan := make(chan *tpcc.Result, numLoadGroups) + for groupIdx, group := range loadGroups { + // Copy for goroutine + groupIdx := groupIdx + group := group + m.Go(func(ctx context.Context) error { + sqlGateways := group.roachNodes + if useHAProxy { + sqlGateways = group.loadNodes + } - extraFlags := "" - activeWarehouses := warehouses - switch b.LoadConfig { - case singleLoadgen: - // Nothing. - case singlePartitionedLoadgen: - extraFlags = fmt.Sprintf(` --partitions=%d`, b.partitions()) - case multiLoadgen: - extraFlags = fmt.Sprintf(` --partitions=%d --partition-affinity=%d`, - b.partitions(), groupIdx) - activeWarehouses = warehouses / numLoadGroups - default: - panic("unexpected") - } + extraFlags := "" + activeWarehouses := warehouses + switch b.LoadConfig { + case singleLoadgen: + // Nothing. + case singlePartitionedLoadgen: + extraFlags = fmt.Sprintf(` --partitions=%d`, b.partitions()) + case multiLoadgen: + extraFlags = fmt.Sprintf(` --partitions=%d --partition-affinity=%d`, + b.partitions(), groupIdx) + activeWarehouses = warehouses / numLoadGroups + default: + panic("unexpected") + } - t.Status(fmt.Sprintf("running benchmark, warehouses=%d", warehouses)) - histogramsPath := fmt.Sprintf("%s/warehouses=%d/stats.json", perfArtifactsDir, activeWarehouses) - cmd := fmt.Sprintf("./workload run tpcc --warehouses=%d --active-warehouses=%d "+ - "--tolerate-errors --scatter --ramp=%s --duration=%s%s {pgurl%s} "+ - "--histograms=%s", - b.LoadWarehouses, activeWarehouses, rampDur, - loadDur, extraFlags, sqlGateways, histogramsPath) - err := c.RunE(ctx, group.loadNodes, cmd) - loadDone <- timeutil.Now() - if err != nil { - return errors.Wrapf(err, "error running tpcc load generator") - } - roachtestHistogramsPath := filepath.Join(resultsDir, fmt.Sprintf("%d.%d-stats.json", warehouses, groupIdx)) - if err := c.Get( - ctx, t.l, histogramsPath, roachtestHistogramsPath, group.loadNodes, - ); err != nil { - t.Fatal(err) - } - snapshots, err := histogram.DecodeSnapshots(roachtestHistogramsPath) - if err != nil { - return errors.Wrapf(err, "failed to decode histogram snapshots") - } - result := tpcc.NewResultWithSnapshots(activeWarehouses, 0, snapshots) - resultChan <- result - return nil - }) - } - if err = eg.Wait(); err != nil { - return false, err - } - close(resultChan) - var results []*tpcc.Result - for partial := range resultChan { - results = append(results, partial) - } - res := tpcc.MergeResults(results...) - failErr := res.FailureError() - // Print the result. - if failErr == nil { - ttycolor.Stdout(ttycolor.Green) - t.l.Printf("--- PASS: tpcc %d resulted in %.1f tpmC (%.1f%% of max tpmC)\n\n", - warehouses, res.TpmC(), res.Efficiency()) - } else { - ttycolor.Stdout(ttycolor.Red) - t.l.Printf("--- FAIL: tpcc %d resulted in %.1f tpmC and failed due to %v", - warehouses, res.TpmC(), failErr) - } - ttycolor.Stdout(ttycolor.Reset) - return failErr == nil, nil - }) - if err != nil { - return err + t.Status(fmt.Sprintf("running benchmark, warehouses=%d", warehouses)) + histogramsPath := fmt.Sprintf("%s/warehouses=%d/stats.json", perfArtifactsDir, activeWarehouses) + cmd := fmt.Sprintf("./workload run tpcc --warehouses=%d --active-warehouses=%d "+ + "--tolerate-errors --scatter --ramp=%s --duration=%s%s {pgurl%s} "+ + "--histograms=%s", + b.LoadWarehouses, activeWarehouses, rampDur, + loadDur, extraFlags, sqlGateways, histogramsPath) + err := c.RunE(ctx, group.loadNodes, cmd) + loadDone <- timeutil.Now() + if err != nil { + return errors.Wrapf(err, "error running tpcc load generator") + } + roachtestHistogramsPath := filepath.Join(resultsDir, fmt.Sprintf("%d.%d-stats.json", warehouses, groupIdx)) + if err := c.Get( + ctx, t.l, histogramsPath, roachtestHistogramsPath, group.loadNodes, + ); err != nil { + t.Fatal(err) + } + snapshots, err := histogram.DecodeSnapshots(roachtestHistogramsPath) + if err != nil { + return errors.Wrapf(err, "failed to decode histogram snapshots") + } + result := tpcc.NewResultWithSnapshots(activeWarehouses, 0, snapshots) + resultChan <- result + return nil + }) } - + if err = m.WaitE(); err != nil { + return false, err + } + close(resultChan) + var results []*tpcc.Result + for partial := range resultChan { + results = append(results, partial) + } + res := tpcc.MergeResults(results...) + failErr := res.FailureError() + // Print the result. + if failErr == nil { + ttycolor.Stdout(ttycolor.Green) + t.l.Printf("--- PASS: tpcc %d resulted in %.1f tpmC (%.1f%% of max tpmC)\n\n", + warehouses, res.TpmC(), res.Efficiency()) + } else { + ttycolor.Stdout(ttycolor.Red) + t.l.Printf("--- FAIL: tpcc %d resulted in %.1f tpmC and failed due to %v", + warehouses, res.TpmC(), failErr) + } + ttycolor.Stdout(ttycolor.Reset) + return failErr == nil, nil + }); err != nil { + t.Fatal(err) + } else { ttycolor.Stdout(ttycolor.Green) t.l.Printf("------\nMAX WAREHOUSES = %d\n------\n\n", res) ttycolor.Stdout(ttycolor.Reset) - return nil - }) - m.Wait() + } } func registerTPCCBench(r *testRegistry) {