Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/kafka]Decide the Kafka topic based on the value of the attribute. #31809

Merged
merged 26 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
88f8227
Decide the Kafka topic based on the value of the attribute.
pyama86 Mar 18, 2024
1148f19
Stop making unnecessary assertions
pyama86 Mar 18, 2024
881dfea
In most cases, it is better not to have missing logs.
pyama86 Mar 18, 2024
441556d
add change log
pyama86 Mar 19, 2024
28c28ac
Merge branch 'main' into attribute-topic
pyama86 Mar 19, 2024
9b92821
I want to become proficient in English.
pyama86 Mar 31, 2024
f27ac4e
Check for the presence of spans, and if none exist, set a default topic.
pyama86 Mar 31, 2024
1a28d5b
Merge branch 'attribute-topic' of github.com:pyama86/opentelemetry-co…
pyama86 Mar 31, 2024
6ba26f4
Use the first valid attribute that can be obtained.
pyama86 Mar 31, 2024
1da7629
add readme for topic_from_attribute
pyama86 Mar 31, 2024
0079c09
Using interfaces allows for DRY (Don't Repeat Yourself) implementation.
pyama86 Apr 7, 2024
7fd0865
What's related to trace is span.
pyama86 Apr 7, 2024
be1efb8
Write an appropriate description.
pyama86 Apr 7, 2024
dfb1a85
There's no need for assignment.
pyama86 Apr 8, 2024
50568fe
Update exporter/kafkaexporter/config.go
pyama86 Apr 9, 2024
70e58e6
Merge branch 'main' into attribute-topic
pyama86 Apr 24, 2024
44272fa
Needs a clearer explanation
pyama86 Apr 26, 2024
3991e9d
Update exporter/kafkaexporter/README.md
pyama86 Apr 28, 2024
141544e
Merge branch 'main' into attribute-topic
codeboten May 1, 2024
99ea82e
Merge branch 'main' into attribute-topic
pyama86 May 2, 2024
5e9785a
fix test interface
pyama86 May 2, 2024
09c53e8
Merge branch 'main' into attribute-topic
pyama86 May 2, 2024
687522a
Merge branch 'main' into attribute-topic
codeboten May 2, 2024
5942c3c
fix lint
pyama86 May 3, 2024
c959fbe
Merge branch 'attribute-topic' of github.com:pyama86/opentelemetry-co…
pyama86 May 3, 2024
27d13a3
Merge branch 'attribute-topic' of github.com:pyama86/opentelemetry-co…
pyama86 May 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/attribute-topic.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: exporter/kafkaexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Enable setting message topics using resource attributes.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31178]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
1 change: 1 addition & 0 deletions exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ The following settings can be optionally configured:
- `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup.
- `client_id` (default = "sarama"): The client ID to configure the Sarama Kafka client with. The client ID will be used for all produce requests.
- `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the kafka topic to export to.
- `topic_from_attribute` (default = ""): Specify the resource attribute whose value should be used as the message's topic. This option, when set, will take precedence over the default topic. If `topic_from_attribute` is not set, the message's topic will be set to the value of the configuration option `topic` instead.
- `encoding` (default = otlp_proto): The encoding of the traces sent to kafka. All available encodings:
- `otlp_proto`: payload is Protobuf serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
- `otlp_json`: payload is JSON serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
Expand Down
3 changes: 3 additions & 0 deletions exporter/kafkaexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type Config struct {
// The name of the kafka topic to export to (default otlp_spans for traces, otlp_metrics for metrics)
Topic string `mapstructure:"topic"`

// TopicFromAttribute is the name of the attribute to use as the topic name.
TopicFromAttribute string `mapstructure:"topic_from_attribute"`
pyama86 marked this conversation as resolved.
Show resolved Hide resolved

// Encoding of messages (default "otlp_proto")
Encoding string `mapstructure:"encoding"`

Expand Down
35 changes: 26 additions & 9 deletions exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand All @@ -26,7 +27,6 @@ var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding")
type kafkaTracesProducer struct {
cfg Config
producer sarama.SyncProducer
topic string
marshaler TracesMarshaler
logger *zap.Logger
}
Expand All @@ -41,7 +41,7 @@ func (ke kafkaErrors) Error() string {
}

func (e *kafkaTracesProducer) tracesPusher(_ context.Context, td ptrace.Traces) error {
messages, err := e.marshaler.Marshal(td, e.topic)
messages, err := e.marshaler.Marshal(td, getTopic(&e.cfg, td.ResourceSpans()))
if err != nil {
return consumererror.NewPermanent(err)
}
Expand Down Expand Up @@ -78,13 +78,12 @@ func (e *kafkaTracesProducer) start(_ context.Context, _ component.Host) error {
type kafkaMetricsProducer struct {
cfg Config
producer sarama.SyncProducer
topic string
marshaler MetricsMarshaler
logger *zap.Logger
}

func (e *kafkaMetricsProducer) metricsDataPusher(_ context.Context, md pmetric.Metrics) error {
messages, err := e.marshaler.Marshal(md, e.topic)
messages, err := e.marshaler.Marshal(md, getTopic(&e.cfg, md.ResourceMetrics()))
if err != nil {
return consumererror.NewPermanent(err)
}
Expand Down Expand Up @@ -121,13 +120,12 @@ func (e *kafkaMetricsProducer) start(_ context.Context, _ component.Host) error
type kafkaLogsProducer struct {
cfg Config
producer sarama.SyncProducer
topic string
marshaler LogsMarshaler
logger *zap.Logger
}

func (e *kafkaLogsProducer) logsDataPusher(_ context.Context, ld plog.Logs) error {
messages, err := e.marshaler.Marshal(ld, e.topic)
messages, err := e.marshaler.Marshal(ld, getTopic(&e.cfg, ld.ResourceLogs()))
if err != nil {
return consumererror.NewPermanent(err)
}
Expand Down Expand Up @@ -219,7 +217,6 @@ func newMetricsExporter(config Config, set exporter.CreateSettings, marshalers m

return &kafkaMetricsProducer{
cfg: config,
topic: config.Topic,
marshaler: marshaler,
logger: set.Logger,
}, nil
Expand All @@ -240,7 +237,6 @@ func newTracesExporter(config Config, set exporter.CreateSettings, marshalers ma

return &kafkaTracesProducer{
cfg: config,
topic: config.Topic,
marshaler: marshaler,
logger: set.Logger,
}, nil
Expand All @@ -254,9 +250,30 @@ func newLogsExporter(config Config, set exporter.CreateSettings, marshalers map[

return &kafkaLogsProducer{
cfg: config,
topic: config.Topic,
marshaler: marshaler,
logger: set.Logger,
}, nil

}

type resourceSlice[T any] interface {
Len() int
At(int) T
}

type resource interface {
Resource() pcommon.Resource
}

func getTopic[T resource](cfg *Config, resources resourceSlice[T]) string {
if cfg.TopicFromAttribute == "" {
return cfg.Topic
}
for i := 0; i < resources.Len(); i++ {
rv, ok := resources.At(i).Resource().Attributes().Get(cfg.TopicFromAttribute)
if ok && rv.Str() != "" {
return rv.Str()
}
}
return cfg.Topic
TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved
}
126 changes: 126 additions & 0 deletions exporter/kafkaexporter/kafka_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,25 @@ func TestTracesPusher(t *testing.T) {
require.NoError(t, err)
}

func TestTracesPusher_attr(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
producer.ExpectSendMessageAndSucceed()

p := kafkaTracesProducer{
cfg: Config{
TopicFromAttribute: "kafka_topic",
},
producer: producer,
marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding),
}
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
})
err := p.tracesPusher(context.Background(), testdata.GenerateTraces(2))
require.NoError(t, err)
}

func TestTracesPusher_err(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
Expand Down Expand Up @@ -196,6 +215,25 @@ func TestMetricsDataPusher(t *testing.T) {
require.NoError(t, err)
}

func TestMetricsDataPusher_attr(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
producer.ExpectSendMessageAndSucceed()

p := kafkaMetricsProducer{
cfg: Config{
TopicFromAttribute: "kafka_topic",
},
producer: producer,
marshaler: newPdataMetricsMarshaler(&pmetric.ProtoMarshaler{}, defaultEncoding),
}
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
})
err := p.metricsDataPusher(context.Background(), testdata.GenerateMetrics(2))
require.NoError(t, err)
}

func TestMetricsDataPusher_err(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
Expand Down Expand Up @@ -243,6 +281,25 @@ func TestLogsDataPusher(t *testing.T) {
require.NoError(t, err)
}

func TestLogsDataPusher_attr(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
producer.ExpectSendMessageAndSucceed()

p := kafkaLogsProducer{
cfg: Config{
TopicFromAttribute: "kafka_topic",
},
producer: producer,
marshaler: newPdataLogsMarshaler(&plog.ProtoMarshaler{}, defaultEncoding),
}
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
})
err := p.logsDataPusher(context.Background(), testdata.GenerateLogs(1))
require.NoError(t, err)
}

func TestLogsDataPusher_err(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
Expand Down Expand Up @@ -311,3 +368,72 @@ func (e logsErrorMarshaler) Marshal(_ plog.Logs, _ string) ([]*sarama.ProducerMe
func (e logsErrorMarshaler) Encoding() string {
panic("implement me")
}

func Test_GetTopic(t *testing.T) {
tests := []struct {
name string
cfg Config
resource interface{}
wantTopic string
}{
{
name: "Valid metric attribute, return topic name",
cfg: Config{
TopicFromAttribute: "resource-attr",
Topic: "defaultTopic",
},
resource: testdata.GenerateMetrics(1).ResourceMetrics(),
wantTopic: "resource-attr-val-1",
},
{
name: "Valid trace attribute, return topic name",
cfg: Config{
TopicFromAttribute: "resource-attr",
Topic: "defaultTopic",
},
resource: testdata.GenerateTraces(1).ResourceSpans(),
wantTopic: "resource-attr-val-1",
},
{
name: "Valid log attribute, return topic name",
cfg: Config{
TopicFromAttribute: "resource-attr",
Topic: "defaultTopic",
},
resource: testdata.GenerateLogs(1).ResourceLogs(),
wantTopic: "resource-attr-val-1",
},
{
name: "Attribute not found",
cfg: Config{
TopicFromAttribute: "nonexistent_attribute",
Topic: "defaultTopic",
},
resource: testdata.GenerateMetrics(1).ResourceMetrics(),
wantTopic: "defaultTopic",
},
{
name: "TopicFromAttribute not set, return default topic",
cfg: Config{
Topic: "defaultTopic",
},
resource: testdata.GenerateMetrics(1).ResourceMetrics(),
wantTopic: "defaultTopic",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
topic := ""
switch r := tt.resource.(type) {
case pmetric.ResourceMetricsSlice:
topic = getTopic(&tt.cfg, r)
case ptrace.ResourceSpansSlice:
topic = getTopic(&tt.cfg, r)
case plog.ResourceLogsSlice:
topic = getTopic(&tt.cfg, r)
}
assert.Equal(t, tt.wantTopic, topic)
})
}
}
Loading