Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc/sink: remove Initialize method from the sink interface (#3682) #3764

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,11 +458,6 @@ func (o *Owner) newChangeFeed(
}
}()

err = primarySink.Initialize(ctx, sinkTableInfo)
if err != nil {
log.Error("error on running owner", zap.Error(err))
}

var syncpointStore sink.SyncpointStore
if info.SyncPointEnabled {
syncpointStore, err = sink.NewSyncpointStore(ctx, id, info.SinkURI)
Expand Down
5 changes: 0 additions & 5 deletions cdc/owner/async_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ const (
// The EmitCheckpointTs and EmitDDLEvent is asynchronous function for now
// Other functions are still synchronization
type AsyncSink interface {
Initialize(ctx cdcContext.Context, tableInfo []*model.SimpleTableInfo) error
// EmitCheckpointTs emits the checkpoint Ts to downstream data source
// this function will return after recording the checkpointTs specified in memory immediately
// and the recorded checkpointTs will be sent and updated to downstream data source every second
Expand Down Expand Up @@ -101,10 +100,6 @@ func newAsyncSink(ctx cdcContext.Context) (AsyncSink, error) {
return asyncSink, nil
}

func (s *asyncSinkImpl) Initialize(ctx cdcContext.Context, tableInfo []*model.SimpleTableInfo) error {
return s.sink.Initialize(ctx, tableInfo)
}

func (s *asyncSinkImpl) run(ctx cdcContext.Context) {
defer s.wg.Done()
// TODO make the tick duration configurable
Expand Down
25 changes: 4 additions & 21 deletions cdc/owner/async_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,10 @@ type asyncSinkSuite struct {

type mockSink struct {
sink.Sink
initTableInfo []*model.SimpleTableInfo
checkpointTs model.Ts
ddl *model.DDLEvent
ddlMu sync.Mutex
ddlError error
}

func (m *mockSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
m.initTableInfo = tableInfo
return nil
checkpointTs model.Ts
ddl *model.DDLEvent
ddlMu sync.Mutex
ddlError error
}

func (m *mockSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
Expand Down Expand Up @@ -88,17 +82,6 @@ func newAsyncSink4Test(ctx cdcContext.Context, c *check.C) (cdcContext.Context,
return ctx, sink, mockSink
}

func (s *asyncSinkSuite) TestInitialize(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewBackendContext4Test(false)
ctx, sink, mockSink := newAsyncSink4Test(ctx, c)
defer sink.Close(ctx)
tableInfos := []*model.SimpleTableInfo{{Schema: "test"}}
err := sink.Initialize(ctx, tableInfos)
c.Assert(err, check.IsNil)
c.Assert(tableInfos, check.DeepEquals, mockSink.initTableInfo)
}

func (s *asyncSinkSuite) TestCheckpoint(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewBackendContext4Test(false)
Expand Down
5 changes: 1 addition & 4 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,7 @@ LOOP:
if err != nil {
return errors.Trace(err)
}
err = c.sink.Initialize(cancelCtx, c.schema.SinkTableInfos())
if err != nil {
return errors.Trace(err)
}

// Refer to the previous comment on why we use (checkpointTs-1).
c.ddlPuller, err = c.newDDLPuller(cancelCtx, checkpointTs-1)
if err != nil {
Expand Down
4 changes: 0 additions & 4 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ func (c *mockFlowController) GetConsumption() uint64 {
return 0
}

func (s *mockSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
return nil
}

func (s *mockSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
for _, row := range rows {
s.received = append(s.received, struct {
Expand Down
5 changes: 0 additions & 5 deletions cdc/sink/black_hole.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,6 @@ func (b *blackHoleSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) e
return nil
}

// Initialize is no-op for blackhole
func (b *blackHoleSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
return nil
}

func (b *blackHoleSink) Close(ctx context.Context) error {
return nil
}
Expand Down
8 changes: 0 additions & 8 deletions cdc/sink/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ type checkSink struct {
lastResolvedTs uint64
}

func (c *checkSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
panic("unreachable")
}

func (c *checkSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
c.rowsMu.Lock()
defer c.rowsMu.Unlock()
Expand Down Expand Up @@ -331,10 +327,6 @@ type errorSink struct {
*check.C
}

func (e *errorSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
panic("unreachable")
}

func (e *errorSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
return errors.New("error in emit row changed events")
}
Expand Down
6 changes: 0 additions & 6 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,12 +262,6 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
return errors.Trace(err)
}

// Initialize registers Avro schemas for all tables
func (k *mqSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
// No longer need it for now
return nil
}

func (k *mqSink) Close(ctx context.Context) error {
err := k.mqProducer.Close()
return errors.Trace(err)
Expand Down
5 changes: 0 additions & 5 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,6 @@ func (s *mysqlSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
return errors.Trace(err)
}

// Initialize is no-op for Mysql sink
func (s *mysqlSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
return nil
}

func (s *mysqlSink) execDDLWithMaxRetries(ctx context.Context, ddl *model.DDLEvent) error {
return retry.Do(ctx, func() error {
err := s.execDDL(ctx, ddl)
Expand Down
5 changes: 0 additions & 5 deletions cdc/sink/simple_mysql_tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,6 @@ func newSimpleMySQLSink(ctx context.Context, sinkURI *url.URL, config *config.Re
return sink, nil
}

func (s *simpleMySQLSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
// do nothing
return nil
}

// EmitRowChangedEvents sends Row Changed Event to Sink
// EmitRowChangedEvents may write rows to downstream directly;
func (s *simpleMySQLSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
Expand Down
2 changes: 0 additions & 2 deletions cdc/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ const (

// Sink is an abstraction for anything that a changefeed may emit into.
type Sink interface {
Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error

// EmitRowChangedEvents sends Row Changed Event to Sink
// EmitRowChangedEvents may write rows to downstream directly;
EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error
Expand Down