Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: Add timeout to testfeed library. #84007

Merged
merged 1 commit into from
Jul 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ go_test(
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/contextutil",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/hlc",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1115,7 +1115,7 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
expectedValues[j] = fmt.Sprintf(`foo: [%d]->{"after": {"val": %d}}`, j, j)
expectedValues[j+numRowsPerTable] = fmt.Sprintf(`bar: [%d]->{"after": {"val": %d}}`, j, j)
}
return assertPayloadsBaseErr(testFeed, expectedValues, false, false)
return assertPayloadsBaseErr(context.Background(), testFeed, expectedValues, false, false)
})

defer func() {
Expand Down
33 changes: 32 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5241,7 +5241,7 @@ func TestChangefeedCheckpointSchemaChange(t *testing.T) {
`foo: [1]->{"after": {"a": 1}}`,
`foo: [2]->{"after": {"a": 2}}`,
}
msgs, err := readNextMessages(foo, len(expected))
msgs, err := readNextMessages(context.Background(), foo, len(expected))
require.NoError(t, err)

var msgsFormatted []string
Expand Down Expand Up @@ -6782,3 +6782,34 @@ func TestChangefeedFailedTelemetryLogs(t *testing.T) {
require.Equal(t, failLogs[0].NumTables, int32(1))
}, feedTestForceSink("pubsub"))
}

func TestChangefeedTestTimesOut(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
nada := feed(t, f, "CREATE CHANGEFEED FOR foo WITH resolved='100ms'")
defer closeFeed(t, nada)

expectResolvedTimestamp(t, nada) // Make sure feed is running.

const expectTimeout = 500 * time.Millisecond
var observedError error
require.NoError(t,
testutils.SucceedsWithinError(func() error {
observedError = withTimeout(
nada, expectTimeout,
func(ctx context.Context) error {
return assertPayloadsBaseErr(
ctx, nada, []string{`nada: [2]->{"after": {}}`}, false, false)
})
return nil
}, 20*expectTimeout))

require.Error(t, observedError)
}

cdcTest(t, testFn)
}
35 changes: 31 additions & 4 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
Expand Down Expand Up @@ -82,9 +83,14 @@ func waitForSchemaChange(
})
}

func readNextMessages(f cdctest.TestFeed, numMessages int) ([]cdctest.TestFeedMessage, error) {
func readNextMessages(
ctx context.Context, f cdctest.TestFeed, numMessages int,
) ([]cdctest.TestFeedMessage, error) {
var actual []cdctest.TestFeedMessage
for len(actual) < numMessages {
if ctx.Err() != nil {
return nil, ctx.Err()
}
m, err := f.Next()
if log.V(1) {
if m != nil {
Expand Down Expand Up @@ -171,13 +177,18 @@ func assertPayloadsBase(
t testing.TB, f cdctest.TestFeed, expected []string, stripTs bool, perKeyOrdered bool,
) {
t.Helper()
require.NoError(t, assertPayloadsBaseErr(f, expected, stripTs, perKeyOrdered))
require.NoError(t,
withTimeout(f, assertPayloadsTimeout(),
func(ctx context.Context) error {
return assertPayloadsBaseErr(ctx, f, expected, stripTs, perKeyOrdered)
},
))
}

func assertPayloadsBaseErr(
f cdctest.TestFeed, expected []string, stripTs bool, perKeyOrdered bool,
ctx context.Context, f cdctest.TestFeed, expected []string, stripTs bool, perKeyOrdered bool,
) error {
actual, err := readNextMessages(f, len(expected))
actual, err := readNextMessages(ctx, f, len(expected))
if err != nil {
return err
}
Expand Down Expand Up @@ -216,6 +227,22 @@ func assertPayloadsBaseErr(
return nil
}

func assertPayloadsTimeout() time.Duration {
if util.RaceEnabled {
return 5 * time.Minute
}
return 30 * time.Second
}

func withTimeout(
f cdctest.TestFeed, timeout time.Duration, fn func(ctx context.Context) error,
) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
defer stopFeedWhenDone(ctx, f)()
return fn(ctx)
}

func assertPayloads(t testing.TB, f cdctest.TestFeed, expected []string) {
t.Helper()
assertPayloadsBase(t, f, expected, false, false)
Expand Down
56 changes: 20 additions & 36 deletions pkg/ccl/changefeedccl/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -236,47 +235,36 @@ func TestKafkaSink(t *testing.T) {
defer cleanup()

// No inflight
if err := sink.Flush(ctx); err != nil {
t.Fatal(err)
}
require.NoError(t, sink.Flush(ctx))

// Timeout
if err := sink.EmitRow(ctx, topic(`t`), []byte(`1`), nil, zeroTS, zeroTS, zeroAlloc); err != nil {
t.Fatal(err)
}
require.NoError(t,
sink.EmitRow(ctx, topic(`t`), []byte(`1`), nil, zeroTS, zeroTS, zeroAlloc))

m1 := <-p.inputCh
for i := 0; i < 2; i++ {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Millisecond)
defer cancel()
if err := sink.Flush(timeoutCtx); !testutils.IsError(
err, `context deadline exceeded`,
) {
t.Fatalf(`expected "context deadline exceeded" error got: %+v`, err)
}
require.True(t, errors.Is(context.DeadlineExceeded, sink.Flush(timeoutCtx)))
}
go func() { p.successesCh <- m1 }()
if err := sink.Flush(ctx); err != nil {
t.Fatal(err)
}
require.NoError(t, sink.Flush(ctx))

// Check no inflight again now that we've sent something
if err := sink.Flush(ctx); err != nil {
t.Fatal(err)
}
require.NoError(t, sink.Flush(ctx))

// Mixed success and error.
var pool testAllocPool
if err := sink.EmitRow(ctx, topic(`t`), []byte(`2`), nil, zeroTS, zeroTS, pool.alloc()); err != nil {
t.Fatal(err)
}
require.NoError(t, sink.EmitRow(ctx,
topic(`t`), []byte(`2`), nil, zeroTS, zeroTS, pool.alloc()))
m2 := <-p.inputCh
if err := sink.EmitRow(ctx, topic(`t`), []byte(`3`), nil, zeroTS, zeroTS, pool.alloc()); err != nil {
t.Fatal(err)
}
require.NoError(t, sink.EmitRow(
ctx, topic(`t`), []byte(`3`), nil, zeroTS, zeroTS, pool.alloc()))

m3 := <-p.inputCh
if err := sink.EmitRow(ctx, topic(`t`), []byte(`4`), nil, zeroTS, zeroTS, pool.alloc()); err != nil {
t.Fatal(err)
}
require.NoError(t, sink.EmitRow(
ctx, topic(`t`), []byte(`4`), nil, zeroTS, zeroTS, pool.alloc()))

m4 := <-p.inputCh
go func() { p.successesCh <- m2 }()
go func() {
Expand All @@ -286,19 +274,15 @@ func TestKafkaSink(t *testing.T) {
}
}()
go func() { p.successesCh <- m4 }()
if err := sink.Flush(ctx); !testutils.IsError(err, `m3`) {
t.Fatalf(`expected "m3" error got: %+v`, err)
}
require.Regexp(t, "m3", sink.Flush(ctx))

// Check simple success again after error
if err := sink.EmitRow(ctx, topic(`t`), []byte(`5`), nil, zeroTS, zeroTS, pool.alloc()); err != nil {
t.Fatal(err)
}
require.NoError(t, sink.EmitRow(
ctx, topic(`t`), []byte(`5`), nil, zeroTS, zeroTS, pool.alloc()))

m5 := <-p.inputCh
go func() { p.successesCh <- m5 }()
if err := sink.Flush(ctx); err != nil {
t.Fatal(err)
}
require.NoError(t, sink.Flush(ctx))
// At the end, all of the resources has been released
require.EqualValues(t, 0, pool.used())
}
Expand Down
79 changes: 71 additions & 8 deletions pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/json"
Expand Down Expand Up @@ -132,11 +134,22 @@ type sinklessFeed struct {

var _ cdctest.TestFeed = (*sinklessFeed)(nil)

func timeout() time.Duration {
if util.RaceEnabled {
return 5 * time.Minute
}
return 30 * time.Second
}

// Partitions implements the TestFeed interface.
func (c *sinklessFeed) Partitions() []string { return []string{`sinkless`} }

// Next implements the TestFeed interface.
func (c *sinklessFeed) Next() (*cdctest.TestFeedMessage, error) {
defer time.AfterFunc(timeout(), func() {
_ = c.conn.Close(context.Background())
}).Stop()

m := &cdctest.TestFeedMessage{Partition: `sinkless`}
for {
if !c.rows.Next() {
Expand Down Expand Up @@ -167,10 +180,8 @@ func (c *sinklessFeed) Next() (*cdctest.TestFeedMessage, error) {
}

// Resume implements the TestFeed interface.
func (c *sinklessFeed) start() error {
ctx := context.Background()
var err error
c.conn, err = pgx.ConnectConfig(ctx, c.connCfg)
func (c *sinklessFeed) start() (err error) {
c.conn, err = pgx.ConnectConfig(context.Background(), c.connCfg)
if err != nil {
return err
}
Expand All @@ -185,7 +196,7 @@ func (c *sinklessFeed) start() error {
create += fmt.Sprintf(` WITH cursor='%s'`, c.latestResolved.AsOfSystemTime())
}
}
c.rows, err = c.conn.Query(ctx, create, c.args...)
c.rows, err = c.conn.Query(context.Background(), create, c.args...)
return err
}

Expand Down Expand Up @@ -248,8 +259,12 @@ func newJobFeed(db *gosql.DB, wrapper wrapSinkFn) *jobFeed {
}
}

type jobFailedMarker interface {
jobFailed(err error)
}

// jobFailed marks this job as failed.
func (f *jobFeed) jobFailed() {
func (f *jobFeed) jobFailed(err error) {
// protect against almost concurrent terminations of the same job.
// this could happen if the caller invokes `cancel job` just as we're
// trying to close this feed. Part of jobFailed handling involves
Expand All @@ -261,7 +276,7 @@ func (f *jobFeed) jobFailed() {
// Already failed/done.
return
}
f.mu.terminalErr = f.FetchTerminalJobErr()
f.mu.terminalErr = err
close(f.shutdown)
}

Expand Down Expand Up @@ -527,7 +542,12 @@ func newDepInjector(srvs ...feedInjectable) *depInjector {
map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
jobspb.TypeChangefeed: func(raw jobs.Resumer) jobs.Resumer {
f := di.getJobFeed(raw.(*changefeedResumer).job.ID())
return &reportErrorResumer{wrapped: raw, jobFailed: f.jobFailed}
return &reportErrorResumer{
wrapped: raw,
jobFailed: func() {
f.jobFailed(f.FetchTerminalJobErr())
},
}
},
}
}
Expand Down Expand Up @@ -707,6 +727,8 @@ func (c *tableFeed) Next() (*cdctest.TestFeedMessage, error) {
}

select {
case <-time.After(timeout()):
return nil, &contextutil.TimeoutError{}
case <-c.ss.eventReady():
case <-c.shutdown:
return nil, c.terminalJobError()
Expand Down Expand Up @@ -960,6 +982,8 @@ func (c *cloudFeed) Next() (*cdctest.TestFeedMessage, error) {
}

select {
case <-time.After(timeout()):
return nil, &contextutil.TimeoutError{}
case <-c.ss.eventReady():
case <-c.shutdown:
return nil, c.terminalJobError()
Expand Down Expand Up @@ -1277,6 +1301,8 @@ func (k *kafkaFeed) Next() (*cdctest.TestFeedMessage, error) {
for {
var msg *sarama.ProducerMessage
select {
case <-time.After(timeout()):
return nil, &contextutil.TimeoutError{}
case <-k.shutdown:
return nil, k.terminalJobError()
case msg = <-k.source:
Expand Down Expand Up @@ -1541,6 +1567,8 @@ func (f *webhookFeed) Next() (*cdctest.TestFeedMessage, error) {
}

select {
case <-time.After(timeout()):
return nil, &contextutil.TimeoutError{}
case <-f.ss.eventReady():
case <-f.shutdown:
return nil, f.terminalJobError()
Expand Down Expand Up @@ -1784,6 +1812,8 @@ func (p *pubsubFeed) Next() (*cdctest.TestFeedMessage, error) {
return m, nil
}
select {
case <-time.After(timeout()):
return nil, &contextutil.TimeoutError{}
case <-p.ss.eventReady():
case <-p.shutdown:
return nil, p.terminalJobError()
Expand All @@ -1799,3 +1829,36 @@ func (p *pubsubFeed) Close() error {
}
return nil
}

// stopFeedWhenDone arranges for feed to stop when passed in context
// is done. Returns cleanup function.
func stopFeedWhenDone(ctx context.Context, f cdctest.TestFeed) func() {
done := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)

whenDone := func(fn func()) {
defer wg.Done()
select {
case <-done:
case <-ctx.Done():
fn()
}
}

switch t := f.(type) {
case *sinklessFeed:
go whenDone(func() {
_ = t.conn.Close(context.Background())
})
case jobFailedMarker:
go whenDone(func() {
t.jobFailed(context.Canceled)
})
}

return func() {
close(done)
wg.Wait()
}
}