Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#5683
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
CharlesCheung96 authored and ti-chi-bot committed Jul 6, 2022
1 parent 41ab14c commit 34e7a0e
Show file tree
Hide file tree
Showing 45 changed files with 954 additions and 548 deletions.
7 changes: 6 additions & 1 deletion cdc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ import (
"github.com/pingcap/tiflow/cdc/processor"
tablepipeline "github.com/pingcap/tiflow/cdc/processor/pipeline"
"github.com/pingcap/tiflow/cdc/puller"
<<<<<<< HEAD
redowriter "github.com/pingcap/tiflow/cdc/redo/writer"
=======
redo "github.com/pingcap/tiflow/cdc/redo/common"
"github.com/pingcap/tiflow/cdc/scheduler"
>>>>>>> f952bf02f (redo(ticdc): make implementation in manager asynchronously (#5683))
sink "github.com/pingcap/tiflow/cdc/sink/metrics"
"github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka"
"github.com/pingcap/tiflow/cdc/sorter"
Expand Down Expand Up @@ -59,7 +64,7 @@ func init() {
memory.InitMetrics(registry)
unified.InitMetrics(registry)
leveldb.InitMetrics(registry)
redowriter.InitMetrics(registry)
redo.InitMetrics(registry)
db.InitMetrics(registry)
kafka.InitMetrics(registry)
// TiKV client metrics, including metrics about resolved and region cache.
Expand Down
20 changes: 10 additions & 10 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@ import (

//go:generate msgp

// MqMessageType is the type of message
type MqMessageType int
// MessageType is the type of message, which is used by MqSink and RedoLog.
type MessageType int

const (
// MqMessageTypeUnknown is unknown type of message key
MqMessageTypeUnknown MqMessageType = iota
// MqMessageTypeRow is row type of message key
MqMessageTypeRow
// MqMessageTypeDDL is ddl type of message key
MqMessageTypeDDL
// MqMessageTypeResolved is resolved type of message key
MqMessageTypeResolved
// MessageTypeUnknown is unknown type of message key
MessageTypeUnknown MessageType = iota
// MessageTypeRow is row type of message key
MessageTypeRow
// MessageTypeDDL is ddl type of message key
MessageTypeDDL
// MessageTypeResolved is resolved type of message key
MessageTypeResolved
)

// ColumnFlagType is for encapsulating the flag operations for different flags.
Expand Down
14 changes: 7 additions & 7 deletions cdc/model/sink_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,40 @@ func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (er
err = n.stop(ctx)
}
}()
<<<<<<< HEAD
currentBarrierTs := atomic.LoadUint64(&n.barrierTs)
currentCheckpointTs := atomic.LoadUint64(&n.checkpointTs)
=======

if resolved.Ts > n.targetTs {
resolved = model.NewResolvedTs(n.targetTs)
}

currentBarrierTs := atomic.LoadUint64(&n.barrierTs)
if n.redoManager != nil && n.redoManager.Enabled() {
// redo log do not support batch resolve mode, hence we
// use `ResolvedMark` to restore a normal resolved ts
resolved = model.NewResolvedTs(resolved.ResolvedMark())
err = n.redoManager.UpdateResolvedTs(ctx, n.tableID, resolved.Ts)

// fail fast check, the happens before relationship is:
// 1. sorter resolvedTs >= sink resolvedTs >= table redoTs == tableActor resolvedTs
// 2. tableActor resolvedTs >= processor resolvedTs >= global resolvedTs >= barrierTs
redoTs := n.redoManager.GetMinResolvedTs()
if redoTs < currentBarrierTs {
log.Debug("redoTs should not less than current barrierTs",
zap.Int64("tableID", n.tableID),
zap.Uint64("redoTs", redoTs),
zap.Uint64("barrierTs", currentBarrierTs))
}

// TODO: remove this check after SchedulerV3 become the first choice.
if resolved.Ts > redoTs {
resolved = model.NewResolvedTs(redoTs)
}
}

>>>>>>> f952bf02f (redo(ticdc): make implementation in manager asynchronously (#5683))
if resolved.Ts > currentBarrierTs {
resolved.Ts = currentBarrierTs
}
Expand Down
32 changes: 32 additions & 0 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package pipeline

import (
"context"
"math/rand"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -135,9 +136,18 @@ func TestStatus(t *testing.T) {
})

// test stop at targetTs
<<<<<<< HEAD
node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))
require.Equal(t, TableStatusInitializing, node.Status())
=======
targetTs := model.Ts(10)
node := newSinkNode(1, &mockSink{}, 0, targetTs, &mockFlowController{}, redo.NewDisabledManager(),
&state, ctx.ChangefeedVars().ID, true)
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).
ChangefeedVars().Info.Config)
require.Equal(t, TableStatePrepared, node.State())
>>>>>>> f952bf02f (redo(ticdc): make implementation in manager asynchronously (#5683))

require.Nil(t, node.Receive(
pipeline.MockNodeContext4Test(ctx, pmessage.BarrierMessage(20), nil)))
Expand Down Expand Up @@ -165,14 +175,36 @@ func TestStatus(t *testing.T) {
require.Nil(t, node.Receive(pipeline.MockNodeContext4Test(ctx, msg, nil)))
require.Equal(t, TableStatusRunning, node.Status())

batchResolved := model.ResolvedTs{
Mode: model.BatchResolvedMode,
Ts: targetTs,
BatchID: rand.Uint64() % 10,
}
msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: targetTs,
Resolved: &batchResolved,
RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Row: &model.RowChangedEvent{},
})
ok, err = node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, batchResolved, node.getResolvedTs())
require.Equal(t, batchResolved, node.getCheckpointTs())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 15, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Row: &model.RowChangedEvent{},
})
err := node.Receive(pipeline.MockNodeContext4Test(ctx, msg, nil))
require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err))
<<<<<<< HEAD
require.Equal(t, TableStatusStopped, node.Status())
require.Equal(t, uint64(10), node.CheckpointTs())
=======
require.Equal(t, TableStateStopped, node.State())
require.Equal(t, targetTs, node.CheckpointTs())
>>>>>>> f952bf02f (redo(ticdc): make implementation in manager asynchronously (#5683))

// test the stop at ts command
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
Expand Down
33 changes: 33 additions & 0 deletions cdc/processor/pipeline/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,43 @@ func (n *sorterNode) start(
lastSendResolvedTsTime := time.Now() // the time at which we last sent a resolved-ts.
lastCRTs := uint64(0) // the commit-ts of the last row changed we sent.

<<<<<<< HEAD
metricsTableMemoryHistogram := tableMemoryHistogram.
WithLabelValues(ctx.ChangefeedVars().ID.Namespace, ctx.ChangefeedVars().ID.ID)
metricsTicker := time.NewTicker(flushMemoryMetricsDuration)
defer metricsTicker.Stop()
=======
resolvedTsInterpolateFunc := func(commitTs uint64) {
// checks the condition: cur_event_commit_ts > prev_event_commit_ts > last_resolved_ts
// If this is true, it implies that (1) the last transaction has finished, and we are
// processing the first event in a new transaction, (2) a resolved-ts is safe to be
// sent, but it has not yet. This means that we can interpolate prev_event_commit_ts
// as a resolved-ts, improving the frequency at which the sink flushes.
if lastCRTs > lastSentResolvedTs && commitTs > lastCRTs {
lastSentResolvedTs = lastCRTs
lastSendResolvedTsTime = time.Now()
msg := model.NewResolvedPolymorphicEvent(0, lastSentResolvedTs)
ctx.SendToNextNode(pmessage.PolymorphicEventMessage(msg))
}
}

// once receive startTs, which means sink should start replicating data to downstream.
var startTs model.Ts
select {
case <-stdCtx.Done():
return nil
case startTs = <-n.startTsCh:
}

select {
case <-stdCtx.Done():
return nil
case <-n.preparedCh:
}

n.state.Store(TableStateReplicating)
eventSorter.EmitStartTs(stdCtx, startTs)
>>>>>>> f952bf02f (redo(ticdc): make implementation in manager asynchronously (#5683))

for {
// We must call `sorter.Output` before receiving resolved events.
Expand Down
5 changes: 5 additions & 0 deletions cdc/processor/pipeline/table_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,8 +430,13 @@ func (t *tableActor) ResolvedTs() model.Ts {
// will be able to cooperate replication status directly. Then we will add
// another replication barrier for consistent replication instead of reusing
// the global resolved-ts.
<<<<<<< HEAD
if redo.IsConsistentEnabled(t.replicaConfig.Consistent.Level) {
return t.sinkNode.ResolvedTs().Ts
=======
if t.redoManager.Enabled() {
return t.redoManager.GetResolvedTs(t.tableID)
>>>>>>> f952bf02f (redo(ticdc): make implementation in manager asynchronously (#5683))
}
return t.sortNode.ResolvedTs()
}
Expand Down
27 changes: 27 additions & 0 deletions cdc/processor/pipeline/table_actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func TestTableActorInterface(t *testing.T) {
tableID, markID := tbl.ID()
require.Equal(t, int64(1), tableID)
require.Equal(t, int64(2), markID)
<<<<<<< HEAD
require.Equal(t, "t1", tbl.Name())
require.Equal(t, TableStatusInitializing, tbl.Status())
sink.status.Store(TableStatusStopped)
Expand All @@ -93,6 +94,32 @@ func TestTableActorInterface(t *testing.T) {
tbl.replicaConfig.Consistent.Level = string(redo.ConsistentLevelEventual)
sink.resolvedTs.Store(model.NewResolvedTs(6))
require.Equal(t, model.Ts(6), tbl.ResolvedTs())
=======
require.Equal(t, "t1", table.Name())
require.Equal(t, TableStatePreparing, table.State())

table.sortNode.state.Store(TableStatePrepared)
require.Equal(t, TableStatePrepared, table.State())

require.Equal(t, uint64(1), table.Workload().Workload)

table.sinkNode.checkpointTs.Store(model.NewResolvedTs(3))
require.Equal(t, model.Ts(3), table.CheckpointTs())

require.Equal(t, model.Ts(5), table.ResolvedTs())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
table.redoManager, _ = redo.NewMockManager(ctx)
table.redoManager.AddTable(table.tableID, 0)
require.Equal(t, model.Ts(0), table.ResolvedTs())
table.redoManager.UpdateResolvedTs(ctx, table.tableID, model.Ts(6))
require.Eventually(t, func() bool { return table.ResolvedTs() == model.Ts(6) },
time.Second*5, time.Millisecond*500)
table.redoManager.Cleanup(ctx)

table.sinkNode.state.Store(TableStateStopped)
require.Equal(t, TableStateStopped, table.State())
>>>>>>> f952bf02f (redo(ticdc): make implementation in manager asynchronously (#5683))
}

func TestTableActorCancel(t *testing.T) {
Expand Down
14 changes: 10 additions & 4 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,17 @@ func (p *processor) createTablePipelineImpl(
return nil
})

<<<<<<< HEAD
tableName, err := p.getTableName(ctx, tableID, replicaInfo)
=======
if p.redoManager.Enabled() {
p.redoManager.AddTable(tableID, replicaInfo.StartTs)
}

tableName := p.getTableName(ctx, tableID)

s, err := sink.NewTableSink(p.sink, tableID, p.metricsTableSinkTotalRows)
>>>>>>> f952bf02f (redo(ticdc): make implementation in manager asynchronously (#5683))
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -830,10 +840,6 @@ func (p *processor) createTablePipelineImpl(
)
}

if p.redoManager.Enabled() {
p.redoManager.AddTable(tableID, replicaInfo.StartTs)
}

log.Info("Add table pipeline", zap.Int64("tableID", tableID),
cdcContext.ZapFieldChangefeed(ctx),
zap.String("name", table.Name()),
Expand Down
Loading

0 comments on commit 34e7a0e

Please sign in to comment.