diff --git a/lib/cdc/mongo/debezium.go b/lib/cdc/mongo/debezium.go index 99b62f91d..8916f3922 100644 --- a/lib/cdc/mongo/debezium.go +++ b/lib/cdc/mongo/debezium.go @@ -173,6 +173,16 @@ func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc kafkalib.TopicConf } } else { retMap = s.Payload.afterMap + // TODO: Remove this code. + for key, value := range pkMap { + retData, isOk := retMap[key] + if !isOk { + return nil, fmt.Errorf("key %q not found in data", key) + } else if retData != value { + return nil, fmt.Errorf("value mismatch for key %q: expected %v, got %v", key, retData, value) + } + } + retMap[constants.DeleteColumnMarker] = false retMap[constants.OnlySetDeleteColumnMarker] = false } diff --git a/lib/cdc/mongo/debezium_test.go b/lib/cdc/mongo/debezium_test.go index b650a25aa..87d262505 100644 --- a/lib/cdc/mongo/debezium_test.go +++ b/lib/cdc/mongo/debezium_test.go @@ -154,7 +154,7 @@ func (m *MongoTestSuite) TestMongoDBEventCustomer() { evt, err := m.Debezium.GetEventFromBytes([]byte(payload)) assert.NoError(m.T(), err) - evtData, err := evt.GetData(map[string]any{"_id": 1003}, kafkalib.TopicConfig{}) + evtData, err := evt.GetData(map[string]any{"_id": int64(1003)}, kafkalib.TopicConfig{}) assert.NoError(m.T(), err) _, isOk := evtData[constants.UpdateColumnMarker] assert.False(m.T(), isOk) @@ -163,12 +163,12 @@ func (m *MongoTestSuite) TestMongoDBEventCustomer() { assert.Equal(m.T(), evtData["last_name"], "Tang") assert.Equal(m.T(), evtData["email"], "robin@example.com") - evtDataWithIncludedAt, err := evt.GetData(map[string]any{"_id": 1003}, kafkalib.TopicConfig{}) + evtDataWithIncludedAt, err := evt.GetData(map[string]any{"_id": int64(1003)}, kafkalib.TopicConfig{}) assert.NoError(m.T(), err) _, isOk = evtDataWithIncludedAt[constants.UpdateColumnMarker] assert.False(m.T(), isOk) - evtDataWithIncludedAt, err = evt.GetData(map[string]any{"_id": 1003}, kafkalib.TopicConfig{IncludeDatabaseUpdatedAt: true, IncludeArtieUpdatedAt: true}) + evtDataWithIncludedAt, err = evt.GetData(map[string]any{"_id": int64(1003)}, kafkalib.TopicConfig{IncludeDatabaseUpdatedAt: true, IncludeArtieUpdatedAt: true}) assert.NoError(m.T(), err) assert.Equal(m.T(), ext.NewExtendedTime(time.Date(2022, time.November, 18, 6, 35, 21, 0, time.UTC), ext.TimestampTzKindType, ext.ISO8601), evtDataWithIncludedAt[constants.DatabaseUpdatedColumnMarker]) diff --git a/processes/consumer/process_test.go b/processes/consumer/process_test.go index 1695b9f84..2d5f8301b 100644 --- a/processes/consumer/process_test.go +++ b/processes/consumer/process_test.go @@ -134,7 +134,7 @@ func TestProcessMessageFailures(t *testing.T) { }, "payload": { "before": null, - "after": "{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}", + "after": "{\"_id\": \"1004\"},\"first_name\": \"Anne\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}", "patch": null, "filter": null, "updateDescription": null, @@ -177,7 +177,7 @@ func TestProcessMessageFailures(t *testing.T) { var rowData map[string]any for _, row := range td.Rows() { - if row["_id"] == int64(1004) { + if row["_id"] == "1004" { rowData = row } }