diff --git a/contrib/confluentinc/confluent-kafka-go/internal/tracing/consumer.go b/contrib/confluentinc/confluent-kafka-go/internal/tracing/consumer.go index 359a3ff796..90678c4ed2 100644 --- a/contrib/confluentinc/confluent-kafka-go/internal/tracing/consumer.go +++ b/contrib/confluentinc/confluent-kafka-go/internal/tracing/consumer.go @@ -51,24 +51,6 @@ func WrapConsumeEventsChannel[E any, TE Event](tr *KafkaTracer, in chan E, consu return out } -func WrapProduceEventsChannel[E any, TE Event](tr *KafkaTracer, in chan E, translateFn func(E) TE) chan E { - if in == nil { - return nil - } - out := make(chan E, 1) - go func() { - defer close(out) - for evt := range in { - tEvt := translateFn(evt) - if msg, ok := tEvt.KafkaMessage(); ok { - tr.TrackProduceOffsets(msg) - } - out <- evt - } - }() - return out -} - func (tr *KafkaTracer) StartConsumeSpan(msg Message) ddtrace.Span { opts := []tracer.StartSpanOption{ tracer.ServiceName(tr.consumerServiceName), diff --git a/contrib/confluentinc/confluent-kafka-go/internal/tracing/producer.go b/contrib/confluentinc/confluent-kafka-go/internal/tracing/producer.go index 9b233280d4..5a711f29b2 100644 --- a/contrib/confluentinc/confluent-kafka-go/internal/tracing/producer.go +++ b/contrib/confluentinc/confluent-kafka-go/internal/tracing/producer.go @@ -30,6 +30,24 @@ func WrapProduceChannel[M any, TM Message](tr *KafkaTracer, out chan M, translat return in } +func WrapProduceEventsChannel[E any, TE Event](tr *KafkaTracer, in chan E, translateFn func(E) TE) chan E { + if in == nil { + return nil + } + out := make(chan E, 1) + go func() { + defer close(out) + for evt := range in { + tEvt := translateFn(evt) + if msg, ok := tEvt.KafkaMessage(); ok { + tr.TrackProduceOffsets(msg) + } + out <- evt + } + }() + return out +} + func (tr *KafkaTracer) StartProduceSpan(msg Message) ddtrace.Span { opts := []tracer.StartSpanOption{ tracer.ServiceName(tr.producerServiceName),