Skip to content

Commit

Permalink
Add metadata to read skd.Record
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillem committed Feb 2, 2024
1 parent beb9ecf commit 04937e1
Showing 1 changed file with 38 additions and 6 deletions.
44 changes: 38 additions & 6 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (s *Source) Configure(ctx context.Context, cfg map[string]string) error {
if err != nil {
return fmt.Errorf("invalid config: %w", err)
}

return nil
}

Expand Down Expand Up @@ -77,12 +77,9 @@ func (s *Source) Read(ctx context.Context) (sdk.Record, error) {
return rec, errors.New("source message channel closed")
}

pos := Position{
DeliveryTag: msg.DeliveryTag,
QueueName: s.queue.Name,
}
pos := Position{msg.DeliveryTag, s.queue.Name}
sdkPos := pos.ToSdkPosition()
var metadata sdk.Metadata
metadata := metadataFromMessage(msg)
var key sdk.Data
var payload sdk.Data = sdk.RawData(msg.Body)

Expand Down Expand Up @@ -146,3 +143,38 @@ func parseSdkPosition(pos sdk.Position) (Position, error) {

return p, nil
}

func metadataFromMessage(msg amqp091.Delivery) sdk.Metadata {
metadata := sdk.Metadata{}

setKey := func(key string, v any) {
if s, ok := v.(string); ok {
metadata[key] = s
return
}

metadata[key] = fmt.Sprintf("%v", v)
}

setKey("rabbitmq.queueName", msg.MessageCount)
setKey("rabbitmq.contentType", msg.ContentType)
setKey("rabbitmq.contentEncoding", msg.ContentEncoding)
setKey("rabbitmq.deliveryMode", msg.DeliveryMode)
setKey("rabbitmq.priority", msg.Priority)
setKey("rabbitmq.correlationId", msg.CorrelationId)
setKey("rabbitmq.replyTo", msg.ReplyTo)
setKey("rabbitmq.expiration", msg.Expiration)
setKey("rabbitmq.messageId", msg.MessageId)
setKey("rabbitmq.timestamp", msg.Timestamp)
setKey("rabbitmq.type", msg.Type)
setKey("rabbitmq.userId", msg.UserId)
setKey("rabbitmq.appId", msg.AppId)
setKey("rabbitmq.consumerTag", msg.ConsumerTag)
setKey("rabbitmq.messageCount", msg.MessageCount)
setKey("rabbitmq.deliveryTag", msg.DeliveryTag)
setKey("rabbitmq.redelivered", msg.Redelivered)
setKey("rabbitmq.exchange", msg.Exchange)
setKey("rabbitmq.routingKey", msg.RoutingKey)

return metadata
}

0 comments on commit 04937e1

Please sign in to comment.