Skip to content

Commit

Permalink
fixed delete message send mq bugfix
Browse files Browse the repository at this point in the history
  • Loading branch information
agapple committed May 27, 2019
1 parent e808c6e commit 058e1fc
Showing 1 changed file with 12 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,34 +222,27 @@ public static Message[] messagePartition(Message message, Integer partitionsNum,
} else {
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
int hashCode = database.hashCode();
CanalEntry.EventType eventType = rowChange.getEventType();
List<CanalEntry.Column> columns = null;
if (eventType == CanalEntry.EventType.DELETE) {
columns = rowData.getBeforeColumnsList();
} else {
columns = rowData.getAfterColumnsList();
}

if (hashMode.autoPkHash) {
// isEmpty use default pkNames
for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
for (CanalEntry.Column column : columns) {
if (column.getIsKey()) {
hashCode = hashCode ^ column.getValue().hashCode();
}
}
} else {
try {
CanalEntry.EventType eventType = RowChange.parseFrom(entry.getStoreValue()).getEventType();
if(eventType == CanalEntry.EventType.DELETE){
for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
if (checkPkNamesHasContain(hashMode.pkNames, column.getName())) {
hashCode = hashCode ^ column.getValue().hashCode();
}
}
}
else {
for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
if (checkPkNamesHasContain(hashMode.pkNames, column.getName())) {
hashCode = hashCode ^ column.getValue().hashCode();
}
}
for (CanalEntry.Column column : columns) {
if (checkPkNamesHasContain(hashMode.pkNames, column.getName())) {
hashCode = hashCode ^ column.getValue().hashCode();
}
}
catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}

int pkHash = Math.abs(hashCode) % partitionsNum;
Expand Down

0 comments on commit 058e1fc

Please sign in to comment.