From cd95b2ca01036f3516238cd2bfc2b6d93ffd5ae8 Mon Sep 17 00:00:00 2001 From: sakulali Date: Thu, 2 Nov 2023 05:46:53 +0800 Subject: [PATCH] [receiver/wavefront] wrap metrics receiver under carbon receiver instead of using export function (#27259) **Description:** Wavefrontreceiver is very similar to carbonreceiver: it is TCP based in which each received text line represents a single metric data point. In order to avoid using exported function `carbonreceiver.New(...)`, we can wrap metrics receiver under carbon receiver. **Link to tracking Issue:** https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/27248 **Testing:** make chlog-validate go test for wavefrontreceiver **Documentation:** --------- Co-authored-by: Pablo Baeyens --- .chloggen/fix-wavefront-metric-factory.yaml | 27 ++++++++ receiver/wavefrontreceiver/factory.go | 29 ++------- receiver/wavefrontreceiver/receiver.go | 72 +++++++++++++++++++++ 3 files changed, 104 insertions(+), 24 deletions(-) create mode 100755 .chloggen/fix-wavefront-metric-factory.yaml create mode 100644 receiver/wavefrontreceiver/receiver.go diff --git a/.chloggen/fix-wavefront-metric-factory.yaml b/.chloggen/fix-wavefront-metric-factory.yaml new file mode 100755 index 000000000000..cd48462ce930 --- /dev/null +++ b/.chloggen/fix-wavefront-metric-factory.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: wavefrontreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Wrap metrics receiver under carbon receiver instead of using export function + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27248] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/wavefrontreceiver/factory.go b/receiver/wavefrontreceiver/factory.go index c7595bea5f67..6c9f44bcabc0 100644 --- a/receiver/wavefrontreceiver/factory.go +++ b/receiver/wavefrontreceiver/factory.go @@ -5,14 +5,13 @@ package wavefrontreceiver // import "github.com/open-telemetry/opentelemetry-col import ( "context" + "fmt" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/protocol" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/transport" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/wavefrontreceiver/internal/metadata" ) @@ -42,27 +41,9 @@ func createMetricsReceiver( cfg component.Config, consumer consumer.Metrics, ) (receiver.Metrics, error) { - - rCfg := cfg.(*Config) - - // Wavefront is very similar to Carbon: it is TCP based in which each received - // text line represents a single metric data point. They differ on the format - // of their textual representation. - // - // The Wavefront receiver leverages the Carbon receiver code by implementing - // a dedicated parser for its format. - carbonCfg := carbonreceiver.Config{ - NetAddr: confignet.NetAddr{ - Endpoint: rCfg.Endpoint, - Transport: "tcp", - }, - TCPIdleTimeout: rCfg.TCPIdleTimeout, - Parser: &protocol.Config{ - Type: "plaintext", // TODO: update after other parsers are implemented for Carbon receiver. - Config: &WavefrontParser{ - ExtractCollectdTags: rCfg.ExtractCollectdTags, - }, - }, + rCfg, ok := cfg.(*Config) + if !ok { + return nil, fmt.Errorf("a wavefront receiver config was expected by the receiver factory, but got %T", rCfg) } - return carbonreceiver.New(params, carbonCfg, consumer) + return newMetricsReceiver(rCfg, params, consumer), nil } diff --git a/receiver/wavefrontreceiver/receiver.go b/receiver/wavefrontreceiver/receiver.go new file mode 100644 index 000000000000..2a3c5c6cac41 --- /dev/null +++ b/receiver/wavefrontreceiver/receiver.go @@ -0,0 +1,72 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package wavefrontreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/wavefrontreceiver" + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/confignet" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/receiver" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/protocol" +) + +var _ receiver.Metrics = (*metricsReceiver)(nil) + +type metricsReceiver struct { + cfg *Config + set receiver.CreateSettings + nextConsumer consumer.Metrics + carbonReceiver receiver.Metrics +} + +func newMetricsReceiver(cfg *Config, set receiver.CreateSettings, nextConsumer consumer.Metrics) *metricsReceiver { + return &metricsReceiver{ + cfg: cfg, + set: set, + nextConsumer: nextConsumer, + } +} + +func (r *metricsReceiver) Start(ctx context.Context, host component.Host) error { + fact := carbonreceiver.NewFactory() + + // Wavefront is very similar to Carbon: it is TCP based in which each received + // text line represents a single metric data point. They differ on the format + // of their textual representation. + // + // The Wavefront receiver leverages the Carbon receiver code by implementing + // a dedicated parser for its format. + carbonCfg := &carbonreceiver.Config{ + NetAddr: confignet.NetAddr{ + Endpoint: r.cfg.Endpoint, + Transport: "tcp", + }, + TCPIdleTimeout: r.cfg.TCPIdleTimeout, + Parser: &protocol.Config{ + Type: "plaintext", // TODO: update after other parsers are implemented for Carbon receiver. + Config: &WavefrontParser{ + ExtractCollectdTags: r.cfg.ExtractCollectdTags, + }, + }, + } + + carbonReceiver, err := fact.CreateMetricsReceiver(ctx, r.set, carbonCfg, r.nextConsumer) + if err != nil { + return err + } + r.carbonReceiver = carbonReceiver + + return r.carbonReceiver.Start(ctx, host) +} + +func (r *metricsReceiver) Shutdown(ctx context.Context) error { + if r.carbonReceiver != nil { + return r.carbonReceiver.Shutdown(ctx) + } + return nil +}