diff --git a/lib/cdc/mongo/debezium.go b/lib/cdc/mongo/debezium.go index 72850d766..c5f07b451 100644 --- a/lib/cdc/mongo/debezium.go +++ b/lib/cdc/mongo/debezium.go @@ -3,6 +3,7 @@ package mongo import ( "encoding/json" "fmt" + "log/slog" "reflect" "time" @@ -175,8 +176,16 @@ func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc kafkalib.TopicConf retMap = s.Payload.afterMap // We need this because there's an edge case with Debezium // Where _id gets rewritten as id in the partition key. - for k, v := range pkMap { - retMap[k] = v + for key, value := range pkMap { + retData, isOk := retMap[key] + if !isOk { + slog.Warn("key not found in retMap", slog.String("key", key), slog.Any("retData", retData)) + } else if retData != value { + slog.Warn("value mismatch", slog.String("key", key), slog.Any("value", value), slog.Any("retData", retData)) + } + + // TODO: Preserve behavior. + retMap[key] = value } retMap[constants.DeleteColumnMarker] = false