diff --git a/pkg/amqp/amqp.go b/pkg/amqp/amqp.go index bae449bd..1a081865 100644 --- a/pkg/amqp/amqp.go +++ b/pkg/amqp/amqp.go @@ -32,14 +32,15 @@ type Connection interface { type Client struct { tracer trace.Tracer - channel *amqp.Channel connection Connection - listeners map[string]*listener reconnectMetric metric.Int64Counter listenerMetric metric.Int64UpDownCounter messageMetric metric.Int64Counter + channel *amqp.Channel + listeners map[string]*listener vhost string uri string + attributes []attribute.KeyValue prefetch int mutex sync.RWMutex } @@ -84,6 +85,11 @@ func NewFromURI(uri string, prefetch int, meterProvider metric.MeterProvider, tr if tracerProvider != nil { client.tracer = tracerProvider.Tracer("amqp") + + client.attributes = []attribute.KeyValue{ + semconv.MessagingSystemRabbitmq, + semconv.NetworkProtocolName("rabbitmq"), + } } connection, channel, err := connect(uri, client.prefetch, client.onDisconnect) @@ -130,7 +136,16 @@ func (c *Client) Publish(ctx context.Context, payload amqp.Publishing, exchange, return nil } - ctx, end := telemetry.StartSpan(ctx, c.tracer, "publish", trace.WithSpanKind(trace.SpanKindProducer)) + attributes := c.getAttributes(exchange, routingKey) + + ctx, end := telemetry.StartSpan(ctx, c.tracer, "publish", + trace.WithSpanKind(trace.SpanKindProducer), + trace.WithAttributes( + append([]attribute.KeyValue{ + semconv.MessagingOperationPublish, + }, attributes...)..., + ), + ) defer end(&err) defer recoverer.Error(&err) @@ -139,24 +154,23 @@ func (c *Client) Publish(ctx context.Context, payload amqp.Publishing, exchange, defer c.mutex.RUnlock() if err = c.channel.PublishWithContext(ctx, exchange, routingKey, false, false, telemetry.InjectToAmqp(ctx, payload)); err != nil { - c.increase(ctx, "error", exchange, routingKey) + c.increase(ctx, append([]attribute.KeyValue{ + semconv.ErrorTypeKey.String("amqp:publish"), + }, attributes...)) return } - c.increase(ctx, "send", exchange, routingKey) + c.increase(ctx, attributes) return nil } -func (c *Client) PublishJSON(ctx context.Context, item any, exchange, routingKey string) (err error) { +func (c *Client) PublishJSON(ctx context.Context, item any, exchange, routingKey string) error { if c == nil { return nil } - ctx, end := telemetry.StartSpan(ctx, c.tracer, "publish", trace.WithSpanKind(trace.SpanKindProducer)) - defer end(&err) - payload, err := json.Marshal(item) if err != nil { return fmt.Errorf("marshal: %w", err) @@ -172,21 +186,20 @@ func (c *Client) PublishJSON(ctx context.Context, item any, exchange, routingKey return nil } -func (c *Client) increase(ctx context.Context, name, exchange, routingKey string) { +func (c *Client) increase(ctx context.Context, attributes []attribute.KeyValue) { if c.messageMetric == nil { return } - attributes := []attribute.KeyValue{ - semconv.MessagingSystemRabbitmq, - semconv.NetworkProtocolName("rabbitmq"), - semconv.MessagingDestinationName(exchange), - attribute.String("state", name), - } + c.messageMetric.Add(ctx, 1, metric.WithAttributes(attributes...)) +} + +func (c *Client) getAttributes(exchange, routingKey string) []attribute.KeyValue { + attributes := append([]attribute.KeyValue{semconv.MessagingDestinationName(exchange)}, c.attributes...) if len(routingKey) != 0 { attributes = append(attributes, semconv.MessagingRabbitmqDestinationRoutingKey(routingKey)) } - c.messageMetric.Add(ctx, 1, metric.WithAttributes(attributes...)) + return attributes } diff --git a/pkg/amqp/listen.go b/pkg/amqp/listen.go index 82ce806d..f1e70844 100644 --- a/pkg/amqp/listen.go +++ b/pkg/amqp/listen.go @@ -8,6 +8,8 @@ import ( "time" amqp "github.com/rabbitmq/amqp091-go" + "go.opentelemetry.io/otel/attribute" + semconv "go.opentelemetry.io/otel/semconv/v1.24.0" ) type QueueResolver func() (string, error) @@ -81,9 +83,13 @@ func (c *Client) forward(listener *listener, queueResolver QueueResolver, input defer close(listener.done) defer close(output) + attributes := append([]attribute.KeyValue{ + semconv.MessagingOperationReceive, + }, c.getAttributes(exchange, routingKey)...) + forward: for delivery := range input { - c.increase(context.Background(), "consume", exchange, routingKey) + c.increase(context.Background(), attributes) output <- delivery } diff --git a/pkg/amqphandler/amqphandler.go b/pkg/amqphandler/amqphandler.go index 39eabf86..49c99f16 100644 --- a/pkg/amqphandler/amqphandler.go +++ b/pkg/amqphandler/amqphandler.go @@ -17,21 +17,23 @@ import ( amqp "github.com/rabbitmq/amqp091-go" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" + semconv "go.opentelemetry.io/otel/semconv/v1.24.0" "go.opentelemetry.io/otel/trace" ) type Handler func(context.Context, amqp.Delivery) error type Service struct { - amqpClient *amqpclient.Client tracer trace.Tracer + counter metric.Int64Counter + amqpClient *amqpclient.Client done chan struct{} handler Handler - counter metric.Int64Counter - exchange string delayExchange string + exchange string queue string routingKey string + attributes []attribute.KeyValue maxRetry int64 retryInterval time.Duration inactiveTimeout time.Duration @@ -103,6 +105,16 @@ func New(config *Config, amqpClient *amqpclient.Client, metricProvider metric.Me } if tracerProvider != nil { + service.attributes = []attribute.KeyValue{ + semconv.MessagingSystemRabbitmq, + semconv.NetworkProtocolName("rabbitmq"), + semconv.MessagingDestinationName(service.exchange), + } + + if len(service.routingKey) != 0 { + service.attributes = append(service.attributes, semconv.MessagingRabbitmqDestinationRoutingKey(service.routingKey)) + } + service.tracer = tracerProvider.Tracer("amqp_handler") } @@ -173,7 +185,14 @@ func (s *Service) Start(ctx context.Context) { func (s *Service) handleMessage(ctx context.Context, log *slog.Logger, message amqp.Delivery) { var err error - ctx, end := telemetry.StartSpan(ctx, s.tracer, "receive", trace.WithSpanKind(trace.SpanKindConsumer)) + ctx, end := telemetry.StartSpan(ctx, s.tracer, "receive", + trace.WithSpanKind(trace.SpanKindConsumer), + trace.WithAttributes( + append([]attribute.KeyValue{ + semconv.MessagingOperationReceive, + }, s.attributes...)..., + ), + ) defer end(&err) defer recoverer.Error(&err) diff --git a/pkg/db/db.go b/pkg/db/db.go index 04cfc6d4..143503f9 100644 --- a/pkg/db/db.go +++ b/pkg/db/db.go @@ -94,8 +94,9 @@ func New(ctx context.Context, config *Config, tracerProvider trace.TracerProvide instance.attributes = []attribute.KeyValue{ semconv.DBSystemPostgreSQL, semconv.DBNameKey.String(config.Name), - attribute.String("server.address", config.Host), - attribute.Int("server.port", int(config.Port)), + semconv.DBUser(config.User), + semconv.ServerAddress(config.Host), + semconv.ServerPort(int(config.Port)), } } @@ -147,7 +148,10 @@ func (s Service) DoAtomic(ctx context.Context, action func(context.Context) erro return errors.New("no action provided") } - ctx, end := telemetry.StartSpan(ctx, s.tracer, "transaction", trace.WithSpanKind(trace.SpanKindClient)) + ctx, end := telemetry.StartSpan(ctx, s.tracer, "transaction", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes(s.attributes...), + ) defer end(&err) if readTx(ctx) != nil {