Skip to content

Commit

Permalink
fix maxwell.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand authored and ti-chi-bot committed Jan 4, 2022
1 parent 543bd4d commit 56df902
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 0 deletions.
1 change: 1 addition & 0 deletions cdc/sink/codec/maxwell.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ func (d *MaxwellEventBatchEncoder) Build() []*MQMessage {
}

ret := NewMQMessage(config.ProtocolMaxwell, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MqMessageTypeRow, nil, nil)
ret.SetRowsCount(d.batchSize)
d.Reset()
return []*MQMessage{ret}
}
Expand Down
1 change: 1 addition & 0 deletions cdc/sink/codec/maxwell_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (s *maxwellbatchSuite) testmaxwellBatchCodec(c *check.C, newEncoder func()
continue
}
c.Assert(messages, check.HasLen, 1)
c.Assert(messages[0].GetRowsCount(), check.Equals, len(cs))
c.Assert(len(messages[0].Key)+len(messages[0].Value), check.Equals, size)
}

Expand Down

0 comments on commit 56df902

Please sign in to comment.