Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: Remove stale test #106433

Merged
merged 1 commit into from
Jul 10, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 0 additions & 121 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5831,127 +5831,6 @@ func TestChangefeedContinuousTelemetryDifferentJobs(t *testing.T) {
cdcTest(t, testFn, feedTestOmitSinks("sinkless", "pubsub"))
}

// Regression test for #41694.
func TestChangefeedRestartDuringBackfill(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, 75080, "flaky test")
defer log.Scope(t).Close(t)

// TODO(yevgeniy): Rework this test. It's too brittle.

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
knobs := s.TestingKnobs.
DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs)
beforeEmitRowCh := make(chan error, 20)
knobs.BeforeEmitRow = func(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-beforeEmitRowCh:
return err
}
}

unblockMessages := func(n int) {
for i := 0; i < n; i++ {
beforeEmitRowCh <- nil
}
}

sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (0), (1), (2), (3)`)

foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH diff`)
defer closeFeed(t, foo)

// TODO(dan): At a high level, all we're doing is trying to restart a
// changefeed in the middle of changefeed backfill after a schema change
// finishes. It turns out this is pretty hard to do with our current testing
// knobs and this test ends up being pretty brittle. I'd love it if anyone
// thought of a better way to do this.

// Read the initial data in the rows.
unblockMessages(4)
assertPayloads(t, foo, []string{
`foo: [0]->{"after": {"a": 0}, "before": null}`,
`foo: [1]->{"after": {"a": 1}, "before": null}`,
`foo: [2]->{"after": {"a": 2}, "before": null}`,
`foo: [3]->{"after": {"a": 3}, "before": null}`,
})

// Run a schema change that backfills kvs.
sqlDB.Exec(t, `ALTER TABLE foo ADD COLUMN b STRING DEFAULT 'backfill'`)

// Unblock emit for each kv written by the schema change's backfill. The
// changefeed actually emits these, but we lose it to overaggressive
// duplicate detection in tableFeed.
// TODO(dan): Track duplicates more precisely in tableFeed.
unblockMessages(4)

// Unblock the emit for *all but one* of the rows emitted by the changefeed
// backfill (run after the schema change completes and the final table
// descriptor is written). The reason this test has 4 rows is because the
// `sqlSink` that powers `tableFeed` only flushes after it has 3 rows, so we
// need 1 more than that to guarantee that this first one gets flushed.
for i := 0; i < 3; i++ {
beforeEmitRowCh <- nil
}
assertPayloads(t, foo, []string{
`foo: [0]->{"after": {"a": 0}, "before": {"a": 0}}`,
`foo: [1]->{"after": {"a": 1}, "before": {"a": 1}}`,
`foo: [2]->{"after": {"a": 2}, "before": {"a": 2}}`,
`foo: [3]->{"after": {"a": 3}, "before": {"a": 3}}`,
`foo: [0]->{"after": {"a": 0, "b": "backfill"}, "before": {"a": 0}}`,
})

// `foo: [0]->{"after": {"a": 0, "b": "backfill"}, "before": {"a": 0}}`,
feedJob := foo.(cdctest.EnterpriseTestFeed)
require.NoError(t, feedJob.Pause())

// Make extra sure that the zombie changefeed can't write any more data.
beforeEmitRowCh <- errors.New(`nope don't write it`)

// Insert some data that we should only see out of the changefeed after it
// re-runs the backfill.
sqlDB.Exec(t, `INSERT INTO foo VALUES (6, 'bar')`)

// Unblock all later emits, we don't need this control anymore.
close(beforeEmitRowCh)

// Resume the changefeed and the backfill should start up again. Currently
// this does the entire backfill again, you could imagine in the future that
// we do some sort of backfill checkpointing and start the backfill up from
// the last checkpoint.
require.NoError(t, feedJob.Resume())
assertPayloads(t, foo, []string{
// The changefeed actually emits this row, but we lose it to
// overaggressive duplicate detection in tableFeed.
// TODO(dan): Track duplicates more precisely in sinklessFeed/tableFeed.
// `foo: [0]->{"after": {"a": 0, "b": "backfill"}}`,
`foo: [1]->{"after": {"a": 1, "b": "backfill"}, "before": {"a": 1}}`,
`foo: [2]->{"after": {"a": 2, "b": "backfill"}, "before": {"a": 2}}`,
`foo: [3]->{"after": {"a": 3, "b": "backfill"}, "before": {"a": 3}}`,
})

assertPayloads(t, foo, []string{
`foo: [6]->{"after": {"a": 6, "b": "bar"}, "before": null}`,
})
}

useSysCfgInKV := withKnobsFn(func(knobs *base.TestingKnobs) {
// TODO(irfansharif): This test is "skipped" under span configs;
// #75080.
if knobs.Store == nil {
knobs.Store = &kvserver.StoreTestingKnobs{}
}
knobs.Store.(*kvserver.StoreTestingKnobs).UseSystemConfigSpanForQueues = true
})

cdcTest(t, testFn, feedTestForceSink("kafka"), useSysCfgInKV)
}

func TestChangefeedHandlesDrainingNodes(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down