Skip to content

Commit

Permalink
add some log
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Jun 21, 2022
1 parent 86d0299 commit d07c5a6
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 4 deletions.
19 changes: 19 additions & 0 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type sinkNode struct {
checkpointTs atomic.Value
targetTs model.Ts
barrierTs model.Ts
startTs model.Ts

flowController tableFlowController
redoManager redo.LogManager
Expand All @@ -91,6 +92,7 @@ func newSinkNode(
status: TableStatusInitializing,
targetTs: targetTs,
barrierTs: startTs,
startTs: startTs,
flowController: flowController,
redoManager: redoManager,
}
Expand Down Expand Up @@ -144,6 +146,10 @@ func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (er
}
}()

log.Warn("[sinkNode]flush sink",
zap.Int64("tableID", n.tableID),
zap.Any("resolvedTs", resolved))

// flush redo log
currentBarrierTs := atomic.LoadUint64(&n.barrierTs)
if resolved.Ts > n.targetTs {
Expand Down Expand Up @@ -330,9 +336,22 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo

resolved := model.NewResolvedTs(event.CRTs)
if event.Resolved != nil {
if event.Resolved.Ts != event.CRTs {
log.Panic("resolved ts should equal to crts",
zap.Any("resolved", event.Resolved),
zap.Uint64("crts", event.CRTs),
zap.Uint64("startTs", n.startTs))
}
resolved = *(event.Resolved)
}

if resolved.Ts < n.ResolvedTs() {
log.Panic("resolved ts regressed in sinkNode",
zap.Any("resolved", resolved),
zap.Any("old", n.getResolvedTs()),
zap.Uint64("startTs", n.startTs))
}

if err := n.flushSink(ctx, resolved); err != nil {
return false, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (n *sorterNode) start(
return nil
})
n.eg.Go(func() error {
lastSentResolvedTs := uint64(0)
lastSentResolvedTs := n.resolvedTs // startTs
lastSendResolvedTsTime := time.Now() // the time at which we last sent a resolved-ts.
lastCRTs := uint64(0) // the commit-ts of the last row changed we sent.

Expand Down
16 changes: 15 additions & 1 deletion cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ type ManagerImpl struct {

rtsMap map[model.TableID]model.Ts
rtsMapMu sync.RWMutex

rowCount uint64
}

// NewManager creates a new Manager
Expand Down Expand Up @@ -261,6 +263,11 @@ func (m *ManagerImpl) EmitRowChangedEvents(
tableID model.TableID,
rows ...*model.RowChangedEvent,
) error {
c := atomic.AddUint64(&m.rowCount, uint64(len(rows)))
log.Error("[redo]EmitRowChangedEvents",
zap.Int64("tableID", int64(tableID)),
zap.Any("rows", rows),
zap.Uint64("rowCount", c))
timer := time.NewTimer(logBufferTimeout)
defer timer.Stop()
select {
Expand All @@ -283,6 +290,9 @@ func (m *ManagerImpl) FlushLog(
tableID model.TableID,
resolvedTs uint64,
) error {
log.Error("[redo]FlushLog",
zap.Int64("tableID", int64(tableID)),
zap.Uint64("resolvedTs", resolvedTs))
timer := time.NewTimer(logBufferTimeout)
defer timer.Stop()
select {
Expand All @@ -309,12 +319,15 @@ func (m *ManagerImpl) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) err
func (m *ManagerImpl) GetResolvedTs(tableID model.TableID) model.Ts {
m.rtsMapMu.Lock()
defer m.rtsMapMu.Unlock()
log.Error("[redo] GetResolvedTs",
zap.Int64("TableID", tableID),
zap.Uint64("ResolvedTs", m.rtsMap[tableID]))
return m.rtsMap[tableID]
}

// GetMinResolvedTs returns the minimum resolved ts of all tables in this redo log manager
func (m *ManagerImpl) GetMinResolvedTs() model.Ts {
log.Error("", zap.Uint64("minResolvedTs", atomic.LoadUint64(&m.minResolvedTs)))
log.Error("[redo] GetMinResolvedTs", zap.Uint64("minResolvedTs", atomic.LoadUint64(&m.minResolvedTs)))
return atomic.LoadUint64(&m.minResolvedTs)
}

Expand Down Expand Up @@ -364,6 +377,7 @@ func (m *ManagerImpl) Cleanup(ctx context.Context) error {
func (m *ManagerImpl) flushLog(
ctx context.Context, tableRtsMap map[model.TableID]model.Ts,
) (map[model.TableID]model.Ts, error) {
log.Error("flushing ......", zap.Any("tableRtsMap", tableRtsMap))
emptyRtsMap := make(map[model.TableID]model.Ts)
err := m.writer.FlushLog(ctx, tableRtsMap)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/consistent_replicate_nfs/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ function run() {
run_sql "insert into consistent_replicate_nfs.USERTABLE2 select * from consistent_replicate_nfs.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT}

# to ensure row changed events have been replicated to TiCDC
sleep 5
sleep 30

nfs_download_path=$WORK_DIR/cdc_data/redo/$changefeed_id
current_tso=$(cdc cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1)
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/consistent_replicate_s3/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export S3_ENDPOINT=127.0.0.1:24927
rm -rf "$WORK_DIR"
mkdir -p "$WORK_DIR"
pkill -9 minio || true
bin/minio server --address $S3_ENDPOINT "$WORK_DIR/s3" &
bin/minio server --address $S3_ENDPOINT "/root/tmp/s3" &
MINIO_PID=$!
i=0
while ! curl -o /dev/null -v -s "http://$S3_ENDPOINT/"; do
Expand Down
Binary file added tests/utils/cdc_state_checker/cdc_state_checker
Binary file not shown.

0 comments on commit d07c5a6

Please sign in to comment.