diff --git a/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/table/DataChangeRecord.java b/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/table/DataChangeRecord.java index ecbb27ad..627e7645 100644 --- a/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/table/DataChangeRecord.java +++ b/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/table/DataChangeRecord.java @@ -17,6 +17,7 @@ package com.oceanbase.connector.flink.table; import java.io.Serializable; +import java.util.Objects; import java.util.Optional; public class DataChangeRecord implements Record { @@ -36,11 +37,15 @@ static KeyExtractor simple() { return record -> Optional.ofNullable(record.getTable().getKey()) .map( - keys -> - new DataChangeRecordData( - keys.stream() - .map(record::getFieldValue) - .toArray())) + keys -> { + Object[] array = + keys.stream().map(record::getFieldValue).toArray(); + if (array.length == 0) { + return null; + } else { + return new DataChangeRecordData(array); + } + }) .orElse(null); } } @@ -76,6 +81,21 @@ public Object getFieldValue(String fieldName) { return data.getValue(table.getFieldIndex(fieldName)); } + @Override + public boolean equals(Object object) { + if (this == object) return true; + if (object == null || getClass() != object.getClass()) return false; + DataChangeRecord record = (DataChangeRecord) object; + return Objects.equals(table, record.table) + && type == record.type + && Objects.equals(data, record.data); + } + + @Override + public int hashCode() { + return Objects.hash(table, type, data); + } + @Override public String toString() { return "DataChangeRecord{" + "table=" + table + ", type=" + type + ", data=" + data + '}';