From 182474ffe396b58264b61277d4b28b2276153ec6 Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Wed, 1 Dec 2021 18:23:54 +0800 Subject: [PATCH 1/3] This is an automated cherry-pick of #3682 Signed-off-by: ti-chi-bot --- cdc/owner/async_sink.go | 5 -- cdc/owner/async_sink_test.go | 25 +----- cdc/owner/changefeed.go | 5 +- cdc/processor/pipeline/sink_test.go | 4 - cdc/sink/black_hole.go | 5 -- cdc/sink/manager_test.go | 8 -- cdc/sink/mq.go | 6 -- cdc/sink/mysql.go | 5 -- cdc/sink/simple_mysql_tester.go | 5 -- cdc/sink/sink.go | 2 - cdc/sink/table_sink.go | 116 ++++++++++++++++++++++++++++ 11 files changed, 121 insertions(+), 65 deletions(-) create mode 100644 cdc/sink/table_sink.go diff --git a/cdc/owner/async_sink.go b/cdc/owner/async_sink.go index 4e33b21f289..9c0ae2cf1be 100644 --- a/cdc/owner/async_sink.go +++ b/cdc/owner/async_sink.go @@ -38,7 +38,6 @@ const ( // The EmitCheckpointTs and EmitDDLEvent is asynchronous function for now // Other functions are still synchronization type AsyncSink interface { - Initialize(ctx cdcContext.Context, tableInfo []*model.SimpleTableInfo) error // EmitCheckpointTs emits the checkpoint Ts to downstream data source // this function will return after recording the checkpointTs specified in memory immediately // and the recorded checkpointTs will be sent and updated to downstream data source every second @@ -101,10 +100,6 @@ func newAsyncSink(ctx cdcContext.Context) (AsyncSink, error) { return asyncSink, nil } -func (s *asyncSinkImpl) Initialize(ctx cdcContext.Context, tableInfo []*model.SimpleTableInfo) error { - return s.sink.Initialize(ctx, tableInfo) -} - func (s *asyncSinkImpl) run(ctx cdcContext.Context) { defer s.wg.Done() // TODO make the tick duration configurable diff --git a/cdc/owner/async_sink_test.go b/cdc/owner/async_sink_test.go index fb1e1aadd01..7a868604e13 100644 --- a/cdc/owner/async_sink_test.go +++ b/cdc/owner/async_sink_test.go @@ -37,16 +37,10 @@ type asyncSinkSuite struct { type mockSink struct { sink.Sink - initTableInfo []*model.SimpleTableInfo - checkpointTs model.Ts - ddl *model.DDLEvent - ddlMu sync.Mutex - ddlError error -} - -func (m *mockSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { - m.initTableInfo = tableInfo - return nil + checkpointTs model.Ts + ddl *model.DDLEvent + ddlMu sync.Mutex + ddlError error } func (m *mockSink) EmitCheckpointTs(ctx context.Context, ts uint64) error { @@ -88,17 +82,6 @@ func newAsyncSink4Test(ctx cdcContext.Context, c *check.C) (cdcContext.Context, return ctx, sink, mockSink } -func (s *asyncSinkSuite) TestInitialize(c *check.C) { - defer testleak.AfterTest(c)() - ctx := cdcContext.NewBackendContext4Test(false) - ctx, sink, mockSink := newAsyncSink4Test(ctx, c) - defer sink.Close(ctx) - tableInfos := []*model.SimpleTableInfo{{Schema: "test"}} - err := sink.Initialize(ctx, tableInfos) - c.Assert(err, check.IsNil) - c.Assert(tableInfos, check.DeepEquals, mockSink.initTableInfo) -} - func (s *asyncSinkSuite) TestCheckpoint(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(false) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index c6670f5098c..7f7b82577af 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -249,10 +249,7 @@ LOOP: if err != nil { return errors.Trace(err) } - err = c.sink.Initialize(cancelCtx, c.schema.SinkTableInfos()) - if err != nil { - return errors.Trace(err) - } + // Refer to the previous comment on why we use (checkpointTs-1). c.ddlPuller, err = c.newDDLPuller(cancelCtx, checkpointTs-1) if err != nil { diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index 12ba3bb1fa5..cf84d48902f 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -58,10 +58,6 @@ func (c *mockFlowController) GetConsumption() uint64 { return 0 } -func (s *mockSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { - return nil -} - func (s *mockSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { for _, row := range rows { s.received = append(s.received, struct { diff --git a/cdc/sink/black_hole.go b/cdc/sink/black_hole.go index 3eca14a0119..da2f407df37 100644 --- a/cdc/sink/black_hole.go +++ b/cdc/sink/black_hole.go @@ -70,11 +70,6 @@ func (b *blackHoleSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) e return nil } -// Initialize is no-op for blackhole -func (b *blackHoleSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { - return nil -} - func (b *blackHoleSink) Close(ctx context.Context) error { return nil } diff --git a/cdc/sink/manager_test.go b/cdc/sink/manager_test.go index 354b06563ae..81d9a6afec8 100644 --- a/cdc/sink/manager_test.go +++ b/cdc/sink/manager_test.go @@ -40,10 +40,6 @@ type checkSink struct { lastResolvedTs uint64 } -func (c *checkSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { - panic("unreachable") -} - func (c *checkSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { c.rowsMu.Lock() defer c.rowsMu.Unlock() @@ -331,10 +327,6 @@ type errorSink struct { *check.C } -func (e *errorSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { - panic("unreachable") -} - func (e *errorSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { return errors.New("error in emit row changed events") } diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index a2a47f7fa66..e260ce71113 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -262,12 +262,6 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { return errors.Trace(err) } -// Initialize registers Avro schemas for all tables -func (k *mqSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { - // No longer need it for now - return nil -} - func (k *mqSink) Close(ctx context.Context) error { err := k.mqProducer.Close() return errors.Trace(err) diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index 7a1946f3824..bf24a758d5b 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -203,11 +203,6 @@ func (s *mysqlSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error return errors.Trace(err) } -// Initialize is no-op for Mysql sink -func (s *mysqlSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { - return nil -} - func (s *mysqlSink) execDDLWithMaxRetries(ctx context.Context, ddl *model.DDLEvent) error { return retry.Do(ctx, func() error { err := s.execDDL(ctx, ddl) diff --git a/cdc/sink/simple_mysql_tester.go b/cdc/sink/simple_mysql_tester.go index 3b3d7edc885..d24460f53aa 100644 --- a/cdc/sink/simple_mysql_tester.go +++ b/cdc/sink/simple_mysql_tester.go @@ -104,11 +104,6 @@ func newSimpleMySQLSink(ctx context.Context, sinkURI *url.URL, config *config.Re return sink, nil } -func (s *simpleMySQLSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { - // do nothing - return nil -} - // EmitRowChangedEvents sends Row Changed Event to Sink // EmitRowChangedEvents may write rows to downstream directly; func (s *simpleMySQLSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { diff --git a/cdc/sink/sink.go b/cdc/sink/sink.go index aa77e2059df..febb147f81a 100644 --- a/cdc/sink/sink.go +++ b/cdc/sink/sink.go @@ -33,8 +33,6 @@ const ( // Sink is an abstraction for anything that a changefeed may emit into. type Sink interface { - Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error - // EmitRowChangedEvents sends Row Changed Event to Sink // EmitRowChangedEvents may write rows to downstream directly; EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error diff --git a/cdc/sink/table_sink.go b/cdc/sink/table_sink.go new file mode 100644 index 00000000000..a57cc0123b6 --- /dev/null +++ b/cdc/sink/table_sink.go @@ -0,0 +1,116 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sink + +import ( + "context" + "sort" + "sync/atomic" + + "github.com/pingcap/errors" + "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/cdc/redo" +) + +type tableSink struct { + tableID model.TableID + manager *Manager + buffer []*model.RowChangedEvent + // emittedTs means all of events which of commitTs less than or equal to emittedTs is sent to backendSink + emittedTs model.Ts + redoManager redo.LogManager +} + +func (t *tableSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { + t.buffer = append(t.buffer, rows...) + t.manager.metricsTableSinkTotalRows.Add(float64(len(rows))) + if t.redoManager.Enabled() { + return t.redoManager.EmitRowChangedEvents(ctx, t.tableID, rows...) + } + return nil +} + +func (t *tableSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { + // the table sink doesn't receive the DDL event + return nil +} + +// FlushRowChangedEvents flushes sorted rows to sink manager, note the resolvedTs +// is required to be no more than global resolvedTs, table barrierTs and table +// redo log watermarkTs. +func (t *tableSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { + i := sort.Search(len(t.buffer), func(i int) bool { + return t.buffer[i].CommitTs > resolvedTs + }) + if i == 0 { + atomic.StoreUint64(&t.emittedTs, resolvedTs) + ckpt, err := t.flushRedoLogs(ctx, resolvedTs) + if err != nil { + return ckpt, err + } + return t.manager.flushBackendSink(ctx, tableID) + } + resolvedRows := t.buffer[:i] + t.buffer = append(make([]*model.RowChangedEvent, 0, len(t.buffer[i:])), t.buffer[i:]...) + + err := t.manager.backendSink.EmitRowChangedEvents(ctx, resolvedRows...) + if err != nil { + return t.manager.getCheckpointTs(tableID), errors.Trace(err) + } + atomic.StoreUint64(&t.emittedTs, resolvedTs) + ckpt, err := t.flushRedoLogs(ctx, resolvedTs) + if err != nil { + return ckpt, err + } + return t.manager.flushBackendSink(ctx, tableID) +} + +func (t *tableSink) flushRedoLogs(ctx context.Context, resolvedTs uint64) (uint64, error) { + if t.redoManager.Enabled() { + err := t.redoManager.FlushLog(ctx, t.tableID, resolvedTs) + if err != nil { + return t.manager.getCheckpointTs(t.tableID), err + } + } + return 0, nil +} + +// getResolvedTs returns resolved ts, which means all events before resolved ts +// have been sent to sink manager, if redo log is enabled, it also means all events +// before resolved ts have been persisted to redo log storage. +func (t *tableSink) getResolvedTs() uint64 { + ts := atomic.LoadUint64(&t.emittedTs) + if t.redoManager.Enabled() { + redoResolvedTs := t.redoManager.GetMinResolvedTs() + if redoResolvedTs < ts { + ts = redoResolvedTs + } + } + return ts +} + +func (t *tableSink) EmitCheckpointTs(ctx context.Context, ts uint64) error { + // the table sink doesn't receive the checkpoint event + return nil +} + +// Note once the Close is called, no more events can be written to this table sink +func (t *tableSink) Close(ctx context.Context) error { + return t.manager.destroyTableSink(ctx, t.tableID) +} + +// Barrier is not used in table sink +func (t *tableSink) Barrier(ctx context.Context, tableID model.TableID) error { + return nil +} From 081c2216528cd1bfd382b60e04e0829847cc98b5 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 11 Jan 2022 13:20:39 +0800 Subject: [PATCH 2/3] remove table_sink. --- cdc/sink/table_sink.go | 116 ----------------------------------------- 1 file changed, 116 deletions(-) delete mode 100644 cdc/sink/table_sink.go diff --git a/cdc/sink/table_sink.go b/cdc/sink/table_sink.go deleted file mode 100644 index a57cc0123b6..00000000000 --- a/cdc/sink/table_sink.go +++ /dev/null @@ -1,116 +0,0 @@ -// Copyright 2021 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package sink - -import ( - "context" - "sort" - "sync/atomic" - - "github.com/pingcap/errors" - "github.com/pingcap/ticdc/cdc/model" - "github.com/pingcap/ticdc/cdc/redo" -) - -type tableSink struct { - tableID model.TableID - manager *Manager - buffer []*model.RowChangedEvent - // emittedTs means all of events which of commitTs less than or equal to emittedTs is sent to backendSink - emittedTs model.Ts - redoManager redo.LogManager -} - -func (t *tableSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { - t.buffer = append(t.buffer, rows...) - t.manager.metricsTableSinkTotalRows.Add(float64(len(rows))) - if t.redoManager.Enabled() { - return t.redoManager.EmitRowChangedEvents(ctx, t.tableID, rows...) - } - return nil -} - -func (t *tableSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { - // the table sink doesn't receive the DDL event - return nil -} - -// FlushRowChangedEvents flushes sorted rows to sink manager, note the resolvedTs -// is required to be no more than global resolvedTs, table barrierTs and table -// redo log watermarkTs. -func (t *tableSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { - i := sort.Search(len(t.buffer), func(i int) bool { - return t.buffer[i].CommitTs > resolvedTs - }) - if i == 0 { - atomic.StoreUint64(&t.emittedTs, resolvedTs) - ckpt, err := t.flushRedoLogs(ctx, resolvedTs) - if err != nil { - return ckpt, err - } - return t.manager.flushBackendSink(ctx, tableID) - } - resolvedRows := t.buffer[:i] - t.buffer = append(make([]*model.RowChangedEvent, 0, len(t.buffer[i:])), t.buffer[i:]...) - - err := t.manager.backendSink.EmitRowChangedEvents(ctx, resolvedRows...) - if err != nil { - return t.manager.getCheckpointTs(tableID), errors.Trace(err) - } - atomic.StoreUint64(&t.emittedTs, resolvedTs) - ckpt, err := t.flushRedoLogs(ctx, resolvedTs) - if err != nil { - return ckpt, err - } - return t.manager.flushBackendSink(ctx, tableID) -} - -func (t *tableSink) flushRedoLogs(ctx context.Context, resolvedTs uint64) (uint64, error) { - if t.redoManager.Enabled() { - err := t.redoManager.FlushLog(ctx, t.tableID, resolvedTs) - if err != nil { - return t.manager.getCheckpointTs(t.tableID), err - } - } - return 0, nil -} - -// getResolvedTs returns resolved ts, which means all events before resolved ts -// have been sent to sink manager, if redo log is enabled, it also means all events -// before resolved ts have been persisted to redo log storage. -func (t *tableSink) getResolvedTs() uint64 { - ts := atomic.LoadUint64(&t.emittedTs) - if t.redoManager.Enabled() { - redoResolvedTs := t.redoManager.GetMinResolvedTs() - if redoResolvedTs < ts { - ts = redoResolvedTs - } - } - return ts -} - -func (t *tableSink) EmitCheckpointTs(ctx context.Context, ts uint64) error { - // the table sink doesn't receive the checkpoint event - return nil -} - -// Note once the Close is called, no more events can be written to this table sink -func (t *tableSink) Close(ctx context.Context) error { - return t.manager.destroyTableSink(ctx, t.tableID) -} - -// Barrier is not used in table sink -func (t *tableSink) Barrier(ctx context.Context, tableID model.TableID) error { - return nil -} From f410a4a9b2c52f2be253967f0fcef411134eb64e Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 11 Jan 2022 18:46:15 +0800 Subject: [PATCH 3/3] remove initialization. --- cdc/owner.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/cdc/owner.go b/cdc/owner.go index 5d713631fbb..b55910716c4 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -458,11 +458,6 @@ func (o *Owner) newChangeFeed( } }() - err = primarySink.Initialize(ctx, sinkTableInfo) - if err != nil { - log.Error("error on running owner", zap.Error(err)) - } - var syncpointStore sink.SyncpointStore if info.SyncPointEnabled { syncpointStore, err = sink.NewSyncpointStore(ctx, id, info.SinkURI)