diff --git a/.chloggen/pulsar_receiver_enhance.yaml b/.chloggen/pulsar_receiver_enhance.yaml new file mode 100755 index 000000000000..b938bbf9e5b9 --- /dev/null +++ b/.chloggen/pulsar_receiver_enhance.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pulsarreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: remove `topic`,`subscription`,`encoding`,`consumer_name` configuration items but introduce `trace`,`metric`,`log` items to make pulsar receiver more configurable. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [28685] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/pulsarreceiver/README.md b/receiver/pulsarreceiver/README.md index 9e8b7dfe67fd..92ee5487bd6d 100644 --- a/receiver/pulsarreceiver/README.md +++ b/receiver/pulsarreceiver/README.md @@ -17,49 +17,78 @@ Pulsar receiver receives logs, metrics, and traces from Pulsar. ## Getting Started The following settings can be optionally configured: + - `endpoint` (default = pulsar://localhost:6650): The url of pulsar cluster. -- `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the pulsar topic to consume from. -- `encoding` (default = otlp_proto): The encoding of the payload sent to pulsar. Available encodings: - - `otlp_proto`: the payload is deserialized to `ExportTraceServiceRequest`. - - `jaeger_proto`: the payload is deserialized to a single Jaeger proto `Span`. - - `jaeger_json`: the payload is deserialized to a single Jaeger JSON Span using `jsonpb`. - - `zipkin_proto`: the payload is deserialized into a list of Zipkin proto spans. - - `zipkin_json`: the payload is deserialized into a list of Zipkin V2 JSON spans. - - `zipkin_thrift`: the payload is deserialized into a list of Zipkin Thrift spans. -- `consumer_name`: specifies the consumer name. +- `trace` + - `topic`: The name of the pulsar topic. (default = ``, skip start traces exporter is topic not set) + - `encoding` + - `otlp_proto`: the payload is deserialized to `ExportTraceServiceRequest`. + - `otlp_json`: the payload is json deserialized to `ExportTraceServiceRequest`. + - `jaeger_proto`: the payload is deserialized to a single Jaeger proto `Span`. + - `jaeger_json`: the payload is deserialized to a single Jaeger JSON Span using `jsonpb`. + - `zipkin_proto`: the payload is deserialized into a list of Zipkin proto spans. + - `zipkin_json`: the payload is deserialized into a list of Zipkin V2 JSON spans. + - `zipkin_thrift`: the payload is deserialized into a list of Zipkin Thrift spans. + - `subscription`: (default = otlp_subscription): the subscription name of consumer. + - `consumer_name`: specifies the consumer name. +- `metric` + - `topic`: The name of the pulsar topic. (default = ``, skip start metrics exporter is topic not set) + - `encoding` + - `otlp_proto`: the payload is deserialized to `ExportMetricServiceRequest`. + - `otlp_json`: the payload is json deserialized to `ExportMetricServiceRequest`. + - `subscription`: (default = otlp_subscription): the subscription name of consumer. + - `consumer_name`: specifies the consumer name. +- `log` + - `topic`: The name of the pulsar topic. (default = ``, skip start logs exporter is topic not set) + - `encoding` + - `otlp_proto`: the payload is deserialized to `ExportLogServiceRequest`. + - `otlp_json`: the payload is json deserialized to `ExportLogServiceRequest`. + - `subscription`: (default = otlp_subscription): the subscription name of consumer. + - `consumer_name`: specifies the consumer name. - `auth` - - `tls` - - `cert_file`: - - `key_file`: - - `token` + - `tls` + - `cert_file`: + - `key_file`: - `token` - - `oauth2` - - `issuer_url`: - - `client_id`: - - `audience`: - - `athenz` - - `provider_domain`: - - `tenant_domain`: - - `tenant_service`: - - `private_key`: - - `key_id`: - - `principal_header`: - - `zts_url`: -- `subscription` (default = otlp_subscription): the subscription name of consumer. + - `token` + - `oauth2` + - `issuer_url`: + - `client_id`: + - `audience`: + - `athenz` + - `provider_domain`: + - `tenant_domain`: + - `tenant_service`: + - `private_key`: + - `key_id`: + - `principal_header`: + - `zts_url`: - `tls_trust_certs_file_path`: path to the CA cert. For a client this verifies the server certificate. Should only be used if `insecure` is set to true. -- `tls_allow_insecure_connection`: configure whether the Pulsar client accept untrusted TLS certificate from broker (default: false) - +- `tls_allow_insecure_connection`: configure whether the Pulsar client accept untrusted TLS certificate from broker ( + default: false) Example configuration: + ```yaml receivers: pulsar: endpoint: pulsar://localhost:6650 - topic: otlp-spans - subscription: otlp_spans_sub - consumer_name: otlp_spans_sub_1 - encoding: otlp_proto + trace: + topic: pulsar://public/default/otlp-spans + subscription: otlp_spans_sub + consumer_name: otlp_spans_sub_1 + encoding: otlp_proto + metric: + topic: pulsar://public/default/otlp-metrics + subscription: otlp_metrics_sub + consumer_name: otlp_metrics_sub_1 + encoding: otlp_proto + log: + topic: pulsar://public/default/otlp-logs + subscription: otlp_logs_sub + consumer_name: otlp_logs_sub_1 + encoding: otlp_proto auth: tls: cert_file: cert.pem diff --git a/receiver/pulsarreceiver/config.go b/receiver/pulsarreceiver/config.go index 20b799e0cd3c..529f200d2a60 100644 --- a/receiver/pulsarreceiver/config.go +++ b/receiver/pulsarreceiver/config.go @@ -11,18 +11,14 @@ import ( "go.opentelemetry.io/collector/config/configopaque" ) +var errMissTopicName = errors.New("miss topic name") + type Config struct { // Configure the service URL for the Pulsar service. - Endpoint string `mapstructure:"endpoint"` - // The topic of pulsar to consume logs,metrics,traces. (default = "otlp_traces" for traces, - // "otlp_metrics" for metrics, "otlp_logs" for logs) - Topic string `mapstructure:"topic"` - // The Subscription that receiver will be consuming messages from (default "otlp_subscription") - Subscription string `mapstructure:"subscription"` - // Encoding of the messages (default "otlp_proto") - Encoding string `mapstructure:"encoding"` - // Name specifies the consumer name. - ConsumerName string `mapstructure:"consumer_name"` + Endpoint string `mapstructure:"endpoint"` + Trace ReceiverOption `mapstructure:"trace"` + Log ReceiverOption `mapstructure:"log"` + Metric ReceiverOption `mapstructure:"metric"` // Set the path to the trusted TLS certificate file TLSTrustCertsFilePath string `mapstructure:"tls_trust_certs_file_path"` // Configure whether the Pulsar client accept untrusted TLS certificate from broker (default: false) @@ -62,6 +58,30 @@ type OAuth2 struct { Audience string `mapstructure:"audience"` } +type ReceiverOption struct { + // The topic of pulsar to consume logs,metrics,traces. (default = "") + Topic string `mapstructure:"topic"` + // The Subscription that receiver will be consuming messages from (default "otlp_subscription") + Subscription string `mapstructure:"subscription"` + // Encoding of the messages (default "otlp_proto") + Encoding string `mapstructure:"encoding"` + // Name specifies the consumer name. + ConsumerName string `mapstructure:"consumer_name"` +} + +func (opt *ReceiverOption) validate() error { + if len(opt.Encoding) == 0 { + opt.Encoding = defaultEncoding + } + if len(opt.Topic) == 0 { + return errMissTopicName + } + if len(opt.Subscription) == 0 { + opt.Subscription = defaultSubscription + } + return nil +} + var _ component.Config = (*Config)(nil) // Validate checks the receiver configuration is valid @@ -118,20 +138,31 @@ func (cfg *Config) clientOptions() pulsar.ClientOptions { return options } -func (cfg *Config) consumerOptions() (pulsar.ConsumerOptions, error) { +func (cfg *Config) consumerOptions(option ReceiverOption) pulsar.ConsumerOptions { options := pulsar.ConsumerOptions{ Type: pulsar.Failover, - Topic: cfg.Topic, - SubscriptionName: cfg.Subscription, + Topic: option.Topic, + SubscriptionName: option.Subscription, } - if len(cfg.ConsumerName) > 0 { - options.Name = cfg.ConsumerName + if len(option.ConsumerName) > 0 { + options.Name = option.ConsumerName } + return options +} - if options.SubscriptionName == "" || options.Topic == "" { - return options, errors.New("topic and subscription is required") +func (cfg *Config) createConsumer(option ReceiverOption) (pulsar.Client, pulsar.Consumer, error) { + client, err := pulsar.NewClient(cfg.clientOptions()) + if err != nil { + return nil, nil, err } - return options, nil + consumerOpts := cfg.consumerOptions(option) + consumer, err := client.Subscribe(consumerOpts) + if err != nil { + // Close the client if err happens + client.Close() + return nil, nil, err + } + return client, consumer, nil } diff --git a/receiver/pulsarreceiver/config_test.go b/receiver/pulsarreceiver/config_test.go index dfa0ea121b2c..3038c917bbe3 100644 --- a/receiver/pulsarreceiver/config_test.go +++ b/receiver/pulsarreceiver/config_test.go @@ -26,11 +26,25 @@ func TestLoadConfig(t *testing.T) { require.NoError(t, component.UnmarshalConfig(sub, cfg)) assert.Equal(t, &Config{ - Topic: "otel-pulsar", - Endpoint: "pulsar://localhost:6500", - ConsumerName: "otel-collector", - Subscription: "otel-collector", - Encoding: defaultEncoding, + Endpoint: "pulsar://localhost:6500", + Trace: ReceiverOption{ + Topic: "otel-pulsar", + ConsumerName: "otel-collector", + Subscription: "otel-collector", + Encoding: defaultEncoding, + }, + Metric: ReceiverOption{ + Topic: "otel-pulsar", + ConsumerName: "otel-collector", + Subscription: "otel-collector", + Encoding: defaultEncoding, + }, + Log: ReceiverOption{ + Topic: "otel-pulsar", + ConsumerName: "otel-collector", + Subscription: "otel-collector", + Encoding: defaultEncoding, + }, TLSTrustCertsFilePath: "ca.pem", Authentication: Authentication{TLS: &TLS{CertFile: "cert.pem", KeyFile: "key.pem"}}, }, diff --git a/receiver/pulsarreceiver/factory.go b/receiver/pulsarreceiver/factory.go index e084d6473a31..c96b193e9300 100644 --- a/receiver/pulsarreceiver/factory.go +++ b/receiver/pulsarreceiver/factory.go @@ -4,10 +4,7 @@ package pulsarreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/pulsarreceiver" import ( - "context" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/pulsarreceiver/internal/metadata" @@ -15,126 +12,43 @@ import ( const ( defaultEncoding = "otlp_proto" - defaultTraceTopic = "otlp_spans" - defaultMeticsTopic = "otlp_metrics" - defaultLogsTopic = "otlp_logs" defaultConsumerName = "" + defaultTopicName = "" defaultSubscription = "otlp_subscription" defaultServiceURL = "pulsar://localhost:6650" ) -// FactoryOption applies changes to PulsarExporterFactory. -type FactoryOption func(factory *pulsarReceiverFactory) - -// withTracesUnmarshalers adds Unmarshalers. -func withTracesUnmarshalers(tracesUnmarshalers ...TracesUnmarshaler) FactoryOption { - return func(factory *pulsarReceiverFactory) { - for _, unmarshaler := range tracesUnmarshalers { - factory.tracesUnmarshalers[unmarshaler.Encoding()] = unmarshaler - } - } -} - -// withMetricsUnmarshalers adds MetricsUnmarshalers. -func withMetricsUnmarshalers(metricsUnmarshalers ...MetricsUnmarshaler) FactoryOption { - return func(factory *pulsarReceiverFactory) { - for _, unmarshaler := range metricsUnmarshalers { - factory.metricsUnmarshalers[unmarshaler.Encoding()] = unmarshaler - } - } -} - -// withLogsUnmarshalers adds LogsUnmarshalers. -func withLogsUnmarshalers(logsUnmarshalers ...LogsUnmarshaler) FactoryOption { - return func(factory *pulsarReceiverFactory) { - for _, unmarshaler := range logsUnmarshalers { - factory.logsUnmarshalers[unmarshaler.Encoding()] = unmarshaler - } - } -} - // NewFactory creates Pulsar receiver factory. -func NewFactory(options ...FactoryOption) receiver.Factory { - - f := &pulsarReceiverFactory{ - tracesUnmarshalers: defaultTracesUnmarshalers(), - metricsUnmarshalers: defaultMetricsUnmarshalers(), - logsUnmarshalers: defaultLogsUnmarshalers(), - } - for _, o := range options { - o(f) - } +func NewFactory() receiver.Factory { return receiver.NewFactory( metadata.Type, createDefaultConfig, - receiver.WithTraces(f.createTracesReceiver, metadata.TracesStability), - receiver.WithMetrics(f.createMetricsReceiver, metadata.MetricsStability), - receiver.WithLogs(f.createLogsReceiver, metadata.LogsStability), + receiver.WithTraces(createTracesReceiver, metadata.TracesStability), + receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability), + receiver.WithLogs(createLogsReceiver, metadata.LogsStability), ) } -type pulsarReceiverFactory struct { - tracesUnmarshalers map[string]TracesUnmarshaler - metricsUnmarshalers map[string]MetricsUnmarshaler - logsUnmarshalers map[string]LogsUnmarshaler -} - -func (f *pulsarReceiverFactory) createTracesReceiver( - _ context.Context, - set receiver.CreateSettings, - cfg component.Config, - nextConsumer consumer.Traces, -) (receiver.Traces, error) { - c := *(cfg.(*Config)) - if len(c.Topic) == 0 { - c.Topic = defaultTraceTopic - } - r, err := newTracesReceiver(c, set, f.tracesUnmarshalers, nextConsumer) - if err != nil { - return nil, err - } - return r, nil -} - -func (f *pulsarReceiverFactory) createMetricsReceiver( - _ context.Context, - set receiver.CreateSettings, - cfg component.Config, - nextConsumer consumer.Metrics, -) (receiver.Metrics, error) { - c := *(cfg.(*Config)) - if len(c.Topic) == 0 { - c.Topic = defaultMeticsTopic - } - r, err := newMetricsReceiver(c, set, f.metricsUnmarshalers, nextConsumer) - if err != nil { - return nil, err - } - return r, nil -} - -func (f *pulsarReceiverFactory) createLogsReceiver( - _ context.Context, - set receiver.CreateSettings, - cfg component.Config, - nextConsumer consumer.Logs, -) (receiver.Logs, error) { - c := *(cfg.(*Config)) - if len(c.Topic) == 0 { - c.Topic = defaultLogsTopic - } - r, err := newLogsReceiver(c, set, f.logsUnmarshalers, nextConsumer) - if err != nil { - return nil, err - } - return r, nil -} - func createDefaultConfig() component.Config { return &Config{ - Encoding: defaultEncoding, - ConsumerName: defaultConsumerName, - Subscription: defaultSubscription, - Endpoint: defaultServiceURL, + Trace: ReceiverOption{ + Topic: defaultTopicName, + Encoding: defaultEncoding, + ConsumerName: defaultConsumerName, + Subscription: defaultSubscription, + }, + Log: ReceiverOption{ + Topic: defaultTopicName, + Encoding: defaultEncoding, + ConsumerName: defaultConsumerName, + Subscription: defaultSubscription, + }, + Metric: ReceiverOption{ + Topic: defaultTopicName, + Encoding: defaultEncoding, + ConsumerName: defaultConsumerName, + Subscription: defaultSubscription, + }, + Endpoint: defaultServiceURL, } } diff --git a/receiver/pulsarreceiver/factory_test.go b/receiver/pulsarreceiver/factory_test.go index cffcd4840d87..3473af5ebc07 100644 --- a/receiver/pulsarreceiver/factory_test.go +++ b/receiver/pulsarreceiver/factory_test.go @@ -4,205 +4,44 @@ package pulsarreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/pulsarreceiver" import ( - "context" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/pdata/ptrace" - "go.opentelemetry.io/collector/receiver/receivertest" ) func TestCreateDefaultConfig(t *testing.T) { cfg := createDefaultConfig() assert.Equal(t, &Config{ - Topic: "", - Encoding: defaultEncoding, - ConsumerName: defaultConsumerName, - Subscription: defaultSubscription, + Trace: ReceiverOption{ + Topic: defaultTopicName, + Encoding: defaultEncoding, + ConsumerName: defaultConsumerName, + Subscription: defaultSubscription, + }, + Metric: ReceiverOption{ + Topic: defaultTopicName, + Encoding: defaultEncoding, + ConsumerName: defaultConsumerName, + Subscription: defaultSubscription, + }, + Log: ReceiverOption{ + Topic: defaultTopicName, + Encoding: defaultEncoding, + ConsumerName: defaultConsumerName, + Subscription: defaultSubscription, + }, Endpoint: defaultServiceURL, Authentication: Authentication{}, }, cfg) } -// trace -func TestCreateTracesReceiver_err_addr(t *testing.T) { - cfg := createDefaultConfig().(*Config) - cfg.Endpoint = "invalid:6650" - - f := pulsarReceiverFactory{tracesUnmarshalers: defaultTracesUnmarshalers()} - r, err := f.createTracesReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) - require.Error(t, err) - assert.Nil(t, r) -} - -func TestCreateTracesReceiver_err_marshallers(t *testing.T) { - cfg := createDefaultConfig().(*Config) - cfg.Endpoint = defaultServiceURL - - f := pulsarReceiverFactory{tracesUnmarshalers: make(map[string]TracesUnmarshaler)} - r, err := f.createTracesReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) - require.Error(t, err) - assert.Nil(t, r) -} - -func Test_CreateTraceReceiver(t *testing.T) { - cfg := createDefaultConfig().(*Config) - f := pulsarReceiverFactory{tracesUnmarshalers: defaultTracesUnmarshalers()} - recv, err := f.createTracesReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) +func TestValidateReceiverOption(t *testing.T) { + opt := ReceiverOption{Topic: "pulsar"} + err := opt.validate() require.NoError(t, err) - assert.NotNil(t, recv) -} - -func TestWithTracesUnmarshalers(t *testing.T) { - unmarshaler := &customTracesUnmarshaler{} - f := NewFactory(withTracesUnmarshalers(unmarshaler)) - cfg := createDefaultConfig().(*Config) - - t.Run("custom_encoding", func(t *testing.T) { - cfg.Encoding = unmarshaler.Encoding() - receiver, err := f.CreateTracesReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) - require.NoError(t, err) - require.NotNil(t, receiver) - }) - t.Run("default_encoding", func(t *testing.T) { - cfg.Encoding = defaultEncoding - receiver, err := f.CreateTracesReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) - require.NoError(t, err) - assert.NotNil(t, receiver) - }) -} - -// metrics -func TestCreateMetricsReceiver_err_addr(t *testing.T) { - cfg := createDefaultConfig().(*Config) - cfg.Endpoint = "invalid:6650" - - f := pulsarReceiverFactory{metricsUnmarshalers: defaultMetricsUnmarshalers()} - r, err := f.createMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) - require.Error(t, err) - assert.Nil(t, r) -} - -func TestCreateMetricsReceiver_err_marshallers(t *testing.T) { - cfg := createDefaultConfig().(*Config) - cfg.Endpoint = defaultServiceURL - - f := pulsarReceiverFactory{metricsUnmarshalers: make(map[string]MetricsUnmarshaler)} - r, err := f.createMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) - require.Error(t, err) - assert.Nil(t, r) -} - -func Test_CreateMetricsReceiver(t *testing.T) { - cfg := createDefaultConfig().(*Config) - f := pulsarReceiverFactory{metricsUnmarshalers: defaultMetricsUnmarshalers()} - - recv, err := f.createMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) - require.NoError(t, err) - assert.NotNil(t, recv) -} - -func TestWithMetricsUnmarshalers(t *testing.T) { - unmarshaler := &customMetricsUnmarshaler{} - f := NewFactory(withMetricsUnmarshalers(unmarshaler)) - cfg := createDefaultConfig().(*Config) - - t.Run("custom_encoding", func(t *testing.T) { - cfg.Encoding = unmarshaler.Encoding() - receiver, err := f.CreateMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) - require.NoError(t, err) - require.NotNil(t, receiver) - }) - t.Run("default_encoding", func(t *testing.T) { - cfg.Encoding = defaultEncoding - receiver, err := f.CreateMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) - require.NoError(t, err) - assert.NotNil(t, receiver) - }) -} - -// logs -func TestCreateLogsReceiver_err_addr(t *testing.T) { - cfg := createDefaultConfig().(*Config) - cfg.Endpoint = "invalid:6650" - - f := pulsarReceiverFactory{logsUnmarshalers: defaultLogsUnmarshalers()} - r, err := f.createLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) - require.Error(t, err) - assert.Nil(t, r) -} - -func TestCreateLogsReceiver_err_marshallers(t *testing.T) { - cfg := createDefaultConfig().(*Config) - cfg.Endpoint = defaultServiceURL - - f := pulsarReceiverFactory{logsUnmarshalers: make(map[string]LogsUnmarshaler)} - r, err := f.createLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) - require.Error(t, err) - assert.Nil(t, r) -} - -func Test_CreateLogsReceiver(t *testing.T) { - cfg := createDefaultConfig().(*Config) - cfg.Endpoint = defaultServiceURL - - f := pulsarReceiverFactory{logsUnmarshalers: defaultLogsUnmarshalers()} - recv, err := f.createLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) - require.NoError(t, err) - assert.NotNil(t, recv) -} - -func TestWithLogsUnmarshalers(t *testing.T) { - unmarshaler := &customLogsUnmarshaler{} - f := NewFactory(withLogsUnmarshalers(unmarshaler)) - cfg := createDefaultConfig().(*Config) - - t.Run("custom_encoding", func(t *testing.T) { - cfg.Encoding = unmarshaler.Encoding() - exporter, err := f.CreateLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) - require.NoError(t, err) - require.NotNil(t, exporter) - }) - t.Run("default_encoding", func(t *testing.T) { - cfg.Encoding = defaultEncoding - exporter, err := f.CreateLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) - require.NoError(t, err) - assert.NotNil(t, exporter) - }) -} - -type customTracesUnmarshaler struct { -} - -type customMetricsUnmarshaler struct { -} - -type customLogsUnmarshaler struct { -} - -func (c customTracesUnmarshaler) Unmarshal([]byte) (ptrace.Traces, error) { - panic("implement me") -} - -func (c customTracesUnmarshaler) Encoding() string { - return "custom" -} - -func (c customMetricsUnmarshaler) Unmarshal([]byte) (pmetric.Metrics, error) { - panic("implement me") -} - -func (c customMetricsUnmarshaler) Encoding() string { - return "custom" -} - -func (c customLogsUnmarshaler) Unmarshal([]byte) (plog.Logs, error) { - panic("implement me") -} - -func (c customLogsUnmarshaler) Encoding() string { - return "custom" + require.Equal(t, opt.Topic, "pulsar") + require.Equal(t, opt.Encoding, defaultEncoding) + require.Equal(t, opt.Subscription, defaultSubscription) + require.Equal(t, opt.ConsumerName, defaultConsumerName) } diff --git a/receiver/pulsarreceiver/pulsar_receiver.go b/receiver/pulsarreceiver/pulsar_receiver.go deleted file mode 100644 index c4b08991ce30..000000000000 --- a/receiver/pulsarreceiver/pulsar_receiver.go +++ /dev/null @@ -1,320 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package pulsarreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/pulsarreceiver" - -import ( - "context" - "errors" - "strings" - "time" - - "github.com/apache/pulsar-client-go/pulsar" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/receiver" - "go.uber.org/zap" -) - -var errUnrecognizedEncoding = errors.New("unrecognized encoding") - -const alreadyClosedError = "AlreadyClosedError" - -type pulsarTracesConsumer struct { - tracesConsumer consumer.Traces - topic string - client pulsar.Client - cancel context.CancelFunc - consumer pulsar.Consumer - unmarshaler TracesUnmarshaler - settings receiver.CreateSettings - consumerOptions pulsar.ConsumerOptions -} - -func newTracesReceiver(config Config, set receiver.CreateSettings, unmarshalers map[string]TracesUnmarshaler, nextConsumer consumer.Traces) (*pulsarTracesConsumer, error) { - unmarshaler := unmarshalers[config.Encoding] - if nil == unmarshaler { - return nil, errUnrecognizedEncoding - } - - options := config.clientOptions() - client, err := pulsar.NewClient(options) - if err != nil { - return nil, err - } - - consumerOptions, err := config.consumerOptions() - if err != nil { - return nil, err - } - - return &pulsarTracesConsumer{ - tracesConsumer: nextConsumer, - topic: config.Topic, - unmarshaler: unmarshaler, - settings: set, - client: client, - consumerOptions: consumerOptions, - }, nil -} - -func (c *pulsarTracesConsumer) Start(context.Context, component.Host) error { - ctx, cancel := context.WithCancel(context.Background()) - c.cancel = cancel - - _consumer, err := c.client.Subscribe(c.consumerOptions) - if err == nil { - c.consumer = _consumer - go func() { - if e := consumerTracesLoop(ctx, c); e != nil { - c.settings.Logger.Error("consume traces loop occurs an error", zap.Error(e)) - } - }() - } - - return err -} - -func consumerTracesLoop(ctx context.Context, c *pulsarTracesConsumer) error { - unmarshaler := c.unmarshaler - traceConsumer := c.tracesConsumer - - for { - message, err := c.consumer.Receive(ctx) - if err != nil { - if strings.Contains(err.Error(), alreadyClosedError) { - return err - } - if errors.Is(err, context.Canceled) { - c.settings.Logger.Info("exiting consume traces loop") - return err - } - c.settings.Logger.Error("failed to receive traces message from Pulsar, waiting for one second before retrying", zap.Error(err)) - time.Sleep(time.Second) - continue - } - - traces, err := unmarshaler.Unmarshal(message.Payload()) - if err != nil { - c.settings.Logger.Error("failed to unmarshaler traces message", zap.Error(err)) - c.consumer.Ack(message) - return err - } - - if err := traceConsumer.ConsumeTraces(context.Background(), traces); err != nil { - c.settings.Logger.Error("consume traces failed", zap.Error(err)) - } - c.consumer.Ack(message) - } -} - -func (c *pulsarTracesConsumer) Shutdown(context.Context) error { - if c.cancel == nil { - return nil - } - c.cancel() - c.consumer.Close() - c.client.Close() - return nil -} - -type pulsarMetricsConsumer struct { - metricsConsumer consumer.Metrics - unmarshaler MetricsUnmarshaler - topic string - client pulsar.Client - consumer pulsar.Consumer - cancel context.CancelFunc - settings receiver.CreateSettings - consumerOptions pulsar.ConsumerOptions -} - -func newMetricsReceiver(config Config, set receiver.CreateSettings, unmarshalers map[string]MetricsUnmarshaler, nextConsumer consumer.Metrics) (*pulsarMetricsConsumer, error) { - unmarshaler := unmarshalers[config.Encoding] - if nil == unmarshaler { - return nil, errUnrecognizedEncoding - } - - options := config.clientOptions() - client, err := pulsar.NewClient(options) - if err != nil { - return nil, err - } - - consumerOptions, err := config.consumerOptions() - if err != nil { - return nil, err - } - - return &pulsarMetricsConsumer{ - metricsConsumer: nextConsumer, - topic: config.Topic, - unmarshaler: unmarshaler, - settings: set, - client: client, - consumerOptions: consumerOptions, - }, nil -} - -func (c *pulsarMetricsConsumer) Start(context.Context, component.Host) error { - ctx, cancel := context.WithCancel(context.Background()) - c.cancel = cancel - - _consumer, err := c.client.Subscribe(c.consumerOptions) - if err == nil { - c.consumer = _consumer - - go func() { - if e := consumeMetricsLoop(ctx, c); e != nil { - c.settings.Logger.Error("consume metrics loop occurs an error", zap.Error(e)) - } - }() - } - - return err -} - -func consumeMetricsLoop(ctx context.Context, c *pulsarMetricsConsumer) error { - unmarshaler := c.unmarshaler - metricsConsumer := c.metricsConsumer - - for { - message, err := c.consumer.Receive(ctx) - if err != nil { - if strings.Contains(err.Error(), alreadyClosedError) { - return err - } - if errors.Is(err, context.Canceled) { - c.settings.Logger.Info("exiting consume metrics loop") - return err - } - - c.settings.Logger.Error("failed to receive metrics message from Pulsar, waiting for one second before retrying", zap.Error(err)) - time.Sleep(time.Second) - continue - } - - metrics, err := unmarshaler.Unmarshal(message.Payload()) - if err != nil { - c.settings.Logger.Error("failed to unmarshaler metrics message", zap.Error(err)) - c.consumer.Ack(message) - return err - } - - if err := metricsConsumer.ConsumeMetrics(context.Background(), metrics); err != nil { - c.settings.Logger.Error("consume traces failed", zap.Error(err)) - } - - c.consumer.Ack(message) - } -} - -func (c *pulsarMetricsConsumer) Shutdown(context.Context) error { - if c.cancel == nil { - return nil - } - c.cancel() - c.consumer.Close() - c.client.Close() - return nil -} - -type pulsarLogsConsumer struct { - logsConsumer consumer.Logs - unmarshaler LogsUnmarshaler - topic string - client pulsar.Client - consumer pulsar.Consumer - cancel context.CancelFunc - settings receiver.CreateSettings - consumerOptions pulsar.ConsumerOptions -} - -func newLogsReceiver(config Config, set receiver.CreateSettings, unmarshalers map[string]LogsUnmarshaler, nextConsumer consumer.Logs) (*pulsarLogsConsumer, error) { - unmarshaler := unmarshalers[config.Encoding] - if nil == unmarshaler { - return nil, errUnrecognizedEncoding - } - - options := config.clientOptions() - client, err := pulsar.NewClient(options) - if err != nil { - return nil, err - } - - consumerOptions, err := config.consumerOptions() - if err != nil { - return nil, err - } - - return &pulsarLogsConsumer{ - logsConsumer: nextConsumer, - topic: config.Topic, - cancel: nil, - unmarshaler: unmarshaler, - settings: set, - client: client, - consumerOptions: consumerOptions, - }, nil -} - -func (c *pulsarLogsConsumer) Start(context.Context, component.Host) error { - ctx, cancel := context.WithCancel(context.Background()) - c.cancel = cancel - - _consumer, err := c.client.Subscribe(c.consumerOptions) - if err == nil { - c.consumer = _consumer - go func() { - if e := consumeLogsLoop(ctx, c); e != nil { - c.settings.Logger.Error("consume logs loop occurs an error", zap.Error(e)) - } - }() - } - - return err -} - -func consumeLogsLoop(ctx context.Context, c *pulsarLogsConsumer) error { - unmarshaler := c.unmarshaler - logsConsumer := c.logsConsumer - - for { - message, err := c.consumer.Receive(ctx) - if err != nil { - if strings.Contains(err.Error(), alreadyClosedError) { - return err - } - if errors.Is(err, context.Canceled) { - c.settings.Logger.Info("exiting consume traces loop canceled") - return err - } - c.settings.Logger.Error("failed to receive logs message from Pulsar, waiting for one second before retrying", zap.Error(err)) - time.Sleep(time.Second) - continue - } - - logs, err := unmarshaler.Unmarshal(message.Payload()) - if err != nil { - c.settings.Logger.Error("failed to unmarshaler logs message", zap.Error(err)) - c.consumer.Ack(message) - return err - } - - if err := logsConsumer.ConsumeLogs(context.Background(), logs); err != nil { - c.settings.Logger.Error("consume traces failed", zap.Error(err)) - } - - c.consumer.Ack(message) - } -} - -func (c *pulsarLogsConsumer) Shutdown(context.Context) error { - if c.cancel == nil { - return nil - } - c.cancel() - c.consumer.Close() - c.client.Close() - return nil -} diff --git a/receiver/pulsarreceiver/pulsar_receiver_test.go b/receiver/pulsarreceiver/pulsar_receiver_test.go deleted file mode 100644 index a221c368c7ba..000000000000 --- a/receiver/pulsarreceiver/pulsar_receiver_test.go +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package pulsarreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/pulsarreceiver" - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/consumer/consumertest" - "go.opentelemetry.io/collector/receiver/receivertest" -) - -func Test_newTracesReceiver_err(t *testing.T) { - c := Config{ - Encoding: defaultEncoding, - } - _, err := newTracesReceiver(c, receivertest.NewNopCreateSettings(), defaultTracesUnmarshalers(), consumertest.NewNop()) - assert.Error(t, err) -} diff --git a/receiver/pulsarreceiver/receiver_logs.go b/receiver/pulsarreceiver/receiver_logs.go new file mode 100644 index 000000000000..3c4a326cc2aa --- /dev/null +++ b/receiver/pulsarreceiver/receiver_logs.go @@ -0,0 +1,116 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package pulsarreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/pulsarreceiver" +import ( + "context" + "errors" + "strings" + "time" + + "github.com/apache/pulsar-client-go/pulsar" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/receiver" + "go.uber.org/zap" +) + +const alreadyClosedError = "AlreadyClosedError" + +type pulsarLogsReceiver struct { + logsConsumer consumer.Logs + unmarshaler LogsUnmarshaler + client pulsar.Client + consumer pulsar.Consumer + cancel context.CancelFunc + settings receiver.CreateSettings + config Config +} + +func newLogsReceiver(config Config, set receiver.CreateSettings, unmarshalers map[string]LogsUnmarshaler, nextConsumer consumer.Logs) (*pulsarLogsReceiver, error) { + option := config.Log + if err := option.validate(); err != nil { + return nil, err + } + unmarshaler := unmarshalers[option.Encoding] + if unmarshaler == nil { + return nil, errUnrecognizedEncoding + } + return &pulsarLogsReceiver{ + config: config, + logsConsumer: nextConsumer, + cancel: nil, + unmarshaler: unmarshaler, + settings: set, + }, nil +} + +func (c *pulsarLogsReceiver) Start(context.Context, component.Host) error { + client, _consumer, err := c.config.createConsumer(c.config.Log) + if err != nil { + return err + } + ctx, cancel := context.WithCancel(context.Background()) + c.cancel = cancel + c.client = client + c.consumer = _consumer + go func() { + if e := consumeLogsLoop(ctx, c); e != nil { + c.settings.Logger.Error("consume logs loop occurs an error", zap.Error(e)) + } + }() + return err +} + +func consumeLogsLoop(ctx context.Context, c *pulsarLogsReceiver) error { + unmarshaler := c.unmarshaler + logsConsumer := c.logsConsumer + + for { + message, err := c.consumer.Receive(ctx) + if err != nil { + if strings.Contains(err.Error(), alreadyClosedError) { + return err + } + if errors.Is(err, context.Canceled) { + c.settings.Logger.Info("exiting consume traces loop canceled") + return err + } + c.settings.Logger.Error("failed to receive logs message from Pulsar, waiting for one second before retrying", zap.Error(err)) + time.Sleep(time.Second) + continue + } + + logs, err := unmarshaler.Unmarshal(message.Payload()) + if err != nil { + c.settings.Logger.Error("failed to unmarshaler logs message", zap.Error(err)) + c.consumer.Ack(message) + return err + } + + if err := logsConsumer.ConsumeLogs(context.Background(), logs); err != nil { + c.settings.Logger.Error("consume traces failed", zap.Error(err)) + } + + c.consumer.Ack(message) + } +} + +func (c *pulsarLogsReceiver) Shutdown(context.Context) error { + if c.cancel == nil { + return nil + } + c.cancel() + c.consumer.Close() + c.client.Close() + return nil +} + +func createLogsReceiver(_ context.Context, set receiver.CreateSettings, cfg component.Config, nextConsumer consumer.Logs) (receiver.Logs, error) { + c := *(cfg.(*Config)) + r, err := newLogsReceiver(c, set, defaultLogsUnmarshalers(), nextConsumer) + if err != nil { + return nil, err + } + return r, nil +} diff --git a/receiver/pulsarreceiver/receiver_logs_test.go b/receiver/pulsarreceiver/receiver_logs_test.go new file mode 100644 index 000000000000..8f073127ab0d --- /dev/null +++ b/receiver/pulsarreceiver/receiver_logs_test.go @@ -0,0 +1,43 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package pulsarreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/pulsarreceiver" +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/receiver/receivertest" +) + +func TestCreateLogsReceiver_err_marshallers(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.Endpoint = defaultServiceURL + cfg.Log.Encoding = "unknown" + cfg.Log.Topic = "pulsar://public/default/otlp_logs" + r, err := createLogsReceiver(context.TODO(), receivertest.NewNopCreateSettings(), cfg, nil) + require.Error(t, err) + assert.Nil(t, r) +} + +func TestCreateLogsReceiver_err_topic(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.Endpoint = defaultServiceURL + cfg.Log.Encoding = defaultEncoding + cfg.Log.Topic = defaultTopicName + r, err := createLogsReceiver(context.TODO(), receivertest.NewNopCreateSettings(), cfg, nil) + require.Error(t, err) + assert.Nil(t, r) +} + +func Test_CreateLogsReceiver(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.Endpoint = defaultServiceURL + cfg.Log.Encoding = defaultEncoding + cfg.Log.Topic = "pulsar://public/default/otlp_logs" + + recv, err := createLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) + require.NoError(t, err) + assert.NotNil(t, recv) +} diff --git a/receiver/pulsarreceiver/receiver_metrics.go b/receiver/pulsarreceiver/receiver_metrics.go new file mode 100644 index 000000000000..6c19f2aab913 --- /dev/null +++ b/receiver/pulsarreceiver/receiver_metrics.go @@ -0,0 +1,115 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package pulsarreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/pulsarreceiver" +import ( + "context" + "errors" + "strings" + "time" + + "github.com/apache/pulsar-client-go/pulsar" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/receiver" + "go.uber.org/zap" +) + +type pulsarMetricsReceiver struct { + metricsConsumer consumer.Metrics + unmarshaler MetricsUnmarshaler + client pulsar.Client + consumer pulsar.Consumer + cancel context.CancelFunc + settings receiver.CreateSettings + config Config +} + +func newMetricsReceiver(config Config, set receiver.CreateSettings, unmarshalers map[string]MetricsUnmarshaler, nextConsumer consumer.Metrics) (*pulsarMetricsReceiver, error) { + option := config.Metric + if err := option.validate(); err != nil { + return nil, err + } + unmarshaler := unmarshalers[option.Encoding] + if unmarshaler == nil { + return nil, errUnrecognizedEncoding + } + + return &pulsarMetricsReceiver{ + metricsConsumer: nextConsumer, + unmarshaler: unmarshaler, + settings: set, + config: config, + }, nil +} + +func (c *pulsarMetricsReceiver) Start(context.Context, component.Host) error { + client, _consumer, err := c.config.createConsumer(c.config.Log) + if err != nil { + return err + } + ctx, cancel := context.WithCancel(context.Background()) + c.cancel = cancel + c.client = client + c.consumer = _consumer + go func() { + if e := consumeMetricsLoop(ctx, c); e != nil { + c.settings.Logger.Error("consume metrics loop occurs an error", zap.Error(e)) + } + }() + return err +} + +func consumeMetricsLoop(ctx context.Context, c *pulsarMetricsReceiver) error { + unmarshaler := c.unmarshaler + metricsConsumer := c.metricsConsumer + + for { + message, err := c.consumer.Receive(ctx) + if err != nil { + if strings.Contains(err.Error(), alreadyClosedError) { + return err + } + if errors.Is(err, context.Canceled) { + c.settings.Logger.Info("exiting consume metrics loop") + return err + } + + c.settings.Logger.Error("failed to receive metrics message from Pulsar, waiting for one second before retrying", zap.Error(err)) + time.Sleep(time.Second) + continue + } + + metrics, err := unmarshaler.Unmarshal(message.Payload()) + if err != nil { + c.settings.Logger.Error("failed to unmarshaler metrics message", zap.Error(err)) + c.consumer.Ack(message) + return err + } + + if err := metricsConsumer.ConsumeMetrics(context.Background(), metrics); err != nil { + c.settings.Logger.Error("consume traces failed", zap.Error(err)) + } + + c.consumer.Ack(message) + } +} + +func (c *pulsarMetricsReceiver) Shutdown(context.Context) error { + if c.cancel == nil { + return nil + } + c.cancel() + c.consumer.Close() + c.client.Close() + return nil +} + +func createMetricsReceiver(_ context.Context, set receiver.CreateSettings, cfg component.Config, nextConsumer consumer.Metrics) (receiver.Metrics, error) { + c := *(cfg.(*Config)) + r, err := newMetricsReceiver(c, set, defaultMetricsUnmarshalers(), nextConsumer) + if err != nil { + return nil, err + } + return r, nil +} diff --git a/receiver/pulsarreceiver/receiver_metrics_test.go b/receiver/pulsarreceiver/receiver_metrics_test.go new file mode 100644 index 000000000000..b24583a0f06e --- /dev/null +++ b/receiver/pulsarreceiver/receiver_metrics_test.go @@ -0,0 +1,45 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package pulsarreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/pulsarreceiver" +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/receiver/receivertest" +) + +func TestCreateMetricsReceiver_err_marshallers(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.Endpoint = defaultServiceURL + cfg.Metric.Encoding = "unknown" + cfg.Metric.Topic = "pulsar://public/default/otlp_metrics" + + r, err := createMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) + require.Error(t, err) + assert.Nil(t, r) +} + +func TestCreateMetricsReceiver_err_topic(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.Endpoint = defaultServiceURL + cfg.Metric.Encoding = "unknown" + cfg.Metric.Topic = defaultTopicName + + r, err := createMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) + require.Error(t, err) + assert.Nil(t, r) +} + +func Test_CreateMetricsReceiver(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.Endpoint = defaultServiceURL + cfg.Metric.Encoding = defaultEncoding + cfg.Metric.Topic = "pulsar://public/default/otlp_metrics" + + recv, err := createMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) + require.NoError(t, err) + assert.NotNil(t, recv) +} diff --git a/receiver/pulsarreceiver/receiver_traces.go b/receiver/pulsarreceiver/receiver_traces.go new file mode 100644 index 000000000000..47fa7ef18288 --- /dev/null +++ b/receiver/pulsarreceiver/receiver_traces.go @@ -0,0 +1,113 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package pulsarreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/pulsarreceiver" +import ( + "context" + "errors" + "strings" + "time" + + "github.com/apache/pulsar-client-go/pulsar" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/receiver" + "go.uber.org/zap" +) + +type pulsarTracesReceiver struct { + tracesConsumer consumer.Traces + client pulsar.Client + cancel context.CancelFunc + consumer pulsar.Consumer + unmarshaler TracesUnmarshaler + settings receiver.CreateSettings + config Config +} + +func newTracesReceiver(config Config, set receiver.CreateSettings, unmarshalers map[string]TracesUnmarshaler, nextConsumer consumer.Traces) (*pulsarTracesReceiver, error) { + option := config.Trace + if err := option.validate(); err != nil { + return nil, err + } + unmarshaler := unmarshalers[option.Encoding] + if unmarshaler == nil { + return nil, errUnrecognizedEncoding + } + + return &pulsarTracesReceiver{ + tracesConsumer: nextConsumer, + unmarshaler: unmarshaler, + settings: set, + config: config, + }, nil +} + +func (c *pulsarTracesReceiver) Start(context.Context, component.Host) error { + client, _consumer, err := c.config.createConsumer(c.config.Log) + if err != nil { + return err + } + ctx, cancel := context.WithCancel(context.Background()) + c.cancel = cancel + c.client = client + c.consumer = _consumer + go func() { + if e := consumeTracesLoop(ctx, c); e != nil { + c.settings.Logger.Error("consume traces loop occurs an error", zap.Error(e)) + } + }() + return err +} + +func consumeTracesLoop(ctx context.Context, c *pulsarTracesReceiver) error { + unmarshaler := c.unmarshaler + traceConsumer := c.tracesConsumer + + for { + message, err := c.consumer.Receive(ctx) + if err != nil { + if strings.Contains(err.Error(), alreadyClosedError) { + return err + } + if errors.Is(err, context.Canceled) { + c.settings.Logger.Info("exiting consume traces loop") + return err + } + c.settings.Logger.Error("failed to receive traces message from Pulsar, waiting for one second before retrying", zap.Error(err)) + time.Sleep(time.Second) + continue + } + + traces, err := unmarshaler.Unmarshal(message.Payload()) + if err != nil { + c.settings.Logger.Error("failed to unmarshaler traces message", zap.Error(err)) + c.consumer.Ack(message) + return err + } + + if err := traceConsumer.ConsumeTraces(context.Background(), traces); err != nil { + c.settings.Logger.Error("consume traces failed", zap.Error(err)) + } + c.consumer.Ack(message) + } +} + +func (c *pulsarTracesReceiver) Shutdown(context.Context) error { + if c.cancel == nil { + return nil + } + c.cancel() + c.consumer.Close() + c.client.Close() + return nil +} + +func createTracesReceiver(_ context.Context, set receiver.CreateSettings, cfg component.Config, nextConsumer consumer.Traces) (receiver.Traces, error) { + c := *(cfg.(*Config)) + r, err := newTracesReceiver(c, set, defaultTracesUnmarshalers(), nextConsumer) + if err != nil { + return nil, err + } + return r, nil +} diff --git a/receiver/pulsarreceiver/receiver_traces_test.go b/receiver/pulsarreceiver/receiver_traces_test.go new file mode 100644 index 000000000000..0182f75236ef --- /dev/null +++ b/receiver/pulsarreceiver/receiver_traces_test.go @@ -0,0 +1,45 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package pulsarreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/pulsarreceiver" +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/receiver/receivertest" +) + +func TestCreateTracesReceiver_err_marshallers(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.Endpoint = defaultServiceURL + cfg.Trace.Encoding = "unknown" + cfg.Trace.Topic = "pulsar://public/default/otlp_metrics" + + r, err := createTracesReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) + require.Error(t, err) + assert.Nil(t, r) +} + +func TestCreateTracesReceiver_err_topic(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.Endpoint = defaultServiceURL + cfg.Trace.Encoding = defaultEncoding + cfg.Trace.Topic = defaultTopicName + + r, err := createTracesReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) + require.Error(t, err) + assert.Nil(t, r) +} + +func Test_CreateTraceReceiver(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.Endpoint = defaultServiceURL + cfg.Trace.Encoding = defaultEncoding + cfg.Trace.Topic = "pulsar://public/default/otlp_spans" + + recv, err := createTracesReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) + require.NoError(t, err) + assert.NotNil(t, recv) +} diff --git a/receiver/pulsarreceiver/testdata/config.yml b/receiver/pulsarreceiver/testdata/config.yml index 40de9955e068..9fd5b9ba57bd 100644 --- a/receiver/pulsarreceiver/testdata/config.yml +++ b/receiver/pulsarreceiver/testdata/config.yml @@ -1,8 +1,20 @@ pulsar: - topic: otel-pulsar endpoint: pulsar://localhost:6500 - consumer_name: otel-collector - subscription: otel-collector + trace: + topic: otel-pulsar + consumer_name: otel-collector + subscription: otel-collector + encoding: otlp_proto + metric: + topic: otel-pulsar + consumer_name: otel-collector + subscription: otel-collector + encoding: otlp_proto + log: + topic: otel-pulsar + consumer_name: otel-collector + subscription: otel-collector + encoding: otlp_proto tls_trust_certs_file_path: ca.pem tls_allow_insecure_connection: false auth: diff --git a/receiver/pulsarreceiver/unmarshaler.go b/receiver/pulsarreceiver/unmarshaler.go index 12ae7178b52f..3028d6d8a430 100644 --- a/receiver/pulsarreceiver/unmarshaler.go +++ b/receiver/pulsarreceiver/unmarshaler.go @@ -4,6 +4,8 @@ package pulsarreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/pulsarreceiver" import ( + "errors" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" @@ -12,6 +14,8 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin/zipkinv2" ) +var errUnrecognizedEncoding = errors.New("unrecognized encoding") + // copy from kafka receiver // TracesUnmarshaler deserializes the message body. @@ -44,6 +48,7 @@ type LogsUnmarshaler interface { // defaultTracesUnmarshalers returns map of supported encodings with TracesUnmarshaler. func defaultTracesUnmarshalers() map[string]TracesUnmarshaler { otlpPb := newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding) + otlpJSON := newPdataTracesUnmarshaler(&ptrace.JSONUnmarshaler{}, "otlp_json") jaegerProto := jaegerProtoSpanUnmarshaler{} jaegerJSON := jaegerJSONSpanUnmarshaler{} zipkinProto := newPdataTracesUnmarshaler(zipkinv2.NewProtobufTracesUnmarshaler(false, false), "zipkin_proto") @@ -51,6 +56,7 @@ func defaultTracesUnmarshalers() map[string]TracesUnmarshaler { zipkinThrift := newPdataTracesUnmarshaler(zipkinv1.NewThriftTracesUnmarshaler(), "zipkin_thrift") return map[string]TracesUnmarshaler{ otlpPb.Encoding(): otlpPb, + otlpJSON.Encoding(): otlpJSON, jaegerProto.Encoding(): jaegerProto, jaegerJSON.Encoding(): jaegerJSON, zipkinProto.Encoding(): zipkinProto, @@ -61,14 +67,18 @@ func defaultTracesUnmarshalers() map[string]TracesUnmarshaler { func defaultMetricsUnmarshalers() map[string]MetricsUnmarshaler { otlpPb := newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding) + otlpJSON := newPdataMetricsUnmarshaler(&pmetric.JSONUnmarshaler{}, "otlp_json") return map[string]MetricsUnmarshaler{ - otlpPb.Encoding(): otlpPb, + otlpPb.Encoding(): otlpPb, + otlpJSON.Encoding(): otlpJSON, } } func defaultLogsUnmarshalers() map[string]LogsUnmarshaler { otlpPb := newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding) + otlpJSON := newPdataLogsUnmarshaler(&plog.JSONUnmarshaler{}, "otlp_json") return map[string]LogsUnmarshaler{ - otlpPb.Encoding(): otlpPb, + otlpPb.Encoding(): otlpPb, + otlpJSON.Encoding(): otlpJSON, } } diff --git a/receiver/pulsarreceiver/unmarshaler_test.go b/receiver/pulsarreceiver/unmarshaler_test.go index eacdc46eef0b..191a7d7f4088 100644 --- a/receiver/pulsarreceiver/unmarshaler_test.go +++ b/receiver/pulsarreceiver/unmarshaler_test.go @@ -14,6 +14,7 @@ import ( func TestDefaultTracesUnMarshaler(t *testing.T) { expectedEncodings := []string{ "otlp_proto", + "otlp_json", "jaeger_proto", "jaeger_json", "zipkin_proto", @@ -34,6 +35,7 @@ func TestDefaultTracesUnMarshaler(t *testing.T) { func TestDefaultMetricsUnMarshaler(t *testing.T) { expectedEncodings := []string{ "otlp_proto", + "otlp_json", } marshalers := defaultMetricsUnmarshalers() assert.Equal(t, len(expectedEncodings), len(marshalers)) @@ -49,6 +51,7 @@ func TestDefaultMetricsUnMarshaler(t *testing.T) { func TestDefaultLogsUnMarshaler(t *testing.T) { expectedEncodings := []string{ "otlp_proto", + "otlp_json", } marshalers := defaultLogsUnmarshalers() assert.Equal(t, len(expectedEncodings), len(marshalers))