Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redo(ticdc): make implementation in manager asynchronously (#5683) #6199

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cdc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/pingcap/tiflow/cdc/processor"
tablepipeline "github.com/pingcap/tiflow/cdc/processor/pipeline"
"github.com/pingcap/tiflow/cdc/puller"
redowriter "github.com/pingcap/tiflow/cdc/redo/writer"
redo "github.com/pingcap/tiflow/cdc/redo/common"
sink "github.com/pingcap/tiflow/cdc/sink/metrics"
"github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka"
"github.com/pingcap/tiflow/cdc/sorter"
Expand Down Expand Up @@ -59,7 +59,7 @@ func init() {
memory.InitMetrics(registry)
unified.InitMetrics(registry)
leveldb.InitMetrics(registry)
redowriter.InitMetrics(registry)
redo.InitMetrics(registry)
db.InitMetrics(registry)
kafka.InitMetrics(registry)
// TiKV client metrics, including metrics about resolved and region cache.
Expand Down
20 changes: 10 additions & 10 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@ import (

//go:generate msgp

// MqMessageType is the type of message
type MqMessageType int
// MessageType is the type of message, which is used by MqSink and RedoLog.
type MessageType int

const (
// MqMessageTypeUnknown is unknown type of message key
MqMessageTypeUnknown MqMessageType = iota
// MqMessageTypeRow is row type of message key
MqMessageTypeRow
// MqMessageTypeDDL is ddl type of message key
MqMessageTypeDDL
// MqMessageTypeResolved is resolved type of message key
MqMessageTypeResolved
// MessageTypeUnknown is unknown type of message key
MessageTypeUnknown MessageType = iota
// MessageTypeRow is row type of message key
MessageTypeRow
// MessageTypeDDL is ddl type of message key
MessageTypeDDL
// MessageTypeResolved is resolved type of message key
MessageTypeResolved
)

// ColumnFlagType is for encapsulating the flag operations for different flags.
Expand Down
14 changes: 7 additions & 7 deletions cdc/model/sink_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 53 additions & 15 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo"
"github.com/pingcap/tiflow/cdc/sink"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -79,6 +80,7 @@ type sinkNode struct {
barrierTs model.Ts

flowController tableFlowController
redoManager redo.LogManager

replicaConfig *config.ReplicaConfig
isTableActorMode bool
Expand All @@ -92,16 +94,17 @@ func newSinkNode(
targetTs model.Ts,
flowController tableFlowController,
splitTxn bool,
redoManager redo.LogManager,
) *sinkNode {
sn := &sinkNode{
tableID: tableID,
sink: sink,
status: TableStatusInitializing,
targetTs: targetTs,
barrierTs: startTs,

tableID: tableID,
sink: sink,
status: TableStatusInitializing,
targetTs: targetTs,
barrierTs: startTs,
flowController: flowController,
splitTxn: splitTxn,
redoManager: redoManager,
}
sn.resolvedTs.Store(model.NewResolvedTs(startTs))
sn.checkpointTs.Store(model.NewResolvedTs(startTs))
Expand Down Expand Up @@ -159,17 +162,44 @@ func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (er
err = n.stop(ctx)
}
}()

if resolved.Ts > n.targetTs {
resolved = model.NewResolvedTs(n.targetTs)
}

currentBarrierTs := atomic.LoadUint64(&n.barrierTs)
currentCheckpointTs := n.getCheckpointTs()
if n.redoManager != nil && n.redoManager.Enabled() {
// redo log do not support batch resolve mode, hence we
// use `ResolvedMark` to restore a normal resolved ts
resolved = model.NewResolvedTs(resolved.ResolvedMark())
err = n.redoManager.UpdateResolvedTs(ctx, n.tableID, resolved.Ts)

// fail fast check, the happens before relationship is:
// 1. sorter resolvedTs >= sink resolvedTs >= table redoTs == tableActor resolvedTs
// 2. tableActor resolvedTs >= processor resolvedTs >= global resolvedTs >= barrierTs
redoTs := n.redoManager.GetMinResolvedTs()
if redoTs < currentBarrierTs {
log.Debug("redoTs should not less than current barrierTs",
zap.Int64("tableID", n.tableID),
zap.Uint64("redoTs", redoTs),
zap.Uint64("barrierTs", currentBarrierTs))
}

// TODO: remove this check after SchedulerV3 become the first choice.
if resolved.Ts > redoTs {
resolved = model.NewResolvedTs(redoTs)
}
}

if resolved.Ts > currentBarrierTs {
resolved = model.NewResolvedTs(currentBarrierTs)
}
if resolved.Ts > n.targetTs {
resolved = model.NewResolvedTs(n.targetTs)
}

currentCheckpointTs := n.getCheckpointTs()
if currentCheckpointTs.EqualOrGreater(resolved) {
return nil
}

checkpoint, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolved)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -200,6 +230,16 @@ func (n *sinkNode) emitRowToSink(ctx context.Context, event *model.PolymorphicEv
panic("ProcessorSyncResolvedPreEmit")
})

emitRows := func(rows ...*model.RowChangedEvent) error {
if n.redoManager != nil && n.redoManager.Enabled() {
err := n.redoManager.EmitRowChangedEvents(ctx, n.tableID, rows...)
if err != nil {
return err
}
}
return n.sink.EmitRowChangedEvents(ctx, rows...)
}

if event == nil || event.Row == nil {
log.Warn("skip emit nil event", zap.Any("event", event))
return nil
Expand All @@ -225,13 +265,13 @@ func (n *sinkNode) emitRowToSink(ctx context.Context, event *model.PolymorphicEv
return errors.Trace(err)
}
// NOTICE: Please do not change the order, the delete event always comes before the insert event.
return n.sink.EmitRowChangedEvents(ctx, deleteEvent.Row, insertEvent.Row)
return emitRows(deleteEvent.Row, insertEvent.Row)
}
// If the handle key columns are not updated, PreColumns is directly ignored.
event.Row.PreColumns = nil
}

return n.sink.EmitRowChangedEvents(ctx, event.Row)
return emitRows(event.Row)
}

// shouldSplitUpdateEvent determines if the split event is needed to align the old format based on
Expand Down Expand Up @@ -323,11 +363,9 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo
failpoint.Return(false, errors.New("processor sync resolved injected error"))
})

var resolved model.ResolvedTs
resolved := model.NewResolvedTs(event.CRTs)
if event.Resolved != nil {
resolved = *(event.Resolved)
} else {
resolved = model.NewResolvedTs(event.CRTs)
}

if err := n.flushSink(ctx, resolved); err != nil {
Expand Down
42 changes: 31 additions & 11 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ package pipeline

import (
"context"
"math/rand"
"sync"
"testing"
"time"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo"
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerrors "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -135,7 +137,8 @@ func TestStatus(t *testing.T) {
})

// test stop at targetTs
node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, false)
targetTs := model.Ts(10)
node := newSinkNode(1, &mockSink{}, 0, targetTs, &mockFlowController{}, true, redo.NewDisabledManager())
require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))
require.Equal(t, TableStatusInitializing, node.Status())

Expand Down Expand Up @@ -165,17 +168,34 @@ func TestStatus(t *testing.T) {
require.Nil(t, node.Receive(pipeline.MockNodeContext4Test(ctx, msg, nil)))
require.Equal(t, TableStatusRunning, node.Status())

batchResolved := model.ResolvedTs{
Mode: model.BatchResolvedMode,
Ts: targetTs,
BatchID: rand.Uint64() % 10,
}
msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: targetTs,
Resolved: &batchResolved,
RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Row: &model.RowChangedEvent{},
})
ok, err := node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, batchResolved, node.getResolvedTs())
require.Equal(t, batchResolved, node.getCheckpointTs())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 15, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Row: &model.RowChangedEvent{},
})
err := node.Receive(pipeline.MockNodeContext4Test(ctx, msg, nil))
err = node.Receive(pipeline.MockNodeContext4Test(ctx, msg, nil))
require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err))
require.Equal(t, TableStatusStopped, node.Status())
require.Equal(t, uint64(10), node.CheckpointTs())
require.Equal(t, targetTs, node.CheckpointTs())

// test the stop at ts command
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, false)
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, false, redo.NewDisabledManager())
require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))
require.Equal(t, TableStatusInitializing, node.Status())

Expand Down Expand Up @@ -206,7 +226,7 @@ func TestStatus(t *testing.T) {
require.Equal(t, uint64(2), node.CheckpointTs())

// test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, false)
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, false, redo.NewDisabledManager())
require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))
require.Equal(t, TableStatusInitializing, node.Status())

Expand Down Expand Up @@ -249,7 +269,7 @@ func TestStopStatus(t *testing.T) {
})

closeCh := make(chan interface{}, 1)
node := newSinkNode(1, &mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{}, false)
node := newSinkNode(1, &mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{}, false, redo.NewDisabledManager())
require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))
require.Equal(t, TableStatusInitializing, node.Status())

Expand Down Expand Up @@ -287,7 +307,7 @@ func TestManyTs(t *testing.T) {
},
})
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, false)
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, false, redo.NewDisabledManager())
require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))
require.Equal(t, TableStatusInitializing, node.Status())

Expand Down Expand Up @@ -449,7 +469,7 @@ func TestIgnoreEmptyRowChangeEvent(t *testing.T) {
},
})
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, false)
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, false, redo.NewDisabledManager())
require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))

// empty row, no Columns and PreColumns.
Expand All @@ -471,7 +491,7 @@ func TestSplitUpdateEventWhenEnableOldValue(t *testing.T) {
},
})
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, false)
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, false, redo.NewDisabledManager())
require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))

// nil row.
Expand Down Expand Up @@ -529,7 +549,7 @@ func TestSplitUpdateEventWhenDisableOldValue(t *testing.T) {
},
})
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, false)
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, false, redo.NewDisabledManager())
require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))

// nil row.
Expand Down Expand Up @@ -674,7 +694,7 @@ func TestFlushSinkReleaseFlowController(t *testing.T) {
flowController := &flushFlowController{}
sink := &flushSink{}
// sNode is a sinkNode
sNode := newSinkNode(1, sink, 0, 10, flowController, false)
sNode := newSinkNode(1, sink, 0, 10, flowController, false, redo.NewDisabledManager())
require.Nil(t, sNode.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))
sNode.barrierTs = 10

Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/pipeline/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,9 @@ func (n *sorterNode) start(
if lastCRTs > lastSentResolvedTs && commitTs > lastCRTs {
lastSentResolvedTs = lastCRTs
lastSendResolvedTsTime = time.Now()
msg := model.NewResolvedPolymorphicEvent(0, lastSentResolvedTs)
ctx.SendToNextNode(pmessage.PolymorphicEventMessage(msg))
}
msg := model.NewResolvedPolymorphicEvent(0, lastSentResolvedTs)
ctx.SendToNextNode(pmessage.PolymorphicEventMessage(msg))
}

for {
Expand Down
Loading