Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#4192
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
3AceShowHand authored and ti-chi-bot committed Jan 13, 2022
1 parent 2a587ec commit f65fa37
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 1 deletion.
1 change: 1 addition & 0 deletions cdc/sink/codec/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func (a *AvroEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
}

mqMessage.Key = evlp
mqMessage.IncRowsCount()
a.resultBuf = append(a.resultBuf, mqMessage)

return EncoderNeedAsyncWrite, nil
Expand Down
8 changes: 7 additions & 1 deletion cdc/sink/codec/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,8 @@ func (d *CanalEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage,

// Build implements the EventBatchEncoder interface
func (d *CanalEventBatchEncoder) Build() []*MQMessage {
if len(d.messages.Messages) == 0 {
rowCount := len(d.messages.Messages)
if rowCount == 0 {
return nil
}

Expand All @@ -390,7 +391,12 @@ func (d *CanalEventBatchEncoder) Build() []*MQMessage {
if err != nil {
log.Panic("Error when serializing Canal packet", zap.Error(err))
}
<<<<<<< HEAD
ret := NewMQMessage(ProtocolCanal, nil, value, 0, model.MqMessageTypeRow, nil, nil)
=======
ret := NewMQMessage(config.ProtocolCanal, nil, value, 0, model.MqMessageTypeRow, nil, nil)
ret.SetRowsCount(rowCount)
>>>>>>> fc70dbde8 (metrics(cdc): fix mq sink write row count metrics. (#4192))
d.messages.Reset()
d.resetPacket()
return []*MQMessage{ret}
Expand Down
6 changes: 6 additions & 0 deletions cdc/sink/codec/canal_flat.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,13 @@ func (c *CanalFlatEventBatchEncoder) Build() []*MQMessage {
log.Panic("CanalFlatEventBatchEncoder", zap.Error(err))
return nil
}
<<<<<<< HEAD
ret[i] = NewMQMessage(ProtocolCanalJSON, nil, value, msg.tikvTs, model.MqMessageTypeRow, &msg.Schema, &msg.Table)
=======
m := NewMQMessage(config.ProtocolCanalJSON, nil, value, msg.getTikvTs(), model.MqMessageTypeRow, msg.getSchema(), msg.getTable())
m.IncRowsCount()
ret[i] = m
>>>>>>> fc70dbde8 (metrics(cdc): fix mq sink write row count metrics. (#4192))
}
c.resolvedBuf = c.resolvedBuf[0:0]
return ret
Expand Down
2 changes: 2 additions & 0 deletions cdc/sink/codec/canal_flat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ func (s *canalFlatSuite) TestBatching(c *check.C) {
c.Assert(msgs, check.HasLen, int(resolvedTs-lastResolved))

for j := range msgs {
c.Assert(msgs[j].GetRowsCount(), check.Equals, 1)

var msg canalFlatMessage
err := json.Unmarshal(msgs[j].Value, &msg)
c.Assert(err, check.IsNil)
Expand Down
1 change: 1 addition & 0 deletions cdc/sink/codec/canal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func (s *canalBatchSuite) TestCanalEventBatchEncoder(c *check.C) {
c.Assert(res, check.HasLen, 1)
c.Assert(res[0].Key, check.IsNil)
c.Assert(len(res[0].Value), check.Equals, size)
c.Assert(res[0].GetRowsCount(), check.Equals, len(cs))

packet := &canal.Packet{}
err := proto.Unmarshal(res[0].Value, packet)
Expand Down
1 change: 1 addition & 0 deletions cdc/sink/codec/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco
if len(cs) > 0 {
res := encoder.Build()
c.Assert(res, check.HasLen, 1)
c.Assert(res[0].GetRowsCount(), check.Equals, len(cs))
decoder, err := newDecoder(res[0].Key, res[0].Value)
c.Assert(err, check.IsNil)
checkRowDecoder(decoder, cs)
Expand Down
5 changes: 5 additions & 0 deletions cdc/sink/codec/maxwell.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,12 @@ func (d *MaxwellEventBatchEncoder) Build() []*MQMessage {
return nil
}

<<<<<<< HEAD
ret := NewMQMessage(ProtocolMaxwell, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MqMessageTypeRow, nil, nil)
=======
ret := NewMQMessage(config.ProtocolMaxwell, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MqMessageTypeRow, nil, nil)
ret.SetRowsCount(d.batchSize)
>>>>>>> fc70dbde8 (metrics(cdc): fix mq sink write row count metrics. (#4192))
d.Reset()
return []*MQMessage{ret}
}
Expand Down
1 change: 1 addition & 0 deletions cdc/sink/codec/maxwell_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (s *maxwellbatchSuite) testmaxwellBatchCodec(c *check.C, newEncoder func()
continue
}
c.Assert(messages, check.HasLen, 1)
c.Assert(messages[0].GetRowsCount(), check.Equals, len(cs))
c.Assert(len(messages[0].Key)+len(messages[0].Value), check.Equals, size)
}

Expand Down

0 comments on commit f65fa37

Please sign in to comment.