diff --git a/lib/debezium/keys.go b/lib/debezium/keys.go index 4f5f393be..840e95ae2 100644 --- a/lib/debezium/keys.go +++ b/lib/debezium/keys.go @@ -53,7 +53,6 @@ func ParsePartitionKey(key []byte, cdcKeyFormat string) (map[string]any, error) return parsePartitionKeyStruct(key) case kafkalib.StringKeyFmt: return parsePartitionKeyString(key) - } return nil, fmt.Errorf("format: %s is not supported", cdcKeyFormat) } diff --git a/processes/consumer/process.go b/processes/consumer/process.go index 2df7cabbf..12bd80237 100644 --- a/processes/consumer/process.go +++ b/processes/consumer/process.go @@ -57,6 +57,24 @@ func (p processArgs) process(ctx context.Context, cfg config.Config, inMemDB *mo return "", fmt.Errorf("cannot unmarshall event: %w", err) } + if len(topicConfig.tc.PrimaryKeysOverride) > 0 { + data, err := _event.GetData(map[string]any{}, topicConfig.tc) + if err != nil { + tags["what"] = "get_data_err" + return "", fmt.Errorf("failed to get data: %w", err) + } + + for _, primaryKey := range topicConfig.tc.PrimaryKeysOverride { + value, isOk := data[primaryKey] + if !isOk { + tags["what"] = "missing_pk" + return "", fmt.Errorf("missing primary key: %q", primaryKey) + } + + pkMap[primaryKey] = value + } + } + tags["op"] = _event.Operation() evt, err := event.ToMemoryEvent(_event, pkMap, topicConfig.tc, cfg.Mode) if err != nil {