From 78fc0c316497145f45b335797000c4a1872497d3 Mon Sep 17 00:00:00 2001 From: Huy Vo Date: Thu, 20 May 2021 10:37:39 -0700 Subject: [PATCH 1/3] Make changes to awsxrayreceiver --- .../awsxrayreceiver/internal/udppoller/poller.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/receiver/awsxrayreceiver/internal/udppoller/poller.go b/receiver/awsxrayreceiver/internal/udppoller/poller.go index 4db2870be472..f379c49fd18c 100644 --- a/receiver/awsxrayreceiver/internal/udppoller/poller.go +++ b/receiver/awsxrayreceiver/internal/udppoller/poller.go @@ -84,6 +84,8 @@ type poller struct { // all segments read by the poller will be sent to this channel segChan chan RawSegment + + obsrecv *obsreport.Receiver } // New creates a new UDP poller @@ -113,6 +115,7 @@ func New(cfg *Config, logger *zap.Logger) (Poller, error) { maxPollerCount: cfg.NumOfPollerToStart, shutDown: make(chan struct{}), segChan: make(chan RawSegment, segChanSize), + obsrecv: obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: cfg.ReceiverID, Transport: cfg.Transport}), }, nil } @@ -168,10 +171,8 @@ func (p *poller) poll() { case <-p.shutDown: return default: - ctx := obsreport.StartTraceDataReceiveOp( + ctx := p.obsrecv.StartTraceDataReceiveOp( p.receiverLongLivedCtx, - p.receiverID, - Transport, obsreport.WithLongLivedCtx()) bufPointer := &buffer @@ -180,11 +181,11 @@ func (p *poller) poll() { // TODO: We may want to attempt to shutdown/clean the broken socket and open a new one // with the same address p.logger.Error("Irrecoverable socket read error. Exiting poller", zap.Error(err)) - obsreport.EndTraceDataReceiveOp(ctx, awsxray.TypeStr, 1, err) + p.obsrecv.EndTraceDataReceiveOp(ctx, awsxray.TypeStr, 1, err) return } else if errors.As(err, &errRecv) { p.logger.Error("Recoverable socket read error", zap.Error(err)) - obsreport.EndTraceDataReceiveOp(ctx, awsxray.TypeStr, 1, err) + p.obsrecv.EndTraceDataReceiveOp(ctx, awsxray.TypeStr, 1, err) continue } @@ -196,7 +197,7 @@ func (p *poller) poll() { if errors.As(err, &errRecv) { p.logger.Error("Failed to split segment header and body", zap.Error(err)) - obsreport.EndTraceDataReceiveOp(ctx, awsxray.TypeStr, 1, err) + p.obsrecv.EndTraceDataReceiveOp(ctx, awsxray.TypeStr, 1, err) continue } @@ -205,7 +206,7 @@ func (p *poller) poll() { zap.String("header format", header.Format), zap.Int("header version", header.Version), ) - obsreport.EndTraceDataReceiveOp(ctx, awsxray.TypeStr, 1, + p.obsrecv.EndTraceDataReceiveOp(ctx, awsxray.TypeStr, 1, errors.New("dropped span due to missing body that contains segment")) continue } From f0fa967d3df0e732df01c27ca206ddb823aac1cd Mon Sep 17 00:00:00 2001 From: Huy Vo Date: Thu, 20 May 2021 13:45:59 -0700 Subject: [PATCH 2/3] Make changes to carbonreceiver --- receiver/carbonreceiver/reporter.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/receiver/carbonreceiver/reporter.go b/receiver/carbonreceiver/reporter.go index a6af5093aa66..976e46894924 100644 --- a/receiver/carbonreceiver/reporter.go +++ b/receiver/carbonreceiver/reporter.go @@ -32,6 +32,7 @@ type reporter struct { spanName string logger *zap.Logger sugaredLogger *zap.SugaredLogger // Used for generic debug logging + obsrecv *obsreport.Receiver } var _ transport.Reporter = (*reporter)(nil) @@ -42,6 +43,7 @@ func newReporter(id config.ComponentID, logger *zap.Logger) transport.Reporter { spanName: id.String() + ".receiver", logger: logger, sugaredLogger: logger.Sugar(), + obsrecv: obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: id, Transport: "tcp"}), } } @@ -51,7 +53,7 @@ func newReporter(id config.ComponentID, logger *zap.Logger) transport.Reporter { // returned span. func (r *reporter) OnDataReceived(ctx context.Context) context.Context { ctx = obsreport.ReceiverContext(ctx, r.id, "tcp") - return obsreport.StartMetricsReceiveOp(ctx, r.id, "tcp") + return r.obsrecv.StartMetricsReceiveOp(ctx) } // OnTranslationError is used to report a translation error from original @@ -95,7 +97,7 @@ func (r *reporter) OnMetricsProcessed( }) } - obsreport.EndMetricsReceiveOp(ctx, "carbon", numReceivedMetricPoints, err) + r.obsrecv.EndMetricsReceiveOp(ctx, "carbon", numReceivedMetricPoints, err) } func (r *reporter) OnDebugf(template string, args ...interface{}) { From 9e8b82185673826de009df7cc0dcad27290594d3 Mon Sep 17 00:00:00 2001 From: Huy Vo Date: Thu, 20 May 2021 13:54:10 -0700 Subject: [PATCH 3/3] Add entry to changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0518a9d32bd3..707fc2a812e7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ The OpenTelemetry Collector Contrib contains everything in the [opentelemetry-co - `groupbytrace` processor: Added workers for queue processing (#2902) - `resourcedetection` processor: Add docker detector (#2775) - `tailsampling` processor: Support regex on span attribute filtering (#3335_ +- Change obsreport helpers for receiver to use the new pattern created in Collector (#3439,#3443) ## 🧰 Bug fixes 🧰