From ad92673bd6dbf154b8c73968f34d1e836099dd35 Mon Sep 17 00:00:00 2001 From: Marco Schmidlin Date: Tue, 7 Feb 2023 18:40:45 +0100 Subject: [PATCH] fix(amqplib): use extracted context for message consuming (#1354) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(amqplib): use extracted context for message consuming - fixes baggage propagation * style(amqplib): fix linting --------- Co-authored-by: Gerhard Stöbich Co-authored-by: Amir Blum --- .../instrumentation-amqplib/src/amqplib.ts | 2 +- .../test/amqplib-callbacks.test.ts | 46 ++++++++++++++++++- 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/plugins/node/instrumentation-amqplib/src/amqplib.ts b/plugins/node/instrumentation-amqplib/src/amqplib.ts index 5d51a3dde5..643fea0fbc 100644 --- a/plugins/node/instrumentation-amqplib/src/amqplib.ts +++ b/plugins/node/instrumentation-amqplib/src/amqplib.ts @@ -451,7 +451,7 @@ export class AmqplibInstrumentation extends InstrumentationBase { msg[MESSAGE_STORED_SPAN] = span; } - context.with(trace.setSpan(context.active(), span), () => { + context.with(trace.setSpan(parentContext, span), () => { onMessage.call(this, msg); }); diff --git a/plugins/node/instrumentation-amqplib/test/amqplib-callbacks.test.ts b/plugins/node/instrumentation-amqplib/test/amqplib-callbacks.test.ts index e9fbdf96e8..9f2ab1d81e 100644 --- a/plugins/node/instrumentation-amqplib/test/amqplib-callbacks.test.ts +++ b/plugins/node/instrumentation-amqplib/test/amqplib-callbacks.test.ts @@ -28,7 +28,7 @@ import { MessagingDestinationKindValues, SemanticAttributes, } from '@opentelemetry/semantic-conventions'; -import { context, SpanKind } from '@opentelemetry/api'; +import { Baggage, context, propagation, SpanKind } from '@opentelemetry/api'; import { asyncConfirmSend, asyncConsume, shouldTest } from './utils'; import { censoredUrl, @@ -36,12 +36,27 @@ import { TEST_RABBITMQ_HOST, TEST_RABBITMQ_PORT, } from './config'; +import { + CompositePropagator, + W3CBaggagePropagator, + W3CTraceContextPropagator, +} from '@opentelemetry/core'; const msgPayload = 'payload from test'; const queueName = 'queue-name-from-unittest'; describe('amqplib instrumentation callback model', () => { let conn: amqpCallback.Connection; + before(() => { + propagation.setGlobalPropagator( + new CompositePropagator({ + propagators: [ + new W3CBaggagePropagator(), + new W3CTraceContextPropagator(), + ], + }) + ); + }); before(function (done) { if (!shouldTest) { this.skip(); @@ -186,6 +201,35 @@ describe('amqplib instrumentation callback model', () => { }); }); + it('baggage is available while consuming', done => { + const baggageContext = propagation.setBaggage( + context.active(), + propagation.createBaggage({ + key1: { value: 'value1' }, + }) + ); + context.with(baggageContext, () => { + channel.sendToQueue(queueName, Buffer.from(msgPayload)); + let extractedBaggage: Baggage | undefined; + asyncConsume( + channel, + queueName, + [ + msg => { + extractedBaggage = propagation.getActiveBaggage(); + }, + ], + { + noAck: true, + } + ).then(() => { + expect(extractedBaggage).toBeDefined(); + expect(extractedBaggage!.getEntry('key1')).toBeDefined(); + done(); + }); + }); + }); + it('end span with ack sync', done => { channel.sendToQueue(queueName, Buffer.from(msgPayload));