From 898eb7837286a87ae97ad92aae5a9a388772b284 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Fri, 20 Sep 2024 16:37:08 -0700 Subject: [PATCH] Checkpoint. --- processes/consumer/process.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/processes/consumer/process.go b/processes/consumer/process.go index 2df7cabbf..12bd80237 100644 --- a/processes/consumer/process.go +++ b/processes/consumer/process.go @@ -57,6 +57,24 @@ func (p processArgs) process(ctx context.Context, cfg config.Config, inMemDB *mo return "", fmt.Errorf("cannot unmarshall event: %w", err) } + if len(topicConfig.tc.PrimaryKeysOverride) > 0 { + data, err := _event.GetData(map[string]any{}, topicConfig.tc) + if err != nil { + tags["what"] = "get_data_err" + return "", fmt.Errorf("failed to get data: %w", err) + } + + for _, primaryKey := range topicConfig.tc.PrimaryKeysOverride { + value, isOk := data[primaryKey] + if !isOk { + tags["what"] = "missing_pk" + return "", fmt.Errorf("missing primary key: %q", primaryKey) + } + + pkMap[primaryKey] = value + } + } + tags["op"] = _event.Operation() evt, err := event.ToMemoryEvent(_event, pkMap, topicConfig.tc, cfg.Mode) if err != nil {