Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redo(ticdc): make implementation in manager asynchronously #5683

Merged
merged 15 commits into from
Jul 2, 2022
Merged
4 changes: 2 additions & 2 deletions cdc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/pingcap/tiflow/cdc/owner"
"github.com/pingcap/tiflow/cdc/processor"
"github.com/pingcap/tiflow/cdc/puller"
redowriter "github.com/pingcap/tiflow/cdc/redo/writer"
redo "github.com/pingcap/tiflow/cdc/redo/common"
"github.com/pingcap/tiflow/cdc/scheduler"
sink "github.com/pingcap/tiflow/cdc/sink/metrics"
"github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka"
Expand Down Expand Up @@ -59,7 +59,7 @@ func init() {
memory.InitMetrics(registry)
unified.InitMetrics(registry)
leveldb.InitMetrics(registry)
redowriter.InitMetrics(registry)
redo.InitMetrics(registry)
db.InitMetrics(registry)
kafka.InitMetrics(registry)
scheduler.InitMetrics(registry)
Expand Down
20 changes: 10 additions & 10 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@ import (

//go:generate msgp

// MqMessageType is the type of message
type MqMessageType int
// MessageType is the type of message, which is used by MqSink and RedoLog.
type MessageType int

const (
// MqMessageTypeUnknown is unknown type of message key
MqMessageTypeUnknown MqMessageType = iota
// MqMessageTypeRow is row type of message key
MqMessageTypeRow
// MqMessageTypeDDL is ddl type of message key
MqMessageTypeDDL
// MqMessageTypeResolved is resolved type of message key
MqMessageTypeResolved
// MessageTypeUnknown is unknown type of message key
MessageTypeUnknown MessageType = iota
// MessageTypeRow is row type of message key
MessageTypeRow
// MessageTypeDDL is ddl type of message key
MessageTypeDDL
// MessageTypeResolved is resolved type of message key
MessageTypeResolved
)

// ColumnFlagType is for encapsulating the flag operations for different flags.
Expand Down
14 changes: 7 additions & 7 deletions cdc/model/sink_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,17 +124,20 @@ func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (er
}
}()

currentBarrierTs := atomic.LoadUint64(&n.barrierTs)
if resolved.Ts > n.targetTs {
resolved = model.NewResolvedTs(n.targetTs)
}

currentBarrierTs := atomic.LoadUint64(&n.barrierTs)
if n.redoManager != nil && n.redoManager.Enabled() {
// redo log do not support batch resolve mode, hence we
// use `ResolvedMark` to restore a normal resolved ts
resolved = model.NewResolvedTs(resolved.ResolvedMark())
err = n.redoManager.FlushLog(ctx, n.tableID, resolved.Ts)
err = n.redoManager.UpdateResolvedTs(ctx, n.tableID, resolved.Ts)

// fail fast check, the happens before relationship is:
// 1. sorter resolvedTs >= sink resolvedTs >= table redoTs == tableActor resolvedTs
// 2. tableActor resolvedTs >= processor resolvedTs >= global resolvedTs >= barrierTs
redoTs := n.redoManager.GetMinResolvedTs()
if redoTs < currentBarrierTs {
log.Debug("redoTs should not less than current barrierTs",
Expand All @@ -143,7 +146,7 @@ func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (er
zap.Uint64("barrierTs", currentBarrierTs))
}

// Fixme(CharlesCheung): remove this check after refactoring redoManager
// TODO: remove this check after SchedulerV3 become the first choice.
if resolved.Ts > redoTs {
resolved = model.NewResolvedTs(redoTs)
}
Expand Down
25 changes: 22 additions & 3 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package pipeline

import (
"context"
"math/rand"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -138,8 +139,9 @@ func TestState(t *testing.T) {

state := TableStatePrepared
// test stop at targetTs
node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager(),
&state, ctx.ChangefeedVars().ID, false)
targetTs := model.Ts(10)
node := newSinkNode(1, &mockSink{}, 0, targetTs, &mockFlowController{}, redo.NewDisabledManager(),
&state, ctx.ChangefeedVars().ID, true)
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).
ChangefeedVars().Info.Config)
require.Equal(t, TableStatePrepared, node.State())
Expand Down Expand Up @@ -177,6 +179,23 @@ func TestState(t *testing.T) {
require.True(t, ok)
require.Equal(t, TableStateReplicating, node.State())

batchResolved := model.ResolvedTs{
Mode: model.BatchResolvedMode,
Ts: targetTs,
BatchID: rand.Uint64() % 10,
}
msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: targetTs,
Resolved: &batchResolved,
RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Row: &model.RowChangedEvent{},
})
ok, err = node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, batchResolved, node.getResolvedTs())
require.Equal(t, batchResolved, node.getCheckpointTs())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 15, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Row: &model.RowChangedEvent{},
Expand All @@ -185,7 +204,7 @@ func TestState(t *testing.T) {
require.False(t, ok)
require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err))
require.Equal(t, TableStateStopped, node.State())
require.Equal(t, model.Ts(10), node.CheckpointTs())
require.Equal(t, targetTs, node.CheckpointTs())

// test the stop at ts command
state = TableStatePrepared
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/pipeline/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,9 @@ func (n *sorterNode) start(
if lastCRTs > lastSentResolvedTs && commitTs > lastCRTs {
lastSentResolvedTs = lastCRTs
lastSendResolvedTsTime = time.Now()
msg := model.NewResolvedPolymorphicEvent(0, lastSentResolvedTs)
ctx.SendToNextNode(pmessage.PolymorphicEventMessage(msg))
}
msg := model.NewResolvedPolymorphicEvent(0, lastSentResolvedTs)
ctx.SendToNextNode(pmessage.PolymorphicEventMessage(msg))
Comment on lines +171 to -173
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change related to this PR?
It seems to change the interpolate way from level trigger to edge trigger.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. We introduce level trigger change in this pr to prevent FlowControl system from deadlocking in large transaction scenarios. However, we found the timed tick event in the tableActor played the same role. Hence, we could remove this redundant level trigger.
  2. More importantly, lastSentResolvedTs has an initial value of 0, so it is possible to send a event with resolvedTs=0 to sinkNode and redoManager, which will violate defensive verification in redoManager.

}

// once receive startTs, which means sink should start replicating data to downstream.
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/table_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func (t *tableActor) ResolvedTs() model.Ts {
// another replication barrier for consistent replication instead of reusing
// the global resolved-ts.
if t.redoManager.Enabled() {
return t.sinkNode.ResolvedTs()
return t.redoManager.GetResolvedTs(t.tableID)
}
return t.sortNode.ResolvedTs()
}
Expand Down
9 changes: 6 additions & 3 deletions cdc/processor/pipeline/table_actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,15 @@ func TestTableActorInterface(t *testing.T) {
require.Equal(t, model.Ts(3), table.CheckpointTs())

require.Equal(t, model.Ts(5), table.ResolvedTs())
table.replicaConfig.Consistent.Level = string(redo.ConsistentLevelEventual)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
table.redoManager, _ = redo.NewMockManager(ctx)
table.sinkNode.resolvedTs.Store(model.NewResolvedTs(6))
require.Equal(t, model.Ts(6), table.ResolvedTs())
table.redoManager.AddTable(table.tableID, 0)
require.Equal(t, model.Ts(0), table.ResolvedTs())
table.redoManager.UpdateResolvedTs(ctx, table.tableID, model.Ts(6))
require.Eventually(t, func() bool { return table.ResolvedTs() == model.Ts(6) },
time.Second*5, time.Millisecond*500)
table.redoManager.Cleanup(ctx)

table.sinkNode.state.Store(TableStateStopped)
require.Equal(t, TableStateStopped, table.State())
Expand Down
8 changes: 4 additions & 4 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,10 @@ func (p *processor) createTablePipelineImpl(
return nil
})

if p.redoManager.Enabled() {
p.redoManager.AddTable(tableID, replicaInfo.StartTs)
}

tableName := p.getTableName(ctx, tableID)

s, err := sink.NewTableSink(p.sink, tableID, p.metricsTableSinkTotalRows)
Comment on lines +894 to 900
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there sequence restriction among p.redoManager.AddTable, sink.NewTableSink and pipeline.NewTableActor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no strict sequence restrictions. This is a defensive optimization to prevent access to related interface before adding a table.

Expand All @@ -911,10 +915,6 @@ func (p *processor) createTablePipelineImpl(
return nil, errors.Trace(err)
}

if p.redoManager.Enabled() {
p.redoManager.AddTable(tableID, replicaInfo.StartTs)
}

log.Info("Add table pipeline", zap.Int64("tableID", tableID),
cdcContext.ZapFieldChangefeed(ctx),
zap.String("name", table.Name()),
Expand Down
88 changes: 88 additions & 0 deletions cdc/redo/common/metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package common

import (
"github.com/prometheus/client_golang/prometheus"
)

const (
namespace = "ticdc"
subsystem = "redo"
)

var (
// RedoWriteBytesGauge records the total number of bytes written to redo log.
RedoWriteBytesGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "write_bytes_total",
Help: "Total number of bytes redo log written",
}, []string{"namespace", "changefeed"})

// RedoFsyncDurationHistogram records the latency distributions of fsync called by redo writer.
RedoFsyncDurationHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "fsync_duration_seconds",
Help: "The latency distributions of fsync called by redo writer",
Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 13),
}, []string{"namespace", "changefeed"})

// RedoFlushAllDurationHistogram records the latency distributions of flushAll
// called by redo writer.
RedoFlushAllDurationHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "flushall_duration_seconds",
Help: "The latency distributions of flushall called by redo writer",
Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 13),
}, []string{"namespace", "changefeed"})

// RedoTotalRowsCountGauge records the total number of rows written to redo log.
RedoTotalRowsCountGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "total_rows_count",
Help: "The total count of rows that are processed by redo writer",
}, []string{"namespace", "changefeed"})

// RedoWriteLogDurationHistogram records the latency distributions of writeLog.
RedoWriteLogDurationHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "write_log_duration_seconds",
Help: "The latency distributions of writeLog called by redoManager",
Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 13),
}, []string{"namespace", "changefeed"})

// RedoFlushLogDurationHistogram records the latency distributions of flushLog.
RedoFlushLogDurationHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "flush_log_duration_seconds",
Help: "The latency distributions of flushLog called by redoManager",
Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 13),
}, []string{"namespace", "changefeed"})
)

// InitMetrics registers all metrics in this file
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(RedoFsyncDurationHistogram)
registry.MustRegister(RedoTotalRowsCountGauge)
registry.MustRegister(RedoWriteBytesGauge)
registry.MustRegister(RedoFlushAllDurationHistogram)
registry.MustRegister(RedoWriteLogDurationHistogram)
registry.MustRegister(RedoFlushLogDurationHistogram)
Copy link
Contributor

@zhaoxinyu zhaoxinyu Jun 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two metrics RedoFlushAllDurationHistogram and RedoFlushLogDurationHistogram may be hard to differentiate for users.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compared with RedoFlushAllDurationHistogram , RedoFlushLogDurationHistogram calculated the waiting time of acquiring a lock. Maybe we could change the monitoring after we have optimized the flush logic of redo module.

}
Loading