Skip to content

Commit

Permalink
Checkpoint.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Sep 20, 2024
1 parent 27ec948 commit 898eb78
Showing 1 changed file with 18 additions and 0 deletions.
18 changes: 18 additions & 0 deletions processes/consumer/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,24 @@ func (p processArgs) process(ctx context.Context, cfg config.Config, inMemDB *mo
return "", fmt.Errorf("cannot unmarshall event: %w", err)
}

if len(topicConfig.tc.PrimaryKeysOverride) > 0 {
data, err := _event.GetData(map[string]any{}, topicConfig.tc)
if err != nil {
tags["what"] = "get_data_err"
return "", fmt.Errorf("failed to get data: %w", err)
}

for _, primaryKey := range topicConfig.tc.PrimaryKeysOverride {
value, isOk := data[primaryKey]
if !isOk {
tags["what"] = "missing_pk"
return "", fmt.Errorf("missing primary key: %q", primaryKey)
}

pkMap[primaryKey] = value
}
}

tags["op"] = _event.Operation()
evt, err := event.ToMemoryEvent(_event, pkMap, topicConfig.tc, cfg.Mode)
if err != nil {
Expand Down

0 comments on commit 898eb78

Please sign in to comment.