diff --git a/cdc/metrics.go b/cdc/metrics.go index d2408ce6b91..9a07ee1ea6c 100644 --- a/cdc/metrics.go +++ b/cdc/metrics.go @@ -19,7 +19,7 @@ import ( "github.com/pingcap/tiflow/cdc/owner" "github.com/pingcap/tiflow/cdc/processor" "github.com/pingcap/tiflow/cdc/puller" - redowriter "github.com/pingcap/tiflow/cdc/redo/writer" + redo "github.com/pingcap/tiflow/cdc/redo/common" "github.com/pingcap/tiflow/cdc/scheduler" sink "github.com/pingcap/tiflow/cdc/sink/metrics" "github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka" @@ -59,7 +59,7 @@ func init() { memory.InitMetrics(registry) unified.InitMetrics(registry) leveldb.InitMetrics(registry) - redowriter.InitMetrics(registry) + redo.InitMetrics(registry) db.InitMetrics(registry) kafka.InitMetrics(registry) scheduler.InitMetrics(registry) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 368a82f64a1..f19b20754c8 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -29,18 +29,18 @@ import ( //go:generate msgp -// MqMessageType is the type of message -type MqMessageType int +// MessageType is the type of message, which is used by MqSink and RedoLog. +type MessageType int const ( - // MqMessageTypeUnknown is unknown type of message key - MqMessageTypeUnknown MqMessageType = iota - // MqMessageTypeRow is row type of message key - MqMessageTypeRow - // MqMessageTypeDDL is ddl type of message key - MqMessageTypeDDL - // MqMessageTypeResolved is resolved type of message key - MqMessageTypeResolved + // MessageTypeUnknown is unknown type of message key + MessageTypeUnknown MessageType = iota + // MessageTypeRow is row type of message key + MessageTypeRow + // MessageTypeDDL is ddl type of message key + MessageTypeDDL + // MessageTypeResolved is resolved type of message key + MessageTypeResolved ) // ColumnFlagType is for encapsulating the flag operations for different flags. diff --git a/cdc/model/sink_gen.go b/cdc/model/sink_gen.go index c0cc799f209..153285792a7 100644 --- a/cdc/model/sink_gen.go +++ b/cdc/model/sink_gen.go @@ -633,7 +633,7 @@ func (z *DDLEvent) Msgsize() (s int) { } // DecodeMsg implements msgp.Decodable -func (z *MqMessageType) DecodeMsg(dc *msgp.Reader) (err error) { +func (z *MessageType) DecodeMsg(dc *msgp.Reader) (err error) { { var zb0001 int zb0001, err = dc.ReadInt() @@ -641,13 +641,13 @@ func (z *MqMessageType) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err) return } - (*z) = MqMessageType(zb0001) + (*z) = MessageType(zb0001) } return } // EncodeMsg implements msgp.Encodable -func (z MqMessageType) EncodeMsg(en *msgp.Writer) (err error) { +func (z MessageType) EncodeMsg(en *msgp.Writer) (err error) { err = en.WriteInt(int(z)) if err != nil { err = msgp.WrapError(err) @@ -657,14 +657,14 @@ func (z MqMessageType) EncodeMsg(en *msgp.Writer) (err error) { } // MarshalMsg implements msgp.Marshaler -func (z MqMessageType) MarshalMsg(b []byte) (o []byte, err error) { +func (z MessageType) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) o = msgp.AppendInt(o, int(z)) return } // UnmarshalMsg implements msgp.Unmarshaler -func (z *MqMessageType) UnmarshalMsg(bts []byte) (o []byte, err error) { +func (z *MessageType) UnmarshalMsg(bts []byte) (o []byte, err error) { { var zb0001 int zb0001, bts, err = msgp.ReadIntBytes(bts) @@ -672,14 +672,14 @@ func (z *MqMessageType) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err) return } - (*z) = MqMessageType(zb0001) + (*z) = MessageType(zb0001) } o = bts return } // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message -func (z MqMessageType) Msgsize() (s int) { +func (z MessageType) Msgsize() (s int) { s = msgp.IntSize return } diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index dbb97d68323..60965e27876 100755 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -124,17 +124,20 @@ func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (er } }() - currentBarrierTs := atomic.LoadUint64(&n.barrierTs) if resolved.Ts > n.targetTs { resolved = model.NewResolvedTs(n.targetTs) } + currentBarrierTs := atomic.LoadUint64(&n.barrierTs) if n.redoManager != nil && n.redoManager.Enabled() { // redo log do not support batch resolve mode, hence we // use `ResolvedMark` to restore a normal resolved ts resolved = model.NewResolvedTs(resolved.ResolvedMark()) - err = n.redoManager.FlushLog(ctx, n.tableID, resolved.Ts) + err = n.redoManager.UpdateResolvedTs(ctx, n.tableID, resolved.Ts) + // fail fast check, the happens before relationship is: + // 1. sorter resolvedTs >= sink resolvedTs >= table redoTs == tableActor resolvedTs + // 2. tableActor resolvedTs >= processor resolvedTs >= global resolvedTs >= barrierTs redoTs := n.redoManager.GetMinResolvedTs() if redoTs < currentBarrierTs { log.Debug("redoTs should not less than current barrierTs", @@ -143,7 +146,7 @@ func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (er zap.Uint64("barrierTs", currentBarrierTs)) } - // Fixme(CharlesCheung): remove this check after refactoring redoManager + // TODO: remove this check after SchedulerV3 become the first choice. if resolved.Ts > redoTs { resolved = model.NewResolvedTs(redoTs) } diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index 05fb1c2de98..2670691e043 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -15,6 +15,7 @@ package pipeline import ( "context" + "math/rand" "sync" "testing" "time" @@ -138,8 +139,9 @@ func TestState(t *testing.T) { state := TableStatePrepared // test stop at targetTs - node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager(), - &state, ctx.ChangefeedVars().ID, false) + targetTs := model.Ts(10) + node := newSinkNode(1, &mockSink{}, 0, targetTs, &mockFlowController{}, redo.NewDisabledManager(), + &state, ctx.ChangefeedVars().ID, true) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil). ChangefeedVars().Info.Config) require.Equal(t, TableStatePrepared, node.State()) @@ -177,6 +179,23 @@ func TestState(t *testing.T) { require.True(t, ok) require.Equal(t, TableStateReplicating, node.State()) + batchResolved := model.ResolvedTs{ + Mode: model.BatchResolvedMode, + Ts: targetTs, + BatchID: rand.Uint64() % 10, + } + msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ + CRTs: targetTs, + Resolved: &batchResolved, + RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, + Row: &model.RowChangedEvent{}, + }) + ok, err = node.HandleMessage(ctx, msg) + require.Nil(t, err) + require.True(t, ok) + require.Equal(t, batchResolved, node.getResolvedTs()) + require.Equal(t, batchResolved, node.getCheckpointTs()) + msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ CRTs: 15, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, Row: &model.RowChangedEvent{}, @@ -185,7 +204,7 @@ func TestState(t *testing.T) { require.False(t, ok) require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err)) require.Equal(t, TableStateStopped, node.State()) - require.Equal(t, model.Ts(10), node.CheckpointTs()) + require.Equal(t, targetTs, node.CheckpointTs()) // test the stop at ts command state = TableStatePrepared diff --git a/cdc/processor/pipeline/sorter.go b/cdc/processor/pipeline/sorter.go index f9d88a5b52c..4c4df27636d 100644 --- a/cdc/processor/pipeline/sorter.go +++ b/cdc/processor/pipeline/sorter.go @@ -168,9 +168,9 @@ func (n *sorterNode) start( if lastCRTs > lastSentResolvedTs && commitTs > lastCRTs { lastSentResolvedTs = lastCRTs lastSendResolvedTsTime = time.Now() + msg := model.NewResolvedPolymorphicEvent(0, lastSentResolvedTs) + ctx.SendToNextNode(pmessage.PolymorphicEventMessage(msg)) } - msg := model.NewResolvedPolymorphicEvent(0, lastSentResolvedTs) - ctx.SendToNextNode(pmessage.PolymorphicEventMessage(msg)) } // once receive startTs, which means sink should start replicating data to downstream. diff --git a/cdc/processor/pipeline/table_actor.go b/cdc/processor/pipeline/table_actor.go index 57d16653a57..63196eb5061 100644 --- a/cdc/processor/pipeline/table_actor.go +++ b/cdc/processor/pipeline/table_actor.go @@ -396,7 +396,7 @@ func (t *tableActor) ResolvedTs() model.Ts { // another replication barrier for consistent replication instead of reusing // the global resolved-ts. if t.redoManager.Enabled() { - return t.sinkNode.ResolvedTs() + return t.redoManager.GetResolvedTs(t.tableID) } return t.sortNode.ResolvedTs() } diff --git a/cdc/processor/pipeline/table_actor_test.go b/cdc/processor/pipeline/table_actor_test.go index c4be1425c1b..900fd7355ad 100644 --- a/cdc/processor/pipeline/table_actor_test.go +++ b/cdc/processor/pipeline/table_actor_test.go @@ -95,12 +95,15 @@ func TestTableActorInterface(t *testing.T) { require.Equal(t, model.Ts(3), table.CheckpointTs()) require.Equal(t, model.Ts(5), table.ResolvedTs()) - table.replicaConfig.Consistent.Level = string(redo.ConsistentLevelEventual) ctx, cancel := context.WithCancel(context.Background()) defer cancel() table.redoManager, _ = redo.NewMockManager(ctx) - table.sinkNode.resolvedTs.Store(model.NewResolvedTs(6)) - require.Equal(t, model.Ts(6), table.ResolvedTs()) + table.redoManager.AddTable(table.tableID, 0) + require.Equal(t, model.Ts(0), table.ResolvedTs()) + table.redoManager.UpdateResolvedTs(ctx, table.tableID, model.Ts(6)) + require.Eventually(t, func() bool { return table.ResolvedTs() == model.Ts(6) }, + time.Second*5, time.Millisecond*500) + table.redoManager.Cleanup(ctx) table.sinkNode.state.Store(TableStateStopped) require.Equal(t, TableStateStopped, table.State()) diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 83158765eae..bfdc65f44c2 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -891,6 +891,10 @@ func (p *processor) createTablePipelineImpl( return nil }) + if p.redoManager.Enabled() { + p.redoManager.AddTable(tableID, replicaInfo.StartTs) + } + tableName := p.getTableName(ctx, tableID) s, err := sink.NewTableSink(p.sink, tableID, p.metricsTableSinkTotalRows) @@ -911,10 +915,6 @@ func (p *processor) createTablePipelineImpl( return nil, errors.Trace(err) } - if p.redoManager.Enabled() { - p.redoManager.AddTable(tableID, replicaInfo.StartTs) - } - log.Info("Add table pipeline", zap.Int64("tableID", tableID), cdcContext.ZapFieldChangefeed(ctx), zap.String("name", table.Name()), diff --git a/cdc/redo/common/metric.go b/cdc/redo/common/metric.go new file mode 100644 index 00000000000..b2e4cd8e61c --- /dev/null +++ b/cdc/redo/common/metric.go @@ -0,0 +1,88 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +const ( + namespace = "ticdc" + subsystem = "redo" +) + +var ( + // RedoWriteBytesGauge records the total number of bytes written to redo log. + RedoWriteBytesGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "write_bytes_total", + Help: "Total number of bytes redo log written", + }, []string{"namespace", "changefeed"}) + + // RedoFsyncDurationHistogram records the latency distributions of fsync called by redo writer. + RedoFsyncDurationHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "fsync_duration_seconds", + Help: "The latency distributions of fsync called by redo writer", + Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 13), + }, []string{"namespace", "changefeed"}) + + // RedoFlushAllDurationHistogram records the latency distributions of flushAll + // called by redo writer. + RedoFlushAllDurationHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "flushall_duration_seconds", + Help: "The latency distributions of flushall called by redo writer", + Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 13), + }, []string{"namespace", "changefeed"}) + + // RedoTotalRowsCountGauge records the total number of rows written to redo log. + RedoTotalRowsCountGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "total_rows_count", + Help: "The total count of rows that are processed by redo writer", + }, []string{"namespace", "changefeed"}) + + // RedoWriteLogDurationHistogram records the latency distributions of writeLog. + RedoWriteLogDurationHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "write_log_duration_seconds", + Help: "The latency distributions of writeLog called by redoManager", + Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 13), + }, []string{"namespace", "changefeed"}) + + // RedoFlushLogDurationHistogram records the latency distributions of flushLog. + RedoFlushLogDurationHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "flush_log_duration_seconds", + Help: "The latency distributions of flushLog called by redoManager", + Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 13), + }, []string{"namespace", "changefeed"}) +) + +// InitMetrics registers all metrics in this file +func InitMetrics(registry *prometheus.Registry) { + registry.MustRegister(RedoFsyncDurationHistogram) + registry.MustRegister(RedoTotalRowsCountGauge) + registry.MustRegister(RedoWriteBytesGauge) + registry.MustRegister(RedoFlushAllDurationHistogram) + registry.MustRegister(RedoWriteLogDurationHistogram) + registry.MustRegister(RedoFlushLogDurationHistogram) +} diff --git a/cdc/redo/manager.go b/cdc/redo/manager.go index 38229444609..80412e19c3f 100644 --- a/cdc/redo/manager.go +++ b/cdc/redo/manager.go @@ -17,7 +17,6 @@ import ( "context" "math" "path/filepath" - "sort" "sync" "sync/atomic" "time" @@ -27,13 +26,19 @@ import ( "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tiflow/cdc/contextutil" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/redo/common" "github.com/pingcap/tiflow/cdc/redo/writer" + "github.com/pingcap/tiflow/pkg/chann" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) -var updateRtsInterval = time.Second +var ( + flushIntervalInMs int64 = 2000 // 2 seconds + flushTimeout = time.Second * 20 +) // ConsistentLevelType is the level of redo log consistent level. type ConsistentLevelType string @@ -54,12 +59,6 @@ const ( consistentStorageBlackhole consistentStorage = "blackhole" ) -const ( - // supposing to replicate 10k tables, each table has one cached changce averagely. - logBufferChanSize = 10000 - logBufferTimeout = time.Minute * 10 -) - // IsValidConsistentLevel checks whether a give consistent level is valid func IsValidConsistentLevel(level string) bool { switch ConsistentLevelType(level) { @@ -99,9 +98,10 @@ type LogManager interface { // The following 6 APIs are called from processor only AddTable(tableID model.TableID, startTs uint64) RemoveTable(tableID model.TableID) + GetResolvedTs(tableID model.TableID) model.Ts GetMinResolvedTs() uint64 EmitRowChangedEvents(ctx context.Context, tableID model.TableID, rows ...*model.RowChangedEvent) error - FlushLog(ctx context.Context, tableID model.TableID, resolvedTs uint64) error + UpdateResolvedTs(ctx context.Context, tableID model.TableID, resolvedTs uint64) error FlushResolvedAndCheckpointTs(ctx context.Context, resolvedTs, checkpointTs uint64) (err error) // EmitDDLEvent are called from owner only @@ -118,33 +118,32 @@ type ManagerOptions struct { ErrCh chan<- error } -type cacheRows struct { - tableID model.TableID - rows []*model.RowChangedEvent - // When calling FlushLog for a table, we must ensure that all data of this - // table has been written to underlying writer. Since the EmitRowChangedEvents - // and FlushLog of the same table can't be executed concurrently, we can - // insert a simple barrier data into data stream to achieve this goal. - flushCallback chan struct{} +type cacheEvents struct { + tableID model.TableID + rows []*model.RowChangedEvent + resolvedTs model.Ts + eventType model.MessageType } // ManagerImpl manages redo log writer, buffers un-persistent redo logs, calculates // redo log resolved ts. It implements LogManager interface. type ManagerImpl struct { - enabled bool - level ConsistentLevelType - storageType consistentStorage + changeFeedID model.ChangeFeedID + enabled bool + level ConsistentLevelType + storageType consistentStorage - logBuffer chan cacheRows - writer writer.RedoLogWriter + rtsMap map[model.TableID]model.Ts + rtsMapMu sync.RWMutex + writer writer.RedoLogWriter + logBuffer *chann.Chann[cacheEvents] minResolvedTs uint64 - tableIDs []model.TableID - rtsMap map[model.TableID]uint64 - rtsMapMu sync.RWMutex + flushing int64 + lastFlushTime time.Time - // record whether there exists a table being flushing resolved ts - flushing int64 + metricWriteLogDuration prometheus.Observer + metricFlushLogDuration prometheus.Observer } // NewManager creates a new Manager @@ -153,16 +152,28 @@ func NewManager(ctx context.Context, cfg *config.ConsistentConfig, opts *Manager if cfg == nil || ConsistentLevelType(cfg.Level) == ConsistentLevelNone { return &ManagerImpl{enabled: false}, nil } + if cfg.FlushIntervalInMs > flushIntervalInMs { + flushIntervalInMs = cfg.FlushIntervalInMs + } + uri, err := storage.ParseRawURL(cfg.Storage) if err != nil { return nil, err } + + changeFeedID := contextutil.ChangefeedIDFromCtx(ctx) m := &ManagerImpl{ - enabled: true, - level: ConsistentLevelType(cfg.Level), - storageType: consistentStorage(uri.Scheme), - rtsMap: make(map[model.TableID]uint64), - logBuffer: make(chan cacheRows, logBufferChanSize), + changeFeedID: changeFeedID, + enabled: true, + level: ConsistentLevelType(cfg.Level), + storageType: consistentStorage(uri.Scheme), + rtsMap: make(map[model.TableID]uint64), + logBuffer: chann.New[cacheEvents](), + minResolvedTs: math.MaxInt64, + metricWriteLogDuration: common.RedoWriteLogDurationHistogram. + WithLabelValues(changeFeedID.Namespace, changeFeedID.ID), + metricFlushLogDuration: common.RedoFlushLogDurationHistogram. + WithLabelValues(changeFeedID.Namespace, changeFeedID.ID), } switch m.storageType { @@ -170,7 +181,6 @@ func NewManager(ctx context.Context, cfg *config.ConsistentConfig, opts *Manager m.writer = writer.NewBlackHoleWriter() case consistentStorageLocal, consistentStorageNFS, consistentStorageS3: globalConf := config.GetGlobalServerConfig() - changeFeedID := contextutil.ChangefeedIDFromCtx(ctx) // We use a temporary dir to storage redo logs before flushing to other backends, such as S3 var redoDir string if changeFeedID.Namespace == model.DefaultNamespace { @@ -208,8 +218,7 @@ func NewManager(ctx context.Context, cfg *config.ConsistentConfig, opts *Manager } if opts.EnableBgRunner { - go m.bgUpdateResolvedTs(ctx, opts.ErrCh) - go m.bgWriteLog(ctx, opts.ErrCh) + go m.bgUpdateLog(ctx, opts.ErrCh) } return m, nil } @@ -252,39 +261,9 @@ func (m *ManagerImpl) Enabled() bool { return m.enabled } -// TryEmitRowChangedEvents sends row changed events to a log buffer, the log buffer -// will be consumed by a background goroutine, which converts row changed events -// to redo logs and sends to log writer. Note this function is non-blocking -func (m *ManagerImpl) TryEmitRowChangedEvents( - ctx context.Context, - tableID model.TableID, - rows ...*model.RowChangedEvent, -) (bool, error) { - timer := time.NewTimer(logBufferTimeout) - defer timer.Stop() - select { - case <-ctx.Done(): - return false, ctx.Err() - case m.logBuffer <- cacheRows{ - tableID: tableID, - // Because the pipeline sink doesn't hold slice memory after calling - // EmitRowChangedEvents, we copy to a new slice to manage memory - // in redo manager itself, which is the same behavior as sink manager. - rows: append(make([]*model.RowChangedEvent, 0, len(rows)), rows...), - }: - return true, nil - default: - return false, nil - } -} - // EmitRowChangedEvents sends row changed events to a log buffer, the log buffer // will be consumed by a background goroutine, which converts row changed events -// to redo logs and sends to log writer. Note this function is non-blocking if -// the channel is not full, otherwise if the channel is always full after timeout, -// error ErrBufferLogTimeout will be returned. -// TODO: if the API is truly non-blocking, we should return an error immediately -// when the log buffer channel is full. +// to redo logs and sends to log writer. // TODO: After buffer sink in sink node is removed, there is no batch mechanism // before sending row changed events to redo manager, the original log buffer // design may have performance issue. @@ -293,50 +272,35 @@ func (m *ManagerImpl) EmitRowChangedEvents( tableID model.TableID, rows ...*model.RowChangedEvent, ) error { - timer := time.NewTimer(logBufferTimeout) - defer timer.Stop() select { case <-ctx.Done(): - return nil - case <-timer.C: - return cerror.ErrBufferLogTimeout.GenWithStackByArgs() - case m.logBuffer <- cacheRows{ - tableID: tableID, - // Because the pipeline sink doesn't hold slice memory after calling - // EmitRowChangedEvents, we copy to a new slice to manage memory - // in redo manager itself, which is the same behavior as sink manager. - rows: append(make([]*model.RowChangedEvent, 0, len(rows)), rows...), + return errors.Trace(ctx.Err()) + case m.logBuffer.In() <- cacheEvents{ + tableID: tableID, + rows: rows, + eventType: model.MessageTypeRow, }: } return nil } -// FlushLog emits resolved ts of a single table -func (m *ManagerImpl) FlushLog( +// UpdateResolvedTs asynchronously updates resolved ts of a single table. +func (m *ManagerImpl) UpdateResolvedTs( ctx context.Context, tableID model.TableID, resolvedTs uint64, ) error { - // Use flushing as a lightweight lock to reduce log contention in log writer. - if !atomic.CompareAndSwapInt64(&m.flushing, 0, 1) { - return nil - } - defer atomic.StoreInt64(&m.flushing, 0) - - // Adding a barrier to data stream, to ensure all logs of this table has been - // written to underlying writer. - flushCallbackCh := make(chan struct{}) - m.logBuffer <- cacheRows{ - tableID: tableID, - flushCallback: flushCallbackCh, - } select { case <-ctx.Done(): return errors.Trace(ctx.Err()) - case <-flushCallbackCh: + case m.logBuffer.In() <- cacheEvents{ + tableID: tableID, + resolvedTs: resolvedTs, + eventType: model.MessageTypeResolved, + }: } - return m.writer.FlushLog(ctx, tableID, resolvedTs) + return nil } // EmitDDLEvent sends DDL event to redo log writer @@ -344,8 +308,15 @@ func (m *ManagerImpl) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) err return m.writer.SendDDL(ctx, DDLToRedo(ddl)) } +// GetResolvedTs returns the resolved ts of a table +func (m *ManagerImpl) GetResolvedTs(tableID model.TableID) model.Ts { + m.rtsMapMu.Lock() + defer m.rtsMapMu.Unlock() + return m.rtsMap[tableID] +} + // GetMinResolvedTs returns the minimum resolved ts of all tables in this redo log manager -func (m *ManagerImpl) GetMinResolvedTs() uint64 { +func (m *ManagerImpl) GetMinResolvedTs() model.Ts { return atomic.LoadUint64(&m.minResolvedTs) } @@ -363,32 +334,22 @@ func (m *ManagerImpl) FlushResolvedAndCheckpointTs(ctx context.Context, resolved func (m *ManagerImpl) AddTable(tableID model.TableID, startTs uint64) { m.rtsMapMu.Lock() defer m.rtsMapMu.Unlock() - i := sort.Search(len(m.tableIDs), func(i int) bool { - return m.tableIDs[i] >= tableID - }) - if i < len(m.tableIDs) && m.tableIDs[i] == tableID { + if _, ok := m.rtsMap[tableID]; ok { log.Warn("add duplicated table in redo log manager", zap.Int64("tableID", tableID)) return } - if i == len(m.tableIDs) { - m.tableIDs = append(m.tableIDs, tableID) - } else { - m.tableIDs = append(m.tableIDs[:i+1], m.tableIDs[i:]...) - m.tableIDs[i] = tableID - } m.rtsMap[tableID] = startTs + + if startTs < m.GetMinResolvedTs() { + atomic.StoreUint64(&m.minResolvedTs, startTs) + } } // RemoveTable removes a table from redo log manager func (m *ManagerImpl) RemoveTable(tableID model.TableID) { m.rtsMapMu.Lock() defer m.rtsMapMu.Unlock() - i := sort.Search(len(m.tableIDs), func(i int) bool { - return m.tableIDs[i] >= tableID - }) - if i < len(m.tableIDs) && m.tableIDs[i] == tableID { - copy(m.tableIDs[i:], m.tableIDs[i+1:]) - m.tableIDs = m.tableIDs[:len(m.tableIDs)-1] + if _, ok := m.rtsMap[tableID]; ok { delete(m.rtsMap, tableID) } else { log.Warn("remove a table not maintained in redo log manager", zap.Int64("tableID", tableID)) @@ -397,75 +358,132 @@ func (m *ManagerImpl) RemoveTable(tableID model.TableID) { // Cleanup removes all redo logs of this manager, it is called when changefeed is removed func (m *ManagerImpl) Cleanup(ctx context.Context) error { + m.logBuffer.Close() + // We must finish consuming the data here, + // otherwise it will cause the channel to not close properly. + for range m.logBuffer.Out() { + // Do nothing. We do not care about the data. + } + common.RedoWriteLogDurationHistogram. + DeleteLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID) + common.RedoFlushLogDurationHistogram. + DeleteLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID) return m.writer.DeleteAllLogs(ctx) } -// updateTableResolvedTs reads rtsMap from redo log writer and calculate the minimum -// resolved ts of all maintaining tables. -func (m *ManagerImpl) updateTableResolvedTs(ctx context.Context) error { - m.rtsMapMu.Lock() - defer m.rtsMapMu.Unlock() - rtsMap, err := m.writer.GetCurrentResolvedTs(ctx, m.tableIDs) - if err != nil { - return err - } - minResolvedTs := uint64(math.MaxUint64) - for tableID := range m.rtsMap { - if rts, ok := rtsMap[tableID]; ok { - m.rtsMap[tableID] = rts - } - rts := m.rtsMap[tableID] - if rts < minResolvedTs { - minResolvedTs = rts +func (m *ManagerImpl) flushLog( + ctx context.Context, + tableRtsMap map[model.TableID]model.Ts, + handleErr func(err error), +) map[model.TableID]model.Ts { + if !atomic.CompareAndSwapInt64(&m.flushing, 0, 1) { + log.Debug("Fail to update flush flag, " + + "the previous flush operation hasn't finished yet") + if time.Since(m.lastFlushTime) > flushTimeout { + log.Warn("flushLog blocking too long, the redo manager may be stuck", + zap.Duration("duration", time.Since(m.lastFlushTime)), + zap.Any("changfeed", m.changeFeedID)) } + return tableRtsMap } - atomic.StoreUint64(&m.minResolvedTs, minResolvedTs) - return nil -} -func (m *ManagerImpl) bgUpdateResolvedTs(ctx context.Context, errCh chan<- error) { - ticker := time.NewTicker(updateRtsInterval) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): + m.lastFlushTime = time.Now() + go func() { + defer atomic.StoreInt64(&m.flushing, 0) + + err := m.writer.FlushLog(ctx, tableRtsMap) + if err != nil { + handleErr(err) return - case <-ticker.C: - err := m.updateTableResolvedTs(ctx) - if err != nil { - select { - case errCh <- err: - default: - log.Error("err channel is full", zap.Error(err)) + } + + m.rtsMapMu.Lock() + defer m.rtsMapMu.Unlock() + minResolvedTs := uint64(math.MaxUint64) + for tableID := range m.rtsMap { + if newRts, ok := tableRtsMap[tableID]; ok { + if newRts < m.rtsMap[tableID] { + log.Panic("resolvedTs in redoManager regressed, report a bug", + zap.Int64("tableID", tableID), + zap.Uint64("oldResolvedTs", m.rtsMap[tableID]), + zap.Uint64("currentReolvedTs", newRts)) } - return + m.rtsMap[tableID] = newRts + } + + rts := m.rtsMap[tableID] + if rts < minResolvedTs { + minResolvedTs = rts } } - } + + atomic.StoreUint64(&m.minResolvedTs, minResolvedTs) + m.metricFlushLogDuration.Observe(time.Since(m.lastFlushTime).Seconds()) + }() + + emptyRtsMap := make(map[model.TableID]model.Ts) + return emptyRtsMap } -func (m *ManagerImpl) bgWriteLog(ctx context.Context, errCh chan<- error) { +func (m *ManagerImpl) bgUpdateLog(ctx context.Context, errCh chan<- error) { + logErrCh := make(chan error, 1) + handleErr := func(err error) { + select { + case logErrCh <- err: + default: + } + } + + tableRtsMap := make(map[int64]uint64) + ticker := time.NewTicker(time.Duration(flushIntervalInMs) * time.Millisecond) + defer ticker.Stop() + for { select { case <-ctx.Done(): return - case cache := <-m.logBuffer: - if cache.flushCallback != nil { - close(cache.flushCallback) - continue + case err := <-logErrCh: + select { + case errCh <- err: + default: + log.Error("err channel in redoManager is full", zap.Error(err)) } - logs := make([]*model.RedoRowChangedEvent, 0, len(cache.rows)) - for _, row := range cache.rows { - logs = append(logs, RowToRedo(row)) + return + case <-ticker.C: + // interpolate tick message to flush writer if needed + // TODO: add log and metrics + newTableRtsMap := m.flushLog(ctx, tableRtsMap, handleErr) + tableRtsMap = newTableRtsMap + case cache, ok := <-m.logBuffer.Out(): + if !ok { + return // channel closed } - _, err := m.writer.WriteLog(ctx, cache.tableID, logs) - if err != nil { - select { - case errCh <- err: - default: - log.Error("err channel is full", zap.Error(err)) + switch cache.eventType { + case model.MessageTypeRow: + start := time.Now() + logs := make([]*model.RedoRowChangedEvent, 0, len(cache.rows)) + for _, row := range cache.rows { + logs = append(logs, RowToRedo(row)) + } + _, err := m.writer.WriteLog(ctx, cache.tableID, logs) + if err != nil { + handleErr(err) + return + } + m.metricWriteLogDuration.Observe(time.Since(start).Seconds()) + case model.MessageTypeResolved: + // handle resolved ts + if oldRts, ok := tableRtsMap[cache.tableID]; ok { + if cache.resolvedTs < oldRts { + log.Panic("resolvedTs received by redoManager regressed, report a bug", + zap.Int64("tableID", cache.tableID), + zap.Uint64("oldResolvedTs", oldRts), + zap.Uint64("currentReolvedTs", cache.resolvedTs)) + } } - return + tableRtsMap[cache.tableID] = cache.resolvedTs + default: + log.Debug("handle unknown event type") } } } diff --git a/cdc/redo/manager_test.go b/cdc/redo/manager_test.go index 4f82a1e862e..0e29166b7c9 100644 --- a/cdc/redo/manager_test.go +++ b/cdc/redo/manager_test.go @@ -15,14 +15,16 @@ package redo import ( "context" + "math" + "math/rand" "sync" - "sync/atomic" "testing" "time" + "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/config" "github.com/stretchr/testify/require" + "go.uber.org/zap" ) func TestConsistentConfig(t *testing.T) { @@ -90,9 +92,10 @@ func TestLogManagerInProcessor(t *testing.T) { logMgr, err := NewMockManager(ctx) require.Nil(t, err) + defer logMgr.Cleanup(ctx) - checkResovledTs := func(mgr LogManager, expectedRts uint64) { - time.Sleep(time.Millisecond*200 + updateRtsInterval) + checkResolvedTs := func(mgr LogManager, expectedRts uint64) { + time.Sleep(time.Duration(flushIntervalInMs+200) * time.Millisecond) resolvedTs := mgr.GetMinResolvedTs() require.Equal(t, expectedRts, resolvedTs) } @@ -141,15 +144,14 @@ func TestLogManagerInProcessor(t *testing.T) { err := logMgr.EmitRowChangedEvents(ctx, tc.tableID, tc.rows...) require.Nil(t, err) } - checkResovledTs(logMgr, uint64(130)) - // check FlushLog can move forward the resolved ts when there is not row event. + // check UpdateResolvedTs can move forward the resolved ts when there is not row event. flushResolvedTs := uint64(150) for _, tableID := range tables { - err := logMgr.FlushLog(ctx, tableID, flushResolvedTs) + err := logMgr.UpdateResolvedTs(ctx, tableID, flushResolvedTs) require.Nil(t, err) } - checkResovledTs(logMgr, flushResolvedTs) + checkResolvedTs(logMgr, flushResolvedTs) // check remove table can work normally removeTable := tables[len(tables)-1] @@ -157,72 +159,15 @@ func TestLogManagerInProcessor(t *testing.T) { logMgr.RemoveTable(removeTable) flushResolvedTs = uint64(200) for _, tableID := range tables { - err := logMgr.FlushLog(ctx, tableID, flushResolvedTs) + err := logMgr.UpdateResolvedTs(ctx, tableID, flushResolvedTs) require.Nil(t, err) } - checkResovledTs(logMgr, flushResolvedTs) + checkResolvedTs(logMgr, flushResolvedTs) err = logMgr.FlushResolvedAndCheckpointTs(ctx, 200 /*resolvedTs*/, 120 /*CheckPointTs*/) require.Nil(t, err) } -// TestUpdateResolvedTsWithDelayedTable tests redo manager doesn't move resolved -// ts forward if one or more tables resolved ts are not returned from underlying -// writer, this secenario happens when there is no data or resolved ts of this -// table sent to redo log writer yet. -func TestUpdateResolvedTsWithDelayedTable(t *testing.T) { - t.Parallel() - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - logMgr, err := NewMockManager(ctx) - require.Nil(t, err) - - var ( - table53 = int64(53) - table55 = int64(55) - table57 = int64(57) - - startTs = uint64(100) - table53Ts = uint64(125) - table55Ts = uint64(120) - table57Ts = uint64(110) - ) - tables := []model.TableID{table53, table55, table57} - for _, tableID := range tables { - logMgr.AddTable(tableID, startTs) - } - - // table 53 has new data, resolved-ts moves forward to 125 - rows := []*model.RowChangedEvent{ - {CommitTs: table53Ts, Table: &model.TableName{TableID: table53}}, - {CommitTs: table53Ts, Table: &model.TableName{TableID: table53}}, - } - err = logMgr.EmitRowChangedEvents(ctx, table53, rows...) - require.Nil(t, err) - require.Eventually(t, func() bool { - tsMap, err := logMgr.writer.GetCurrentResolvedTs(ctx, []int64{table53}) - require.Nil(t, err) - ts, ok := tsMap[table53] - return ok && ts == table53Ts - }, time.Second, time.Millisecond*10) - - // table 55 has no data, but receives resolved-ts event and moves forward to 120 - err = logMgr.FlushLog(ctx, table55, table55Ts) - require.Nil(t, err) - - // get min resolved ts should take each table into consideration - err = logMgr.updateTableResolvedTs(ctx) - require.Nil(t, err) - require.Equal(t, startTs, logMgr.GetMinResolvedTs()) - - // table 57 moves forward, update table resolved ts and check again - logMgr.FlushLog(ctx, table57, table57Ts) - err = logMgr.updateTableResolvedTs(ctx) - require.Nil(t, err) - require.Equal(t, table57Ts, logMgr.GetMinResolvedTs()) -} - // TestLogManagerInOwner tests how redo log manager is used in owner, // where the redo log manager needs to handle DDL event only. func TestLogManagerInOwner(t *testing.T) { @@ -232,6 +177,7 @@ func TestLogManagerInOwner(t *testing.T) { logMgr, err := NewMockManager(ctx) require.Nil(t, err) + defer logMgr.Cleanup(ctx) ddl := &model.DDLEvent{StartTs: 100, CommitTs: 120, Query: "CREATE TABLE `TEST.T1`"} err = logMgr.EmitDDLEvent(ctx, ddl) @@ -241,72 +187,76 @@ func TestLogManagerInOwner(t *testing.T) { require.Nil(t, err) } -// TestWriteLogFlushLogSequence tests flush log must be executed after table's -// log has been written to writer. -func TestWriteLogFlushLogSequence(t *testing.T) { - t.Parallel() - +func BenchmarkRedoManager(b *testing.B) { ctx, cancel := context.WithCancel(context.Background()) - cfg := &config.ConsistentConfig{ - Level: string(ConsistentLevelEventual), - Storage: "blackhole://", - } - errCh := make(chan error, 1) - opts := &ManagerOptions{ - EnableBgRunner: false, - ErrCh: errCh, - } - logMgr, err := NewManager(ctx, cfg, opts) - require.Nil(t, err) - - var ( - wg sync.WaitGroup + defer cancel() + runBenchTest(ctx, b) +} - tableID = int64(53) - startTs = uint64(100) - resolvedTs = uint64(150) - ) - logMgr.AddTable(tableID, startTs) +func BenchmarkRedoManagerWaitFlush(b *testing.B) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + logMgr, maxTsMap := runBenchTest(ctx, b) - wg.Add(1) - go func() { - defer wg.Done() - select { - case <-ctx.Done(): - return - case err := <-errCh: - require.Nil(t, err) + var minResolvedTs model.Ts = math.MaxUint64 + for _, tp := range maxTsMap { + if *tp < minResolvedTs { + minResolvedTs = *tp } - }() + } - wg.Add(1) - go func() { - defer wg.Done() - // FlushLog blocks until bgWriteLog consumes data and close callback chan. - err := logMgr.FlushLog(ctx, tableID, resolvedTs) - require.Nil(t, err) - }() + for t := logMgr.GetMinResolvedTs(); t != minResolvedTs; { + time.Sleep(time.Millisecond * 200) + log.Debug("", zap.Uint64("targetTs", minResolvedTs), zap.Uint64("minResolvedTs", t)) + t = logMgr.GetMinResolvedTs() + } +} - // Sleep a short time to ensure `logMgr.FlushLog` is called - time.Sleep(time.Millisecond * 100) - // FlushLog is still ongoing - require.Equal(t, int64(1), atomic.LoadInt64(&logMgr.flushing)) - err = logMgr.updateTableResolvedTs(ctx) - require.Nil(t, err) - require.Equal(t, startTs, logMgr.GetMinResolvedTs()) +func runBenchTest(ctx context.Context, b *testing.B) (LogManager, map[model.TableID]*model.Ts) { + logMgr, err := NewMockManager(ctx) + require.Nil(b, err) - wg.Add(1) - go func() { - defer wg.Done() - logMgr.bgWriteLog(ctx, errCh) - }() + // Init tables + numOfTables := 200 + tables := make([]model.TableID, 0, numOfTables) + maxTsMap := make(map[model.TableID]*model.Ts, numOfTables) + startTs := uint64(100) + for i := 0; i < numOfTables; i++ { + tableID := model.TableID(i) + tables = append(tables, tableID) + ts := startTs + maxTsMap[tableID] = &ts + logMgr.AddTable(tableID, startTs) + } - require.Eventually(t, func() bool { - err = logMgr.updateTableResolvedTs(ctx) - require.Nil(t, err) - return logMgr.GetMinResolvedTs() == resolvedTs - }, time.Second, time.Millisecond*20) + maxRowCount := 100000 + wg := sync.WaitGroup{} + b.ResetTimer() + for _, tableID := range tables { + wg.Add(1) + go func(tableID model.TableID) { + defer wg.Done() + maxCommitTs := maxTsMap[tableID] + rows := []*model.RowChangedEvent{} + for i := 0; i < maxRowCount; i++ { + if i%100 == 0 { + logMgr.UpdateResolvedTs(ctx, tableID, *maxCommitTs) + // prepare new row change events + b.StopTimer() + *maxCommitTs += rand.Uint64() % 10 + rows = []*model.RowChangedEvent{ + {CommitTs: *maxCommitTs, Table: &model.TableName{TableID: tableID}}, + {CommitTs: *maxCommitTs, Table: &model.TableName{TableID: tableID}}, + {CommitTs: *maxCommitTs, Table: &model.TableName{TableID: tableID}}, + } + + b.StartTimer() + } + logMgr.EmitRowChangedEvents(ctx, tableID, rows...) + } + }(tableID) + } - cancel() wg.Wait() + return logMgr, maxTsMap } diff --git a/cdc/redo/writer/blackhole_writer.go b/cdc/redo/writer/blackhole_writer.go index c2022be3e4f..1956e4a882d 100644 --- a/cdc/redo/writer/blackhole_writer.go +++ b/cdc/redo/writer/blackhole_writer.go @@ -56,10 +56,12 @@ func (bs *blackHoleWriter) WriteLog(_ context.Context, tableID model.TableID, lo return } -func (bs *blackHoleWriter) FlushLog(_ context.Context, tableID model.TableID, resolvedTs uint64) error { +func (bs *blackHoleWriter) FlushLog(_ context.Context, rtsMap map[model.TableID]model.Ts) error { bs.tableRtsMu.Lock() defer bs.tableRtsMu.Unlock() - bs.tableRtsMap[tableID] = resolvedTs + for tableID, rts := range rtsMap { + bs.tableRtsMap[tableID] = rts + } return nil } @@ -78,18 +80,6 @@ func (bs *blackHoleWriter) EmitCheckpointTs(_ context.Context, ts uint64) error return nil } -func (bs *blackHoleWriter) GetCurrentResolvedTs(_ context.Context, tableIDs []int64) (map[int64]uint64, error) { - bs.tableRtsMu.RLock() - defer bs.tableRtsMu.RUnlock() - rtsMap := make(map[int64]uint64, len(bs.tableRtsMap)) - for _, tableID := range tableIDs { - if rts, ok := bs.tableRtsMap[tableID]; ok { - rtsMap[tableID] = rts - } - } - return rtsMap, nil -} - func (bs *blackHoleWriter) Close() error { return nil } diff --git a/cdc/redo/writer/file.go b/cdc/redo/writer/file.go index c26cdc92741..4f68b3581a7 100644 --- a/cdc/redo/writer/file.go +++ b/cdc/redo/writer/file.go @@ -175,11 +175,11 @@ func NewWriter(ctx context.Context, cfg *FileWriterConfig, opts ...Option) (*Wri uint64buf: make([]byte, 8), storage: s3storage, - metricFsyncDuration: redoFsyncDurationHistogram. + metricFsyncDuration: common.RedoFsyncDurationHistogram. WithLabelValues(cfg.ChangeFeedID.Namespace, cfg.ChangeFeedID.ID), - metricFlushAllDuration: redoFlushAllDurationHistogram. + metricFlushAllDuration: common.RedoFlushAllDurationHistogram. WithLabelValues(cfg.ChangeFeedID.Namespace, cfg.ChangeFeedID.ID), - metricWriteBytes: redoWriteBytesGauge. + metricWriteBytes: common.RedoWriteBytesGauge. WithLabelValues(cfg.ChangeFeedID.Namespace, cfg.ChangeFeedID.ID), } if w.op.getUUIDGenerator != nil { @@ -189,40 +189,9 @@ func NewWriter(ctx context.Context, cfg *FileWriterConfig, opts ...Option) (*Wri } w.running.Store(true) - go w.runFlushToDisk(ctx, cfg.FlushIntervalInMs) - return w, nil } -func (w *Writer) runFlushToDisk(ctx context.Context, flushIntervalInMs int64) { - ticker := time.NewTicker(time.Duration(flushIntervalInMs) * time.Millisecond) - defer ticker.Stop() - - for { - if !w.IsRunning() { - return - } - - select { - case <-ctx.Done(): - err := w.Close() - if err != nil { - log.Error("runFlushToDisk close fail", - zap.String("namespace", w.cfg.ChangeFeedID.Namespace), - zap.String("changefeed", w.cfg.ChangeFeedID.ID), - zap.Error(err)) - } - case <-ticker.C: - err := w.Flush() - if err != nil { - log.Error("redo log flush fail", - zap.String("namespace", w.cfg.ChangeFeedID.Namespace), - zap.String("changefeed", w.cfg.ChangeFeedID.ID), zap.Error(err)) - } - } - } -} - // Write implement write interface // TODO: more general api with fileName generated by caller func (w *Writer) Write(rawData []byte) (int, error) { @@ -300,11 +269,11 @@ func (w *Writer) Close() error { return nil } - redoFlushAllDurationHistogram. + common.RedoFlushAllDurationHistogram. DeleteLabelValues(w.cfg.ChangeFeedID.Namespace, w.cfg.ChangeFeedID.ID) - redoFsyncDurationHistogram. + common.RedoFsyncDurationHistogram. DeleteLabelValues(w.cfg.ChangeFeedID.Namespace, w.cfg.ChangeFeedID.ID) - redoWriteBytesGauge. + common.RedoWriteBytesGauge. DeleteLabelValues(w.cfg.ChangeFeedID.Namespace, w.cfg.ChangeFeedID.ID) return w.close() diff --git a/cdc/redo/writer/file_test.go b/cdc/redo/writer/file_test.go index 3b5c77dc330..73939a056f1 100644 --- a/cdc/redo/writer/file_test.go +++ b/cdc/redo/writer/file_test.go @@ -65,11 +65,11 @@ func TestWriterWrite(t *testing.T) { }, uint64buf: make([]byte, 8), running: *atomic.NewBool(true), - metricWriteBytes: redoWriteBytesGauge. + metricWriteBytes: common.RedoWriteBytesGauge. WithLabelValues("default", "test-cf"), - metricFsyncDuration: redoFsyncDurationHistogram. + metricFsyncDuration: common.RedoFsyncDurationHistogram. WithLabelValues("default", "test-cf"), - metricFlushAllDuration: redoFlushAllDurationHistogram. + metricFlushAllDuration: common.RedoFlushAllDurationHistogram. WithLabelValues("default", "test-cf"), uuidGenerator: uuidGen, } @@ -157,11 +157,11 @@ func TestWriterWrite(t *testing.T) { }, uint64buf: make([]byte, 8), running: *atomic.NewBool(true), - metricWriteBytes: redoWriteBytesGauge. + metricWriteBytes: common.RedoWriteBytesGauge. WithLabelValues("default", "test-cf11"), - metricFsyncDuration: redoFsyncDurationHistogram. + metricFsyncDuration: common.RedoFsyncDurationHistogram. WithLabelValues("default", "test-cf11"), - metricFlushAllDuration: redoFlushAllDurationHistogram. + metricFlushAllDuration: common.RedoFlushAllDurationHistogram. WithLabelValues("default", "test-cf11"), uuidGenerator: uuidGen, } @@ -240,11 +240,11 @@ func TestWriterGC(t *testing.T) { cfg: cfg, uint64buf: make([]byte, 8), storage: mockStorage, - metricWriteBytes: redoWriteBytesGauge. + metricWriteBytes: common.RedoWriteBytesGauge. WithLabelValues(cfg.ChangeFeedID.Namespace, cfg.ChangeFeedID.ID), - metricFsyncDuration: redoFsyncDurationHistogram. + metricFsyncDuration: common.RedoFsyncDurationHistogram. WithLabelValues(cfg.ChangeFeedID.Namespace, cfg.ChangeFeedID.ID), - metricFlushAllDuration: redoFlushAllDurationHistogram. + metricFlushAllDuration: common.RedoFlushAllDurationHistogram. WithLabelValues(cfg.ChangeFeedID.Namespace, cfg.ChangeFeedID.ID), uuidGenerator: uuidGen, } @@ -344,11 +344,11 @@ func TestNewWriter(t *testing.T) { }, uint64buf: make([]byte, 8), storage: mockStorage, - metricWriteBytes: redoWriteBytesGauge. + metricWriteBytes: common.RedoWriteBytesGauge. WithLabelValues("default", "test"), - metricFsyncDuration: redoFsyncDurationHistogram. + metricFsyncDuration: common.RedoFsyncDurationHistogram. WithLabelValues("default", "test"), - metricFlushAllDuration: redoFlushAllDurationHistogram. + metricFlushAllDuration: common.RedoFlushAllDurationHistogram. WithLabelValues("default", "test"), uuidGenerator: uuidGen, } @@ -410,11 +410,11 @@ func TestRotateFile(t *testing.T) { MaxLogSize: defaultMaxLogSize, }, uint64buf: make([]byte, 8), - metricWriteBytes: redoWriteBytesGauge. + metricWriteBytes: common.RedoWriteBytesGauge. WithLabelValues("default", "test"), - metricFsyncDuration: redoFsyncDurationHistogram. + metricFsyncDuration: common.RedoFsyncDurationHistogram. WithLabelValues("default", "test"), - metricFlushAllDuration: redoFlushAllDurationHistogram. + metricFlushAllDuration: common.RedoFlushAllDurationHistogram. WithLabelValues("default", "test"), storage: mockStorage, uuidGenerator: uuidGen, diff --git a/cdc/redo/writer/metric.go b/cdc/redo/writer/metric.go deleted file mode 100644 index 7d1de1331e4..00000000000 --- a/cdc/redo/writer/metric.go +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright 2021 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package writer - -import ( - "github.com/prometheus/client_golang/prometheus" -) - -const ( - namespace = "ticdc" - subsystem = "redo" -) - -var ( - redoWriteBytesGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "write_bytes_total", - Help: "Total number of bytes redo log written", - }, []string{"namespace", "changefeed"}) - - redoFsyncDurationHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "fsync_duration_seconds", - Help: "The latency distributions of fsync called by redo writer", - Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 13), - }, []string{"namespace", "changefeed"}) - - redoFlushAllDurationHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "flushall_duration_seconds", - Help: "The latency distributions of flushall called by redo writer", - Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 13), - }, []string{"namespace", "changefeed"}) - - redoTotalRowsCountGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "total_rows_count", - Help: "The total count of rows that are processed by redo writer", - }, []string{"namespace", "changefeed"}) -) - -// InitMetrics registers all metrics in this file -func InitMetrics(registry *prometheus.Registry) { - registry.MustRegister(redoFsyncDurationHistogram) - registry.MustRegister(redoTotalRowsCountGauge) - registry.MustRegister(redoWriteBytesGauge) - registry.MustRegister(redoFlushAllDurationHistogram) -} diff --git a/cdc/redo/writer/mock_RedoLogWriter.go b/cdc/redo/writer/mock_RedoLogWriter.go index 361aa6f61ab..072d739dd41 100644 --- a/cdc/redo/writer/mock_RedoLogWriter.go +++ b/cdc/redo/writer/mock_RedoLogWriter.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Code generated by mockery v0.0.0-dev. DO NOT EDIT. +// Code generated by mockery v2.13.1. DO NOT EDIT. package writer @@ -41,6 +41,20 @@ func (_m *MockRedoLogWriter) Close() error { return r0 } +// DeleteAllLogs provides a mock function with given fields: ctx +func (_m *MockRedoLogWriter) DeleteAllLogs(ctx context.Context) error { + ret := _m.Called(ctx) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // EmitCheckpointTs provides a mock function with given fields: ctx, ts func (_m *MockRedoLogWriter) EmitCheckpointTs(ctx context.Context, ts uint64) error { ret := _m.Called(ctx, ts) @@ -69,13 +83,13 @@ func (_m *MockRedoLogWriter) EmitResolvedTs(ctx context.Context, ts uint64) erro return r0 } -// FlushLog provides a mock function with given fields: ctx, tableID, ts -func (_m *MockRedoLogWriter) FlushLog(ctx context.Context, tableID int64, ts uint64) error { - ret := _m.Called(ctx, tableID, ts) +// FlushLog provides a mock function with given fields: ctx, rtsMap +func (_m *MockRedoLogWriter) FlushLog(ctx context.Context, rtsMap map[int64]uint64) error { + ret := _m.Called(ctx, rtsMap) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, int64, uint64) error); ok { - r0 = rf(ctx, tableID, ts) + if rf, ok := ret.Get(0).(func(context.Context, map[int64]uint64) error); ok { + r0 = rf(ctx, rtsMap) } else { r0 = ret.Error(0) } @@ -83,29 +97,6 @@ func (_m *MockRedoLogWriter) FlushLog(ctx context.Context, tableID int64, ts uin return r0 } -// GetCurrentResolvedTs provides a mock function with given fields: ctx, tableIDs -func (_m *MockRedoLogWriter) GetCurrentResolvedTs(ctx context.Context, tableIDs []int64) (map[int64]uint64, error) { - ret := _m.Called(ctx, tableIDs) - - var r0 map[int64]uint64 - if rf, ok := ret.Get(0).(func(context.Context, []int64) map[int64]uint64); ok { - r0 = rf(ctx, tableIDs) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(map[int64]uint64) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(context.Context, []int64) error); ok { - r1 = rf(ctx, tableIDs) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - // SendDDL provides a mock function with given fields: ctx, ddl func (_m *MockRedoLogWriter) SendDDL(ctx context.Context, ddl *model.RedoDDLEvent) error { ret := _m.Called(ctx, ddl) @@ -140,3 +131,18 @@ func (_m *MockRedoLogWriter) WriteLog(ctx context.Context, tableID int64, rows [ return r0, r1 } + +type mockConstructorTestingTNewMockRedoLogWriter interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockRedoLogWriter creates a new instance of MockRedoLogWriter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockRedoLogWriter(t mockConstructorTestingTNewMockRedoLogWriter) *MockRedoLogWriter { + mock := &MockRedoLogWriter{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/cdc/redo/writer/mock_fileWriter.go b/cdc/redo/writer/mock_fileWriter.go index 82a34efd1a7..71492b712fd 100644 --- a/cdc/redo/writer/mock_fileWriter.go +++ b/cdc/redo/writer/mock_fileWriter.go @@ -1,17 +1,17 @@ -// Copyright 2021 PingCAP, Inc. +// Copyright 2021 PingCAP, Inc. // -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. -// Code generated by mockery v0.0.0-dev. DO NOT EDIT. +// Code generated by mockery v2.13.1. DO NOT EDIT. package writer @@ -103,3 +103,18 @@ func (_m *mockFileWriter) Write(p []byte) (int, error) { return r0, r1 } + +type mockConstructorTestingTnewMockFileWriter interface { + mock.TestingT + Cleanup(func()) +} + +// newMockFileWriter creates a new instance of mockFileWriter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func newMockFileWriter(t mockConstructorTestingTnewMockFileWriter) *mockFileWriter { + mock := &mockFileWriter{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/cdc/redo/writer/writer.go b/cdc/redo/writer/writer.go index 6b8ca59e97f..beea3133728 100644 --- a/cdc/redo/writer/writer.go +++ b/cdc/redo/writer/writer.go @@ -53,7 +53,7 @@ type RedoLogWriter interface { // FlushLog sends resolved-ts from table pipeline to log writer, it is // essential to flush when a table doesn't have any row change event for // some time, and the resolved ts of this table should be moved forward. - FlushLog(ctx context.Context, tableID int64, ts uint64) error + FlushLog(ctx context.Context, rtsMap map[model.TableID]model.Ts) error // EmitCheckpointTs write CheckpointTs to meta file EmitCheckpointTs(ctx context.Context, ts uint64) error @@ -61,9 +61,6 @@ type RedoLogWriter interface { // EmitResolvedTs write ResolvedTs to meta file EmitResolvedTs(ctx context.Context, ts uint64) error - // GetCurrentResolvedTs return all the ResolvedTs list for given tableIDs - GetCurrentResolvedTs(ctx context.Context, tableIDs []int64) (resolvedTsList map[int64]uint64, err error) - // DeleteAllLogs delete all log files related to the changefeed, called from owner only when delete changefeed DeleteAllLogs(ctx context.Context) error } @@ -191,7 +188,7 @@ func NewLogWriter( } } - logWriter.metricTotalRowsCount = redoTotalRowsCountGauge. + logWriter.metricTotalRowsCount = common.RedoTotalRowsCountGauge. WithLabelValues(cfg.ChangeFeedID.Namespace, cfg.ChangeFeedID.ID) logWriters[cfg.ChangeFeedID] = logWriter go logWriter.runGC(ctx) @@ -382,7 +379,7 @@ func (l *LogWriter) SendDDL(ctx context.Context, ddl *model.RedoDDLEvent) error } // FlushLog implement FlushLog api -func (l *LogWriter) FlushLog(ctx context.Context, tableID int64, ts uint64) error { +func (l *LogWriter) FlushLog(ctx context.Context, rtsMap map[model.TableID]model.Ts) error { select { case <-ctx.Done(): return errors.Trace(ctx.Err()) @@ -396,7 +393,9 @@ func (l *LogWriter) FlushLog(ctx context.Context, tableID int64, ts uint64) erro if err := l.flush(); err != nil { return err } - l.setMaxCommitTs(tableID, ts) + for tableID, rts := range rtsMap { + l.setMaxCommitTs(tableID, rts) + } return nil } @@ -429,38 +428,6 @@ func (l *LogWriter) EmitResolvedTs(ctx context.Context, ts uint64) error { return l.flushLogMeta(0, ts) } -// GetCurrentResolvedTs implement GetCurrentResolvedTs api -func (l *LogWriter) GetCurrentResolvedTs(ctx context.Context, tableIDs []int64) (map[int64]uint64, error) { - select { - case <-ctx.Done(): - return nil, errors.Trace(ctx.Err()) - default: - } - - if len(tableIDs) == 0 { - return nil, nil - } - - l.metaLock.RLock() - defer l.metaLock.RUnlock() - - // need to make sure all data received got saved already - err := l.rowWriter.Flush() - if err != nil { - return nil, err - } - - ret := map[int64]uint64{} - for i := 0; i < len(tableIDs); i++ { - id := tableIDs[i] - if v, ok := l.meta.ResolvedTsList[id]; ok { - ret[id] = v - } - } - - return ret, nil -} - // DeleteAllLogs implement DeleteAllLogs api func (l *LogWriter) DeleteAllLogs(ctx context.Context) error { err := l.Close() @@ -557,7 +524,7 @@ var getAllFilesInS3 = func(ctx context.Context, l *LogWriter) ([]string, error) // Close implements RedoLogWriter.Close. func (l *LogWriter) Close() error { - redoTotalRowsCountGauge. + common.RedoTotalRowsCountGauge. DeleteLabelValues(l.cfg.ChangeFeedID.Namespace, l.cfg.ChangeFeedID.ID) var err error diff --git a/cdc/redo/writer/writer_test.go b/cdc/redo/writer/writer_test.go index 59a8bca4668..d3ab96c886a 100644 --- a/cdc/redo/writer/writer_test.go +++ b/cdc/redo/writer/writer_test.go @@ -131,7 +131,7 @@ func TestLogWriterWriteLog(t *testing.T) { rowWriter: mockWriter, ddlWriter: mockWriter, meta: &common.LogMeta{ResolvedTsList: map[int64]uint64{}}, - metricTotalRowsCount: redoTotalRowsCountGauge. + metricTotalRowsCount: common.RedoTotalRowsCountGauge. WithLabelValues("default", ""), } if tt.name == "context cancel" { @@ -338,7 +338,7 @@ func TestLogWriterFlushLog(t *testing.T) { cancel() tt.args.ctx = ctx } - err := writer.FlushLog(tt.args.ctx, tt.args.tableID, tt.args.ts) + err := writer.FlushLog(tt.args.ctx, map[int64]uint64{tt.args.tableID: tt.args.ts}) if tt.wantErr != nil { require.True(t, errors.ErrorEqual(tt.wantErr, err), err.Error()+tt.wantErr.Error()) } else { @@ -526,84 +526,6 @@ func TestLogWriterEmitResolvedTs(t *testing.T) { } } -func TestLogWriterGetCurrentResolvedTs(t *testing.T) { - type arg struct { - ctx context.Context - ts map[int64]uint64 - tableIDs []int64 - } - tests := []struct { - name string - args arg - wantTs map[int64]uint64 - wantErr error - }{ - { - name: "happy", - args: arg{ - ctx: context.Background(), - ts: map[int64]uint64{1: 1, 2: 2}, - tableIDs: []int64{1, 2, 3}, - }, - wantTs: map[int64]uint64{1: 1, 2: 2}, - }, - { - name: "len(tableIDs)==0", - args: arg{ - ctx: context.Background(), - }, - }, - { - name: "context cancel", - args: arg{ - ctx: context.Background(), - }, - wantErr: context.Canceled, - }, - } - - dir := t.TempDir() - - for _, tt := range tests { - mockWriter := &mockFileWriter{} - mockWriter.On("Flush", mock.Anything).Return(nil) - mockWriter.On("IsRunning").Return(true) - cfg := &LogWriterConfig{ - Dir: dir, - ChangeFeedID: model.DefaultChangeFeedID("test-cf"), - CaptureID: "cp", - MaxLogSize: 10, - CreateTime: time.Date(2000, 1, 1, 1, 1, 1, 1, &time.Location{}), - FlushIntervalInMs: 5, - } - writer := LogWriter{ - rowWriter: mockWriter, - ddlWriter: mockWriter, - meta: &common.LogMeta{ResolvedTsList: map[int64]uint64{}}, - cfg: cfg, - } - - if tt.name == "context cancel" { - ctx, cancel := context.WithCancel(context.Background()) - cancel() - tt.args.ctx = ctx - } - for k, v := range tt.args.ts { - _ = writer.FlushLog(tt.args.ctx, k, v) - } - ret, err := writer.GetCurrentResolvedTs(tt.args.ctx, tt.args.tableIDs) - if tt.wantErr != nil { - require.True(t, errors.ErrorEqual(tt.wantErr, err), tt.name, err.Error()) - } else { - require.Nil(t, err, tt.name) - require.Equal(t, len(ret), len(tt.wantTs)) - for k, v := range tt.wantTs { - require.Equal(t, v, ret[k]) - } - } - } -} - func TestNewLogWriter(t *testing.T) { _, err := NewLogWriter(context.Background(), nil) require.NotNil(t, err) diff --git a/cdc/sink/mq/codec/avro.go b/cdc/sink/mq/codec/avro.go index c659b503794..ed8faa80e05 100644 --- a/cdc/sink/mq/codec/avro.go +++ b/cdc/sink/mq/codec/avro.go @@ -67,7 +67,7 @@ func (a *AvroEventBatchEncoder) AppendRowChangedEvent( nil, nil, e.CommitTs, - model.MqMessageTypeRow, + model.MessageTypeRow, &e.Table.Schema, &e.Table.Table, ) diff --git a/cdc/sink/mq/codec/canal.go b/cdc/sink/mq/codec/canal.go index 909601809b4..eb86a5e300c 100644 --- a/cdc/sink/mq/codec/canal.go +++ b/cdc/sink/mq/codec/canal.go @@ -486,7 +486,7 @@ func (d *CanalEventBatchEncoder) Build() []*MQMessage { if err != nil { log.Panic("Error when serializing Canal packet", zap.Error(err)) } - ret := NewMQMessage(config.ProtocolCanal, nil, value, 0, model.MqMessageTypeRow, nil, nil) + ret := NewMQMessage(config.ProtocolCanal, nil, value, 0, model.MessageTypeRow, nil, nil) ret.SetRowsCount(rowCount) d.messages.Reset() d.resetPacket() diff --git a/cdc/sink/mq/codec/canal_flat.go b/cdc/sink/mq/codec/canal_flat.go index b541f604945..e41b9d904be 100644 --- a/cdc/sink/mq/codec/canal_flat.go +++ b/cdc/sink/mq/codec/canal_flat.go @@ -79,7 +79,7 @@ type canalFlatMessageInterface interface { getData() map[string]interface{} getMySQLType() map[string]string getJavaSQLType() map[string]int32 - mqMessageType() model.MqMessageType + mqMessageType() model.MessageType eventType() canal.EventType pkNameSet() map[string]struct{} } @@ -153,16 +153,16 @@ func (c *canalFlatMessage) getJavaSQLType() map[string]int32 { return c.SQLType } -func (c *canalFlatMessage) mqMessageType() model.MqMessageType { +func (c *canalFlatMessage) mqMessageType() model.MessageType { if c.IsDDL { - return model.MqMessageTypeDDL + return model.MessageTypeDDL } if c.EventType == tidbWaterMarkType { - return model.MqMessageTypeResolved + return model.MessageTypeResolved } - return model.MqMessageTypeRow + return model.MessageTypeRow } func (c *canalFlatMessage) eventType() canal.EventType { @@ -375,7 +375,7 @@ func (c *CanalFlatEventBatchEncoder) Build() []*MQMessage { log.Panic("CanalFlatEventBatchEncoder", zap.Error(err)) return nil } - m := NewMQMessage(config.ProtocolCanalJSON, nil, value, msg.getTikvTs(), model.MqMessageTypeRow, msg.getSchema(), msg.getTable()) + m := NewMQMessage(config.ProtocolCanalJSON, nil, value, msg.getTikvTs(), model.MessageTypeRow, msg.getSchema(), msg.getTable()) m.IncRowsCount() ret[i] = m } @@ -400,9 +400,9 @@ func NewCanalFlatEventBatchDecoder(data []byte, enableTiDBExtension bool) EventB } // HasNext implements the EventBatchDecoder interface -func (b *CanalFlatEventBatchDecoder) HasNext() (model.MqMessageType, bool, error) { +func (b *CanalFlatEventBatchDecoder) HasNext() (model.MessageType, bool, error) { if len(b.data) == 0 { - return model.MqMessageTypeUnknown, false, nil + return model.MessageTypeUnknown, false, nil } var msg canalFlatMessageInterface = &canalFlatMessage{} if b.enableTiDBExtension { @@ -414,7 +414,7 @@ func (b *CanalFlatEventBatchDecoder) HasNext() (model.MqMessageType, bool, error if err := json.Unmarshal(b.data, msg); err != nil { log.Error("canal-json decoder unmarshal data failed", zap.Error(err), zap.ByteString("data", b.data)) - return model.MqMessageTypeUnknown, false, err + return model.MessageTypeUnknown, false, err } b.msg = msg b.data = nil @@ -425,7 +425,7 @@ func (b *CanalFlatEventBatchDecoder) HasNext() (model.MqMessageType, bool, error // NextRowChangedEvent implements the EventBatchDecoder interface // `HasNext` should be called before this. func (b *CanalFlatEventBatchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { - if b.msg == nil || b.msg.mqMessageType() != model.MqMessageTypeRow { + if b.msg == nil || b.msg.mqMessageType() != model.MessageTypeRow { return nil, cerrors.ErrCanalDecodeFailed. GenWithStack("not found row changed event message") } @@ -440,7 +440,7 @@ func (b *CanalFlatEventBatchDecoder) NextRowChangedEvent() (*model.RowChangedEve // NextDDLEvent implements the EventBatchDecoder interface // `HasNext` should be called before this. func (b *CanalFlatEventBatchDecoder) NextDDLEvent() (*model.DDLEvent, error) { - if b.msg == nil || b.msg.mqMessageType() != model.MqMessageTypeDDL { + if b.msg == nil || b.msg.mqMessageType() != model.MessageTypeDDL { return nil, cerrors.ErrCanalDecodeFailed. GenWithStack("not found ddl event message") } @@ -453,7 +453,7 @@ func (b *CanalFlatEventBatchDecoder) NextDDLEvent() (*model.DDLEvent, error) { // NextResolvedEvent implements the EventBatchDecoder interface // `HasNext` should be called before this. func (b *CanalFlatEventBatchDecoder) NextResolvedEvent() (uint64, error) { - if b.msg == nil || b.msg.mqMessageType() != model.MqMessageTypeResolved { + if b.msg == nil || b.msg.mqMessageType() != model.MessageTypeResolved { return 0, cerrors.ErrCanalDecodeFailed. GenWithStack("not found resolved event message") } diff --git a/cdc/sink/mq/codec/canal_flat_test.go b/cdc/sink/mq/codec/canal_flat_test.go index 024c660943a..a9b655cb96c 100644 --- a/cdc/sink/mq/codec/canal_flat_test.go +++ b/cdc/sink/mq/codec/canal_flat_test.go @@ -178,7 +178,7 @@ func TestNewCanalFlatEventBatchDecoder4RowMessage(t *testing.T) { ty, hasNext, err := decoder.HasNext() require.Nil(t, err) require.True(t, hasNext) - require.Equal(t, model.MqMessageTypeRow, ty) + require.Equal(t, model.MessageTypeRow, ty) consumed, err := decoder.NextRowChangedEvent() require.Nil(t, err) @@ -259,7 +259,7 @@ func TestNewCanalFlatEventBatchDecoder4DDLMessage(t *testing.T) { ty, hasNext, err := decoder.HasNext() require.Nil(t, err) require.True(t, hasNext) - require.Equal(t, model.MqMessageTypeDDL, ty) + require.Equal(t, model.MessageTypeDDL, ty) consumed, err := decoder.NextDDLEvent() require.Nil(t, err) @@ -276,7 +276,7 @@ func TestNewCanalFlatEventBatchDecoder4DDLMessage(t *testing.T) { ty, hasNext, err = decoder.HasNext() require.Nil(t, err) require.False(t, hasNext) - require.Equal(t, model.MqMessageTypeUnknown, ty) + require.Equal(t, model.MessageTypeUnknown, ty) consumed, err = decoder.NextDDLEvent() require.NotNil(t, err) @@ -338,19 +338,19 @@ func TestEncodeCheckpointEvent(t *testing.T) { require.Nil(t, err) if enable { require.True(t, hasNext) - require.Equal(t, model.MqMessageTypeResolved, ty) + require.Equal(t, model.MessageTypeResolved, ty) consumed, err := decoder.NextResolvedEvent() require.Nil(t, err) require.Equal(t, watermark, consumed) } else { require.False(t, hasNext) - require.Equal(t, model.MqMessageTypeUnknown, ty) + require.Equal(t, model.MessageTypeUnknown, ty) } ty, hasNext, err = decoder.HasNext() require.Nil(t, err) require.False(t, hasNext) - require.Equal(t, model.MqMessageTypeUnknown, ty) + require.Equal(t, model.MessageTypeUnknown, ty) } } diff --git a/cdc/sink/mq/codec/craft.go b/cdc/sink/mq/codec/craft.go index b29a509e1d6..46fe93c106f 100644 --- a/cdc/sink/mq/codec/craft.go +++ b/cdc/sink/mq/codec/craft.go @@ -46,7 +46,7 @@ func (e *CraftEventBatchEncoder) flush() { schema := headers.GetSchema(0) table := headers.GetTable(0) rowsCnt := e.rowChangedBuffer.RowsCount() - mqMessage := NewMQMessage(config.ProtocolCraft, nil, e.rowChangedBuffer.Encode(), ts, model.MqMessageTypeRow, &schema, &table) + mqMessage := NewMQMessage(config.ProtocolCraft, nil, e.rowChangedBuffer.Encode(), ts, model.MessageTypeRow, &schema, &table) mqMessage.SetRowsCount(rowsCnt) e.messageBuf = append(e.messageBuf, mqMessage) } @@ -123,9 +123,9 @@ type CraftEventBatchDecoder struct { } // HasNext implements the EventBatchDecoder interface -func (b *CraftEventBatchDecoder) HasNext() (model.MqMessageType, bool, error) { +func (b *CraftEventBatchDecoder) HasNext() (model.MessageType, bool, error) { if b.index >= b.headers.Count() { - return model.MqMessageTypeUnknown, false, nil + return model.MessageTypeUnknown, false, nil } return b.headers.GetType(b.index), true, nil } @@ -136,7 +136,7 @@ func (b *CraftEventBatchDecoder) NextResolvedEvent() (uint64, error) { if err != nil { return 0, errors.Trace(err) } - if !hasNext || ty != model.MqMessageTypeResolved { + if !hasNext || ty != model.MessageTypeResolved { return 0, cerror.ErrCraftCodecInvalidData.GenWithStack("not found resolved event message") } ts := b.headers.GetTs(b.index) @@ -150,7 +150,7 @@ func (b *CraftEventBatchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, if err != nil { return nil, errors.Trace(err) } - if !hasNext || ty != model.MqMessageTypeRow { + if !hasNext || ty != model.MessageTypeRow { return nil, cerror.ErrCraftCodecInvalidData.GenWithStack("not found row changed event message") } oldValue, newValue, err := b.decoder.RowChangedEvent(b.index) @@ -188,7 +188,7 @@ func (b *CraftEventBatchDecoder) NextDDLEvent() (*model.DDLEvent, error) { if err != nil { return nil, errors.Trace(err) } - if !hasNext || ty != model.MqMessageTypeDDL { + if !hasNext || ty != model.MessageTypeDDL { return nil, cerror.ErrCraftCodecInvalidData.GenWithStack("not found ddl event message") } ddlType, query, err := b.decoder.DDLEvent(b.index) diff --git a/cdc/sink/mq/codec/craft/encoder.go b/cdc/sink/mq/codec/craft/encoder.go index 69938f99a27..67853107d92 100644 --- a/cdc/sink/mq/codec/craft/encoder.go +++ b/cdc/sink/mq/codec/craft/encoder.go @@ -300,7 +300,7 @@ func (e *MessageEncoder) encodeRowChangeEvents(events []rowChangedEvent) *Messag func NewResolvedEventEncoder(allocator *SliceAllocator, ts uint64) *MessageEncoder { return NewMessageEncoder(allocator).encodeHeaders(&Headers{ ts: allocator.oneUint64Slice(ts), - ty: allocator.oneUint64Slice(uint64(model.MqMessageTypeResolved)), + ty: allocator.oneUint64Slice(uint64(model.MessageTypeResolved)), partition: oneNullInt64Slice, schema: oneNullStringSlice, table: oneNullStringSlice, @@ -321,7 +321,7 @@ func NewDDLEventEncoder(allocator *SliceAllocator, ev *model.DDLEvent) *MessageE } return NewMessageEncoder(allocator).encodeHeaders(&Headers{ ts: allocator.oneUint64Slice(ev.CommitTs), - ty: allocator.oneUint64Slice(uint64(model.MqMessageTypeDDL)), + ty: allocator.oneUint64Slice(uint64(model.MessageTypeDDL)), partition: oneNullInt64Slice, schema: allocator.oneNullableStringSlice(schema), table: allocator.oneNullableStringSlice(table), diff --git a/cdc/sink/mq/codec/craft/model.go b/cdc/sink/mq/codec/craft/model.go index 6cf393885c8..37a264ef732 100644 --- a/cdc/sink/mq/codec/craft/model.go +++ b/cdc/sink/mq/codec/craft/model.go @@ -218,8 +218,8 @@ func (h *Headers) reset() { } // GetType returns type of event at given index -func (h *Headers) GetType(index int) model.MqMessageType { - return model.MqMessageType(h.ty[index]) +func (h *Headers) GetType(index int) model.MessageType { + return model.MessageType(h.ty[index]) } // GetTs returns timestamp of event at given index @@ -471,7 +471,7 @@ func (b *RowChangedEventBuffer) AppendRowChangedEvent(ev *model.RowChangedEvent) b.estimatedSize += b.headers.appendHeader( b.allocator, ev.CommitTs, - uint64(model.MqMessageTypeRow), + uint64(model.MessageTypeRow), partition, schema, table, diff --git a/cdc/sink/mq/codec/craft_test.go b/cdc/sink/mq/codec/craft_test.go index ce1809f3ef0..10aefc2abbc 100644 --- a/cdc/sink/mq/codec/craft_test.go +++ b/cdc/sink/mq/codec/craft_test.go @@ -36,7 +36,7 @@ func testBatchCodec( if !hasNext { break } - require.Equal(t, model.MqMessageTypeRow, tp) + require.Equal(t, model.MessageTypeRow, tp) row, err := decoder.NextRowChangedEvent() require.Nil(t, err) require.Equal(t, cs[index], row) @@ -51,7 +51,7 @@ func testBatchCodec( if !hasNext { break } - require.Equal(t, model.MqMessageTypeDDL, tp) + require.Equal(t, model.MessageTypeDDL, tp) ddl, err := decoder.NextDDLEvent() require.Nil(t, err) require.Equal(t, cs[index], ddl) @@ -66,7 +66,7 @@ func testBatchCodec( if !hasNext { break } - require.Equal(t, model.MqMessageTypeResolved, tp) + require.Equal(t, model.MessageTypeResolved, tp) ts, err := decoder.NextResolvedEvent() require.Nil(t, err) require.Equal(t, cs[index], ts) @@ -171,7 +171,7 @@ func TestCraftMaxBatchSize(t *testing.T) { break } - require.Equal(t, model.MqMessageTypeRow, v) + require.Equal(t, model.MessageTypeRow, v) _, err = decoder.NextRowChangedEvent() require.Nil(t, err) count++ diff --git a/cdc/sink/mq/codec/decoder.go b/cdc/sink/mq/codec/decoder.go index 7e49769dd97..cffcd428a8f 100644 --- a/cdc/sink/mq/codec/decoder.go +++ b/cdc/sink/mq/codec/decoder.go @@ -22,7 +22,7 @@ type EventBatchDecoder interface { // 1. the type of the next event // 2. a bool if the next event is exist // 3. error - HasNext() (model.MqMessageType, bool, error) + HasNext() (model.MessageType, bool, error) // NextResolvedEvent returns the next resolved event if exists NextResolvedEvent() (uint64, error) // NextRowChangedEvent returns the next row changed event if exists diff --git a/cdc/sink/mq/codec/json.go b/cdc/sink/mq/codec/json.go index f2dcf5d1d10..c1e86bc956a 100644 --- a/cdc/sink/mq/codec/json.go +++ b/cdc/sink/mq/codec/json.go @@ -205,12 +205,12 @@ func formatColumnVal(c column) column { type mqMessageKey struct { // TODO: should we rename it to CRTs - Ts uint64 `json:"ts"` - Schema string `json:"scm,omitempty"` - Table string `json:"tbl,omitempty"` - RowID int64 `json:"rid,omitempty"` - Partition *int64 `json:"ptn,omitempty"` - Type model.MqMessageType `json:"t"` + Ts uint64 `json:"ts"` + Schema string `json:"scm,omitempty"` + Table string `json:"tbl,omitempty"` + RowID int64 `json:"rid,omitempty"` + Partition *int64 `json:"ptn,omitempty"` + Type model.MessageType `json:"t"` } func (m *mqMessageKey) Encode() ([]byte, error) { @@ -269,7 +269,7 @@ func (m *mqMessageDDL) Decode(data []byte) error { func newResolvedMessage(ts uint64) *mqMessageKey { return &mqMessageKey{ Ts: ts, - Type: model.MqMessageTypeResolved, + Type: model.MessageTypeResolved, } } @@ -284,7 +284,7 @@ func rowEventToMqMessage(e *model.RowChangedEvent) (*mqMessageKey, *mqMessageRow Table: e.Table.Table, RowID: e.RowID, Partition: partition, - Type: model.MqMessageTypeRow, + Type: model.MessageTypeRow, } value := &mqMessageRow{} if e.IsDelete() { @@ -356,7 +356,7 @@ func ddlEventtoMqMessage(e *model.DDLEvent) (*mqMessageKey, *mqMessageDDL) { Ts: e.CommitTs, Schema: e.TableInfo.Schema, Table: e.TableInfo.Table, - Type: model.MqMessageTypeDDL, + Type: model.MessageTypeDDL, } value := &mqMessageDDL{ Query: e.Query, @@ -461,7 +461,7 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent( versionHead := make([]byte, 8) binary.BigEndian.PutUint64(versionHead, BatchVersion1) - d.messageBuf = append(d.messageBuf, NewMQMessage(config.ProtocolOpen, versionHead, nil, 0, model.MqMessageTypeRow, nil, nil)) + d.messageBuf = append(d.messageBuf, NewMQMessage(config.ProtocolOpen, versionHead, nil, 0, model.MessageTypeRow, nil, nil)) d.curBatchSize = 0 } @@ -554,7 +554,7 @@ type JSONEventBatchMixedDecoder struct { } // HasNext implements the EventBatchDecoder interface -func (b *JSONEventBatchMixedDecoder) HasNext() (model.MqMessageType, bool, error) { +func (b *JSONEventBatchMixedDecoder) HasNext() (model.MessageType, bool, error) { if !b.hasNext() { return 0, false, nil } @@ -572,7 +572,7 @@ func (b *JSONEventBatchMixedDecoder) NextResolvedEvent() (uint64, error) { } } b.mixedBytes = b.mixedBytes[b.nextKeyLen+8:] - if b.nextKey.Type != model.MqMessageTypeResolved { + if b.nextKey.Type != model.MessageTypeResolved { return 0, cerror.ErrJSONCodecInvalidData.GenWithStack("not found resolved event message") } valueLen := binary.BigEndian.Uint64(b.mixedBytes[:8]) @@ -590,7 +590,7 @@ func (b *JSONEventBatchMixedDecoder) NextRowChangedEvent() (*model.RowChangedEve } } b.mixedBytes = b.mixedBytes[b.nextKeyLen+8:] - if b.nextKey.Type != model.MqMessageTypeRow { + if b.nextKey.Type != model.MessageTypeRow { return nil, cerror.ErrJSONCodecInvalidData.GenWithStack("not found row event message") } valueLen := binary.BigEndian.Uint64(b.mixedBytes[:8]) @@ -613,7 +613,7 @@ func (b *JSONEventBatchMixedDecoder) NextDDLEvent() (*model.DDLEvent, error) { } } b.mixedBytes = b.mixedBytes[b.nextKeyLen+8:] - if b.nextKey.Type != model.MqMessageTypeDDL { + if b.nextKey.Type != model.MessageTypeDDL { return nil, cerror.ErrJSONCodecInvalidData.GenWithStack("not found ddl event message") } valueLen := binary.BigEndian.Uint64(b.mixedBytes[:8]) @@ -655,7 +655,7 @@ type JSONEventBatchDecoder struct { } // HasNext implements the EventBatchDecoder interface -func (b *JSONEventBatchDecoder) HasNext() (model.MqMessageType, bool, error) { +func (b *JSONEventBatchDecoder) HasNext() (model.MessageType, bool, error) { if !b.hasNext() { return 0, false, nil } @@ -673,7 +673,7 @@ func (b *JSONEventBatchDecoder) NextResolvedEvent() (uint64, error) { } } b.keyBytes = b.keyBytes[b.nextKeyLen+8:] - if b.nextKey.Type != model.MqMessageTypeResolved { + if b.nextKey.Type != model.MessageTypeResolved { return 0, cerror.ErrJSONCodecInvalidData.GenWithStack("not found resolved event message") } valueLen := binary.BigEndian.Uint64(b.valueBytes[:8]) @@ -691,7 +691,7 @@ func (b *JSONEventBatchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, e } } b.keyBytes = b.keyBytes[b.nextKeyLen+8:] - if b.nextKey.Type != model.MqMessageTypeRow { + if b.nextKey.Type != model.MessageTypeRow { return nil, cerror.ErrJSONCodecInvalidData.GenWithStack("not found row event message") } valueLen := binary.BigEndian.Uint64(b.valueBytes[:8]) @@ -714,7 +714,7 @@ func (b *JSONEventBatchDecoder) NextDDLEvent() (*model.DDLEvent, error) { } } b.keyBytes = b.keyBytes[b.nextKeyLen+8:] - if b.nextKey.Type != model.MqMessageTypeDDL { + if b.nextKey.Type != model.MessageTypeDDL { return nil, cerror.ErrJSONCodecInvalidData.GenWithStack("not found ddl event message") } valueLen := binary.BigEndian.Uint64(b.valueBytes[:8]) diff --git a/cdc/sink/mq/codec/json_test.go b/cdc/sink/mq/codec/json_test.go index 45733fa1896..eda03317ca0 100644 --- a/cdc/sink/mq/codec/json_test.go +++ b/cdc/sink/mq/codec/json_test.go @@ -73,7 +73,7 @@ func (s *batchTester) testBatchCodec( if !hasNext { break } - require.Equal(t, model.MqMessageTypeRow, tp) + require.Equal(t, model.MessageTypeRow, tp) row, err := decoder.NextRowChangedEvent() require.Nil(t, err) sortColumnsArrays(row.Columns, row.PreColumns, cs[index].Columns, cs[index].PreColumns) @@ -89,7 +89,7 @@ func (s *batchTester) testBatchCodec( if !hasNext { break } - require.Equal(t, model.MqMessageTypeDDL, tp) + require.Equal(t, model.MessageTypeDDL, tp) ddl, err := decoder.NextDDLEvent() require.Nil(t, err) require.Equal(t, cs[index], ddl) @@ -104,7 +104,7 @@ func (s *batchTester) testBatchCodec( if !hasNext { break } - require.Equal(t, model.MqMessageTypeResolved, tp) + require.Equal(t, model.MessageTypeResolved, tp) ts, err := decoder.NextResolvedEvent() require.Nil(t, err) require.Equal(t, cs[index], ts) @@ -233,7 +233,7 @@ func TestMaxBatchSize(t *testing.T) { break } - require.Equal(t, model.MqMessageTypeRow, v) + require.Equal(t, model.MessageTypeRow, v) _, err = decoder.NextRowChangedEvent() require.Nil(t, err) count++ diff --git a/cdc/sink/mq/codec/maxwell.go b/cdc/sink/mq/codec/maxwell.go index 9de5567fbc9..c6efc18e72a 100644 --- a/cdc/sink/mq/codec/maxwell.go +++ b/cdc/sink/mq/codec/maxwell.go @@ -88,7 +88,7 @@ func rowEventToMaxwellMessage(e *model.RowChangedEvent) (*mqMessageKey, *maxwell Schema: e.Table.Schema, Table: e.Table.Table, Partition: partition, - Type: model.MqMessageTypeRow, + Type: model.MessageTypeRow, } value := &maxwellMessage{ Ts: 0, @@ -218,7 +218,7 @@ func ddlEventtoMaxwellMessage(e *model.DDLEvent) (*mqMessageKey, *DdlMaxwellMess Ts: e.CommitTs, Schema: e.TableInfo.Schema, Table: e.TableInfo.Table, - Type: model.MqMessageTypeDDL, + Type: model.MessageTypeDDL, } value := &DdlMaxwellMessage{ Ts: e.CommitTs, @@ -284,7 +284,7 @@ func (d *MaxwellEventBatchEncoder) Build() []*MQMessage { return nil } - ret := NewMQMessage(config.ProtocolMaxwell, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MqMessageTypeRow, nil, nil) + ret := NewMQMessage(config.ProtocolMaxwell, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MessageTypeRow, nil, nil) ret.SetRowsCount(d.batchSize) d.Reset() return []*MQMessage{ret} diff --git a/cdc/sink/mq/codec/message.go b/cdc/sink/mq/codec/message.go index a7033b60100..a257e6f3702 100644 --- a/cdc/sink/mq/codec/message.go +++ b/cdc/sink/mq/codec/message.go @@ -26,12 +26,12 @@ import ( type MQMessage struct { Key []byte Value []byte - Ts uint64 // reserved for possible output sorting - Schema *string // schema - Table *string // table - Type model.MqMessageType // type - Protocol config.Protocol // protocol - rowsCount int // rows in one MQ Message + Ts uint64 // reserved for possible output sorting + Schema *string // schema + Table *string // table + Type model.MessageType // type + Protocol config.Protocol // protocol + rowsCount int // rows in one MQ Message } // maximumRecordOverhead is used to calculate ProducerMessage's byteSize by sarama kafka client. @@ -72,14 +72,14 @@ func newDDLMQMessage(proto config.Protocol, key, value []byte, event *model.DDLE key, value, event.CommitTs, - model.MqMessageTypeDDL, + model.MessageTypeDDL, &event.TableInfo.Schema, &event.TableInfo.Table, ) } func newResolvedMQMessage(proto config.Protocol, key, value []byte, ts uint64) *MQMessage { - return NewMQMessage(proto, key, value, ts, model.MqMessageTypeResolved, nil, nil) + return NewMQMessage(proto, key, value, ts, model.MessageTypeResolved, nil, nil) } // NewMQMessage should be used when creating a MQMessage struct. @@ -89,7 +89,7 @@ func NewMQMessage( key []byte, value []byte, ts uint64, - ty model.MqMessageType, + ty model.MessageType, schema, table *string, ) *MQMessage { ret := &MQMessage{ diff --git a/cdc/sink/mq/codec/message_test.go b/cdc/sink/mq/codec/message_test.go index d7ef01ef54f..3b05fba738b 100644 --- a/cdc/sink/mq/codec/message_test.go +++ b/cdc/sink/mq/codec/message_test.go @@ -46,12 +46,12 @@ func TestCreate(t *testing.T) { CommitTs: 5678, } - msg := NewMQMessage(config.ProtocolOpen, []byte("key1"), []byte("value1"), rowEvent.CommitTs, model.MqMessageTypeRow, &rowEvent.Table.Schema, &rowEvent.Table.Table) + msg := NewMQMessage(config.ProtocolOpen, []byte("key1"), []byte("value1"), rowEvent.CommitTs, model.MessageTypeRow, &rowEvent.Table.Schema, &rowEvent.Table.Table) require.Equal(t, []byte("key1"), msg.Key) require.Equal(t, []byte("value1"), msg.Value) require.Equal(t, rowEvent.CommitTs, msg.Ts) - require.Equal(t, model.MqMessageTypeRow, msg.Type) + require.Equal(t, model.MessageTypeRow, msg.Type) require.Equal(t, rowEvent.Table.Schema, *msg.Schema) require.Equal(t, rowEvent.Table.Table, *msg.Table) require.Equal(t, config.ProtocolOpen, msg.Protocol) @@ -98,7 +98,7 @@ func TestCreate(t *testing.T) { require.Nil(t, msg.Key) require.Equal(t, []byte("value1"), msg.Value) require.Equal(t, ddlEvent.CommitTs, msg.Ts) - require.Equal(t, model.MqMessageTypeDDL, msg.Type) + require.Equal(t, model.MessageTypeDDL, msg.Type) require.Equal(t, ddlEvent.TableInfo.Schema, *msg.Schema) require.Equal(t, ddlEvent.TableInfo.Table, *msg.Table) require.Equal(t, config.ProtocolMaxwell, msg.Protocol) @@ -107,7 +107,7 @@ func TestCreate(t *testing.T) { require.Equal(t, []byte("key1"), msg.Key) require.Nil(t, msg.Value) require.Equal(t, uint64(1234), msg.Ts) - require.Equal(t, model.MqMessageTypeResolved, msg.Type) + require.Equal(t, model.MessageTypeResolved, msg.Type) require.Nil(t, msg.Schema) require.Nil(t, msg.Table) require.Equal(t, config.ProtocolCanal, msg.Protocol) diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index 0fd8a3e247e..b440244cb6e 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -532,7 +532,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram } switch tp { - case model.MqMessageTypeDDL: + case model.MessageTypeDDL: // for some protocol, DDL would be dispatched to all partitions, // Consider that DDL a, b, c received from partition-0, the latest DDL is c, // if we receive `a` from partition-1, which would be seemed as DDL regression, @@ -546,7 +546,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram if partition == 0 { c.appendDDL(ddl) } - case model.MqMessageTypeRow: + case model.MessageTypeRow: row, err := decoder.NextRowChangedEvent() if err != nil { log.Panic("decode message value failed", zap.ByteString("value", message.Value)) @@ -590,7 +590,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram eventGroups[tableID] = group } group.Append(row) - case model.MqMessageTypeResolved: + case model.MessageTypeResolved: ts, err := decoder.NextResolvedEvent() if err != nil { log.Panic("decode message value failed", zap.ByteString("value", message.Value)) diff --git a/metrics/grafana/ticdc.json b/metrics/grafana/ticdc.json index 1242e78276f..1292711cd64 100644 --- a/metrics/grafana/ticdc.json +++ b/metrics/grafana/ticdc.json @@ -16765,6 +16765,154 @@ "yBucketNumber": null, "yBucketSize": null }, + { + "cards": { + "cardPadding": 0, + "cardRound": 0 + }, + "color": { + "cardColor": "#b4ff00", + "colorScale": "sqrt", + "colorScheme": "interpolateSpectral", + "exponent": 0.5, + "min": 0, + "mode": "spectrum" + }, + "dataFormat": "tsbuckets", + "datasource": "${DS_TEST-CLUSTER}", + "description": "The latency distributions of writeLog called by redoManager", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 23 + }, + "heatmap": {}, + "hideZeroBuckets": true, + "highlightCards": true, + "id": 175, + "legend": { + "show": true + }, + "pluginVersion": "6.1.6", + "reverseYBuckets": false, + "targets": [ + { + "exemplar": true, + "expr": "max(rate(ticdc_redo_write_log_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])) by (le)", + "format": "heatmap", + "interval": "", + "intervalFactor": 2, + "legendFormat": "{{le}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "timeFrom": null, + "timeShift": null, + "title": "Redo write log duration", + "tooltip": { + "show": true, + "showHistogram": true + }, + "tooltipDecimals": 1, + "type": "heatmap", + "xAxis": { + "show": true + }, + "xBucketNumber": null, + "xBucketSize": null, + "yAxis": { + "decimals": 1, + "format": "s", + "logBase": 1, + "max": null, + "min": null, + "show": true, + "splitFactor": null + }, + "yBucketBound": "upper", + "yBucketNumber": null, + "yBucketSize": null + }, + { + "cards": { + "cardPadding": 0, + "cardRound": 0 + }, + "color": { + "cardColor": "#b4ff00", + "colorScale": "sqrt", + "colorScheme": "interpolateSpectral", + "exponent": 0.5, + "min": 0, + "mode": "spectrum" + }, + "dataFormat": "tsbuckets", + "datasource": "${DS_TEST-CLUSTER}", + "description": "The latency distributions of flushLog called by redoManager", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 23 + }, + "heatmap": {}, + "hideZeroBuckets": true, + "highlightCards": true, + "id": 178, + "legend": { + "show": true + }, + "pluginVersion": "6.1.6", + "reverseYBuckets": false, + "targets": [ + { + "exemplar": true, + "expr": "max(rate(ticdc_redo_flush_log_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])) by (le)", + "format": "heatmap", + "interval": "", + "intervalFactor": 2, + "legendFormat": "{{le}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "timeFrom": null, + "timeShift": null, + "title": "Redo flush log duration", + "tooltip": { + "show": true, + "showHistogram": true + }, + "tooltipDecimals": 1, + "type": "heatmap", + "xAxis": { + "show": true + }, + "xBucketNumber": null, + "xBucketSize": null, + "yAxis": { + "decimals": 1, + "format": "s", + "logBase": 1, + "max": null, + "min": null, + "show": true, + "splitFactor": null + }, + "yBucketBound": "upper", + "yBucketNumber": null, + "yBucketSize": null + }, { "aliasColors": {}, "bars": false, @@ -16782,7 +16930,7 @@ "h": 8, "w": 12, "x": 0, - "y": 16 + "y": 31 }, "hiddenSeries": false, "id": 170, @@ -16892,7 +17040,7 @@ "h": 8, "w": 12, "x": 12, - "y": 16 + "y": 31 }, "hiddenSeries": false, "id": 173, @@ -16912,7 +17060,7 @@ "alertThreshold": true }, "percentage": false, - "pluginVersion": "6.1.6", + "pluginVersion": "7.5.11", "pointradius": 2, "points": false, "renderer": "flot", @@ -18980,4 +19128,4 @@ "title": "Test-Cluster-TiCDC", "uid": "YiGL8hBZ1", "version": 40 -} +} \ No newline at end of file diff --git a/pkg/applier/redo.go b/pkg/applier/redo.go index 6ab03d08b29..1cbe8914739 100644 --- a/pkg/applier/redo.go +++ b/pkg/applier/redo.go @@ -97,6 +97,12 @@ func (ra *RedoApplier) consumeLogs(ctx context.Context) error { if err != nil { return err } + if checkpointTs == resolvedTs { + log.Info("apply redo log suncceed: checkpointTs == resolvedTs", + zap.Uint64("checkpointTs", checkpointTs), + zap.Uint64("resolvedTs", resolvedTs)) + return errApplyFinished + } err = ra.rd.ResetReader(ctx, checkpointTs, resolvedTs) if err != nil { return err diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 53cd9ce99a6..fa40460cfe4 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -52,7 +52,7 @@ const ( "consistent": { "level": "none", "max-log-size": 64, - "flush-interval": 1000, + "flush-interval": 2000, "storage": "" } }` @@ -172,7 +172,7 @@ const ( "consistent": { "level": "none", "max-log-size": 64, - "flush-interval": 1000, + "flush-interval": 2000, "storage": "" } }` @@ -209,7 +209,7 @@ const ( "consistent": { "level": "none", "max-log-size": 64, - "flush-interval": 1000, + "flush-interval": 2000, "storage": "" } }` diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 5d9100c90f9..3d8889c9d82 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -40,7 +40,7 @@ var defaultReplicaConfig = &ReplicaConfig{ Consistent: &ConsistentConfig{ Level: "none", MaxLogSize: 64, - FlushIntervalInMs: 1000, + FlushIntervalInMs: 2000, Storage: "", }, } diff --git a/tests/integration_tests/consistent_replicate_nfs/conf/workload b/tests/integration_tests/consistent_replicate_nfs/conf/workload index 6dba9b907dd..7a3fd48df6f 100644 --- a/tests/integration_tests/consistent_replicate_nfs/conf/workload +++ b/tests/integration_tests/consistent_replicate_nfs/conf/workload @@ -1,7 +1,8 @@ threadcount=10 -recordcount=200 +recordcount=5000 operationcount=0 workload=core +fieldcount=100 readallfields=true diff --git a/tests/integration_tests/consistent_replicate_nfs/run.sh b/tests/integration_tests/consistent_replicate_nfs/run.sh index 06b67b1103c..63b7bba6d0d 100644 --- a/tests/integration_tests/consistent_replicate_nfs/run.sh +++ b/tests/integration_tests/consistent_replicate_nfs/run.sh @@ -56,11 +56,11 @@ function run() { run_sql "insert into consistent_replicate_nfs.USERTABLE2 select * from consistent_replicate_nfs.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} # to ensure row changed events have been replicated to TiCDC - sleep 5 + sleep 10 nfs_download_path=$WORK_DIR/cdc_data/redo/$changefeed_id current_tso=$(cdc cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1) - ensure 20 check_resolved_ts $changefeed_id $current_tso $nfs_download_path + ensure 50 check_resolved_ts $changefeed_id $current_tso $nfs_download_path cleanup_process $CDC_BINARY export GO_FAILPOINTS='' diff --git a/tests/integration_tests/consistent_replicate_s3/conf/workload b/tests/integration_tests/consistent_replicate_s3/conf/workload index 6dba9b907dd..7a3fd48df6f 100644 --- a/tests/integration_tests/consistent_replicate_s3/conf/workload +++ b/tests/integration_tests/consistent_replicate_s3/conf/workload @@ -1,7 +1,8 @@ threadcount=10 -recordcount=200 +recordcount=5000 operationcount=0 workload=core +fieldcount=100 readallfields=true diff --git a/tests/integration_tests/consistent_replicate_s3/run.sh b/tests/integration_tests/consistent_replicate_s3/run.sh index 5f4bae08082..15d17826706 100644 --- a/tests/integration_tests/consistent_replicate_s3/run.sh +++ b/tests/integration_tests/consistent_replicate_s3/run.sh @@ -89,10 +89,10 @@ function run() { run_sql "insert into consistent_replicate_s3.USERTABLE2 select * from consistent_replicate_s3.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} # to ensure row changed events have been replicated to TiCDC - sleep 5 + sleep 10 current_tso=$(cdc cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1) - ensure 20 check_resolved_ts $changefeed_id $current_tso $WORK_DIR/redo/meta + ensure 50 check_resolved_ts $changefeed_id $current_tso $WORK_DIR/redo/meta cleanup_process $CDC_BINARY export GO_FAILPOINTS='' diff --git a/tests/integration_tests/consistent_replicate_schedulerv3/conf/workload b/tests/integration_tests/consistent_replicate_schedulerv3/conf/workload index 6dba9b907dd..7a3fd48df6f 100644 --- a/tests/integration_tests/consistent_replicate_schedulerv3/conf/workload +++ b/tests/integration_tests/consistent_replicate_schedulerv3/conf/workload @@ -1,7 +1,8 @@ threadcount=10 -recordcount=200 +recordcount=5000 operationcount=0 workload=core +fieldcount=100 readallfields=true diff --git a/tests/integration_tests/consistent_replicate_schedulerv3/run.sh b/tests/integration_tests/consistent_replicate_schedulerv3/run.sh index d32441ab13f..81ded55caf8 100644 --- a/tests/integration_tests/consistent_replicate_schedulerv3/run.sh +++ b/tests/integration_tests/consistent_replicate_schedulerv3/run.sh @@ -56,11 +56,11 @@ function run() { run_sql "insert into consistent_replicate_schedulerv3.USERTABLE2 select * from consistent_replicate_schedulerv3.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} # to ensure row changed events have been replicated to TiCDC - sleep 5 + sleep 10 nfs_download_path=$WORK_DIR/cdc_data/redo/$changefeed_id current_tso=$(cdc cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1) - ensure 20 check_resolved_ts $changefeed_id $current_tso $nfs_download_path + ensure 50 check_resolved_ts $changefeed_id $current_tso $nfs_download_path cleanup_process $CDC_BINARY export GO_FAILPOINTS=''