Skip to content

Commit

Permalink
fix canal json sink write row count metrics.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Jan 3, 2022
1 parent 60ba6a6 commit 519690b
Showing 1 changed file with 3 additions and 1 deletion.
4 changes: 3 additions & 1 deletion cdc/sink/codec/canal_flat.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,9 @@ func (c *CanalFlatEventBatchEncoder) Build() []*MQMessage {
log.Panic("CanalFlatEventBatchEncoder", zap.Error(err))
return nil
}
ret[i] = NewMQMessage(config.ProtocolCanalJSON, nil, value, msg.getTikvTs(), model.MqMessageTypeRow, msg.getSchema(), msg.getTable())
m := NewMQMessage(config.ProtocolCanalJSON, nil, value, msg.getTikvTs(), model.MqMessageTypeRow, msg.getSchema(), msg.getTable())
m.IncRowsCount()
ret[i] = m
}
c.resolvedBuf = c.resolvedBuf[0:0]
return ret
Expand Down

0 comments on commit 519690b

Please sign in to comment.