From 063f5b966246cff1dcd425d5ef444ddea2e9dd44 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Fri, 7 Jul 2023 15:51:15 -0400 Subject: [PATCH] changefeedccl: Remove stale test Fixes #75080 Removed `TestChangefeedRestartDuringBackfill` test which was previously skipped, and remaing skipped for very long time. The reason for removal is that this test is exceedingly brittle and very stale. Furthermore, the functionality of restart during backfill already test extensively by non-flaky tests that verify restart and checkpoint functionality (`TestChangefeedCheckpointSchemaChange`, `TestChangefeedBackfillCheckpoint`, `TestCoreChangefeedBackfillScanCheckpoint`). Release note: None --- pkg/ccl/changefeedccl/changefeed_test.go | 121 ----------------------- 1 file changed, 121 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 0d0bf1698eb3..f723c2cb1295 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -5831,127 +5831,6 @@ func TestChangefeedContinuousTelemetryDifferentJobs(t *testing.T) { cdcTest(t, testFn, feedTestOmitSinks("sinkless", "pubsub")) } -// Regression test for #41694. -func TestChangefeedRestartDuringBackfill(t *testing.T) { - defer leaktest.AfterTest(t)() - skip.WithIssue(t, 75080, "flaky test") - defer log.Scope(t).Close(t) - - // TODO(yevgeniy): Rework this test. It's too brittle. - - testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { - knobs := s.TestingKnobs. - DistSQL.(*execinfra.TestingKnobs). - Changefeed.(*TestingKnobs) - beforeEmitRowCh := make(chan error, 20) - knobs.BeforeEmitRow = func(ctx context.Context) error { - select { - case <-ctx.Done(): - return ctx.Err() - case err := <-beforeEmitRowCh: - return err - } - } - - unblockMessages := func(n int) { - for i := 0; i < n; i++ { - beforeEmitRowCh <- nil - } - } - - sqlDB := sqlutils.MakeSQLRunner(s.DB) - sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) - sqlDB.Exec(t, `INSERT INTO foo VALUES (0), (1), (2), (3)`) - - foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH diff`) - defer closeFeed(t, foo) - - // TODO(dan): At a high level, all we're doing is trying to restart a - // changefeed in the middle of changefeed backfill after a schema change - // finishes. It turns out this is pretty hard to do with our current testing - // knobs and this test ends up being pretty brittle. I'd love it if anyone - // thought of a better way to do this. - - // Read the initial data in the rows. - unblockMessages(4) - assertPayloads(t, foo, []string{ - `foo: [0]->{"after": {"a": 0}, "before": null}`, - `foo: [1]->{"after": {"a": 1}, "before": null}`, - `foo: [2]->{"after": {"a": 2}, "before": null}`, - `foo: [3]->{"after": {"a": 3}, "before": null}`, - }) - - // Run a schema change that backfills kvs. - sqlDB.Exec(t, `ALTER TABLE foo ADD COLUMN b STRING DEFAULT 'backfill'`) - - // Unblock emit for each kv written by the schema change's backfill. The - // changefeed actually emits these, but we lose it to overaggressive - // duplicate detection in tableFeed. - // TODO(dan): Track duplicates more precisely in tableFeed. - unblockMessages(4) - - // Unblock the emit for *all but one* of the rows emitted by the changefeed - // backfill (run after the schema change completes and the final table - // descriptor is written). The reason this test has 4 rows is because the - // `sqlSink` that powers `tableFeed` only flushes after it has 3 rows, so we - // need 1 more than that to guarantee that this first one gets flushed. - for i := 0; i < 3; i++ { - beforeEmitRowCh <- nil - } - assertPayloads(t, foo, []string{ - `foo: [0]->{"after": {"a": 0}, "before": {"a": 0}}`, - `foo: [1]->{"after": {"a": 1}, "before": {"a": 1}}`, - `foo: [2]->{"after": {"a": 2}, "before": {"a": 2}}`, - `foo: [3]->{"after": {"a": 3}, "before": {"a": 3}}`, - `foo: [0]->{"after": {"a": 0, "b": "backfill"}, "before": {"a": 0}}`, - }) - - // `foo: [0]->{"after": {"a": 0, "b": "backfill"}, "before": {"a": 0}}`, - feedJob := foo.(cdctest.EnterpriseTestFeed) - require.NoError(t, feedJob.Pause()) - - // Make extra sure that the zombie changefeed can't write any more data. - beforeEmitRowCh <- errors.New(`nope don't write it`) - - // Insert some data that we should only see out of the changefeed after it - // re-runs the backfill. - sqlDB.Exec(t, `INSERT INTO foo VALUES (6, 'bar')`) - - // Unblock all later emits, we don't need this control anymore. - close(beforeEmitRowCh) - - // Resume the changefeed and the backfill should start up again. Currently - // this does the entire backfill again, you could imagine in the future that - // we do some sort of backfill checkpointing and start the backfill up from - // the last checkpoint. - require.NoError(t, feedJob.Resume()) - assertPayloads(t, foo, []string{ - // The changefeed actually emits this row, but we lose it to - // overaggressive duplicate detection in tableFeed. - // TODO(dan): Track duplicates more precisely in sinklessFeed/tableFeed. - // `foo: [0]->{"after": {"a": 0, "b": "backfill"}}`, - `foo: [1]->{"after": {"a": 1, "b": "backfill"}, "before": {"a": 1}}`, - `foo: [2]->{"after": {"a": 2, "b": "backfill"}, "before": {"a": 2}}`, - `foo: [3]->{"after": {"a": 3, "b": "backfill"}, "before": {"a": 3}}`, - }) - - assertPayloads(t, foo, []string{ - `foo: [6]->{"after": {"a": 6, "b": "bar"}, "before": null}`, - }) - } - - useSysCfgInKV := withKnobsFn(func(knobs *base.TestingKnobs) { - // TODO(irfansharif): This test is "skipped" under span configs; - // #75080. - if knobs.Store == nil { - knobs.Store = &kvserver.StoreTestingKnobs{} - } - knobs.Store.(*kvserver.StoreTestingKnobs).UseSystemConfigSpanForQueues = true - }) - - cdcTest(t, testFn, feedTestForceSink("kafka"), useSysCfgInKV) -} - func TestChangefeedHandlesDrainingNodes(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t)