From 09949a605cde0209506c2c50ef8e8620dd431e3f Mon Sep 17 00:00:00 2001 From: anna-cross Date: Wed, 20 Nov 2024 16:17:22 -0500 Subject: [PATCH] Append CreatedDate and CreatedBy if its not part of payload or nil --- pubsub/client.go | 2 +- pubsub/utils.go | 11 ++++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/pubsub/client.go b/pubsub/client.go index 469fe02..516cbb9 100644 --- a/pubsub/client.go +++ b/pubsub/client.go @@ -393,7 +393,7 @@ func (c *Client) prepareRecord(codec *goavro.Codec, r opencdc.Record) ([]byte, e if err != nil { return nil, errors.Errorf("failed to extract payload data: %w", err) } - avroPrepared, err := validateAndPreparePayload(data, codec.Schema()) + avroPrepared, err := validateAndPreparePayload(data, codec.Schema(), c.userID) if err != nil { return nil, errors.Errorf("error validating and preparing avro data:%w", err) } diff --git a/pubsub/utils.go b/pubsub/utils.go index 2984fb2..48ff736 100644 --- a/pubsub/utils.go +++ b/pubsub/utils.go @@ -19,6 +19,7 @@ import ( "crypto/x509" "encoding/json" "strings" + "time" "github.com/conduitio/conduit-commons/opencdc" "github.com/go-errors/errors" @@ -98,7 +99,7 @@ func invalidReplayIDErr(err error) bool { return strings.Contains(strings.ToLower(err.Error()), "replay id validation failed") } -func validateAndPreparePayload(dataMap opencdc.StructuredData, avroSchema string) (map[string]interface{}, error) { +func validateAndPreparePayload(dataMap opencdc.StructuredData, avroSchema string, userID string) (map[string]interface{}, error) { var schema map[string]interface{} if err := json.Unmarshal([]byte(avroSchema), &schema); err != nil { return nil, err @@ -140,6 +141,14 @@ func validateAndPreparePayload(dataMap opencdc.StructuredData, avroSchema string } } + if val, ok := avroRecord["CreatedDate"]; !ok || val == nil { + avroRecord["CreatedDate"] = time.Now().Unix() + } + + if val, ok := avroRecord["CreatedById"]; !ok || val == nil { + avroRecord["CreatedById"] = userID + } + return avroRecord, nil }