Skip to content

Commit

Permalink
[exporter/kafka] Impelement partitioning for OTLP metrics (#31315)
Browse files Browse the repository at this point in the history
**Description:** Add resource attributes based partitioning for OTLP
metrics

In our backend we really need an ability to distribute metrics based on
resource attributes.
For this I added additional flag to the configuration.
Some code from traces partitioning by traceId reused.

Judging by issues, this feature is anticipated by several more people.

**Link to tracking Issue:**
[31675](#31675)

Additionally this feature was menioned in these issues:
[29433](#29433),
[30666](#30666)


**Testing:**
Added tests for hashing utility.
Added tests for marshalling and asserting correct keys and the number of
messages.
Tested locally with host metrics and chained OTLP metrics receiver.

**Documentation:** 
Changelog entry
Flag is added to the doc of KafkaExporter

---------

Co-authored-by: Curtis Robert <crobert@splunk.com>
  • Loading branch information
SHaaD94 and crobert-1 authored Apr 29, 2024
1 parent 556c237 commit cec2153
Show file tree
Hide file tree
Showing 11 changed files with 168 additions and 22 deletions.
27 changes: 27 additions & 0 deletions .chloggen/kafka-exporter-key-by-metric-resources.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkaexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: add an ability to publish kafka messages with message key based on metric resource attributes - it will allow partitioning metrics in Kafka.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [29433, 30666, 31675]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user, api]
1 change: 1 addition & 0 deletions exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ The following settings can be optionally configured:
- The following encodings are valid *only* for **logs**.
- `raw`: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded.
- `partition_traces_by_id` (default = false): configures the exporter to include the trace ID as the message key in trace messages sent to kafka. *Please note:* this setting does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default.
- `partition_metrics_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka.
- `auth`
- `plain_text`
- `username`: The username to use.
Expand Down
2 changes: 2 additions & 0 deletions exporter/kafkaexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type Config struct {
// trace ID as the message key by default.
PartitionTracesByID bool `mapstructure:"partition_traces_by_id"`

PartitionMetricsByResourceAttributes bool `mapstructure:"partition_metrics_by_resource_attributes"`

// Metadata is the namespace for metadata management properties used by the
// Client, and shared by the Producer/Consumer.
Metadata Metadata `mapstructure:"metadata"`
Expand Down
23 changes: 13 additions & 10 deletions exporter/kafkaexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ func TestLoadConfig(t *testing.T) {
NumConsumers: 2,
QueueSize: 10,
},
Topic: "spans",
Encoding: "otlp_proto",
PartitionTracesByID: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
Topic: "spans",
Encoding: "otlp_proto",
PartitionTracesByID: true,
PartitionMetricsByResourceAttributes: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
Authentication: kafka.Authentication{
PlainText: &kafka.PlainTextConfig{
Username: "jdoe",
Expand Down Expand Up @@ -109,11 +110,12 @@ func TestLoadConfig(t *testing.T) {
NumConsumers: 2,
QueueSize: 10,
},
Topic: "spans",
Encoding: "otlp_proto",
PartitionTracesByID: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
Topic: "spans",
Encoding: "otlp_proto",
PartitionTracesByID: true,
PartitionMetricsByResourceAttributes: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
Authentication: kafka.Authentication{
PlainText: &kafka.PlainTextConfig{
Username: "jdoe",
Expand Down Expand Up @@ -165,6 +167,7 @@ func TestLoadConfig(t *testing.T) {
Topic: "spans",
Encoding: "otlp_proto",
PartitionTracesByID: true,
PartitionMetricsByResourceAttributes: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
ResolveCanonicalBootstrapServersOnly: true,
Expand Down
7 changes: 5 additions & 2 deletions exporter/kafkaexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ const (
defaultCompression = "none"
// default from sarama.NewConfig()
defaultFluxMaxMessages = 0
// partitioning metrics by resource attributes is disabled by default
defaultPartitionMetricsByResourceAttributesEnabled = false
)

// FactoryOption applies changes to kafkaExporterFactory.
Expand Down Expand Up @@ -97,8 +99,9 @@ func createDefaultConfig() component.Config {
Brokers: []string{defaultBroker},
ClientID: defaultClientID,
// using an empty topic to track when it has not been set by user, default is based on traces or metrics.
Topic: "",
Encoding: defaultEncoding,
Topic: "",
Encoding: defaultEncoding,
PartitionMetricsByResourceAttributes: defaultPartitionMetricsByResourceAttributesEnabled,
Metadata: Metadata{
Full: defaultMetadataFull,
Retry: MetadataRetry{
Expand Down
1 change: 1 addition & 0 deletions exporter/kafkaexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.99.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka v0.99.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.99.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.99.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.99.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.99.0
github.com/openzipkin/zipkin-go v0.4.2
Expand Down
6 changes: 6 additions & 0 deletions exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,12 @@ func newMetricsExporter(config Config, set exporter.CreateSettings, marshalers m
if marshaler == nil {
return nil, errUnrecognizedEncoding
}
if config.PartitionMetricsByResourceAttributes {
if keyableMarshaler, ok := marshaler.(KeyableMetricsMarshaler); ok {
keyableMarshaler.Key()
}
}

return &kafkaMetricsProducer{
cfg: config,
topic: config.Topic,
Expand Down
64 changes: 64 additions & 0 deletions exporter/kafkaexporter/marshaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
)
Expand Down Expand Up @@ -71,6 +72,69 @@ func TestDefaultLogsMarshalers(t *testing.T) {
}
}

func TestOTLPMetricsJsonMarshaling(t *testing.T) {
tests := []struct {
name string
keyEnabled bool
messagePartitionKeys []sarama.Encoder
}{
{
name: "partitioning_disabled",
keyEnabled: false,
messagePartitionKeys: []sarama.Encoder{nil},
},
{
name: "partitioning_enabled",
keyEnabled: true,
messagePartitionKeys: []sarama.Encoder{
sarama.ByteEncoder{0x62, 0x7f, 0x20, 0x34, 0x85, 0x49, 0x55, 0x2e, 0xfa, 0x93, 0xae, 0xd7, 0xde, 0x91, 0xd7, 0x16},
sarama.ByteEncoder{0x75, 0x6b, 0xb4, 0xd6, 0xff, 0xeb, 0x92, 0x22, 0xa, 0x68, 0x65, 0x48, 0xe0, 0xd3, 0x94, 0x44},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
metric := pmetric.NewMetrics()
r := pcommon.NewResource()
r.Attributes().PutStr("service.name", "my_service_name")
r.Attributes().PutStr("service.instance.id", "kek_x_1")
r.CopyTo(metric.ResourceMetrics().AppendEmpty().Resource())

rm := metric.ResourceMetrics().At(0)
rm.SetSchemaUrl(conventions.SchemaURL)

sm := rm.ScopeMetrics().AppendEmpty()
pmetric.NewScopeMetrics()
m := sm.Metrics().AppendEmpty()
m.SetEmptyGauge()
m.Gauge().DataPoints().AppendEmpty().SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(1, 0)))
m.Gauge().DataPoints().At(0).Attributes().PutStr("gauage_attribute", "attr")
m.Gauge().DataPoints().At(0).SetDoubleValue(1.0)

r1 := pcommon.NewResource()
r1.Attributes().PutStr("service.instance.id", "kek_x_2")
r1.Attributes().PutStr("service.name", "my_service_name")
r1.CopyTo(metric.ResourceMetrics().AppendEmpty().Resource())

standardMarshaler := metricsMarshalers()["otlp_json"]
keyableMarshaler, ok := standardMarshaler.(KeyableMetricsMarshaler)
require.True(t, ok, "Must be a KeyableMetricsMarshaler")
if tt.keyEnabled {
keyableMarshaler.Key()
}

msgs, err := standardMarshaler.Marshal(metric, "KafkaTopicX")
require.NoError(t, err, "Must have marshaled the data without error")

require.Len(t, msgs, len(tt.messagePartitionKeys), "Number of messages must be %d, but was %d", len(tt.messagePartitionKeys), len(msgs))

for i := 0; i < len(tt.messagePartitionKeys); i++ {
require.Equal(t, tt.messagePartitionKeys[i], msgs[i].Key, "message %d has incorrect key", i)
}
})
}
}

func TestOTLPTracesJsonMarshaling(t *testing.T) {
t.Parallel()

Expand Down
57 changes: 47 additions & 10 deletions exporter/kafkaexporter/pdata_marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil"
)

type pdataLogsMarshaler struct {
Expand Down Expand Up @@ -42,36 +43,72 @@ func newPdataLogsMarshaler(marshaler plog.Marshaler, encoding string) LogsMarsha
}
}

// KeyableMetricsMarshaler is an extension of the MetricsMarshaler interface intended to provide partition key capabilities
// for metrics messages
type KeyableMetricsMarshaler interface {
MetricsMarshaler
Key()
}

type pdataMetricsMarshaler struct {
marshaler pmetric.Marshaler
encoding string
keyed bool
}

// Key configures the pdataMetricsMarshaler to set the message key on the kafka messages
func (p *pdataMetricsMarshaler) Key() {
p.keyed = true
}

func (p pdataMetricsMarshaler) Marshal(ld pmetric.Metrics, topic string) ([]*sarama.ProducerMessage, error) {
bts, err := p.marshaler.MarshalMetrics(ld)
if err != nil {
return nil, err
}
return []*sarama.ProducerMessage{
{
var msgs []*sarama.ProducerMessage
if p.keyed {
metrics := ld.ResourceMetrics()

for i := 0; i < metrics.Len(); i++ {
resourceMetrics := metrics.At(i)
var hash = pdatautil.MapHash(resourceMetrics.Resource().Attributes())

newMetrics := pmetric.NewMetrics()
resourceMetrics.CopyTo(newMetrics.ResourceMetrics().AppendEmpty())

bts, err := p.marshaler.MarshalMetrics(newMetrics)
if err != nil {
return nil, err
}
msgs = append(msgs, &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(bts),
Key: sarama.ByteEncoder(hash[:]),
})
}
} else {
bts, err := p.marshaler.MarshalMetrics(ld)
if err != nil {
return nil, err
}
msgs = append(msgs, &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(bts),
},
}, nil
})
}

return msgs, nil
}

func (p pdataMetricsMarshaler) Encoding() string {
return p.encoding
}

func newPdataMetricsMarshaler(marshaler pmetric.Marshaler, encoding string) MetricsMarshaler {
return pdataMetricsMarshaler{
return &pdataMetricsMarshaler{
marshaler: marshaler,
encoding: encoding,
}
}

// KeyableTracesMarshaler is an extension of the TracesMarshaler interface inteded to provide partition key capabilities
// KeyableTracesMarshaler is an extension of the TracesMarshaler interface intended to provide partition key capabilities
// for trace messages
type KeyableTracesMarshaler interface {
TracesMarshaler
Expand Down
1 change: 1 addition & 0 deletions exporter/kafkaexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ kafka:
required_acks: -1 # WaitForAll
timeout: 10s
partition_traces_by_id: true
partition_metrics_by_resource_attributes: true
auth:
plain_text:
username: jdoe
Expand Down
1 change: 1 addition & 0 deletions receiver/kafkareceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.99.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.99.0 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.19.0 // indirect
Expand Down

0 comments on commit cec2153

Please sign in to comment.