Skip to content

Commit

Permalink
refactor(amqp): Using more semconv const
Browse files Browse the repository at this point in the history
Signed-off-by: Vincent Boutour <bob@vibioh.fr>
  • Loading branch information
ViBiOh committed Apr 2, 2024
1 parent 08294fd commit 08cf9e6
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 25 deletions.
47 changes: 30 additions & 17 deletions pkg/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@ type Connection interface {

type Client struct {
tracer trace.Tracer
channel *amqp.Channel
connection Connection
listeners map[string]*listener
reconnectMetric metric.Int64Counter
listenerMetric metric.Int64UpDownCounter
messageMetric metric.Int64Counter
channel *amqp.Channel
listeners map[string]*listener
vhost string
uri string
attributes []attribute.KeyValue
prefetch int
mutex sync.RWMutex
}
Expand Down Expand Up @@ -84,6 +85,11 @@ func NewFromURI(uri string, prefetch int, meterProvider metric.MeterProvider, tr

if tracerProvider != nil {
client.tracer = tracerProvider.Tracer("amqp")

client.attributes = []attribute.KeyValue{
semconv.MessagingSystemRabbitmq,
semconv.NetworkProtocolName("rabbitmq"),
}
}

connection, channel, err := connect(uri, client.prefetch, client.onDisconnect)
Expand Down Expand Up @@ -130,7 +136,16 @@ func (c *Client) Publish(ctx context.Context, payload amqp.Publishing, exchange,
return nil
}

ctx, end := telemetry.StartSpan(ctx, c.tracer, "publish", trace.WithSpanKind(trace.SpanKindProducer))
attributes := c.getAttributes(exchange, routingKey)

ctx, end := telemetry.StartSpan(ctx, c.tracer, "publish",
trace.WithSpanKind(trace.SpanKindProducer),
trace.WithAttributes(
append([]attribute.KeyValue{
semconv.MessagingOperationPublish,
}, attributes...)...,
),
)
defer end(&err)

defer recoverer.Error(&err)
Expand All @@ -139,24 +154,23 @@ func (c *Client) Publish(ctx context.Context, payload amqp.Publishing, exchange,
defer c.mutex.RUnlock()

if err = c.channel.PublishWithContext(ctx, exchange, routingKey, false, false, telemetry.InjectToAmqp(ctx, payload)); err != nil {
c.increase(ctx, "error", exchange, routingKey)
c.increase(ctx, append([]attribute.KeyValue{
semconv.ErrorTypeKey.String("amqp:publish"),
}, attributes...))

return
}

c.increase(ctx, "send", exchange, routingKey)
c.increase(ctx, attributes)

return nil
}

func (c *Client) PublishJSON(ctx context.Context, item any, exchange, routingKey string) (err error) {
func (c *Client) PublishJSON(ctx context.Context, item any, exchange, routingKey string) error {
if c == nil {
return nil
}

ctx, end := telemetry.StartSpan(ctx, c.tracer, "publish", trace.WithSpanKind(trace.SpanKindProducer))
defer end(&err)

payload, err := json.Marshal(item)
if err != nil {
return fmt.Errorf("marshal: %w", err)
Expand All @@ -172,21 +186,20 @@ func (c *Client) PublishJSON(ctx context.Context, item any, exchange, routingKey
return nil
}

func (c *Client) increase(ctx context.Context, name, exchange, routingKey string) {
func (c *Client) increase(ctx context.Context, attributes []attribute.KeyValue) {
if c.messageMetric == nil {
return
}

attributes := []attribute.KeyValue{
semconv.MessagingSystemRabbitmq,
semconv.NetworkProtocolName("rabbitmq"),
semconv.MessagingDestinationName(exchange),
attribute.String("state", name),
}
c.messageMetric.Add(ctx, 1, metric.WithAttributes(attributes...))
}

func (c *Client) getAttributes(exchange, routingKey string) []attribute.KeyValue {
attributes := append([]attribute.KeyValue{semconv.MessagingDestinationName(exchange)}, c.attributes...)

if len(routingKey) != 0 {
attributes = append(attributes, semconv.MessagingRabbitmqDestinationRoutingKey(routingKey))
}

c.messageMetric.Add(ctx, 1, metric.WithAttributes(attributes...))
return attributes
}
8 changes: 7 additions & 1 deletion pkg/amqp/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"time"

amqp "github.com/rabbitmq/amqp091-go"
"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
)

type QueueResolver func() (string, error)
Expand Down Expand Up @@ -81,9 +83,13 @@ func (c *Client) forward(listener *listener, queueResolver QueueResolver, input
defer close(listener.done)
defer close(output)

attributes := append([]attribute.KeyValue{
semconv.MessagingOperationReceive,
}, c.getAttributes(exchange, routingKey)...)

forward:
for delivery := range input {
c.increase(context.Background(), "consume", exchange, routingKey)
c.increase(context.Background(), attributes)
output <- delivery
}

Expand Down
27 changes: 23 additions & 4 deletions pkg/amqphandler/amqphandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,23 @@ import (
amqp "github.com/rabbitmq/amqp091-go"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
"go.opentelemetry.io/otel/trace"
)

type Handler func(context.Context, amqp.Delivery) error

type Service struct {
amqpClient *amqpclient.Client
tracer trace.Tracer
counter metric.Int64Counter
amqpClient *amqpclient.Client
done chan struct{}
handler Handler
counter metric.Int64Counter
exchange string
delayExchange string
exchange string
queue string
routingKey string
attributes []attribute.KeyValue
maxRetry int64
retryInterval time.Duration
inactiveTimeout time.Duration
Expand Down Expand Up @@ -103,6 +105,16 @@ func New(config *Config, amqpClient *amqpclient.Client, metricProvider metric.Me
}

if tracerProvider != nil {
service.attributes = []attribute.KeyValue{
semconv.MessagingSystemRabbitmq,
semconv.NetworkProtocolName("rabbitmq"),
semconv.MessagingDestinationName(service.exchange),
}

if len(service.routingKey) != 0 {
service.attributes = append(service.attributes, semconv.MessagingRabbitmqDestinationRoutingKey(service.routingKey))
}

service.tracer = tracerProvider.Tracer("amqp_handler")
}

Expand Down Expand Up @@ -173,7 +185,14 @@ func (s *Service) Start(ctx context.Context) {
func (s *Service) handleMessage(ctx context.Context, log *slog.Logger, message amqp.Delivery) {
var err error

ctx, end := telemetry.StartSpan(ctx, s.tracer, "receive", trace.WithSpanKind(trace.SpanKindConsumer))
ctx, end := telemetry.StartSpan(ctx, s.tracer, "receive",
trace.WithSpanKind(trace.SpanKindConsumer),
trace.WithAttributes(
append([]attribute.KeyValue{
semconv.MessagingOperationReceive,
}, s.attributes...)...,
),
)
defer end(&err)

defer recoverer.Error(&err)
Expand Down
10 changes: 7 additions & 3 deletions pkg/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,9 @@ func New(ctx context.Context, config *Config, tracerProvider trace.TracerProvide
instance.attributes = []attribute.KeyValue{
semconv.DBSystemPostgreSQL,
semconv.DBNameKey.String(config.Name),
attribute.String("server.address", config.Host),
attribute.Int("server.port", int(config.Port)),
semconv.DBUser(config.User),
semconv.ServerAddress(config.Host),
semconv.ServerPort(int(config.Port)),
}
}

Expand Down Expand Up @@ -147,7 +148,10 @@ func (s Service) DoAtomic(ctx context.Context, action func(context.Context) erro
return errors.New("no action provided")
}

ctx, end := telemetry.StartSpan(ctx, s.tracer, "transaction", trace.WithSpanKind(trace.SpanKindClient))
ctx, end := telemetry.StartSpan(ctx, s.tracer, "transaction",
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(s.attributes...),
)
defer end(&err)

if readTx(ctx) != nil {
Expand Down

0 comments on commit 08cf9e6

Please sign in to comment.