Skip to content

Commit

Permalink
Move receiver builder into internal service (#10781)
Browse files Browse the repository at this point in the history
#### Description

This moves the receiver builder out of the `receiver` package, and into
`service/internal/builders`.
There's no real reason for this struct to be public (folks shouldn't
call it), and making it private will allow us to add profiling support
to it.

#### Link to tracking issue

#10375 (review)
  • Loading branch information
dmathieu committed Aug 21, 2024
1 parent 6764622 commit 454432e
Show file tree
Hide file tree
Showing 15 changed files with 443 additions and 41 deletions.
25 changes: 25 additions & 0 deletions .chloggen/private-receiver-builder.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: deprecation

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: receiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Deprecate receiver.Builder, and move it into an internal package of the service module

# One or more tracking issues or pull requests related to the change
issues: [10781]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
3 changes: 3 additions & 0 deletions cmd/otelcorecol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
Expand All @@ -64,6 +65,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mostynb/go-grpc-compression v1.2.3 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus/client_golang v1.20.1 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
Expand All @@ -74,6 +76,7 @@ require (
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/spf13/cobra v1.8.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/testify v1.9.0 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
Expand Down
13 changes: 6 additions & 7 deletions internal/e2e/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,12 @@ func Test_ComponentStatusReporting_SharedInstance(t *testing.T) {
set := service.Settings{
BuildInfo: component.NewDefaultBuildInfo(),
CollectorConf: confmap.New(),
Receivers: receiver.NewBuilder(
map[component.ID]component.Config{
component.NewID(component.MustNewType("test")): &receiverConfig{},
},
map[component.Type]receiver.Factory{
component.MustNewType("test"): newReceiverFactory(),
}),
ReceiversConfigs: map[component.ID]component.Config{
component.NewID(component.MustNewType("test")): &receiverConfig{},
},
ReceiversFactories: map[component.Type]receiver.Factory{
component.MustNewType("test"): newReceiverFactory(),
},
Processors: processortest.NewNopBuilder(),
Exporters: exportertest.NewNopBuilder(),
Connectors: connectortest.NewNopBuilder(),
Expand Down
14 changes: 8 additions & 6 deletions otelcol/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/otelcol/internal/grpclog"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/service"
)

Expand Down Expand Up @@ -187,11 +186,14 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
col.service, err = service.New(ctx, service.Settings{
BuildInfo: col.set.BuildInfo,
CollectorConf: conf,
Receivers: receiver.NewBuilder(cfg.Receivers, factories.Receivers),
Processors: processor.NewBuilder(cfg.Processors, factories.Processors),
Exporters: exporter.NewBuilder(cfg.Exporters, factories.Exporters),
Connectors: connector.NewBuilder(cfg.Connectors, factories.Connectors),
Extensions: extension.NewBuilder(cfg.Extensions, factories.Extensions),

ReceiversConfigs: cfg.Receivers,
ReceiversFactories: factories.Receivers,

Processors: processor.NewBuilder(cfg.Processors, factories.Processors),
Exporters: exporter.NewBuilder(cfg.Exporters, factories.Exporters),
Connectors: connector.NewBuilder(cfg.Connectors, factories.Connectors),
Extensions: extension.NewBuilder(cfg.Extensions, factories.Extensions),
ModuleInfo: extension.ModuleInfo{
Receiver: factories.ReceiverModules,
Processor: factories.ProcessorModules,
Expand Down
12 changes: 11 additions & 1 deletion receiver/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package receiver // import "go.opentelemetry.io/collector/receiver"

import (
"context"
"errors"
"fmt"

"go.uber.org/zap"
Expand All @@ -13,13 +14,22 @@ import (
"go.opentelemetry.io/collector/consumer"
)

// Builder receiver is a helper struct that given a set of Configs and Factories helps with creating receivers.
var errNilNextConsumer = errors.New("nil next Consumer")

// Builder receiver is a helper struct that given a set of Configs and
// Factories helps with creating receivers.
//
// Deprecated: [v0.108.0] this builder is being internalized within the service module,
// and will be removed soon.
type Builder struct {
cfgs map[component.ID]component.Config
factories map[component.Type]Factory
}

// NewBuilder creates a new receiver.Builder to help with creating components form a set of configs and factories.
//
// Deprecated: [v0.108.0] this builder is being internalized within the service module,
// and will be removed soon.
func NewBuilder(cfgs map[component.ID]component.Config, factories map[component.Type]Factory) *Builder {
return &Builder{cfgs: cfgs, factories: factories}
}
Expand Down
5 changes: 0 additions & 5 deletions receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,12 @@
package receiver // import "go.opentelemetry.io/collector/receiver"

import (
"errors"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/receiver/internal"
)

var (
errNilNextConsumer = errors.New("nil next Consumer")
)

// Traces receiver receives traces.
// Its purpose is to translate data from any format to the collector's internal trace format.
// TracesReceiver feeds a consumer.Traces with data.
Expand Down
3 changes: 3 additions & 0 deletions receiver/receivertest/nop_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ type nopReceiver struct {
}

// NewNopBuilder returns a receiver.Builder that constructs nop receivers.
//
// Deprecated: [v0.108.0] this builder is being internalized within the service module,
// and will be removed soon.
func NewNopBuilder() *receiver.Builder {
nopFactory := NewNopFactory()
return receiver.NewBuilder(
Expand Down
129 changes: 129 additions & 0 deletions service/internal/builders/receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package builders // import "go.opentelemetry.io/collector/service/internal/builders"

import (
"context"
"errors"
"fmt"

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receivertest"
)

var (
errNilNextConsumer = errors.New("nil next Consumer")
nopType = component.MustNewType("nop")
)

// Receiver is an interface that allows using implementations of the builder
// from different packages.
type Receiver interface {
CreateTraces(context.Context, receiver.Settings, consumer.Traces) (receiver.Traces, error)
CreateMetrics(context.Context, receiver.Settings, consumer.Metrics) (receiver.Metrics, error)
CreateLogs(context.Context, receiver.Settings, consumer.Logs) (receiver.Logs, error)
Factory(component.Type) component.Factory
}

// ReceiverBuilder receiver is a helper struct that given a set of Configs and
// Factories helps with creating receivers.
type ReceiverBuilder struct {
cfgs map[component.ID]component.Config
factories map[component.Type]receiver.Factory
}

// NewReceiver creates a new ReceiverBuilder to help with creating
// components form a set of configs and factories.
func NewReceiver(cfgs map[component.ID]component.Config, factories map[component.Type]receiver.Factory) *ReceiverBuilder {
return &ReceiverBuilder{cfgs: cfgs, factories: factories}
}

// CreateTraces creates a Traces receiver based on the settings and config.
func (b *ReceiverBuilder) CreateTraces(ctx context.Context, set receiver.Settings, next consumer.Traces) (receiver.Traces, error) {
if next == nil {
return nil, errNilNextConsumer
}
cfg, existsCfg := b.cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("receiver %q is not configured", set.ID)
}

f, existsFactory := b.factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("receiver factory not available for: %q", set.ID)
}

logStabilityLevel(set.Logger, f.TracesReceiverStability())
return f.CreateTracesReceiver(ctx, set, cfg, next)
}

// CreateMetrics creates a Metrics receiver based on the settings and config.
func (b *ReceiverBuilder) CreateMetrics(ctx context.Context, set receiver.Settings, next consumer.Metrics) (receiver.Metrics, error) {
if next == nil {
return nil, errNilNextConsumer
}
cfg, existsCfg := b.cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("receiver %q is not configured", set.ID)
}

f, existsFactory := b.factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("receiver factory not available for: %q", set.ID)
}

logStabilityLevel(set.Logger, f.MetricsReceiverStability())
return f.CreateMetricsReceiver(ctx, set, cfg, next)
}

// CreateLogs creates a Logs receiver based on the settings and config.
func (b *ReceiverBuilder) CreateLogs(ctx context.Context, set receiver.Settings, next consumer.Logs) (receiver.Logs, error) {
if next == nil {
return nil, errNilNextConsumer
}
cfg, existsCfg := b.cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("receiver %q is not configured", set.ID)
}

f, existsFactory := b.factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("receiver factory not available for: %q", set.ID)
}

logStabilityLevel(set.Logger, f.LogsReceiverStability())
return f.CreateLogsReceiver(ctx, set, cfg, next)
}

func (b *ReceiverBuilder) Factory(componentType component.Type) component.Factory {
return b.factories[componentType]
}

// logStabilityLevel logs the stability level of a component. The log level is set to info for
// undefined, unmaintained, deprecated and development. The log level is set to debug
// for alpha, beta and stable.
func logStabilityLevel(logger *zap.Logger, sl component.StabilityLevel) {
if sl >= component.StabilityLevelAlpha {
logger.Debug(sl.LogMessage())
} else {
logger.Info(sl.LogMessage())
}
}

// NewNopReceiverConfigsAndFactories returns a configuration and factories that allows building a new nop receiver.
func NewNopReceiverConfigsAndFactories() (map[component.ID]component.Config, map[component.Type]receiver.Factory) {
nopFactory := receivertest.NewNopFactory()
configs := map[component.ID]component.Config{
component.NewID(nopType): nopFactory.CreateDefaultConfig(),
}
factories := map[component.Type]receiver.Factory{
nopType: nopFactory,
}

return configs, factories
}
Loading

0 comments on commit 454432e

Please sign in to comment.