diff --git a/source.go b/source.go index f521a2e..f04af08 100644 --- a/source.go +++ b/source.go @@ -37,7 +37,7 @@ func (s *Source) Configure(ctx context.Context, cfg map[string]string) error { if err != nil { return fmt.Errorf("invalid config: %w", err) } - + return nil } @@ -77,12 +77,9 @@ func (s *Source) Read(ctx context.Context) (sdk.Record, error) { return rec, errors.New("source message channel closed") } - pos := Position{ - DeliveryTag: msg.DeliveryTag, - QueueName: s.queue.Name, - } + pos := Position{msg.DeliveryTag, s.queue.Name} sdkPos := pos.ToSdkPosition() - var metadata sdk.Metadata + metadata := metadataFromMessage(msg) var key sdk.Data var payload sdk.Data = sdk.RawData(msg.Body) @@ -146,3 +143,38 @@ func parseSdkPosition(pos sdk.Position) (Position, error) { return p, nil } + +func metadataFromMessage(msg amqp091.Delivery) sdk.Metadata { + metadata := sdk.Metadata{} + + setKey := func(key string, v any) { + if s, ok := v.(string); ok { + metadata[key] = s + return + } + + metadata[key] = fmt.Sprintf("%v", v) + } + + setKey("rabbitmq.queueName", msg.MessageCount) + setKey("rabbitmq.contentType", msg.ContentType) + setKey("rabbitmq.contentEncoding", msg.ContentEncoding) + setKey("rabbitmq.deliveryMode", msg.DeliveryMode) + setKey("rabbitmq.priority", msg.Priority) + setKey("rabbitmq.correlationId", msg.CorrelationId) + setKey("rabbitmq.replyTo", msg.ReplyTo) + setKey("rabbitmq.expiration", msg.Expiration) + setKey("rabbitmq.messageId", msg.MessageId) + setKey("rabbitmq.timestamp", msg.Timestamp) + setKey("rabbitmq.type", msg.Type) + setKey("rabbitmq.userId", msg.UserId) + setKey("rabbitmq.appId", msg.AppId) + setKey("rabbitmq.consumerTag", msg.ConsumerTag) + setKey("rabbitmq.messageCount", msg.MessageCount) + setKey("rabbitmq.deliveryTag", msg.DeliveryTag) + setKey("rabbitmq.redelivered", msg.Redelivered) + setKey("rabbitmq.exchange", msg.Exchange) + setKey("rabbitmq.routingKey", msg.RoutingKey) + + return metadata +}