Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Commit

Permalink
Fix end span
Browse files Browse the repository at this point in the history
Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>
  • Loading branch information
slinkydeveloper committed Apr 22, 2020
1 parent ad4a107 commit 42fca14
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions kafka/channel/pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat
// using the distributed tracing extension.
// If the span is not recorded, we just not send these info
span := trace.FromContext(ctx)
defer span.End()
if span.IsRecordingEvents() {
dte := extensions.FromSpanContext(span.SpanContext())
transformers = append(transformers, dte.WriteTransformer())
Expand Down Expand Up @@ -182,10 +183,11 @@ func (c consumerMessageHandler) Handle(ctx context.Context, consumerMessage *sar
zap.String("sub", string(c.sub.UID)),
)

tracingCtx, err := extractTrace(ctx, message)
tracingCtx, span, err := extractTrace(ctx, message)
if err != nil {
return false, err
}
defer span.End()

err = c.dispatcher.DispatchMessage(tracingCtx, message, nil, destination, reply, deadLetter)
// NOTE: only return `true` here if DispatchMessage actually delivered the message.
Expand Down Expand Up @@ -416,24 +418,24 @@ func newSubscription(spec eventingduck.SubscriberSpec, name string, namespace st
}
}

func extractTrace(inCtx context.Context, message *protocolkafka.Message) (context.Context, error) {
func extractTrace(inCtx context.Context, message *protocolkafka.Message) (context.Context, *trace.Span, error) {
// Recording span are injected only and only if the initial span is recording
if message.ReadEncoding() == binding.EncodingBinary {
dte := extensions.DistributedTracingExtension{}
err := dte.ReadTransformer().Transform(message, nil)
if err != nil {
return nil, err
return nil, nil, err
}
// There is a span!
if dte.TraceParent != "" {
if sc, err := dte.ToSpanContext(); err == nil {
outCtx, _ := trace.StartSpanWithRemoteParent(inCtx, "kafkachannel", sc)
return outCtx, nil
outCtx, s := trace.StartSpanWithRemoteParent(inCtx, "kafkachannel", sc)
return outCtx, s, nil
} else {
return nil, err
return nil, nil, err
}
}
}
outCtx, _ := trace.StartSpan(inCtx, "kafkachannel", trace.WithSampler(trace.NeverSample()))
return outCtx, nil
outCtx, s := trace.StartSpan(inCtx, "kafkachannel", trace.WithSampler(trace.NeverSample()))
return outCtx, s, nil
}

0 comments on commit 42fca14

Please sign in to comment.