From bed252f76456debba149683cc22ec58bb58458df Mon Sep 17 00:00:00 2001 From: Jianyuan Jiang Date: Thu, 21 Apr 2022 09:06:03 +0800 Subject: [PATCH] This is an automated cherry-pick of #5196 Signed-off-by: ti-chi-bot --- cdc/processor/pipeline/sink_test.go | 12 + cdc/processor/pipeline/sorter.go | 25 ++ cdc/processor/pipeline/table_actor.go | 505 ++++++++++++++++++++++++++ cdc/processor/processor.go | 36 ++ cdc/sink/black_hole.go | 16 + cdc/sink/buffer_sink.go | 223 ++++++++++++ cdc/sink/buffer_sink_test.go | 78 ++++ cdc/sink/common/common.go | 7 + cdc/sink/manager_test.go | 64 +++- cdc/sink/mq/mq.go | 501 +++++++++++++++++++++++++ cdc/sink/mysql.go | 30 ++ cdc/sink/mysql_test.go | 29 ++ cdc/sink/simple_mysql_tester.go | 12 + cdc/sink/sink.go | 5 + cdc/sink/sink_manager.go | 110 ++++++ cdc/sink/table_sink.go | 130 +++++++ 16 files changed, 1781 insertions(+), 2 deletions(-) create mode 100644 cdc/processor/pipeline/table_actor.go create mode 100644 cdc/sink/buffer_sink.go create mode 100644 cdc/sink/mq/mq.go create mode 100644 cdc/sink/sink_manager.go create mode 100644 cdc/sink/table_sink.go diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index 1a3a41614ea..7a8df2d4874 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -58,6 +58,18 @@ func (c *mockFlowController) GetConsumption() uint64 { return 0 } +<<<<<<< HEAD +======= +func (s *mockSink) Init(tableID model.TableID) error { + return nil +} + +func (s *mockSink) TryEmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) (bool, error) { + _ = s.EmitRowChangedEvents(ctx, rows...) + return true, nil +} + +>>>>>>> c6966a492 (sink(ticdc): refine sink interface and add init method (#5196)) func (s *mockSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { for _, row := range rows { s.received = append(s.received, struct { diff --git a/cdc/processor/pipeline/sorter.go b/cdc/processor/pipeline/sorter.go index e1d7461c0e4..08259e67b29 100644 --- a/cdc/processor/pipeline/sorter.go +++ b/cdc/processor/pipeline/sorter.go @@ -194,11 +194,36 @@ func (n *sorterNode) Receive(ctx pipeline.NodeContext) error { default: ctx.SendToNextNode(msg) } +<<<<<<< HEAD return nil +======= +} + +func (n *sorterNode) releaseResource(changefeedID string) { + defer tableMemoryHistogram.DeleteLabelValues(changefeedID) + // Since the flowController is implemented by `Cond`, it is not cancelable by a context + // the flowController will be blocked in a background goroutine, + // We need to abort the flowController manually in the nodeRunner + n.flowController.Abort() +>>>>>>> c6966a492 (sink(ticdc): refine sink interface and add init method (#5196)) } func (n *sorterNode) Destroy(ctx pipeline.NodeContext) error { defer tableMemoryHistogram.DeleteLabelValues(ctx.ChangefeedVars().ID, ctx.GlobalVars().CaptureInfo.AdvertiseAddr) n.cancel() +<<<<<<< HEAD return n.wg.Wait() +======= + n.releaseResource(ctx.ChangefeedVars().ID) + return n.eg.Wait() +} + +func (n *sorterNode) ResolvedTs() model.Ts { + return atomic.LoadUint64(&n.resolvedTs) +} + +// BarrierTs returns the sorter barrierTs +func (n *sorterNode) BarrierTs() model.Ts { + return atomic.LoadUint64(&n.barrierTs) +>>>>>>> c6966a492 (sink(ticdc): refine sink interface and add init method (#5196)) } diff --git a/cdc/processor/pipeline/table_actor.go b/cdc/processor/pipeline/table_actor.go new file mode 100644 index 00000000000..66ca210fbb0 --- /dev/null +++ b/cdc/processor/pipeline/table_actor.go @@ -0,0 +1,505 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipeline + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/entry" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/redo" + "github.com/pingcap/tiflow/cdc/sink" + "github.com/pingcap/tiflow/cdc/sink/flowcontrol" + "github.com/pingcap/tiflow/pkg/actor" + "github.com/pingcap/tiflow/pkg/actor/message" + serverConfig "github.com/pingcap/tiflow/pkg/config" + cdcContext "github.com/pingcap/tiflow/pkg/context" + cerror "github.com/pingcap/tiflow/pkg/errors" + pmessage "github.com/pingcap/tiflow/pkg/pipeline/message" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +var ( + _ TablePipeline = (*tableActor)(nil) + _ actor.Actor[pmessage.Message] = (*tableActor)(nil) + stopped = uint32(1) +) + +const sinkFlushInterval = 500 * time.Millisecond + +type tableActor struct { + actorID actor.ID + mb actor.Mailbox[pmessage.Message] + router *actor.Router[pmessage.Message] + // all goroutines in tableActor should be spawned from this wg + wg *errgroup.Group + // backend mounter + mounter entry.Mounter + // backend tableSink + tableSink sink.Sink + + pullerNode *pullerNode + sortNode *sorterNode + sinkNode *sinkNode + // contains all nodes except pullerNode + nodes []*ActorNode + + // states of table actor + started bool + stopped uint32 + stopLock sync.Mutex + // TODO: try to reduce these config fields below in the future + tableID int64 + markTableID int64 + cyclicEnabled bool + targetTs model.Ts + memoryQuota uint64 + replicaInfo *model.TableReplicaInfo + replicaConfig *serverConfig.ReplicaConfig + changefeedVars *cdcContext.ChangefeedVars + globalVars *cdcContext.GlobalVars + // these fields below are used in logs and metrics only + changefeedID string + tableName string + + // use to report error to processor + reportErr func(error) + // stopCtx use to stop table actor + stopCtx context.Context + // cancel use to cancel all goroutines spawned from table actor + cancel context.CancelFunc + + lastFlushSinkTime time.Time +} + +// NewTableActor creates a table actor and starts it. +func NewTableActor(cdcCtx cdcContext.Context, + mounter entry.Mounter, + tableID model.TableID, + tableName string, + replicaInfo *model.TableReplicaInfo, + sink sink.Sink, + targetTs model.Ts, +) (TablePipeline, error) { + config := cdcCtx.ChangefeedVars().Info.Config + cyclicEnabled := config.Cyclic != nil && config.Cyclic.IsEnabled() + changefeedVars := cdcCtx.ChangefeedVars() + globalVars := cdcCtx.GlobalVars() + + actorID := globalVars.TableActorSystem.ActorID() + mb := actor.NewMailbox[pmessage.Message](actorID, defaultOutputChannelSize) + // Cancel should be able to release all sub-goroutines in this actor. + ctx, cancel := context.WithCancel(cdcCtx) + // All sub-goroutines should be spawn in this wait group. + wg, cctx := errgroup.WithContext(ctx) + + table := &tableActor{ + // all errors in table actor will be reported to processor + reportErr: cdcCtx.Throw, + mb: mb, + wg: wg, + cancel: cancel, + + tableID: tableID, + markTableID: replicaInfo.MarkTableID, + tableName: tableName, + cyclicEnabled: cyclicEnabled, + memoryQuota: serverConfig.GetGlobalServerConfig().PerTableMemoryQuota, + mounter: mounter, + replicaInfo: replicaInfo, + replicaConfig: config, + tableSink: sink, + targetTs: targetTs, + started: false, + + changefeedID: changefeedVars.ID, + changefeedVars: changefeedVars, + globalVars: globalVars, + router: globalVars.TableActorSystem.Router(), + actorID: actorID, + + stopCtx: cctx, + } + + startTime := time.Now() + log.Info("table actor starting", + zap.String("changefeed", table.changefeedID), + zap.String("tableName", tableName), + zap.Int64("tableID", tableID)) + if err := table.start(cctx); err != nil { + table.stop(err) + return nil, errors.Trace(err) + } + err := globalVars.TableActorSystem.System().Spawn(mb, table) + if err != nil { + return nil, errors.Trace(err) + } + log.Info("table actor started", + zap.String("changefeed", table.changefeedID), + zap.String("tableName", tableName), + zap.Int64("tableID", tableID), + zap.Duration("duration", time.Since(startTime))) + return table, nil +} + +// OnClose implements Actor interface. +// TODO: implements table actor stop here. +func (t *tableActor) OnClose() { +} + +func (t *tableActor) Poll(ctx context.Context, msgs []message.Message[pmessage.Message]) bool { + for i := range msgs { + if atomic.LoadUint32(&t.stopped) == stopped { + // No need to handle remaining messages. + return false + } + + var err error + switch msgs[i].Tp { + case message.TypeValue: + switch msgs[i].Value.Tp { + case pmessage.MessageTypeBarrier: + err = t.handleBarrierMsg(ctx, msgs[i].Value.BarrierTs) + case pmessage.MessageTypeTick: + err = t.handleTickMsg(ctx) + } + case message.TypeStop: + t.handleStopMsg(ctx) + } + if err != nil { + log.Error("failed to process message, stop table actor ", + zap.String("tableName", t.tableName), + zap.Int64("tableID", t.tableID), + zap.Any("message", msgs[i]), + zap.Error(err)) + t.handleError(err) + break + } + + // process message for each node, pull message from parent node and then send it to next node + if err := t.handleDataMsg(ctx); err != nil { + log.Error("failed to process message, stop table actor ", + zap.String("tableName", t.tableName), + zap.Int64("tableID", t.tableID), zap.Error(err)) + t.handleError(err) + break + } + } + if atomic.LoadUint32(&t.stopped) == stopped { + log.Error("table actor removed", + zap.String("changefeed", t.changefeedID), + zap.String("tableName", t.tableName), + zap.Int64("tableID", t.tableID)) + return false + } + return true +} + +func (t *tableActor) handleDataMsg(ctx context.Context) error { + for _, n := range t.nodes { + if err := n.TryRun(ctx); err != nil { + return err + } + } + return nil +} + +func (t *tableActor) handleBarrierMsg(ctx context.Context, barrierTs model.Ts) error { + t.sortNode.updateBarrierTs(barrierTs) + return t.sinkNode.updateBarrierTs(ctx, barrierTs) +} + +func (t *tableActor) handleTickMsg(ctx context.Context) error { + // tick message flush the raw event to sink, follow the old pipeline implementation, batch flush the events every 500ms + if time.Since(t.lastFlushSinkTime) > sinkFlushInterval { + _, err := t.sinkNode.HandleMessage(ctx, pmessage.TickMessage()) + if err != nil { + return err + } + t.lastFlushSinkTime = time.Now() + } + return nil +} + +func (t *tableActor) handleStopMsg(ctx context.Context) { + // async stops sinkNode and tableSink + go func() { + _, err := t.sinkNode.HandleMessage(ctx, + pmessage.CommandMessage(&pmessage.Command{Tp: pmessage.CommandTypeStop}), + ) + if err != nil { + t.handleError(err) + } + }() +} + +func (t *tableActor) start(sdtTableContext context.Context) error { + if t.started { + log.Panic("start an already started table", + zap.String("changefeedID", t.changefeedID), + zap.Int64("tableID", t.tableID), + zap.String("tableName", t.tableName)) + } + log.Debug("creating table flow controller", + zap.String("changefeedID", t.changefeedID), + zap.Int64("tableID", t.tableID), + zap.String("tableName", t.tableName), + zap.Uint64("quota", t.memoryQuota)) + + flowController := flowcontrol.NewTableFlowController(t.memoryQuota) + sorterNode := newSorterNode(t.tableName, t.tableID, + t.replicaInfo.StartTs, flowController, + t.mounter, t.replicaConfig, + ) + t.sortNode = sorterNode + sortActorNodeContext := newContext(sdtTableContext, t.tableName, + t.globalVars.TableActorSystem.Router(), + t.actorID, t.changefeedVars, t.globalVars, t.reportErr) + if err := startSorter(t, sortActorNodeContext); err != nil { + log.Error("sorter fails to start", + zap.String("tableName", t.tableName), + zap.Int64("tableID", t.tableID), + zap.Error(err)) + return err + } + + pullerNode := newPullerNode(t.tableID, t.replicaInfo, t.tableName, t.changefeedVars.ID) + pullerActorNodeContext := newContext(sdtTableContext, + t.tableName, + t.globalVars.TableActorSystem.Router(), + t.actorID, t.changefeedVars, t.globalVars, t.reportErr) + t.pullerNode = pullerNode + if err := startPuller(t, pullerActorNodeContext); err != nil { + log.Error("puller fails to start", + zap.String("tableName", t.tableName), + zap.Int64("tableID", t.tableID), + zap.Error(err)) + return err + } + + messageFetchFunc, err := t.getSinkAsyncMessageHolder(sdtTableContext, sortActorNodeContext) + if err != nil { + return errors.Trace(err) + } + + actorSinkNode := newSinkNode(t.tableID, t.tableSink, + t.replicaInfo.StartTs, + t.targetTs, flowController) + actorSinkNode.initWithReplicaConfig(true, t.replicaConfig) + t.sinkNode = actorSinkNode + + // construct sink actor node, it gets message from sortNode or cyclicNode + var messageProcessFunc asyncMessageProcessorFunc = func( + ctx context.Context, msg pmessage.Message, + ) (bool, error) { + return actorSinkNode.HandleMessage(sdtTableContext, msg) + } + t.nodes = append(t.nodes, NewActorNode(messageFetchFunc, messageProcessFunc)) + + t.started = true + log.Info("table actor is started", + zap.String("tableName", t.tableName), + zap.Int64("tableID", t.tableID)) + return nil +} + +func (t *tableActor) getSinkAsyncMessageHolder( + sdtTableContext context.Context, + sortActorNodeContext *actorNodeContext) (AsyncMessageHolder, error, +) { + var messageFetchFunc asyncMessageHolderFunc = func() *pmessage.Message { + return sortActorNodeContext.tryGetProcessedMessage() + } + // check if cyclic feature is enabled + if t.cyclicEnabled { + cyclicNode := newCyclicMarkNode(t.markTableID) + cyclicActorNodeContext := newCyclicNodeContext( + newContext(sdtTableContext, t.tableName, + t.globalVars.TableActorSystem.Router(), + t.actorID, t.changefeedVars, + t.globalVars, t.reportErr)) + if err := cyclicNode.Init(cyclicActorNodeContext); err != nil { + log.Error("failed to start cyclic node", + zap.String("tableName", t.tableName), + zap.Int64("tableID", t.tableID), + zap.Error(err)) + return nil, err + } + + // construct cyclic actor node if it's enabled, it gets message from sortNode + var messageProcessFunc asyncMessageProcessorFunc = func( + ctx context.Context, msg pmessage.Message, + ) (bool, error) { + return cyclicNode.TryHandleDataMessage(cyclicActorNodeContext, msg) + } + t.nodes = append(t.nodes, NewActorNode(messageFetchFunc, messageProcessFunc)) + messageFetchFunc = func() *pmessage.Message { + return cyclicActorNodeContext.tryGetProcessedMessage() + } + } + return messageFetchFunc, nil +} + +// stop will set this table actor state to stopped and releases all goroutines spawned +// from this table actor +func (t *tableActor) stop(err error) { + log.Info("table actor begin to stop....", + zap.String("changefeed", t.changefeedID), + zap.String("tableName", t.tableName)) + t.stopLock.Lock() + defer t.stopLock.Unlock() + if atomic.LoadUint32(&t.stopped) == stopped { + log.Info("table actor is already stopped", + zap.String("changefeed", t.changefeedID), + zap.String("tableName", t.tableName)) + return + } + atomic.StoreUint32(&t.stopped, stopped) + if t.sortNode != nil { + // releaseResource will send a message to sorter router + t.sortNode.releaseResource(t.changefeedID) + } + t.cancel() + if t.sinkNode != nil { + if err := t.sinkNode.releaseResource(t.stopCtx); err != nil { + log.Warn("close sink failed", + zap.String("changefeed", t.changefeedID), + zap.String("tableName", t.tableName), + zap.Error(err)) + } + } + log.Info("table actor stopped", + zap.String("changefeed", t.changefeedID), + zap.String("tableName", t.tableName), + zap.Int64("tableID", t.tableID), + zap.Error(err)) +} + +// handleError stops the table actor at first and then reports the error to processor +func (t *tableActor) handleError(err error) { + t.stop(err) + if !cerror.ErrTableProcessorStoppedSafely.Equal(err) { + t.reportErr(err) + } +} + +// ============ Implement TablePipline, must be threadsafe ============ + +// ResolvedTs returns the resolved ts in this table pipeline +func (t *tableActor) ResolvedTs() model.Ts { + // TODO: after TiCDC introduces p2p based resolved ts mechanism, TiCDC nodes + // will be able to cooperate replication status directly. Then we will add + // another replication barrier for consistent replication instead of reusing + // the global resolved-ts. + if redo.IsConsistentEnabled(t.replicaConfig.Consistent.Level) { + return t.sinkNode.ResolvedTs() + } + return t.sortNode.ResolvedTs() +} + +// CheckpointTs returns the checkpoint ts in this table pipeline +func (t *tableActor) CheckpointTs() model.Ts { + return t.sinkNode.CheckpointTs() +} + +// UpdateBarrierTs updates the barrier ts in this table pipeline +func (t *tableActor) UpdateBarrierTs(ts model.Ts) { + msg := pmessage.BarrierMessage(ts) + err := t.router.Send(t.actorID, message.ValueMessage(msg)) + if err != nil { + log.Warn("send fails", + zap.Reflect("msg", msg), + zap.String("tableName", t.tableName), + zap.Int64("tableID", t.tableID), + zap.Error(err)) + } +} + +// AsyncStop tells the pipeline to stop, and returns true if the pipeline is already stopped. +func (t *tableActor) AsyncStop(targetTs model.Ts) bool { + // TypeStop stop the sinkNode only ,the processor stop the sink to release some resource + // and then stop the whole table pipeline by call Cancel + msg := message.StopMessage[pmessage.Message]() + err := t.router.Send(t.actorID, msg) + log.Info("send async stop signal to table", + zap.String("tableName", t.tableName), + zap.Int64("tableID", t.tableID), + zap.Uint64("targetTs", targetTs)) + if err != nil { + if cerror.ErrMailboxFull.Equal(err) { + return false + } + if cerror.ErrActorNotFound.Equal(err) || cerror.ErrActorStopped.Equal(err) { + return true + } + log.Panic("send fails", zap.Reflect("msg", msg), zap.Error(err)) + } + return true +} + +// Workload returns the workload of this table pipeline +func (t *tableActor) Workload() model.WorkloadInfo { + // We temporarily set the value to constant 1 + return workload +} + +// Status returns the status of this table pipeline +func (t *tableActor) Status() TableStatus { + return t.sinkNode.Status() +} + +// ID returns the ID of source table and mark table +func (t *tableActor) ID() (tableID, markTableID int64) { + return t.tableID, t.markTableID +} + +// Name returns the quoted schema and table name +func (t *tableActor) Name() string { + return t.tableName +} + +// Cancel stops this table pipeline immediately and destroy all resources +// created by this table pipeline +func (t *tableActor) Cancel() { + // cancel wait group, release resource and mark the status as stopped + t.stop(nil) + // actor is closed, tick actor to remove this actor router + msg := pmessage.TickMessage() + if err := t.router.Send(t.mb.ID(), message.ValueMessage(msg)); err != nil { + log.Warn("fails to send Stop message", + zap.String("tableName", t.tableName), + zap.Int64("tableID", t.tableID), + zap.Error(err)) + } +} + +// Wait waits for table pipeline destroyed +func (t *tableActor) Wait() { + _ = t.wg.Wait() +} + +// for ut +var startPuller = func(t *tableActor, ctx *actorNodeContext) error { + return t.pullerNode.start(ctx, t.wg, true, t.sortNode) +} + +var startSorter = func(t *tableActor, ctx *actorNodeContext) error { + return t.sortNode.start(ctx, true, t.wg, t.actorID, t.router) +} diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 1bc9e96aa7d..33b7174fa14 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -735,6 +735,7 @@ func (p *processor) createTablePipelineImpl(ctx cdcContext.Context, tableID mode tableNameStr = tableName.QuoteString() } +<<<<<<< HEAD sink := p.sinkManager.CreateTableSink(tableID, replicaInfo.StartTs) table := tablepipeline.NewTablePipeline( ctx, @@ -756,6 +757,41 @@ func (p *processor) createTablePipelineImpl(ctx cdcContext.Context, tableID mode zap.String("name", table.Name()), zap.Any("replicaInfo", replicaInfo)) }() +======= + sink, err := p.sinkManager.CreateTableSink(tableID, p.redoManager) + if err != nil { + return nil, errors.Trace(err) + } + var table tablepipeline.TablePipeline + if config.GetGlobalServerConfig().Debug.EnableTableActor { + var err error + table, err = tablepipeline.NewTableActor( + ctx, + p.mounter, + tableID, + tableNameStr, + replicaInfo, + sink, + p.changefeed.Info.GetTargetTs()) + if err != nil { + return nil, errors.Trace(err) + } + } else { + table = tablepipeline.NewTablePipeline( + ctx, + p.mounter, + tableID, + tableNameStr, + replicaInfo, + sink, + p.changefeed.Info.GetTargetTs(), + ) + } + + if p.redoManager.Enabled() { + p.redoManager.AddTable(tableID, replicaInfo.StartTs) + } +>>>>>>> c6966a492 (sink(ticdc): refine sink interface and add init method (#5196)) log.Info("Add table pipeline", zap.Int64("tableID", tableID), cdcContext.ZapFieldChangefeed(ctx), diff --git a/cdc/sink/black_hole.go b/cdc/sink/black_hole.go index 25806baaee1..b2571a35e1a 100644 --- a/cdc/sink/black_hole.go +++ b/cdc/sink/black_hole.go @@ -35,6 +35,22 @@ type blackHoleSink struct { lastAccumulated uint64 } +<<<<<<< HEAD +======= +// Init table sink resources +func (b *blackHoleSink) Init(tableID model.TableID) error { + return nil +} + +func (b *blackHoleSink) TryEmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) (bool, error) { + err := b.EmitRowChangedEvents(ctx, rows...) + if err != nil { + return false, err + } + return true, nil +} + +>>>>>>> c6966a492 (sink(ticdc): refine sink interface and add init method (#5196)) func (b *blackHoleSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { for _, row := range rows { log.Debug("BlockHoleSink: EmitRowChangedEvents", zap.Any("row", row)) diff --git a/cdc/sink/buffer_sink.go b/cdc/sink/buffer_sink.go new file mode 100644 index 00000000000..c830bfa3588 --- /dev/null +++ b/cdc/sink/buffer_sink.go @@ -0,0 +1,223 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sink + +import ( + "context" + "sort" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sink/metrics" + "github.com/pingcap/tiflow/pkg/util" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" +) + +const maxFlushBatchSize = 512 + +// bufferSink buffers emitted events and checkpoints and flush asynchronously. +// Note that it is a thread-safe Sink implementation. +type bufferSink struct { + Sink + changeFeedCheckpointTs uint64 + tableCheckpointTsMap sync.Map + buffer map[model.TableID][]*model.RowChangedEvent + bufferMu sync.Mutex + flushTsChan chan flushMsg +} + +var _ Sink = (*bufferSink)(nil) + +func newBufferSink( + backendSink Sink, checkpointTs model.Ts, +) *bufferSink { + sink := &bufferSink{ + Sink: backendSink, + // buffer shares the same flow control with table sink + buffer: make(map[model.TableID][]*model.RowChangedEvent), + changeFeedCheckpointTs: checkpointTs, + flushTsChan: make(chan flushMsg, maxFlushBatchSize), + } + return sink +} + +type runState struct { + batch [maxFlushBatchSize]flushMsg + + metricTotalRows prometheus.Counter +} + +func (b *bufferSink) run(ctx context.Context, changefeedID string, errCh chan error) { + state := runState{ + metricTotalRows: metrics.BufferSinkTotalRowsCountCounter.WithLabelValues(changefeedID), + } + defer func() { + metrics.BufferSinkTotalRowsCountCounter.DeleteLabelValues(changefeedID) + }() + + for { + keepRun, err := b.runOnce(ctx, &state) + if err != nil && errors.Cause(err) != context.Canceled { + errCh <- err + return + } + if !keepRun { + return + } + } +} + +func (b *bufferSink) runOnce(ctx context.Context, state *runState) (bool, error) { + batchSize, batch := 0, state.batch + push := func(event flushMsg) { + batch[batchSize] = event + batchSize++ + } + select { + case <-ctx.Done(): + return false, ctx.Err() + case event := <-b.flushTsChan: + push(event) + RecvBatch: + for batchSize < maxFlushBatchSize { + select { + case event := <-b.flushTsChan: + push(event) + default: + break RecvBatch + } + } + } + + start := time.Now() + b.bufferMu.Lock() + // find all rows before resolvedTs and emit to backend sink + for i := 0; i < batchSize; i++ { + tableID, resolvedTs := batch[i].tableID, batch[i].resolvedTs + rows := b.buffer[tableID] + i := sort.Search(len(rows), func(i int) bool { + return rows[i].CommitTs > resolvedTs + }) + if i == 0 { + continue + } + state.metricTotalRows.Add(float64(i)) + + err := b.Sink.EmitRowChangedEvents(ctx, rows[:i]...) + if err != nil { + b.bufferMu.Unlock() + return false, errors.Trace(err) + } + + // put remaining rows back to buffer + // append to a new, fixed slice to avoid lazy GC + b.buffer[tableID] = append(make([]*model.RowChangedEvent, 0, len(rows[i:])), rows[i:]...) + } + b.bufferMu.Unlock() + + for i := 0; i < batchSize; i++ { + tableID, resolvedTs := batch[i].tableID, batch[i].resolvedTs + checkpointTs, err := b.Sink.FlushRowChangedEvents(ctx, tableID, resolvedTs) + if err != nil { + return false, errors.Trace(err) + } + b.tableCheckpointTsMap.Store(tableID, checkpointTs) + } + elapsed := time.Since(start) + if elapsed > time.Second { + log.Warn("flush row changed events too slow", + zap.Int("batchSize", batchSize), + zap.Duration("duration", elapsed), + util.ZapFieldChangefeed(ctx)) + } + + return true, nil +} + +// Init table sink resources +func (b *bufferSink) Init(tableID model.TableID) error { + b.clearBufferedTableData(tableID) + return b.Sink.Init(tableID) +} + +// Barrier delete buffer +func (b *bufferSink) Barrier(ctx context.Context, tableID model.TableID) error { + b.clearBufferedTableData(tableID) + return b.Sink.Barrier(ctx, tableID) +} + +func (b *bufferSink) clearBufferedTableData(tableID model.TableID) { + b.bufferMu.Lock() + defer b.bufferMu.Unlock() + delete(b.buffer, tableID) +} + +func (b *bufferSink) TryEmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) (bool, error) { + err := b.EmitRowChangedEvents(ctx, rows...) + if err != nil { + return false, err + } + return true, nil +} + +func (b *bufferSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + if len(rows) == 0 { + return nil + } + tableID := rows[0].Table.TableID + b.bufferMu.Lock() + b.buffer[tableID] = append(b.buffer[tableID], rows...) + b.bufferMu.Unlock() + } + return nil +} + +func (b *bufferSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { + select { + case <-ctx.Done(): + return b.getTableCheckpointTs(tableID), ctx.Err() + case b.flushTsChan <- flushMsg{ + tableID: tableID, + resolvedTs: resolvedTs, + }: + } + return b.getTableCheckpointTs(tableID), nil +} + +type flushMsg struct { + tableID model.TableID + resolvedTs uint64 +} + +func (b *bufferSink) getTableCheckpointTs(tableID model.TableID) uint64 { + checkPoints, ok := b.tableCheckpointTsMap.Load(tableID) + if ok { + return checkPoints.(uint64) + } + return atomic.LoadUint64(&b.changeFeedCheckpointTs) +} + +// UpdateChangeFeedCheckpointTs update the changeFeedCheckpointTs every processor tick +func (b *bufferSink) UpdateChangeFeedCheckpointTs(checkpointTs uint64) { + atomic.StoreUint64(&b.changeFeedCheckpointTs, checkpointTs) +} diff --git a/cdc/sink/buffer_sink_test.go b/cdc/sink/buffer_sink_test.go index e1fe467a0a5..1137c1aebcc 100644 --- a/cdc/sink/buffer_sink_test.go +++ b/cdc/sink/buffer_sink_test.go @@ -31,10 +31,17 @@ func TestTableIsNotFlushed(t *testing.T) { func TestFlushTable(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) +<<<<<<< HEAD defer func() { cancel() }() b := newBufferSink(ctx, newBlackHoleSink(ctx, make(map[string]string)), make(chan error), 5, make(chan drawbackMsg)) +======= + defer cancel() + b := newBufferSink(newBlackHoleSink(ctx), 5) + go b.run(ctx, "", make(chan error)) + +>>>>>>> c6966a492 (sink(ticdc): refine sink interface and add init method (#5196)) require.Equal(t, uint64(5), b.getTableCheckpointTs(2)) require.Nil(t, b.EmitRowChangedEvents(ctx)) tbl1 := &model.TableName{TableID: 1} @@ -74,7 +81,13 @@ func TestFlushTable(t *testing.T) { func TestFlushFailed(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) +<<<<<<< HEAD b := newBufferSink(ctx, newBlackHoleSink(ctx, make(map[string]string)), make(chan error), 5, make(chan drawbackMsg)) +======= + b := newBufferSink(newBlackHoleSink(ctx), 5) + go b.run(ctx, "", make(chan error)) + +>>>>>>> c6966a492 (sink(ticdc): refine sink interface and add init method (#5196)) checkpoint, err := b.FlushRowChangedEvents(ctx, 3, 8) require.True(t, checkpoint <= 8) require.Nil(t, err) @@ -89,3 +102,68 @@ func TestFlushFailed(t *testing.T) { require.Equal(t, uint64(8), b.getTableCheckpointTs(3)) require.Equal(t, uint64(5), b.getTableCheckpointTs(1)) } +<<<<<<< HEAD +======= + +func TestCleanBufferedData(t *testing.T) { + t.Parallel() + + tblID := model.TableID(1) + b := newBufferSink(newBlackHoleSink(context.TODO()), 5) + b.buffer[tblID] = []*model.RowChangedEvent{} + _, ok := b.buffer[tblID] + require.True(t, ok) + require.Nil(t, b.Init(tblID)) + _, ok = b.buffer[tblID] + require.False(t, ok) +} + +type benchSink struct { + Sink +} + +func (b *benchSink) EmitRowChangedEvents( + ctx context.Context, rows ...*model.RowChangedEvent, +) error { + return nil +} + +func (b *benchSink) FlushRowChangedEvents( + ctx context.Context, tableID model.TableID, resolvedTs uint64, +) (uint64, error) { + return 0, nil +} + +func BenchmarkRun(b *testing.B) { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + state := runState{ + metricTotalRows: metrics.BufferSinkTotalRowsCountCounter.WithLabelValues(b.Name()), + } + + for exp := 0; exp < 9; exp++ { + count := int(math.Pow(4, float64(exp))) + s := newBufferSink(&benchSink{}, 5) + s.flushTsChan = make(chan flushMsg, count) + for i := 0; i < count; i++ { + s.buffer[int64(i)] = []*model.RowChangedEvent{{CommitTs: 5}} + } + b.ResetTimer() + + b.Run(fmt.Sprintf("%d table(s)", count), func(b *testing.B) { + for i := 0; i < b.N; i++ { + for j := 0; j < count; j++ { + s.flushTsChan <- flushMsg{tableID: int64(0)} + } + for len(s.flushTsChan) != 0 { + keepRun, err := s.runOnce(ctx, &state) + if err != nil || !keepRun { + b.Fatal(keepRun, err) + } + } + } + }) + } +} +>>>>>>> c6966a492 (sink(ticdc): refine sink interface and add init method (#5196)) diff --git a/cdc/sink/common/common.go b/cdc/sink/common/common.go index 7fff1ec0082..ff87632b19d 100644 --- a/cdc/sink/common/common.go +++ b/cdc/sink/common/common.go @@ -63,6 +63,13 @@ func NewUnresolvedTxnCache() *UnresolvedTxnCache { } } +// RemoveTableTxn removes unresolved rows from cache +func (c *unresolvedTxnCache) RemoveTableTxn(tableID model.TableID) { + c.unresolvedTxnsMu.Lock() + defer c.unresolvedTxnsMu.Unlock() + delete(c.unresolvedTxns, tableID) +} + // Append adds unresolved rows to cache // the rows inputed into this function will go through the following handling logic // 1. group by tableID from one input stream diff --git a/cdc/sink/manager_test.go b/cdc/sink/manager_test.go index efc18c0f398..7263f7f7cd7 100644 --- a/cdc/sink/manager_test.go +++ b/cdc/sink/manager_test.go @@ -48,6 +48,18 @@ func newCheckSink(c *check.C) *checkSink { } } +<<<<<<< HEAD:cdc/sink/manager_test.go +======= +// Init table sink resources +func (c *checkSink) Init(tableID model.TableID) error { + return nil +} + +func (c *checkSink) TryEmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) (bool, error) { + return true, nil +} + +>>>>>>> c6966a492 (sink(ticdc): refine sink interface and add init method (#5196)):cdc/sink/sink_manager_test.go func (c *checkSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { c.rowsMu.Lock() defer c.rowsMu.Unlock() @@ -103,14 +115,23 @@ func (s *managerSuite) TestManagerRandom(c *check.C) { defer manager.Close(ctx) goroutineNum := 10 rowNum := 100 - var wg sync.WaitGroup + var ( + wg sync.WaitGroup + err error + ) tableSinks := make([]Sink, goroutineNum) for i := 0; i < goroutineNum; i++ { i := i wg.Add(1) go func() { defer wg.Done() +<<<<<<< HEAD:cdc/sink/manager_test.go tableSinks[i] = manager.CreateTableSink(model.TableID(i), 0) +======= + tableSinks[i], err = manager.CreateTableSink(model.TableID(i), + redo.NewDisabledManager()) + c.Assert(err, check.IsNil) +>>>>>>> c6966a492 (sink(ticdc): refine sink interface and add init method (#5196)):cdc/sink/sink_manager_test.go }() } wg.Wait() @@ -202,7 +223,12 @@ func (s *managerSuite) TestManagerAddRemoveTable(c *check.C) { for i := 0; i < goroutineNum; i++ { if i%4 != 3 { // add table +<<<<<<< HEAD:cdc/sink/manager_test.go table := manager.CreateTableSink(model.TableID(i), maxResolvedTs) +======= + table, err := manager.CreateTableSink(model.TableID(i), redoManager) + c.Assert(err, check.IsNil) +>>>>>>> c6966a492 (sink(ticdc): refine sink interface and add init method (#5196)):cdc/sink/sink_manager_test.go ctx, cancel := context.WithCancel(ctx) tableCancels = append(tableCancels, cancel) tableSinks = append(tableSinks, table) @@ -242,10 +268,18 @@ func (s *managerSuite) TestManagerDestroyTableSink(c *check.C) { manager := NewManager(ctx, newCheckSink(c), errCh, 0, "", "") defer manager.Close(ctx) +<<<<<<< HEAD:cdc/sink/manager_test.go tableID := int64(49) tableSink := manager.CreateTableSink(tableID, 100) err := tableSink.EmitRowChangedEvents(ctx, &model.RowChangedEvent{ Table: &model.TableName{TableID: tableID}, +======= + table := &model.TableName{TableID: int64(49)} + tableSink, err := manager.CreateTableSink(table.TableID, redo.NewDisabledManager()) + c.Assert(err, check.IsNil) + err = tableSink.EmitRowChangedEvents(ctx, &model.RowChangedEvent{ + Table: table, +>>>>>>> c6966a492 (sink(ticdc): refine sink interface and add init method (#5196)):cdc/sink/sink_manager_test.go CommitTs: uint64(110), }) c.Assert(err, check.IsNil) @@ -265,14 +299,23 @@ func BenchmarkManagerFlushing(b *testing.B) { // Init table sinks. goroutineNum := 2000 rowNum := 2000 - var wg sync.WaitGroup + var ( + wg sync.WaitGroup + err error + ) tableSinks := make([]Sink, goroutineNum) for i := 0; i < goroutineNum; i++ { i := i wg.Add(1) go func() { defer wg.Done() +<<<<<<< HEAD:cdc/sink/manager_test.go tableSinks[i] = manager.CreateTableSink(model.TableID(i), 0) +======= + tableSinks[i], err = manager.CreateTableSink(model.TableID(i), + redo.NewDisabledManager()) + panic(err) +>>>>>>> c6966a492 (sink(ticdc): refine sink interface and add init method (#5196)):cdc/sink/sink_manager_test.go }() } wg.Wait() @@ -338,6 +381,17 @@ type errorSink struct { *check.C } +<<<<<<< HEAD:cdc/sink/manager_test.go +======= +func (e *errorSink) Init(_ model.TableID) error { + return nil +} + +func (e *errorSink) TryEmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) (bool, error) { + return true, nil +} + +>>>>>>> c6966a492 (sink(ticdc): refine sink interface and add init method (#5196)):cdc/sink/sink_manager_test.go func (e *errorSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { return errors.New("error in emit row changed events") } @@ -369,8 +423,14 @@ func (s *managerSuite) TestManagerError(c *check.C) { errCh := make(chan error, 16) manager := NewManager(ctx, &errorSink{C: c}, errCh, 0, "", "") defer manager.Close(ctx) +<<<<<<< HEAD:cdc/sink/manager_test.go sink := manager.CreateTableSink(1, 0) err := sink.EmitRowChangedEvents(ctx, &model.RowChangedEvent{ +======= + sink, err := manager.CreateTableSink(1, redo.NewDisabledManager()) + c.Assert(err, check.IsNil) + err = sink.EmitRowChangedEvents(ctx, &model.RowChangedEvent{ +>>>>>>> c6966a492 (sink(ticdc): refine sink interface and add init method (#5196)):cdc/sink/sink_manager_test.go CommitTs: 1, Table: &model.TableName{TableID: 1}, }) diff --git a/cdc/sink/mq/mq.go b/cdc/sink/mq/mq.go new file mode 100644 index 00000000000..c4a7456eeda --- /dev/null +++ b/cdc/sink/mq/mq.go @@ -0,0 +1,501 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mq + +import ( + "context" + "net/url" + "strings" + "sync" + + "github.com/Shopify/sarama" + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sink/codec" + "github.com/pingcap/tiflow/cdc/sink/metrics" + "github.com/pingcap/tiflow/cdc/sink/mq/dispatcher" + "github.com/pingcap/tiflow/cdc/sink/mq/manager" + kafkamanager "github.com/pingcap/tiflow/cdc/sink/mq/manager/kafka" + pulsarmanager "github.com/pingcap/tiflow/cdc/sink/mq/manager/pulsar" + "github.com/pingcap/tiflow/cdc/sink/mq/producer" + "github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka" + "github.com/pingcap/tiflow/cdc/sink/mq/producer/pulsar" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/filter" + "github.com/pingcap/tiflow/pkg/security" + "github.com/pingcap/tiflow/pkg/util" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +type resolvedTsEvent struct { + tableID model.TableID + resolvedTs model.Ts +} + +const ( + // Depend on this size, `resolvedBuffer` will take + // approximately 2 KiB memory. + defaultResolvedTsEventBufferSize = 128 +) + +type mqSink struct { + mqProducer producer.Producer + eventRouter *dispatcher.EventRouter + encoderBuilder codec.EncoderBuilder + filter *filter.Filter + protocol config.Protocol + + topicManager manager.TopicManager + flushWorker *flushWorker + tableCheckpointTsMap sync.Map + resolvedBuffer chan resolvedTsEvent + + statistics *metrics.Statistics + + role util.Role + id model.ChangeFeedID +} + +func newMqSink( + ctx context.Context, + credential *security.Credential, + topicManager manager.TopicManager, + mqProducer producer.Producer, + filter *filter.Filter, + defaultTopic string, + replicaConfig *config.ReplicaConfig, encoderConfig *codec.Config, + errCh chan error, +) (*mqSink, error) { + encoderBuilder, err := codec.NewEventBatchEncoderBuilder(encoderConfig, credential) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) + } + + eventRouter, err := dispatcher.NewEventRouter(replicaConfig, defaultTopic) + if err != nil { + return nil, errors.Trace(err) + } + + changefeedID := util.ChangefeedIDFromCtx(ctx) + role := util.RoleFromCtx(ctx) + + encoder := encoderBuilder.Build() + statistics := metrics.NewStatistics(ctx, metrics.SinkTypeMQ) + flushWorker := newFlushWorker(encoder, mqProducer, statistics) + + s := &mqSink{ + mqProducer: mqProducer, + eventRouter: eventRouter, + encoderBuilder: encoderBuilder, + filter: filter, + protocol: encoderConfig.Protocol(), + topicManager: topicManager, + flushWorker: flushWorker, + resolvedBuffer: make(chan resolvedTsEvent, defaultResolvedTsEventBufferSize), + statistics: statistics, + role: role, + id: changefeedID, + } + + go func() { + if err := s.run(ctx); err != nil && errors.Cause(err) != context.Canceled { + select { + case <-ctx.Done(): + return + case errCh <- err: + default: + log.Error("error channel is full", zap.Error(err), + zap.String("changefeed", changefeedID), zap.Any("role", s.role)) + } + } + }() + return s, nil +} + +// TryEmitRowChangedEvents just calls EmitRowChangedEvents internally, +// it still blocking in current implementation. +// TODO(dongmen): We should make this method truly non-blocking after we remove buffer sink +func (k *mqSink) TryEmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) (bool, error) { + err := k.EmitRowChangedEvents(ctx, rows...) + if err != nil { + return false, err + } + return true, nil +} + +// Init table sink resources +func (k *mqSink) Init(tableID model.TableID) error { + return nil +} + +func (k *mqSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { + rowsCount := 0 + for _, row := range rows { + if k.filter.ShouldIgnoreDMLEvent(row.StartTs, row.Table.Schema, row.Table.Table) { + log.Info("Row changed event ignored", + zap.Uint64("start-ts", row.StartTs), + zap.String("changefeed", k.id), + zap.Any("role", k.role)) + continue + } + topic := k.eventRouter.GetTopicForRowChange(row) + partitionNum, err := k.topicManager.Partitions(topic) + if err != nil { + return errors.Trace(err) + } + partition := k.eventRouter.GetPartitionForRowChange(row, partitionNum) + err = k.flushWorker.addEvent(ctx, mqEvent{ + row: row, + key: topicPartitionKey{ + topic: topic, partition: partition, + }, + }) + if err != nil { + return err + } + rowsCount++ + } + k.statistics.AddRowsCount(rowsCount) + return nil +} + +// FlushRowChangedEvents is thread-safety +func (k *mqSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { + var checkpointTs uint64 + v, ok := k.tableCheckpointTsMap.Load(tableID) + if ok { + checkpointTs = v.(uint64) + } + if resolvedTs <= checkpointTs { + return checkpointTs, nil + } + select { + case <-ctx.Done(): + return 0, ctx.Err() + case k.resolvedBuffer <- resolvedTsEvent{ + tableID: tableID, + resolvedTs: resolvedTs, + }: + } + k.statistics.PrintStatus(ctx) + return checkpointTs, nil +} + +// bgFlushTs flush resolvedTs to workers and flush the mqProducer +func (k *mqSink) bgFlushTs(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case msg := <-k.resolvedBuffer: + resolvedTs := msg.resolvedTs + err := k.flushTsToWorker(ctx, resolvedTs) + if err != nil { + return errors.Trace(err) + } + // Since CDC does not guarantee exactly once semantic, it won't cause any problem + // here even if the table was moved or removed. + // ref: https://github.com/pingcap/tiflow/pull/4356#discussion_r787405134 + k.tableCheckpointTsMap.Store(msg.tableID, resolvedTs) + } + } +} + +func (k *mqSink) flushTsToWorker(ctx context.Context, resolvedTs model.Ts) error { + if err := k.flushWorker.addEvent(ctx, mqEvent{resolvedTs: resolvedTs}); err != nil { + if errors.Cause(err) != context.Canceled { + log.Warn("failed to flush TS to worker", zap.Error(err)) + } else { + log.Debug("flushing TS to worker has been canceled", zap.Error(err)) + } + return err + } + return nil +} + +func (k *mqSink) EmitCheckpointTs(ctx context.Context, ts uint64, tables []model.TableName) error { + encoder := k.encoderBuilder.Build() + msg, err := encoder.EncodeCheckpointEvent(ts) + if err != nil { + return errors.Trace(err) + } + if msg == nil { + return nil + } + // NOTICE: When there is no table sync, + // we need to send checkpoint ts to the default topic. T + // This will be compatible with the old behavior. + if len(tables) == 0 { + topic := k.eventRouter.GetDefaultTopic() + partitionNum, err := k.topicManager.Partitions(topic) + if err != nil { + return errors.Trace(err) + } + log.Debug("emit checkpointTs to default topic", + zap.String("topic", topic), zap.Uint64("checkpointTs", ts)) + err = k.mqProducer.SyncBroadcastMessage(ctx, topic, partitionNum, msg) + return errors.Trace(err) + } + topics := k.eventRouter.GetActiveTopics(tables) + log.Debug("MQ sink current active topics", zap.Any("topics", topics)) + for _, topic := range topics { + partitionNum, err := k.topicManager.Partitions(topic) + if err != nil { + return errors.Trace(err) + } + log.Debug("emit checkpointTs to active topic", + zap.String("topic", topic), zap.Uint64("checkpointTs", ts)) + err = k.mqProducer.SyncBroadcastMessage(ctx, topic, partitionNum, msg) + if err != nil { + return errors.Trace(err) + } + } + return nil +} + +func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { + if k.filter.ShouldIgnoreDDLEvent(ddl.StartTs, ddl.Type, ddl.TableInfo.Schema, ddl.TableInfo.Table) { + log.Info( + "DDL event ignored", + zap.String("query", ddl.Query), + zap.Uint64("startTs", ddl.StartTs), + zap.Uint64("commitTs", ddl.CommitTs), + zap.String("changefeed", k.id), + zap.Any("role", k.role), + ) + return cerror.ErrDDLEventIgnored.GenWithStackByArgs() + } + + encoder := k.encoderBuilder.Build() + msg, err := encoder.EncodeDDLEvent(ddl) + if err != nil { + return errors.Trace(err) + } + if msg == nil { + return nil + } + + topic := k.eventRouter.GetTopicForDDL(ddl) + partitionRule := k.eventRouter.GetDLLDispatchRuleByProtocol(k.protocol) + k.statistics.AddDDLCount() + log.Debug("emit ddl event", + zap.Uint64("commitTs", ddl.CommitTs), zap.String("query", ddl.Query), + zap.String("changefeed", k.id), zap.Any("role", k.role)) + if partitionRule == dispatcher.PartitionAll { + partitionNum, err := k.topicManager.Partitions(topic) + if err != nil { + return errors.Trace(err) + } + err = k.mqProducer.SyncBroadcastMessage(ctx, topic, partitionNum, msg) + return errors.Trace(err) + } + // Notice: We must call Partitions here, + // which will be responsible for automatically creating topics when they don't exist. + // If it is not called here and kafka has `auto.create.topics.enable` turned on, + // then the auto-created topic will not be created as configured by ticdc. + _, err = k.topicManager.Partitions(topic) + if err != nil { + return errors.Trace(err) + } + err = k.asyncFlushToPartitionZero(ctx, topic, msg) + return errors.Trace(err) +} + +// Close the producer asynchronously, does not care closed successfully or not. +func (k *mqSink) Close(ctx context.Context) error { + go k.mqProducer.Close() + return nil +} + +func (k *mqSink) Barrier(cxt context.Context, tableID model.TableID) error { + // Barrier does nothing because FlushRowChangedEvents in mq sink has flushed + // all buffered events by force. + return nil +} + +func (k *mqSink) run(ctx context.Context) error { + wg, ctx := errgroup.WithContext(ctx) + wg.Go(func() error { + return k.bgFlushTs(ctx) + }) + wg.Go(func() error { + return k.flushWorker.run(ctx) + }) + return wg.Wait() +} + +// asyncFlushToPartitionZero writes message to +// partition zero asynchronously and flush it immediately. +func (k *mqSink) asyncFlushToPartitionZero( + ctx context.Context, topic string, message *codec.MQMessage, +) error { + err := k.mqProducer.AsyncSendMessage(ctx, topic, dispatcher.PartitionZero, message) + if err != nil { + return err + } + return k.mqProducer.Flush(ctx) +} + +// NewKafkaSaramaSink creates a new Kafka mqSink. +func NewKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, + filter *filter.Filter, replicaConfig *config.ReplicaConfig, + opts map[string]string, errCh chan error, +) (*mqSink, error) { + topic := strings.TrimFunc(sinkURI.Path, func(r rune) bool { + return r == '/' + }) + if topic == "" { + return nil, cerror.ErrKafkaInvalidConfig.GenWithStack("no topic is specified in sink-uri") + } + + baseConfig := kafka.NewConfig() + if err := baseConfig.Apply(sinkURI); err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) + } + + if err := replicaConfig.ApplyProtocol(sinkURI).Validate(); err != nil { + return nil, errors.Trace(err) + } + + saramaConfig, err := kafka.NewSaramaConfig(ctx, baseConfig) + if err != nil { + return nil, errors.Trace(err) + } + + adminClient, err := kafka.NewAdminClientImpl(baseConfig.BrokerEndpoints, saramaConfig) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + + if err := kafka.AdjustConfig(adminClient, baseConfig, saramaConfig, topic); err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) + } + + var protocol config.Protocol + if err := protocol.FromString(replicaConfig.Sink.Protocol); err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) + } + + encoderConfig := codec.NewConfig(protocol, util.TimezoneFromCtx(ctx)) + if err := encoderConfig.Apply(sinkURI, opts); err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) + } + // always set encoder's `MaxMessageBytes` equal to producer's `MaxMessageBytes` + // to prevent that the encoder generate batched message too large then cause producer meet `message too large` + encoderConfig = encoderConfig.WithMaxMessageBytes(saramaConfig.Producer.MaxMessageBytes) + + if err := encoderConfig.Validate(); err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) + } + + client, err := sarama.NewClient(baseConfig.BrokerEndpoints, saramaConfig) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + + topicManager := kafkamanager.NewTopicManager( + client, + adminClient, + baseConfig.DeriveTopicConfig(), + ) + if _, err := topicManager.CreateTopic(topic); err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaCreateTopic, err) + } + + sProducer, err := kafka.NewKafkaSaramaProducer( + ctx, + client, + adminClient, + baseConfig, + saramaConfig, + errCh, + ) + if err != nil { + return nil, errors.Trace(err) + } + + sink, err := newMqSink( + ctx, + baseConfig.Credential, + topicManager, + sProducer, + filter, + topic, + replicaConfig, + encoderConfig, + errCh, + ) + if err != nil { + return nil, errors.Trace(err) + } + return sink, nil +} + +// NewPulsarSink creates a new Pulsar mqSink. +func NewPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, + replicaConfig *config.ReplicaConfig, opts map[string]string, errCh chan error, +) (*mqSink, error) { + s := sinkURI.Query().Get(config.ProtocolKey) + if s != "" { + replicaConfig.Sink.Protocol = s + } + err := replicaConfig.Validate() + if err != nil { + return nil, err + } + + var protocol config.Protocol + if err := protocol.FromString(replicaConfig.Sink.Protocol); err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) + } + + encoderConfig := codec.NewConfig(protocol, util.TimezoneFromCtx(ctx)) + if err := encoderConfig.Apply(sinkURI, opts); err != nil { + return nil, errors.Trace(err) + } + // todo: set by pulsar producer's `max.message.bytes` + // encoderConfig = encoderConfig.WithMaxMessageBytes() + if err := encoderConfig.Validate(); err != nil { + return nil, errors.Trace(err) + } + + producer, err := pulsar.NewProducer(sinkURI, errCh) + if err != nil { + return nil, errors.Trace(err) + } + // For now, it's a placeholder. Avro format have to make connection to Schema Registry, + // and it may need credential. + credential := &security.Credential{} + fakeTopicManager := pulsarmanager.NewTopicManager( + producer.GetPartitionNum(), + ) + sink, err := newMqSink( + ctx, + credential, + fakeTopicManager, + producer, + filter, + "", + replicaConfig, + encoderConfig, + errCh, + ) + if err != nil { + return nil, errors.Trace(err) + } + return sink, nil +} diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index f6551edaec0..8bc54992384 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -699,6 +699,7 @@ func (s *mysqlSink) dispatchAndExecTxns(ctx context.Context, txnsGroup map[model s.notifyAndWaitExec(ctx) } +<<<<<<< HEAD:cdc/sink/mysql.go type mysqlSinkWorker struct { txnCh chan *model.SingleTableTxn maxTxnRow int @@ -843,6 +844,30 @@ func (w *mysqlSinkWorker) cleanup() { return } } +======= +func (s *mysqlSink) Init(tableID model.TableID) error { + s.cleanTableResource(tableID) + return nil +} + +func (s *mysqlSink) cleanTableResource(tableID model.TableID) { + // We need to clean up the old values of the table, + // otherwise when the table is dispatched back again, + // it may read the old values. + // See: https://github.com/pingcap/tiflow/issues/4464#issuecomment-1085385382. + if resolvedTs, loaded := s.tableMaxResolvedTs.LoadAndDelete(tableID); loaded { + log.Info("clean up table max resolved ts", + zap.Int64("tableID", tableID), + zap.Uint64("resolvedTs", resolvedTs.(uint64))) + } + if checkpointTs, loaded := s.tableCheckpointTs.LoadAndDelete(tableID); loaded { + log.Info("clean up table checkpoint ts", + zap.Int64("tableID", tableID), + zap.Uint64("checkpointTs", checkpointTs.(uint64))) + } + // try to remove table txn cache + s.txnCache.RemoveTableTxn(tableID) +>>>>>>> c6966a492 (sink(ticdc): refine sink interface and add init method (#5196)):cdc/sink/mysql/mysql.go } func (s *mysqlSink) Close(ctx context.Context) error { @@ -853,6 +878,11 @@ func (s *mysqlSink) Close(ctx context.Context) error { } func (s *mysqlSink) Barrier(ctx context.Context, tableID model.TableID) error { +<<<<<<< HEAD:cdc/sink/mysql.go +======= + defer s.cleanTableResource(tableID) + +>>>>>>> c6966a492 (sink(ticdc): refine sink interface and add init method (#5196)):cdc/sink/mysql/mysql.go warnDuration := 3 * time.Minute ticker := time.NewTicker(warnDuration) defer ticker.Stop() diff --git a/cdc/sink/mysql_test.go b/cdc/sink/mysql_test.go index 24463682b27..4acd20f9b73 100644 --- a/cdc/sink/mysql_test.go +++ b/cdc/sink/mysql_test.go @@ -1305,3 +1305,32 @@ func TestMySQLSinkFlushResovledTs(t *testing.T) { err = sink.Close(ctx) require.Nil(t, err) } + +func TestCleanTableResource(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + tblID := model.TableID(1) + f, err := filter.NewFilter(config.GetDefaultReplicaConfig()) + require.Nil(t, err) + s := &mysqlSink{ + txnCache: newUnresolvedTxnCache(), + filter: f, + statistics: metrics.NewStatistics(ctx, metrics.SinkTypeDB), + } + require.Nil(t, s.EmitRowChangedEvents(ctx, &model.RowChangedEvent{ + Table: &model.TableName{TableID: tblID, Schema: "test", Table: "t1"}, + })) + s.tableCheckpointTs.Store(tblID, uint64(1)) + s.tableMaxResolvedTs.Store(tblID, uint64(2)) + _, ok := s.txnCache.unresolvedTxns[tblID] + require.True(t, ok) + require.Nil(t, s.Init(tblID)) + _, ok = s.txnCache.unresolvedTxns[tblID] + require.False(t, ok) + _, ok = s.tableCheckpointTs.Load(tblID) + require.False(t, ok) + _, ok = s.tableMaxResolvedTs.Load(tblID) + require.False(t, ok) +} diff --git a/cdc/sink/simple_mysql_tester.go b/cdc/sink/simple_mysql_tester.go index adbac7ba09c..4d8aead6f17 100644 --- a/cdc/sink/simple_mysql_tester.go +++ b/cdc/sink/simple_mysql_tester.go @@ -104,6 +104,18 @@ func newSimpleMySQLSink(ctx context.Context, sinkURI *url.URL, config *config.Re return sink, nil } +<<<<<<< HEAD:cdc/sink/simple_mysql_tester.go +======= +// Init table sink resources +func (s *simpleMySQLSink) Init(tableID model.TableID) error { + return nil +} + +func (s *simpleMySQLSink) TryEmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) (bool, error) { + return true, nil +} + +>>>>>>> c6966a492 (sink(ticdc): refine sink interface and add init method (#5196)):cdc/sink/mysql/simple_mysql_tester.go // EmitRowChangedEvents sends Row Changed Event to Sink // EmitRowChangedEvents may write rows to downstream directly; func (s *simpleMySQLSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { diff --git a/cdc/sink/sink.go b/cdc/sink/sink.go index b48dd2fa4e1..96cb4562518 100644 --- a/cdc/sink/sink.go +++ b/cdc/sink/sink.go @@ -33,6 +33,11 @@ const ( // Sink is an abstraction for anything that a changefeed may emit into. type Sink interface { + // Init initializes the sink resource + // when the sink is added, this function will be called + // init resource or clean up the old values in this function + Init(tableID model.TableID) error + // EmitRowChangedEvents sends Row Changed Event to Sink // EmitRowChangedEvents may write rows to downstream directly; EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error diff --git a/cdc/sink/sink_manager.go b/cdc/sink/sink_manager.go new file mode 100644 index 00000000000..6d63f89675c --- /dev/null +++ b/cdc/sink/sink_manager.go @@ -0,0 +1,110 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sink + +import ( + "context" + "sync" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/redo" + "github.com/pingcap/tiflow/cdc/sink/metrics" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" +) + +// Manager manages table sinks, maintains the relationship between table sinks +// and backendSink. +// Manager is thread-safe. +type Manager struct { + bufSink *bufferSink + tableSinks map[model.TableID]*tableSink + tableSinksMu sync.Mutex + + changefeedID model.ChangeFeedID + metricsTableSinkTotalRows prometheus.Counter +} + +// NewManager creates a new Sink manager +func NewManager( + ctx context.Context, backendSink Sink, errCh chan error, checkpointTs model.Ts, + captureAddr string, changefeedID model.ChangeFeedID, +) *Manager { + bufSink := newBufferSink(backendSink, checkpointTs) + go bufSink.run(ctx, changefeedID, errCh) + counter := metrics.TableSinkTotalRowsCountCounter.WithLabelValues(changefeedID) + return &Manager{ + bufSink: bufSink, + tableSinks: make(map[model.TableID]*tableSink), + changefeedID: changefeedID, + metricsTableSinkTotalRows: counter, + } +} + +// CreateTableSink creates a table sink +func (m *Manager) CreateTableSink( + tableID model.TableID, + redoManager redo.LogManager, +) (Sink, error) { + sink := &tableSink{ + tableID: tableID, + manager: m, + buffer: make([]*model.RowChangedEvent, 0, 128), + redoManager: redoManager, + } + + m.tableSinksMu.Lock() + defer m.tableSinksMu.Unlock() + if _, exist := m.tableSinks[tableID]; exist { + log.Panic("the table sink already exists", zap.Uint64("tableID", uint64(tableID))) + } + if err := sink.Init(tableID); err != nil { + return nil, errors.Trace(err) + } + m.tableSinks[tableID] = sink + return sink, nil +} + +// Close closes the Sink manager and backend Sink, this method can be reentrantly called +func (m *Manager) Close(ctx context.Context) error { + m.tableSinksMu.Lock() + defer m.tableSinksMu.Unlock() + metrics.TableSinkTotalRowsCountCounter.DeleteLabelValues(m.changefeedID) + if m.bufSink != nil { + if err := m.bufSink.Close(ctx); err != nil && errors.Cause(err) != context.Canceled { + log.Warn("close bufSink failed", + zap.String("changefeed", m.changefeedID), + zap.Error(err)) + return err + } + } + return nil +} + +func (m *Manager) destroyTableSink(ctx context.Context, tableID model.TableID) error { + m.tableSinksMu.Lock() + delete(m.tableSinks, tableID) + m.tableSinksMu.Unlock() + return m.bufSink.Barrier(ctx, tableID) +} + +// UpdateChangeFeedCheckpointTs updates changedfeed level checkpointTs, +// this value is used in getCheckpointTs func +func (m *Manager) UpdateChangeFeedCheckpointTs(checkpointTs uint64) { + if m.bufSink != nil { + m.bufSink.UpdateChangeFeedCheckpointTs(checkpointTs) + } +} diff --git a/cdc/sink/table_sink.go b/cdc/sink/table_sink.go new file mode 100644 index 00000000000..d9b0bf40c67 --- /dev/null +++ b/cdc/sink/table_sink.go @@ -0,0 +1,130 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sink + +import ( + "context" + "sort" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/redo" + "go.uber.org/zap" +) + +type tableSink struct { + tableID model.TableID + manager *Manager + buffer []*model.RowChangedEvent + redoManager redo.LogManager +} + +var _ Sink = (*tableSink)(nil) + +func (t *tableSink) TryEmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) (bool, error) { + t.buffer = append(t.buffer, rows...) + t.manager.metricsTableSinkTotalRows.Add(float64(len(rows))) + if t.redoManager.Enabled() { + return t.redoManager.TryEmitRowChangedEvents(ctx, t.tableID, rows...) + } + return true, nil +} + +func (t *tableSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { + t.buffer = append(t.buffer, rows...) + t.manager.metricsTableSinkTotalRows.Add(float64(len(rows))) + if t.redoManager.Enabled() { + return t.redoManager.EmitRowChangedEvents(ctx, t.tableID, rows...) + } + return nil +} + +func (t *tableSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { + // the table sink doesn't receive the DDL event + return nil +} + +// FlushRowChangedEvents flushes sorted rows to sink manager, note the resolvedTs +// is required to be no more than global resolvedTs, table barrierTs and table +// redo log watermarkTs. +func (t *tableSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { + if tableID != t.tableID { + log.Panic("inconsistent table sink", + zap.Int64("tableID", tableID), zap.Int64("sinkTableID", t.tableID)) + } + i := sort.Search(len(t.buffer), func(i int) bool { + return t.buffer[i].CommitTs > resolvedTs + }) + if i == 0 { + return t.flushResolvedTs(ctx, resolvedTs) + } + resolvedRows := t.buffer[:i] + t.buffer = append(make([]*model.RowChangedEvent, 0, len(t.buffer[i:])), t.buffer[i:]...) + + err := t.manager.bufSink.EmitRowChangedEvents(ctx, resolvedRows...) + if err != nil { + return 0, errors.Trace(err) + } + return t.flushResolvedTs(ctx, resolvedTs) +} + +func (t *tableSink) flushResolvedTs(ctx context.Context, resolvedTs uint64) (uint64, error) { + redoTs, err := t.flushRedoLogs(ctx, resolvedTs) + if err != nil { + return 0, errors.Trace(err) + } + if redoTs < resolvedTs { + resolvedTs = redoTs + } + + checkpointTs, err := t.manager.bufSink.FlushRowChangedEvents(ctx, t.tableID, resolvedTs) + if err != nil { + return 0, errors.Trace(err) + } + return checkpointTs, nil +} + +// flushRedoLogs flush redo logs and returns redo log resolved ts which means +// all events before the ts have been persisted to redo log storage. +func (t *tableSink) flushRedoLogs(ctx context.Context, resolvedTs uint64) (uint64, error) { + if t.redoManager.Enabled() { + err := t.redoManager.FlushLog(ctx, t.tableID, resolvedTs) + if err != nil { + return 0, err + } + return t.redoManager.GetMinResolvedTs(), nil + } + return resolvedTs, nil +} + +func (t *tableSink) EmitCheckpointTs(_ context.Context, _ uint64, _ []model.TableName) error { + // the table sink doesn't receive the checkpoint event + return nil +} + +// Init table sink resources +func (t *tableSink) Init(tableID model.TableID) error { + return t.manager.bufSink.Init(tableID) +} + +// Close once the method is called, no more events can be written to this table sink +func (t *tableSink) Close(ctx context.Context) error { + return t.manager.destroyTableSink(ctx, t.tableID) +} + +// Barrier is not used in table sink +func (t *tableSink) Barrier(ctx context.Context, tableID model.TableID) error { + return nil +}