From 42fca146b4048bab796aaf485c6bb4855b25f76a Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Wed, 22 Apr 2020 11:35:35 +0200 Subject: [PATCH] Fix end span Signed-off-by: Francesco Guardiani --- kafka/channel/pkg/dispatcher/dispatcher.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/kafka/channel/pkg/dispatcher/dispatcher.go b/kafka/channel/pkg/dispatcher/dispatcher.go index 0aeb3f43ce..257f382c89 100644 --- a/kafka/channel/pkg/dispatcher/dispatcher.go +++ b/kafka/channel/pkg/dispatcher/dispatcher.go @@ -103,6 +103,7 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat // using the distributed tracing extension. // If the span is not recorded, we just not send these info span := trace.FromContext(ctx) + defer span.End() if span.IsRecordingEvents() { dte := extensions.FromSpanContext(span.SpanContext()) transformers = append(transformers, dte.WriteTransformer()) @@ -182,10 +183,11 @@ func (c consumerMessageHandler) Handle(ctx context.Context, consumerMessage *sar zap.String("sub", string(c.sub.UID)), ) - tracingCtx, err := extractTrace(ctx, message) + tracingCtx, span, err := extractTrace(ctx, message) if err != nil { return false, err } + defer span.End() err = c.dispatcher.DispatchMessage(tracingCtx, message, nil, destination, reply, deadLetter) // NOTE: only return `true` here if DispatchMessage actually delivered the message. @@ -416,24 +418,24 @@ func newSubscription(spec eventingduck.SubscriberSpec, name string, namespace st } } -func extractTrace(inCtx context.Context, message *protocolkafka.Message) (context.Context, error) { +func extractTrace(inCtx context.Context, message *protocolkafka.Message) (context.Context, *trace.Span, error) { // Recording span are injected only and only if the initial span is recording if message.ReadEncoding() == binding.EncodingBinary { dte := extensions.DistributedTracingExtension{} err := dte.ReadTransformer().Transform(message, nil) if err != nil { - return nil, err + return nil, nil, err } // There is a span! if dte.TraceParent != "" { if sc, err := dte.ToSpanContext(); err == nil { - outCtx, _ := trace.StartSpanWithRemoteParent(inCtx, "kafkachannel", sc) - return outCtx, nil + outCtx, s := trace.StartSpanWithRemoteParent(inCtx, "kafkachannel", sc) + return outCtx, s, nil } else { - return nil, err + return nil, nil, err } } } - outCtx, _ := trace.StartSpan(inCtx, "kafkachannel", trace.WithSampler(trace.NeverSample())) - return outCtx, nil + outCtx, s := trace.StartSpan(inCtx, "kafkachannel", trace.WithSampler(trace.NeverSample())) + return outCtx, s, nil }