Skip to content

Commit

Permalink
changefeedccl: Remove stale test
Browse files Browse the repository at this point in the history
Fixes cockroachdb#75080
Removed `TestChangefeedRestartDuringBackfill` test which was previously
skipped, and remaing skipped for very long time.

The reason for removal is that this test is exceedingly brittle
and very stale.  Furthermore, the functionality of restart during
backfill already test extensively by non-flaky tests
that verify restart and checkpoint functionality
(`TestChangefeedCheckpointSchemaChange`,
`TestChangefeedBackfillCheckpoint`,
`TestCoreChangefeedBackfillScanCheckpoint`).

Release note: None
  • Loading branch information
Yevgeniy Miretskiy committed Jul 7, 2023
1 parent 9dda82c commit 063f5b9
Showing 1 changed file with 0 additions and 121 deletions.
121 changes: 0 additions & 121 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5831,127 +5831,6 @@ func TestChangefeedContinuousTelemetryDifferentJobs(t *testing.T) {
cdcTest(t, testFn, feedTestOmitSinks("sinkless", "pubsub"))
}

// Regression test for #41694.
func TestChangefeedRestartDuringBackfill(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, 75080, "flaky test")
defer log.Scope(t).Close(t)

// TODO(yevgeniy): Rework this test. It's too brittle.

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
knobs := s.TestingKnobs.
DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs)
beforeEmitRowCh := make(chan error, 20)
knobs.BeforeEmitRow = func(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-beforeEmitRowCh:
return err
}
}

unblockMessages := func(n int) {
for i := 0; i < n; i++ {
beforeEmitRowCh <- nil
}
}

sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (0), (1), (2), (3)`)

foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH diff`)
defer closeFeed(t, foo)

// TODO(dan): At a high level, all we're doing is trying to restart a
// changefeed in the middle of changefeed backfill after a schema change
// finishes. It turns out this is pretty hard to do with our current testing
// knobs and this test ends up being pretty brittle. I'd love it if anyone
// thought of a better way to do this.

// Read the initial data in the rows.
unblockMessages(4)
assertPayloads(t, foo, []string{
`foo: [0]->{"after": {"a": 0}, "before": null}`,
`foo: [1]->{"after": {"a": 1}, "before": null}`,
`foo: [2]->{"after": {"a": 2}, "before": null}`,
`foo: [3]->{"after": {"a": 3}, "before": null}`,
})

// Run a schema change that backfills kvs.
sqlDB.Exec(t, `ALTER TABLE foo ADD COLUMN b STRING DEFAULT 'backfill'`)

// Unblock emit for each kv written by the schema change's backfill. The
// changefeed actually emits these, but we lose it to overaggressive
// duplicate detection in tableFeed.
// TODO(dan): Track duplicates more precisely in tableFeed.
unblockMessages(4)

// Unblock the emit for *all but one* of the rows emitted by the changefeed
// backfill (run after the schema change completes and the final table
// descriptor is written). The reason this test has 4 rows is because the
// `sqlSink` that powers `tableFeed` only flushes after it has 3 rows, so we
// need 1 more than that to guarantee that this first one gets flushed.
for i := 0; i < 3; i++ {
beforeEmitRowCh <- nil
}
assertPayloads(t, foo, []string{
`foo: [0]->{"after": {"a": 0}, "before": {"a": 0}}`,
`foo: [1]->{"after": {"a": 1}, "before": {"a": 1}}`,
`foo: [2]->{"after": {"a": 2}, "before": {"a": 2}}`,
`foo: [3]->{"after": {"a": 3}, "before": {"a": 3}}`,
`foo: [0]->{"after": {"a": 0, "b": "backfill"}, "before": {"a": 0}}`,
})

// `foo: [0]->{"after": {"a": 0, "b": "backfill"}, "before": {"a": 0}}`,
feedJob := foo.(cdctest.EnterpriseTestFeed)
require.NoError(t, feedJob.Pause())

// Make extra sure that the zombie changefeed can't write any more data.
beforeEmitRowCh <- errors.New(`nope don't write it`)

// Insert some data that we should only see out of the changefeed after it
// re-runs the backfill.
sqlDB.Exec(t, `INSERT INTO foo VALUES (6, 'bar')`)

// Unblock all later emits, we don't need this control anymore.
close(beforeEmitRowCh)

// Resume the changefeed and the backfill should start up again. Currently
// this does the entire backfill again, you could imagine in the future that
// we do some sort of backfill checkpointing and start the backfill up from
// the last checkpoint.
require.NoError(t, feedJob.Resume())
assertPayloads(t, foo, []string{
// The changefeed actually emits this row, but we lose it to
// overaggressive duplicate detection in tableFeed.
// TODO(dan): Track duplicates more precisely in sinklessFeed/tableFeed.
// `foo: [0]->{"after": {"a": 0, "b": "backfill"}}`,
`foo: [1]->{"after": {"a": 1, "b": "backfill"}, "before": {"a": 1}}`,
`foo: [2]->{"after": {"a": 2, "b": "backfill"}, "before": {"a": 2}}`,
`foo: [3]->{"after": {"a": 3, "b": "backfill"}, "before": {"a": 3}}`,
})

assertPayloads(t, foo, []string{
`foo: [6]->{"after": {"a": 6, "b": "bar"}, "before": null}`,
})
}

useSysCfgInKV := withKnobsFn(func(knobs *base.TestingKnobs) {
// TODO(irfansharif): This test is "skipped" under span configs;
// #75080.
if knobs.Store == nil {
knobs.Store = &kvserver.StoreTestingKnobs{}
}
knobs.Store.(*kvserver.StoreTestingKnobs).UseSystemConfigSpanForQueues = true
})

cdcTest(t, testFn, feedTestForceSink("kafka"), useSysCfgInKV)
}

func TestChangefeedHandlesDrainingNodes(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down

0 comments on commit 063f5b9

Please sign in to comment.