From adfca099252a479f921c8f208820991b68f5f01b Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Tue, 30 Apr 2024 13:01:00 +0100 Subject: [PATCH 1/5] Add a otelcol.exporter.kafka component. --- CHANGELOG.md | 4 + .../sources/reference/compatibility/_index.md | 1 + .../components/otelcol.exporter.kafka.md | 323 ++++++++++++++++++ .../components/otelcol.receiver.kafka.md | 2 +- internal/component/all/all.go | 1 + .../component/otelcol/exporter/kafka/kafka.go | 179 ++++++++++ .../otelcol/exporter/kafka/kafka_test.go | 240 +++++++++++++ .../otelcolconvert/converter_kafkaexporter.go | 72 ++++ .../otelcolconvert/testdata/kafka.alloy | 58 +++- .../otelcolconvert/testdata/kafka.yaml | 46 ++- .../tools/docs_generator/docs_updated_test.go | 4 +- 11 files changed, 912 insertions(+), 18 deletions(-) create mode 100644 docs/sources/reference/components/otelcol.exporter.kafka.md create mode 100644 internal/component/otelcol/exporter/kafka/kafka.go create mode 100644 internal/component/otelcol/exporter/kafka/kafka_test.go create mode 100644 internal/converter/internal/otelcolconvert/converter_kafkaexporter.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 458cd752ce..1c1a619572 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,10 @@ Main (unreleased) - Update Public preview `remotecfg` to use `alloy-remote-config` instead of `agent-remote-config`. The API has been updated to use the term `collector` over `agent`. (@erikbaranowski) +### Features + +- Adding an `otelcol.exporter.kafka` component for sending OTLP metrics, logs, and traces to Kafka. + ### Enhancements - (_Public preview_) Add native histogram support to `otelcol.receiver.prometheus`. (@wildum) diff --git a/docs/sources/reference/compatibility/_index.md b/docs/sources/reference/compatibility/_index.md index f391fe7f21..5747b2720e 100644 --- a/docs/sources/reference/compatibility/_index.md +++ b/docs/sources/reference/compatibility/_index.md @@ -285,6 +285,7 @@ The following components, grouped by namespace, _export_ OpenTelemetry `otelcol. - [otelcol.connector.servicegraph](../components/otelcol.connector.servicegraph) - [otelcol.connector.spanlogs](../components/otelcol.connector.spanlogs) - [otelcol.connector.spanmetrics](../components/otelcol.connector.spanmetrics) +- [otelcol.exporter.kafka](../components/otelcol.exporter.kafka) - [otelcol.exporter.loadbalancing](../components/otelcol.exporter.loadbalancing) - [otelcol.exporter.logging](../components/otelcol.exporter.logging) - [otelcol.exporter.loki](../components/otelcol.exporter.loki) diff --git a/docs/sources/reference/components/otelcol.exporter.kafka.md b/docs/sources/reference/components/otelcol.exporter.kafka.md new file mode 100644 index 0000000000..a13b20c1e2 --- /dev/null +++ b/docs/sources/reference/components/otelcol.exporter.kafka.md @@ -0,0 +1,323 @@ +--- +canonical: https://grafana.com/docs/alloy/latest/reference/components/otelcol.exporter.kafka/ +description: Learn about otelcol.exporter.kafka +title: otelcol.exporter.kafka +--- + +# otelcol.exporter.kafka + +`otelcol.exporter.kafka` accepts logs, metrics, and traces telemetry data from +other `otelcol` components and sends it to Kafka. + +> **NOTE**: `otelcol.exporter.kafka` is a wrapper over the upstream +> OpenTelemetry Collector `kafka` exporter from the `otelcol-contrib` +> distribution. Bug reports or feature requests will be redirected to the +> upstream repository, if necessary. + +Multiple `otelcol.exporter.kafka` components can be specified by giving them +different labels. + +## Usage + +```alloy +otelcol.exporter.kafka "LABEL" { + protocol_version = "PROTOCOL_VERSION" +} +``` + +## Arguments + +The following arguments are supported: + +Name | Type | Description | Default | Required +------------------------------------------ | --------------- | ----------------------------------------------------------------------------------- | -------------------- | -------- +`protocol_version` | `string` | Kafka protocol version to use. | | yes +`brokers` | `list(string)` | Kafka brokers to connect to. | `["localhost:9092"]` | no +`topic` | `string` | Kafka topic to read from. | _See below_ | no +`topic_from_attribute` | `string` | A resource attribute whose value should be used as the message's topic. | `""` | no +`encoding` | `string` | Encoding of payload read from Kafka. | `"otlp_proto"` | no +`client_id` | `string` | Consumer client ID to use. The ID will be used for all produce requests. | `"sarama"` | no +`timeout` | `duration` | The timeout for every attempt to send data to the backend. | `"5s"` | no +`resolve_canonical_bootstrap_servers_only` | `bool` | Whether to resolve then reverse-lookup broker IPs during startup. | `"false"` | no +`partition_traces_by_id` | `bool` | Whether to include the trace ID as the message key in trace messages sent to Kafka. | `"false"` | no +`partition_metrics_by_resource_attributes` | `bool` | Whether to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to Kafka. | `"false"` | no + +If `topic` is not set, different topics will be used for different telemetry signals: + +* Metrics will be received from an `otlp_metrics` topic. +* Traces will be received from an `otlp_spans` topic. +* Logs will be received from an `otlp_logs` topic. + +If `topic` is set to a specific value, then only the signal type that corresponds to the data stored in the topic must be set in the output block. +For example, if `topic` is set to `"my_telemetry"`, then the `"my_telemetry"` topic can only contain either metrics, logs, or traces. +If it contains only metrics, then `otelcol.exporter.kafka` should be configured to process only metrics. + +When `topic_from_attribute` is set, it will take precedence over `topic`. + +The `encoding` argument determines how to encode messages sent to Kafka. +`encoding` must be one of the following strings: +* Encodings which work for traces, logs, and metrics: + * `"otlp_proto"`: Encode messages as OTLP protobuf. + * `"otlp_json"`: Encode messages as OTLP JSON. +* Encodings which work only for traces: + * `"jaeger_proto"`: The payload is serialized to a single Jaeger proto `Span`, and keyed by TraceID. + * `"jaeger_json"`: The payload is serialized to a single Jaeger JSON Span using `jsonpb`, and keyed by TraceID. + * `"zipkin_proto"`: The payload is serialized to Zipkin v2 proto Span. + * `"zipkin_json"`: The payload is serialized to Zipkin v2 JSON Span. +* Encodings which work only for logs: + * `"raw"`: If the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded. + +`partition_traces_by_id` does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default. + +## Blocks + +The following blocks are supported inside the definition of `otelcol.exporter.kafka`: + +Hierarchy | Block | Description | Required +-------------------------------- | ------------------- | --------------------------------------------------------------------------- | -------- +authentication | [authentication][] | Configures authentication for connecting to Kafka brokers. | no +authentication > plaintext | [plaintext][] | Authenticates against Kafka brokers with plaintext. | no +authentication > sasl | [sasl][] | Authenticates against Kafka brokers with SASL. | no +authentication > sasl > aws_msk | [aws_msk][] | Additional SASL parameters when using AWS_MSK_IAM. | no +authentication > tls | [tls][] | Configures TLS for connecting to the Kafka brokers. | no +authentication > kerberos | [kerberos][] | Authenticates against Kafka brokers with Kerberos. | no +metadata | [metadata][] | Configures how to retrieve metadata from Kafka brokers. | no +metadata > retry | [retry][] | Configures how to retry metadata retrieval. | no +retry_on_failure | [retry_on_failure][] | Configures retry mechanism for failed requests. | no +queue | [queue][] | Configures batching of data before sending. | no +producer | [producer][] | Kafka producer configuration, | no +debug_metrics | [debug_metrics][] | Configures the metrics which this component generates to monitor its state. | no + +The `>` symbol indicates deeper levels of nesting. +For example, `authentication > tls` refers to a `tls` block defined inside an `authentication` block. + +[authentication]: #authentication-block +[plaintext]: #plaintext-block +[sasl]: #sasl-block +[aws_msk]: #aws_msk-block +[tls]: #tls-block +[kerberos]: #kerberos-block +[metadata]: #metadata-block +[retry]: #retry-block +[retry_on_failure]: #retry_on_failure-block +[queue]: #queue-block +[producer]: #producer-block +[debug_metrics]: #debug_metrics-block + +### authentication block + +The `authentication` block holds the definition of different authentication +mechanisms to use when connecting to Kafka brokers. It doesn't support any +arguments and is configured fully through inner blocks. + +### plaintext block + +The `plaintext` block configures `PLAIN` authentication against Kafka brokers. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`username` | `string` | Username to use for `PLAIN` authentication. | | yes +`password` | `secret` | Password to use for `PLAIN` authentication. | | yes + +### sasl block + +The `sasl` block configures SASL authentication against Kafka brokers. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`username` | `string` | Username to use for SASL authentication. | | yes +`password` | `secret` | Password to use for SASL authentication. | | yes +`mechanism` | `string` | SASL mechanism to use when authenticating. | | yes +`version` | `number` | Version of the SASL Protocol to use when authenticating. | `0` | no + +The `mechanism` argument can be set to one of the following strings: + +* `"PLAIN"` +* `"AWS_MSK_IAM"` +* `"SCRAM-SHA-256"` +* `"SCRAM-SHA-512"` + +When `mechanism` is set to `"AWS_MSK_IAM"`, the [`aws_msk` child block][aws_msk] must also be provided. + +The `version` argument can be set to either `0` or `1`. + +### aws_msk block + +The `aws_msk` block configures extra parameters for SASL authentication when +using the `AWS_MSK_IAM` mechanism. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`region` | `string` | AWS region the MSK cluster is based in. | | yes +`broker_addr` | `string` | MSK address to connect to for authentication. | | yes + +### tls block + +The `tls` block configures TLS settings used for connecting to the Kafka +brokers. If the `tls` block isn't provided, TLS won't be used for +communication. + +{{< docs/shared lookup="reference/components/otelcol-tls-client-block.md" source="alloy" version="" >}} + +### kerberos block + +The `kerberos` block configures Kerberos authentication against the Kafka +broker. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`service_name` | `string` | Kerberos service name. | | no +`realm` | `string` | Kerberos realm. | | no +`use_keytab` | `string` | Enables using keytab instead of password. | | no +`username` | `string` | Kerberos username to authenticate as. | | yes +`password` | `secret` | Kerberos password to authenticate with. | | no +`config_file` | `string` | Path to Kerberos location (for example, `/etc/krb5.conf`). | | no +`keytab_file` | `string` | Path to keytab file (for example, `/etc/security/kafka.keytab`). | | no + +When `use_keytab` is `false`, the `password` argument is required. When +`use_keytab` is `true`, the file pointed to by the `keytab_file` argument is +used for authentication instead. At most one of `password` or `keytab_file` +must be provided. + +### metadata block + +The `metadata` block configures how to retrieve and store metadata from the +Kafka broker. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`include_all_topics` | `bool` | When true, maintains metadata for all topics. | `true` | no + +If the `include_all_topics` argument is `true`, `otelcol.exporter.kafka` +maintains a full set of metadata for all topics rather than the minimal set +that has been necessary so far. Including the full set of metadata is more +convenient for users but can consume a substantial amount of memory if you have +many topics and partitions. + +Retrieving metadata may fail if the Kafka broker is starting up at the same +time as the `otelcol.exporter.kafka` component. The [`retry` child +block][retry] can be provided to customize retry behavior. + +### retry block + +The `retry` block configures how to retry retrieving metadata when retrieval +fails. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`max_retries` | `number` | How many times to reattempt retrieving metadata. | `3` | no +`backoff` | `duration` | Time to wait between retries. | `"250ms"` | no + +### retry_on_failure block + +The `retry_on_failure` block configures how failed requests to Kafka are retried. + +{{< docs/shared lookup="reference/components/otelcol-retry-block.md" source="alloy" version="" >}} + +### queue block + +The `queue` block configures an in-memory buffer of batches before data is sent to the gRPC server. + +{{< docs/shared lookup="reference/components/otelcol-queue-block.md" source="alloy" version="" >}} + +### producer block + +The `producer` block configures how to retry retrieving metadata when retrieval fails. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`max_message_bytes` | `number` | The maximum permitted size of a message in bytes. | `1000000` | no +`required_acks` | `number` | Controls when a message is regarded as transmitted. | `1` | no +`compression` | `string` | Time to wait between retries. | `"none"` | no +`flush_max_messages` | `number` | Time to wait between retries. | `0` | no + +Refer to the [sarama documentation][RequiredAcks] for more information on `required_acks`. + +`compression` could be set to either `none`, `gzip`, `snappy`, `lz4`, or `zstd`. +Refer to the [sarama documentation][CompressionCodec] for more information. + +[RequiredAcks]: https://pkg.go.dev/github.com/IBM/sarama@v1.43.2#RequiredAcks +[CompressionCodec]: https://pkg.go.dev/github.com/IBM/sarama@v1.43.2#CompressionCodec + +### debug_metrics block + +{{< docs/shared lookup="reference/components/otelcol-debug-metrics-block.md" source="alloy" version="" >}} + +## Exported fields + +The following fields are exported and can be referenced by other components: + +Name | Type | Description +--------|--------------------|----------------------------------------------------------------- +`input` | `otelcol.Consumer` | A value that other components can use to send telemetry data to. + +`input` accepts `otelcol.Consumer` data for any telemetry signal (metrics, logs, or traces). + +## Component health + +`otelcol.exporter.kafka` is only reported as unhealthy if given an invalid +configuration. + +## Debug information + +`otelcol.exporter.kafka` does not expose any component-specific debug +information. + +## Example + +This example forwards telemetry data through a batch processor before finally sending it to Kafka: + +```alloy +otelcol.receiver.otlp "default" { + http {} + grpc {} + + output { + metrics = [otelcol.processor.batch.default.input] + logs = [otelcol.processor.batch.default.input] + traces = [otelcol.processor.batch.default.input] + } +} + +otelcol.processor.batch "default" { + output { + metrics = [otelcol.exporter.kafka.default.input] + logs = [otelcol.exporter.kafka.default.input] + traces = [otelcol.exporter.kafka.default.input] + } +} + +otelcol.exporter.kafka "default" { + brokers = ["localhost:9092"] + protocol_version = "2.0.0" +} +``` + + + +## Compatible components + +`otelcol.exporter.kafka` has exports that can be consumed by the following components: + +- Components that consume [OpenTelemetry `otelcol.Consumer`](../../compatibility/#opentelemetry-otelcolconsumer-consumers) + +{{< admonition type="note" >}} +Connecting some components may not be sensible or components may require further configuration to make the connection work correctly. +Refer to the linked documentation for more details. +{{< /admonition >}} + + \ No newline at end of file diff --git a/docs/sources/reference/components/otelcol.receiver.kafka.md b/docs/sources/reference/components/otelcol.receiver.kafka.md index e0664d1ced..acd01b63ac 100644 --- a/docs/sources/reference/components/otelcol.receiver.kafka.md +++ b/docs/sources/reference/components/otelcol.receiver.kafka.md @@ -40,7 +40,7 @@ Name | Type | Description | Default | Required ---- | ---- | ----------- | ------- | -------- `brokers` | `array(string)` | Kafka brokers to connect to. | | yes `protocol_version` | `string` | Kafka protocol version to use. | | yes -`topic` | `string` | Kafka topic to read from. | | no +`topic` | `string` | Kafka topic to read from. | _See below_ | no `encoding` | `string` | Encoding of payload read from Kafka. | `"otlp_proto"` | no `group_id` | `string` | Consumer group to consume messages from. | `"otel-collector"` | no `client_id` | `string` | Consumer client ID to use. | `"otel-collector"` | no diff --git a/internal/component/all/all.go b/internal/component/all/all.go index f852e62341..760153c3fd 100644 --- a/internal/component/all/all.go +++ b/internal/component/all/all.go @@ -67,6 +67,7 @@ import ( _ "github.com/grafana/alloy/internal/component/otelcol/connector/servicegraph" // Import otelcol.connector.servicegraph _ "github.com/grafana/alloy/internal/component/otelcol/connector/spanlogs" // Import otelcol.connector.spanlogs _ "github.com/grafana/alloy/internal/component/otelcol/connector/spanmetrics" // Import otelcol.connector.spanmetrics + _ "github.com/grafana/alloy/internal/component/otelcol/exporter/kafka" // Import otelcol.exporter.kafka _ "github.com/grafana/alloy/internal/component/otelcol/exporter/loadbalancing" // Import otelcol.exporter.loadbalancing _ "github.com/grafana/alloy/internal/component/otelcol/exporter/logging" // Import otelcol.exporter.logging _ "github.com/grafana/alloy/internal/component/otelcol/exporter/loki" // Import otelcol.exporter.loki diff --git a/internal/component/otelcol/exporter/kafka/kafka.go b/internal/component/otelcol/exporter/kafka/kafka.go new file mode 100644 index 0000000000..2b0a5f0d8c --- /dev/null +++ b/internal/component/otelcol/exporter/kafka/kafka.go @@ -0,0 +1,179 @@ +// Package kafka provides an otelcol.exporter.kafka component. +package kafka + +import ( + "time" + + "github.com/IBM/sarama" + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/otelcol" + otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" + "github.com/grafana/alloy/internal/component/otelcol/exporter" + alloy_kafka_receiver "github.com/grafana/alloy/internal/component/otelcol/receiver/kafka" + "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/syntax" + "github.com/mitchellh/mapstructure" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" + otelcomponent "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter/exporterhelper" + otelextension "go.opentelemetry.io/collector/extension" +) + +func init() { + component.Register(component.Registration{ + Name: "otelcol.exporter.kafka", + Stability: featuregate.StabilityGenerallyAvailable, + Args: Arguments{}, + Exports: otelcol.ConsumerExports{}, + + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + fact := kafkaexporter.NewFactory() + return exporter.New(opts, fact, args.(Arguments), exporter.TypeAll) + }, + }) +} + +// Arguments configures the otelcol.exporter.kafka component. +type Arguments struct { + ProtocolVersion string `alloy:"protocol_version,attr"` + Brokers []string `alloy:"brokers,attr,optional"` + ResolveCanonicalBootstrapServersOnly bool `alloy:"resolve_canonical_bootstrap_servers_only,attr,optional"` + ClientID string `alloy:"client_id,attr,optional"` + Topic string `alloy:"topic,attr,optional"` + TopicFromAttribute string `alloy:"topic_from_attribute,attr,optional"` + Encoding string `alloy:"encoding,attr,optional"` + PartitionTracesByID bool `alloy:"partition_traces_by_id,attr,optional"` + PartitionMetricsByResourceAttributes bool `alloy:"partition_metrics_by_resource_attributes,attr,optional"` + Timeout time.Duration `alloy:"timeout,attr,optional"` + + Authentication alloy_kafka_receiver.AuthenticationArguments `alloy:"authentication,block,optional"` + Metadata alloy_kafka_receiver.MetadataArguments `alloy:"metadata,block,optional"` + Retry otelcol.RetryArguments `alloy:"retry_on_failure,block,optional"` + Queue otelcol.QueueArguments `alloy:"sending_queue,block,optional"` + Producer Producer `alloy:"producer,block,optional"` + + // DebugMetrics configures component internal metrics. Optional. + DebugMetrics otelcolCfg.DebugMetricsArguments `alloy:"debug_metrics,block,optional"` +} + +// Producer defines configuration for producer +type Producer struct { + // Maximum message bytes the producer will accept to produce. + MaxMessageBytes int `alloy:"max_message_bytes,attr,optional"` + + // RequiredAcks Number of acknowledgements required to assume that a message has been sent. + // https://pkg.go.dev/github.com/IBM/sarama@v1.30.0#RequiredAcks + // The options are: + // 0 -> NoResponse. doesn't send any response + // 1 -> WaitForLocal. waits for only the local commit to succeed before responding ( default ) + // -1 -> WaitForAll. waits for all in-sync replicas to commit before responding. + RequiredAcks int `alloy:"required_acks,attr,optional"` + + // Compression Codec used to produce messages + // https://pkg.go.dev/github.com/IBM/sarama@v1.30.0#CompressionCodec + // The options are: 'none', 'gzip', 'snappy', 'lz4', and 'zstd' + Compression string `alloy:"compression,attr,optional"` + + // The maximum number of messages the producer will send in a single + // broker request. Defaults to 0 for unlimited. Similar to + // `queue.buffering.max.messages` in the JVM producer. + FlushMaxMessages int `alloy:"flush_max_messages,attr,optional"` +} + +// Convert converts args into the upstream type. +func (args Producer) Convert() kafkaexporter.Producer { + return kafkaexporter.Producer{ + MaxMessageBytes: args.MaxMessageBytes, + RequiredAcks: sarama.RequiredAcks(args.RequiredAcks), + Compression: args.Compression, + FlushMaxMessages: args.FlushMaxMessages, + } +} + +var ( + _ syntax.Validator = (*Arguments)(nil) + _ syntax.Defaulter = (*Arguments)(nil) + _ exporter.Arguments = (*Arguments)(nil) +) + +// SetToDefault implements syntax.Defaulter. +func (args *Arguments) SetToDefault() { + *args = Arguments{ + Encoding: "otlp_proto", + Brokers: []string{"localhost:9092"}, + ClientID: "sarama", + Timeout: 5 * time.Second, + Metadata: alloy_kafka_receiver.MetadataArguments{ + IncludeAllTopics: true, + Retry: alloy_kafka_receiver.MetadataRetryArguments{ + MaxRetries: 3, + Backoff: 250 * time.Millisecond, + }, + }, + Producer: Producer{ + MaxMessageBytes: 1000000, + RequiredAcks: 1, + Compression: "none", + FlushMaxMessages: 0, + }, + } + args.Retry.SetToDefault() + args.Queue.SetToDefault() + args.DebugMetrics.SetToDefault() +} + +// Validate implements syntax.Validator. +func (args *Arguments) Validate() error { + otelCfg, err := args.Convert() + if err != nil { + return err + } + kafkaCfg := otelCfg.(*kafkaexporter.Config) + return kafkaCfg.Validate() +} + +// Convert implements exporter.Arguments. +func (args Arguments) Convert() (otelcomponent.Config, error) { + input := make(map[string]interface{}) + input["auth"] = args.Authentication.Convert() + + var result kafkaexporter.Config + err := mapstructure.Decode(input, &result) + if err != nil { + return nil, err + } + + result.Brokers = args.Brokers + result.ResolveCanonicalBootstrapServersOnly = args.ResolveCanonicalBootstrapServersOnly + result.ProtocolVersion = args.ProtocolVersion + result.ClientID = args.ClientID + result.Topic = args.Topic + result.TopicFromAttribute = args.TopicFromAttribute + result.Encoding = args.Encoding + result.PartitionTracesByID = args.PartitionTracesByID + result.PartitionMetricsByResourceAttributes = args.PartitionMetricsByResourceAttributes + result.TimeoutSettings = exporterhelper.TimeoutSettings{ + Timeout: args.Timeout, + } + result.Metadata = args.Metadata.Convert() + result.BackOffConfig = *args.Retry.Convert() + result.QueueSettings = *args.Queue.Convert() + result.Producer = args.Producer.Convert() + + return &result, nil +} + +// Extensions implements exporter.Arguments. +func (args Arguments) Extensions() map[otelcomponent.ID]otelextension.Extension { + return nil +} + +// Exporters implements exporter.Arguments. +func (args Arguments) Exporters() map[otelcomponent.DataType]map[otelcomponent.ID]otelcomponent.Component { + return nil +} + +// DebugMetricsConfig implements exporter.Arguments. +func (args Arguments) DebugMetricsConfig() otelcolCfg.DebugMetricsArguments { + return args.DebugMetrics +} diff --git a/internal/component/otelcol/exporter/kafka/kafka_test.go b/internal/component/otelcol/exporter/kafka/kafka_test.go new file mode 100644 index 0000000000..85d2154a46 --- /dev/null +++ b/internal/component/otelcol/exporter/kafka/kafka_test.go @@ -0,0 +1,240 @@ +package kafka_test + +import ( + "testing" + "time" + + otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" + "github.com/grafana/alloy/internal/component/otelcol/exporter/kafka" + "github.com/grafana/alloy/syntax" + "github.com/mitchellh/mapstructure" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" + "github.com/stretchr/testify/require" +) + +func TestArguments_UnmarshalAlloy(t *testing.T) { + tests := []struct { + testName string + cfg string + expected map[string]interface{} + }{ + { + testName: "Defaults", + cfg: ` + protocol_version = "2.0.0" + `, + expected: map[string]interface{}{ + "brokers": []string{"localhost:9092"}, + "protocol_version": "2.0.0", + "resolve_canonical_bootstrap_servers_only": false, + "client_id": "sarama", + "topic": "", + "topic_from_attribute": "", + "encoding": "otlp_proto", + "partition_traces_by_id": false, + "partition_metrics_by_resource_attributes": false, + "timeout": 5 * time.Second, + "authentication": map[string]interface{}{}, + + "metadata": map[string]interface{}{ + "full": true, + "retry": map[string]interface{}{ + "max": 3, + "backoff": 250 * time.Millisecond, + }, + }, + "retry_on_failure": map[string]interface{}{ + "enabled": true, + "initial_interval": 5 * time.Second, + "randomization_factor": 0.5, + "multiplier": 1.5, + "max_interval": 30 * time.Second, + "max_elapsed_time": 5 * time.Minute, + }, + "sending_queue": map[string]interface{}{ + "enabled": true, + "num_consumers": 10, + "queue_size": 1000, + }, + "producer": map[string]interface{}{ + "max_message_bytes": 1000000, + "required_acks": 1, + "compression": "none", + "flush_max_messages": 0, + }, + }, + }, + { + testName: "Explicit", + cfg: ` + protocol_version = "2.0.0" + brokers = ["redpanda:123"] + resolve_canonical_bootstrap_servers_only = true + client_id = "my-client" + topic = "my-topic" + topic_from_attribute = "my-attr" + encoding = "otlp_json" + partition_traces_by_id = true + partition_metrics_by_resource_attributes = true + timeout = "12s" + + authentication { + plaintext { + username = "user" + password = "pass" + } + } + + metadata { + include_all_topics = false + retry { + max_retries = 5 + backoff = "511ms" + } + } + + retry_on_failure { + enabled = true + initial_interval = "10s" + randomization_factor = 0.1 + multiplier = 2.0 + max_interval = "61s" + max_elapsed_time = "11m" + } + + sending_queue { + enabled = true + num_consumers = 11 + queue_size = 1001 + } + + producer { + max_message_bytes = 2000001 + required_acks = 0 + compression = "gzip" + flush_max_messages = 101 + } + `, + expected: map[string]interface{}{ + "brokers": []string{"redpanda:123"}, + "protocol_version": "2.0.0", + "resolve_canonical_bootstrap_servers_only": true, + "client_id": "my-client", + "topic": "my-topic", + "topic_from_attribute": "my-attr", + "encoding": "otlp_json", + "partition_traces_by_id": true, + "partition_metrics_by_resource_attributes": true, + "timeout": 12 * time.Second, + "auth": map[string]interface{}{ + "plain_text": map[string]interface{}{ + "username": "user", + "password": "pass", + }, + }, + + "metadata": map[string]interface{}{ + "full": false, + "retry": map[string]interface{}{ + "max": 5, + "backoff": 511 * time.Millisecond, + }, + }, + "retry_on_failure": map[string]interface{}{ + "enabled": true, + "initial_interval": 10 * time.Second, + "randomization_factor": 0.1, + "multiplier": 2.0, + "max_interval": 61 * time.Second, + "max_elapsed_time": 11 * time.Minute, + }, + "sending_queue": map[string]interface{}{ + "enabled": true, + "num_consumers": 11, + "queue_size": 1001, + }, + "producer": map[string]interface{}{ + "max_message_bytes": 2000001, + "required_acks": 0, + "compression": "gzip", + "flush_max_messages": 101, + }, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.testName, func(t *testing.T) { + var expected kafkaexporter.Config + err := mapstructure.Decode(tc.expected, &expected) + require.NoError(t, err) + + var args kafka.Arguments + err = syntax.Unmarshal([]byte(tc.cfg), &args) + require.NoError(t, err) + + actualPtr, err := args.Convert() + require.NoError(t, err) + + actual := actualPtr.(*kafkaexporter.Config) + + require.Equal(t, expected, *actual) + }) + } +} + +func TestDebugMetricsConfig(t *testing.T) { + tests := []struct { + testName string + agentCfg string + expected otelcolCfg.DebugMetricsArguments + }{ + { + testName: "default", + agentCfg: ` + protocol_version = "2.0.0" + `, + expected: otelcolCfg.DebugMetricsArguments{ + DisableHighCardinalityMetrics: true, + Level: otelcolCfg.LevelDetailed, + }, + }, + { + testName: "explicit_false", + agentCfg: ` + protocol_version = "2.0.0" + debug_metrics { + disable_high_cardinality_metrics = false + } + `, + expected: otelcolCfg.DebugMetricsArguments{ + DisableHighCardinalityMetrics: false, + Level: otelcolCfg.LevelDetailed, + }, + }, + { + testName: "explicit_true", + agentCfg: ` + protocol_version = "2.0.0" + debug_metrics { + disable_high_cardinality_metrics = true + } + `, + expected: otelcolCfg.DebugMetricsArguments{ + DisableHighCardinalityMetrics: true, + Level: otelcolCfg.LevelDetailed, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.testName, func(t *testing.T) { + var args kafka.Arguments + require.NoError(t, syntax.Unmarshal([]byte(tc.agentCfg), &args)) + _, err := args.Convert() + require.NoError(t, err) + + require.Equal(t, tc.expected, args.DebugMetricsConfig()) + }) + } +} diff --git a/internal/converter/internal/otelcolconvert/converter_kafkaexporter.go b/internal/converter/internal/otelcolconvert/converter_kafkaexporter.go new file mode 100644 index 0000000000..4ade6e5396 --- /dev/null +++ b/internal/converter/internal/otelcolconvert/converter_kafkaexporter.go @@ -0,0 +1,72 @@ +package otelcolconvert + +import ( + "fmt" + + "github.com/grafana/alloy/internal/component/otelcol/exporter/kafka" + "github.com/grafana/alloy/internal/converter/diag" + "github.com/grafana/alloy/internal/converter/internal/common" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" + "go.opentelemetry.io/collector/component" +) + +func init() { + converters = append(converters, kafkaExporterConverter{}) +} + +type kafkaExporterConverter struct{} + +func (kafkaExporterConverter) Factory() component.Factory { return kafkaexporter.NewFactory() } + +func (kafkaExporterConverter) InputComponentName() string { + return "otelcol.exporter.kafka" +} + +func (kafkaExporterConverter) ConvertAndAppend(state *State, id component.InstanceID, cfg component.Config) diag.Diagnostics { + var diags diag.Diagnostics + + label := state.AlloyComponentLabel() + + args := toKafkaExporter(state, id, cfg.(*kafkaexporter.Config)) + block := common.NewBlockWithOverride([]string{"otelcol", "exporter", "kafka"}, label, args) + + diags.Add( + diag.SeverityLevelInfo, + fmt.Sprintf("Converted %s into %s", StringifyInstanceID(id), StringifyBlock(block)), + ) + + state.Body().AppendBlock(block) + return diags +} + +func toKafkaExporter(state *State, id component.InstanceID, cfg *kafkaexporter.Config) *kafka.Arguments { + return &kafka.Arguments{ + Brokers: cfg.Brokers, + ProtocolVersion: cfg.ProtocolVersion, + ResolveCanonicalBootstrapServersOnly: cfg.ResolveCanonicalBootstrapServersOnly, + ClientID: cfg.ClientID, + Topic: cfg.Topic, + TopicFromAttribute: cfg.TopicFromAttribute, + Encoding: cfg.Encoding, + PartitionTracesByID: cfg.PartitionTracesByID, + PartitionMetricsByResourceAttributes: cfg.PartitionMetricsByResourceAttributes, + Timeout: cfg.Timeout, + + Authentication: toKafkaAuthentication(encodeMapstruct(cfg.Authentication)), + Metadata: toKafkaMetadata(cfg.Metadata), + Retry: toRetryArguments(cfg.BackOffConfig), + Queue: toQueueArguments(cfg.QueueSettings), + Producer: toKafkaProducer(cfg.Producer), + + DebugMetrics: common.DefaultValue[kafka.Arguments]().DebugMetrics, + } +} + +func toKafkaProducer(cfg kafkaexporter.Producer) kafka.Producer { + return kafka.Producer{ + MaxMessageBytes: cfg.MaxMessageBytes, + Compression: cfg.Compression, + RequiredAcks: int(cfg.RequiredAcks), + FlushMaxMessages: cfg.FlushMaxMessages, + } +} diff --git a/internal/converter/internal/otelcolconvert/testdata/kafka.alloy b/internal/converter/internal/otelcolconvert/testdata/kafka.alloy index b98eabaf2f..bdb45cfc7b 100644 --- a/internal/converter/internal/otelcolconvert/testdata/kafka.alloy +++ b/internal/converter/internal/otelcolconvert/testdata/kafka.alloy @@ -11,8 +11,8 @@ otelcol.receiver.kafka "default" { sasl { username = "fakeusername" password = "fakepassword" - mechanism = "somemechanism" - version = 5 + mechanism = "AWS_MSK_IAM" + version = 1 aws_msk { region = "us-east-1" @@ -33,14 +33,56 @@ otelcol.receiver.kafka "default" { } output { - metrics = [otelcol.exporter.otlp.default.input] - logs = [otelcol.exporter.otlp.default.input] - traces = [otelcol.exporter.otlp.default.input] + metrics = [otelcol.exporter.kafka.default.input] + logs = [otelcol.exporter.kafka.default.input] + traces = [otelcol.exporter.kafka.default.input] } } -otelcol.exporter.otlp "default" { - client { - endpoint = "database:4317" +otelcol.exporter.kafka "default" { + protocol_version = "2.0.0" + brokers = ["redpanda:9092"] + resolve_canonical_bootstrap_servers_only = true + client_id = "otelcol" + topic_from_attribute = "my_topic" + encoding = "otlp_json" + partition_traces_by_id = true + partition_metrics_by_resource_attributes = true + timeout = "11s" + + authentication { + plaintext { + username = "fakeusername" + password = "fakepassword" + } + + sasl { + username = "fakeusername" + password = "fakepassword" + mechanism = "SCRAM-SHA-256" + version = 1 + + aws_msk { + region = "us-east-1" + broker_addr = "broker:9092" + } + } + + tls { + insecure = true + } + + kerberos { + service_name = "someservice" + realm = "myrealm" + username = "fakeusername" + password = "fakepassword" + } + } + + producer { + max_message_bytes = 1000001 + compression = "snappy" + flush_max_messages = 11 } } diff --git a/internal/converter/internal/otelcolconvert/testdata/kafka.yaml b/internal/converter/internal/otelcolconvert/testdata/kafka.yaml index fb8455bbf5..d5f563c3de 100644 --- a/internal/converter/internal/otelcolconvert/testdata/kafka.yaml +++ b/internal/converter/internal/otelcolconvert/testdata/kafka.yaml @@ -9,8 +9,8 @@ receivers: sasl: username: fakeusername password: fakepassword - mechanism: somemechanism - version: 5 + mechanism: AWS_MSK_IAM + version: 1 aws_msk: region: us-east-1 broker_addr: broker:9092 @@ -24,20 +24,52 @@ receivers: exporters: - otlp: - endpoint: database:4317 + kafka: + brokers: redpanda:9092 + protocol_version: 2.0.0 + resolve_canonical_bootstrap_servers_only: true + client_id: otelcol + topic_from_attribute: my_topic + partition_traces_by_id: true + partition_metrics_by_resource_attributes: true + timeout: 11s + encoding: otlp_json + auth: + plain_text: + username: fakeusername + password: fakepassword + sasl: + username: fakeusername + password: fakepassword + mechanism: SCRAM-SHA-256 + version: 1 + aws_msk: + region: us-east-1 + broker_addr: broker:9092 + tls: + insecure: true + kerberos: + username: fakeusername + password: fakepassword + service_name: someservice + realm: myrealm + producer: + max_message_bytes: 1000001 + compression: snappy + required_acks: 0 + flush_max_messages: 11 service: pipelines: metrics: receivers: [kafka] processors: [] - exporters: [otlp] + exporters: [kafka] logs: receivers: [kafka] processors: [] - exporters: [otlp] + exporters: [kafka] traces: receivers: [kafka] processors: [] - exporters: [otlp] + exporters: [kafka] diff --git a/internal/tools/docs_generator/docs_updated_test.go b/internal/tools/docs_generator/docs_updated_test.go index 65358f908a..2a67fd3dda 100644 --- a/internal/tools/docs_generator/docs_updated_test.go +++ b/internal/tools/docs_generator/docs_updated_test.go @@ -64,12 +64,12 @@ func runForGenerator(t *testing.T, g generator.DocsGenerator) { } actual, err := g.Read() - require.NoError(t, err, "failed to read existing generated docs for %q, try running 'go generate ./docs'", g.Name()) + require.NoError(t, err, "failed to read existing generated docs for %q, try running 'go generate ./internal/tools/docs_generator/", g.Name()) require.Contains( t, actual, strings.TrimSpace(generated), - "outdated docs detected when running %q, try updating with 'go generate ./docs'", + "outdated docs detected when running %q, try updating with 'go generate ./internal/tools/docs_generator/", g.Name(), ) } From 2a6ecb4bd005777fbf609fcb24ef6a3ee4f37351 Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Wed, 5 Jun 2024 18:11:22 +0100 Subject: [PATCH 2/5] Apply suggestions from code review Co-authored-by: William Dumont --- CHANGELOG.md | 2 +- .../internal/otelcolconvert/converter_kafkaexporter.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1c1a619572..142646a0b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,7 +17,7 @@ Main (unreleased) ### Features -- Adding an `otelcol.exporter.kafka` component for sending OTLP metrics, logs, and traces to Kafka. +- Add an `otelcol.exporter.kafka` component to send OTLP metrics, logs, and traces to Kafka. ### Enhancements diff --git a/internal/converter/internal/otelcolconvert/converter_kafkaexporter.go b/internal/converter/internal/otelcolconvert/converter_kafkaexporter.go index 4ade6e5396..90d845d281 100644 --- a/internal/converter/internal/otelcolconvert/converter_kafkaexporter.go +++ b/internal/converter/internal/otelcolconvert/converter_kafkaexporter.go @@ -39,7 +39,7 @@ func (kafkaExporterConverter) ConvertAndAppend(state *State, id component.Instan return diags } -func toKafkaExporter(state *State, id component.InstanceID, cfg *kafkaexporter.Config) *kafka.Arguments { +func toKafkaExporter(cfg *kafkaexporter.Config) *kafka.Arguments { return &kafka.Arguments{ Brokers: cfg.Brokers, ProtocolVersion: cfg.ProtocolVersion, From 772d9ab1656f87ae433093f629d7f164d2341624 Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Wed, 5 Jun 2024 18:45:03 +0100 Subject: [PATCH 3/5] Suggestions from code review --- .../components/otelcol.exporter.kafka.md | 109 ++---------- .../components/otelcol.receiver.kafka.md | 94 +---------- .../otelcol-kafka-authentication-kerberos.md | 24 +++ .../otelcol-kafka-authentication-plaintext.md | 13 ++ ...elcol-kafka-authentication-sasl-aws_msk.md | 14 ++ .../otelcol-kafka-authentication-sasl.md | 26 +++ .../otelcol-kafka-authentication.md | 8 + .../otelcol-kafka-metadata-retry.md | 14 ++ .../components/otelcol-kafka-metadata.md | 22 +++ internal/component/otelcol/config_kafka.go | 155 +++++++++++++++++ .../component/otelcol/exporter/kafka/kafka.go | 15 +- .../component/otelcol/receiver/kafka/kafka.go | 159 +----------------- .../otelcolconvert/converter_kafkaexporter.go | 2 +- .../otelcolconvert/converter_kafkareceiver.go | 30 ++-- 14 files changed, 326 insertions(+), 359 deletions(-) create mode 100644 docs/sources/shared/reference/components/otelcol-kafka-authentication-kerberos.md create mode 100644 docs/sources/shared/reference/components/otelcol-kafka-authentication-plaintext.md create mode 100644 docs/sources/shared/reference/components/otelcol-kafka-authentication-sasl-aws_msk.md create mode 100644 docs/sources/shared/reference/components/otelcol-kafka-authentication-sasl.md create mode 100644 docs/sources/shared/reference/components/otelcol-kafka-authentication.md create mode 100644 docs/sources/shared/reference/components/otelcol-kafka-metadata-retry.md create mode 100644 docs/sources/shared/reference/components/otelcol-kafka-metadata.md create mode 100644 internal/component/otelcol/config_kafka.go diff --git a/docs/sources/reference/components/otelcol.exporter.kafka.md b/docs/sources/reference/components/otelcol.exporter.kafka.md index a13b20c1e2..b4d573e4a1 100644 --- a/docs/sources/reference/components/otelcol.exporter.kafka.md +++ b/docs/sources/reference/components/otelcol.exporter.kafka.md @@ -9,6 +9,9 @@ title: otelcol.exporter.kafka `otelcol.exporter.kafka` accepts logs, metrics, and traces telemetry data from other `otelcol` components and sends it to Kafka. +It is important to use `otelcol.exporter.kafka` together with `otelcol.processor.batch` +in order to make sure `otelcol.exporter.kafka` doesn't slow down due to sending Kafka a huge number of small payloads. + > **NOTE**: `otelcol.exporter.kafka` is a wrapper over the upstream > OpenTelemetry Collector `kafka` exporter from the `otelcol-contrib` > distribution. Bug reports or feature requests will be redirected to the @@ -33,7 +36,7 @@ Name | Type | Description ------------------------------------------ | --------------- | ----------------------------------------------------------------------------------- | -------------------- | -------- `protocol_version` | `string` | Kafka protocol version to use. | | yes `brokers` | `list(string)` | Kafka brokers to connect to. | `["localhost:9092"]` | no -`topic` | `string` | Kafka topic to read from. | _See below_ | no +`topic` | `string` | Kafka topic to send to. | _See below_ | no `topic_from_attribute` | `string` | A resource attribute whose value should be used as the message's topic. | `""` | no `encoding` | `string` | Encoding of payload read from Kafka. | `"otlp_proto"` | no `client_id` | `string` | Consumer client ID to use. The ID will be used for all produce requests. | `"sarama"` | no @@ -44,13 +47,11 @@ Name | Type | Description If `topic` is not set, different topics will be used for different telemetry signals: -* Metrics will be received from an `otlp_metrics` topic. -* Traces will be received from an `otlp_spans` topic. -* Logs will be received from an `otlp_logs` topic. +* Metrics will be sent to an `otlp_metrics` topic. +* Traces will be sent to an `otlp_spans` topic. +* Logs will be sent to an `otlp_logs` topic. -If `topic` is set to a specific value, then only the signal type that corresponds to the data stored in the topic must be set in the output block. -For example, if `topic` is set to `"my_telemetry"`, then the `"my_telemetry"` topic can only contain either metrics, logs, or traces. -If it contains only metrics, then `otelcol.exporter.kafka` should be configured to process only metrics. +If topic is set, the same topic will be used for all telemetry signals - metrics, logs, and traces. When `topic_from_attribute` is set, it will take precedence over `topic`. @@ -106,56 +107,19 @@ For example, `authentication > tls` refers to a `tls` block defined inside an `a ### authentication block -The `authentication` block holds the definition of different authentication -mechanisms to use when connecting to Kafka brokers. It doesn't support any -arguments and is configured fully through inner blocks. +{{< docs/shared lookup="reference/components/otelcol-kafka-authentication.md" source="alloy" version="" >}} ### plaintext block -The `plaintext` block configures `PLAIN` authentication against Kafka brokers. - -The following arguments are supported: - -Name | Type | Description | Default | Required ----- | ---- | ----------- | ------- | -------- -`username` | `string` | Username to use for `PLAIN` authentication. | | yes -`password` | `secret` | Password to use for `PLAIN` authentication. | | yes +{{< docs/shared lookup="reference/components/otelcol-kafka-authentication-plaintext.md" source="alloy" version="" >}} ### sasl block -The `sasl` block configures SASL authentication against Kafka brokers. - -The following arguments are supported: - -Name | Type | Description | Default | Required ----- | ---- | ----------- | ------- | -------- -`username` | `string` | Username to use for SASL authentication. | | yes -`password` | `secret` | Password to use for SASL authentication. | | yes -`mechanism` | `string` | SASL mechanism to use when authenticating. | | yes -`version` | `number` | Version of the SASL Protocol to use when authenticating. | `0` | no - -The `mechanism` argument can be set to one of the following strings: - -* `"PLAIN"` -* `"AWS_MSK_IAM"` -* `"SCRAM-SHA-256"` -* `"SCRAM-SHA-512"` - -When `mechanism` is set to `"AWS_MSK_IAM"`, the [`aws_msk` child block][aws_msk] must also be provided. - -The `version` argument can be set to either `0` or `1`. +{{< docs/shared lookup="reference/components/otelcol-kafka-authentication-sasl.md" source="alloy" version="" >}} ### aws_msk block -The `aws_msk` block configures extra parameters for SASL authentication when -using the `AWS_MSK_IAM` mechanism. - -The following arguments are supported: - -Name | Type | Description | Default | Required ----- | ---- | ----------- | ------- | -------- -`region` | `string` | AWS region the MSK cluster is based in. | | yes -`broker_addr` | `string` | MSK address to connect to for authentication. | | yes +{{< docs/shared lookup="reference/components/otelcol-kafka-authentication-sasl-aws_msk.md" source="alloy" version="" >}} ### tls block @@ -167,58 +131,15 @@ communication. ### kerberos block -The `kerberos` block configures Kerberos authentication against the Kafka -broker. - -The following arguments are supported: - -Name | Type | Description | Default | Required ----- | ---- | ----------- | ------- | -------- -`service_name` | `string` | Kerberos service name. | | no -`realm` | `string` | Kerberos realm. | | no -`use_keytab` | `string` | Enables using keytab instead of password. | | no -`username` | `string` | Kerberos username to authenticate as. | | yes -`password` | `secret` | Kerberos password to authenticate with. | | no -`config_file` | `string` | Path to Kerberos location (for example, `/etc/krb5.conf`). | | no -`keytab_file` | `string` | Path to keytab file (for example, `/etc/security/kafka.keytab`). | | no - -When `use_keytab` is `false`, the `password` argument is required. When -`use_keytab` is `true`, the file pointed to by the `keytab_file` argument is -used for authentication instead. At most one of `password` or `keytab_file` -must be provided. +{{< docs/shared lookup="reference/components/otelcol-kafka-authentication-kerberos.md" source="alloy" version="" >}} ### metadata block -The `metadata` block configures how to retrieve and store metadata from the -Kafka broker. - -The following arguments are supported: - -Name | Type | Description | Default | Required ----- | ---- | ----------- | ------- | -------- -`include_all_topics` | `bool` | When true, maintains metadata for all topics. | `true` | no - -If the `include_all_topics` argument is `true`, `otelcol.exporter.kafka` -maintains a full set of metadata for all topics rather than the minimal set -that has been necessary so far. Including the full set of metadata is more -convenient for users but can consume a substantial amount of memory if you have -many topics and partitions. - -Retrieving metadata may fail if the Kafka broker is starting up at the same -time as the `otelcol.exporter.kafka` component. The [`retry` child -block][retry] can be provided to customize retry behavior. +{{< docs/shared lookup="reference/components/otelcol-kafka-metadata.md" source="alloy" version="" >}} ### retry block -The `retry` block configures how to retry retrieving metadata when retrieval -fails. - -The following arguments are supported: - -Name | Type | Description | Default | Required ----- | ---- | ----------- | ------- | -------- -`max_retries` | `number` | How many times to reattempt retrieving metadata. | `3` | no -`backoff` | `duration` | Time to wait between retries. | `"250ms"` | no +{{< docs/shared lookup="reference/components/otelcol-kafka-metadata-retry.md" source="alloy" version="" >}} ### retry_on_failure block diff --git a/docs/sources/reference/components/otelcol.receiver.kafka.md b/docs/sources/reference/components/otelcol.receiver.kafka.md index acd01b63ac..ba71f33030 100644 --- a/docs/sources/reference/components/otelcol.receiver.kafka.md +++ b/docs/sources/reference/components/otelcol.receiver.kafka.md @@ -118,56 +118,19 @@ The `>` symbol indicates deeper levels of nesting. For example, ### authentication block -The `authentication` block holds the definition of different authentication -mechanisms to use when connecting to Kafka brokers. It doesn't support any -arguments and is configured fully through inner blocks. +{{< docs/shared lookup="reference/components/otelcol-kafka-authentication.md" source="alloy" version="" >}} ### plaintext block -The `plaintext` block configures `PLAIN` authentication against Kafka brokers. - -The following arguments are supported: - -Name | Type | Description | Default | Required ----- | ---- | ----------- | ------- | -------- -`username` | `string` | Username to use for `PLAIN` authentication. | | yes -`password` | `secret` | Password to use for `PLAIN` authentication. | | yes +{{< docs/shared lookup="reference/components/otelcol-kafka-authentication-plaintext.md" source="alloy" version="" >}} ### sasl block -The `sasl` block configures SASL authentication against Kafka brokers. - -The following arguments are supported: - -Name | Type | Description | Default | Required ----- | ---- | ----------- | ------- | -------- -`username` | `string` | Username to use for SASL authentication. | | yes -`password` | `secret` | Password to use for SASL authentication. | | yes -`mechanism` | `string` | SASL mechanism to use when authenticating. | | yes -`version` | `number` | Version of the SASL Protocol to use when authenticating. | `0` | no - -The `mechanism` argument can be set to one of the following strings: - -* `"PLAIN"` -* `"AWS_MSK_IAM"` -* `"SCRAM-SHA-256"` -* `"SCRAM-SHA-512"` - -When `mechanism` is set to `"AWS_MSK_IAM"`, the [`aws_msk` child block][aws_msk] must also be provided. - -The `version` argument can be set to either `0` or `1`. +{{< docs/shared lookup="reference/components/otelcol-kafka-authentication-sasl.md" source="alloy" version="" >}} ### aws_msk block -The `aws_msk` block configures extra parameters for SASL authentication when -using the `AWS_MSK_IAM` mechanism. - -The following arguments are supported: - -Name | Type | Description | Default | Required ----- | ---- | ----------- | ------- | -------- -`region` | `string` | AWS region the MSK cluster is based in. | | yes -`broker_addr` | `string` | MSK address to connect to for authentication. | | yes +{{< docs/shared lookup="reference/components/otelcol-kafka-authentication-sasl-aws_msk.md" source="alloy" version="" >}} ### tls block @@ -179,58 +142,15 @@ communication. ### kerberos block -The `kerberos` block configures Kerberos authentication against the Kafka -broker. - -The following arguments are supported: - -Name | Type | Description | Default | Required ----- | ---- | ----------- | ------- | -------- -`service_name` | `string` | Kerberos service name. | | no -`realm` | `string` | Kerberos realm. | | no -`use_keytab` | `string` | Enables using keytab instead of password. | | no -`username` | `string` | Kerberos username to authenticate as. | | yes -`password` | `secret` | Kerberos password to authenticate with. | | no -`config_file` | `string` | Path to Kerberos location (for example, `/etc/krb5.conf`). | | no -`keytab_file` | `string` | Path to keytab file (for example, `/etc/security/kafka.keytab`). | | no - -When `use_keytab` is `false`, the `password` argument is required. When -`use_keytab` is `true`, the file pointed to by the `keytab_file` argument is -used for authentication instead. At most one of `password` or `keytab_file` -must be provided. +{{< docs/shared lookup="reference/components/otelcol-kafka-authentication-kerberos.md" source="alloy" version="" >}} ### metadata block -The `metadata` block configures how to retrieve and store metadata from the -Kafka broker. - -The following arguments are supported: - -Name | Type | Description | Default | Required ----- | ---- | ----------- | ------- | -------- -`include_all_topics` | `bool` | When true, maintains metadata for all topics. | `true` | no - -If the `include_all_topics` argument is `true`, `otelcol.receiver.kafka` -maintains a full set of metadata for all topics rather than the minimal set -that has been necessary so far. Including the full set of metadata is more -convenient for users but can consume a substantial amount of memory if you have -many topics and partitions. - -Retrieving metadata may fail if the Kafka broker is starting up at the same -time as the `otelcol.receiver.kafka` component. The [`retry` child -block][retry] can be provided to customize retry behavior. +{{< docs/shared lookup="reference/components/otelcol-kafka-metadata.md" source="alloy" version="" >}} ### retry block -The `retry` block configures how to retry retrieving metadata when retrieval -fails. - -The following arguments are supported: - -Name | Type | Description | Default | Required ----- | ---- | ----------- | ------- | -------- -`max_retries` | `number` | How many times to reattempt retrieving metadata. | `3` | no -`backoff` | `duration` | Time to wait between retries. | `"250ms"` | no +{{< docs/shared lookup="reference/components/otelcol-kafka-metadata-retry.md" source="alloy" version="" >}} ### autocommit block diff --git a/docs/sources/shared/reference/components/otelcol-kafka-authentication-kerberos.md b/docs/sources/shared/reference/components/otelcol-kafka-authentication-kerberos.md new file mode 100644 index 0000000000..2fcbfea2c7 --- /dev/null +++ b/docs/sources/shared/reference/components/otelcol-kafka-authentication-kerberos.md @@ -0,0 +1,24 @@ +--- +description: Shared content, otelcol kafka kerberos authentication +headless: true +--- + +The `kerberos` block configures Kerberos authentication against the Kafka +broker. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`service_name` | `string` | Kerberos service name. | | no +`realm` | `string` | Kerberos realm. | | no +`use_keytab` | `string` | Enables using keytab instead of password. | | no +`username` | `string` | Kerberos username to authenticate as. | | yes +`password` | `secret` | Kerberos password to authenticate with. | | no +`config_file` | `string` | Path to Kerberos location (for example, `/etc/krb5.conf`). | | no +`keytab_file` | `string` | Path to keytab file (for example, `/etc/security/kafka.keytab`). | | no + +When `use_keytab` is `false`, the `password` argument is required. When +`use_keytab` is `true`, the file pointed to by the `keytab_file` argument is +used for authentication instead. At most one of `password` or `keytab_file` +must be provided. diff --git a/docs/sources/shared/reference/components/otelcol-kafka-authentication-plaintext.md b/docs/sources/shared/reference/components/otelcol-kafka-authentication-plaintext.md new file mode 100644 index 0000000000..e7b47a0fe7 --- /dev/null +++ b/docs/sources/shared/reference/components/otelcol-kafka-authentication-plaintext.md @@ -0,0 +1,13 @@ +--- +description: Shared content, otelcol kafka plain text authentication +headless: true +--- + +The `plaintext` block configures plain text authentication against Kafka brokers. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`username` | `string` | Username to use for plain text authentication. | | yes +`password` | `secret` | Password to use for plain text authentication. | | yes diff --git a/docs/sources/shared/reference/components/otelcol-kafka-authentication-sasl-aws_msk.md b/docs/sources/shared/reference/components/otelcol-kafka-authentication-sasl-aws_msk.md new file mode 100644 index 0000000000..8f35ef3552 --- /dev/null +++ b/docs/sources/shared/reference/components/otelcol-kafka-authentication-sasl-aws_msk.md @@ -0,0 +1,14 @@ +--- +description: Shared content, otelcol kafka sasl aws_msk authentication +headless: true +--- + +The `aws_msk` block configures extra parameters for SASL authentication when +using the `AWS_MSK_IAM` mechanism. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`region` | `string` | AWS region the MSK cluster is based in. | | yes +`broker_addr` | `string` | MSK address to connect to for authentication. | | yes diff --git a/docs/sources/shared/reference/components/otelcol-kafka-authentication-sasl.md b/docs/sources/shared/reference/components/otelcol-kafka-authentication-sasl.md new file mode 100644 index 0000000000..0c2f20844d --- /dev/null +++ b/docs/sources/shared/reference/components/otelcol-kafka-authentication-sasl.md @@ -0,0 +1,26 @@ +--- +description: Shared content, otelcol kafka sasl authentication +headless: true +--- + +The `sasl` block configures SASL authentication against Kafka brokers. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`username` | `string` | Username to use for SASL authentication. | | yes +`password` | `secret` | Password to use for SASL authentication. | | yes +`mechanism` | `string` | SASL mechanism to use when authenticating. | | yes +`version` | `number` | Version of the SASL Protocol to use when authenticating. | `0` | no + +The `mechanism` argument can be set to one of the following strings: + +* `"PLAIN"` +* `"AWS_MSK_IAM"` +* `"SCRAM-SHA-256"` +* `"SCRAM-SHA-512"` + +When `mechanism` is set to `"AWS_MSK_IAM"`, the `aws_msk` child block must also be provided. + +The `version` argument can be set to either `0` or `1`. diff --git a/docs/sources/shared/reference/components/otelcol-kafka-authentication.md b/docs/sources/shared/reference/components/otelcol-kafka-authentication.md new file mode 100644 index 0000000000..df02b37731 --- /dev/null +++ b/docs/sources/shared/reference/components/otelcol-kafka-authentication.md @@ -0,0 +1,8 @@ +--- +description: Shared content, otelcol kafka authentication +headless: true +--- + +The `authentication` block holds the definition of different authentication +mechanisms to use when connecting to Kafka brokers. It doesn't support any +arguments and is configured fully through inner blocks. diff --git a/docs/sources/shared/reference/components/otelcol-kafka-metadata-retry.md b/docs/sources/shared/reference/components/otelcol-kafka-metadata-retry.md new file mode 100644 index 0000000000..52465b8192 --- /dev/null +++ b/docs/sources/shared/reference/components/otelcol-kafka-metadata-retry.md @@ -0,0 +1,14 @@ +--- +description: Shared content, otelcol kafka metadata retry +headless: true +--- + +The `retry` block configures how to retry retrieving metadata when retrieval +fails. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`max_retries` | `number` | How many times to reattempt retrieving metadata. | `3` | no +`backoff` | `duration` | Time to wait between retries. | `"250ms"` | no diff --git a/docs/sources/shared/reference/components/otelcol-kafka-metadata.md b/docs/sources/shared/reference/components/otelcol-kafka-metadata.md new file mode 100644 index 0000000000..2ee3428230 --- /dev/null +++ b/docs/sources/shared/reference/components/otelcol-kafka-metadata.md @@ -0,0 +1,22 @@ +--- +description: Shared content, otelcol kafka metadata +headless: true +--- + +The `metadata` block configures how to retrieve and store metadata from the +Kafka broker. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`include_all_topics` | `bool` | When true, maintains metadata for all topics. | `true` | no + +If the `include_all_topics` argument is `true`, +a full set of metadata for all topics is maintained rather than the minimal set +that has been necessary so far. Including the full set of metadata is more +convenient for users but can consume a substantial amount of memory if you have +many topics and partitions. + +Retrieving metadata may fail if the Kafka broker is starting up at the same +time as the Alloy component. The `retry` child block can be provided to customize retry behavior. diff --git a/internal/component/otelcol/config_kafka.go b/internal/component/otelcol/config_kafka.go new file mode 100644 index 0000000000..1178e85e4c --- /dev/null +++ b/internal/component/otelcol/config_kafka.go @@ -0,0 +1,155 @@ +package otelcol + +import ( + "time" + + "github.com/grafana/alloy/syntax/alloytypes" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" +) + +// KafkaAuthenticationArguments configures how to authenticate to the Kafka broker. +type KafkaAuthenticationArguments struct { + Plaintext *KafkaPlaintextArguments `alloy:"plaintext,block,optional"` + SASL *KafkaSASLArguments `alloy:"sasl,block,optional"` + TLS *TLSClientArguments `alloy:"tls,block,optional"` + Kerberos *KafkaKerberosArguments `alloy:"kerberos,block,optional"` +} + +// Convert converts args into the upstream type. +func (args KafkaAuthenticationArguments) Convert() map[string]interface{} { + auth := make(map[string]interface{}) + + if args.Plaintext != nil { + conv := args.Plaintext.Convert() + auth["plain_text"] = &conv + } + if args.SASL != nil { + conv := args.SASL.Convert() + auth["sasl"] = &conv + } + if args.TLS != nil { + auth["tls"] = args.TLS.Convert() + } + if args.Kerberos != nil { + conv := args.Kerberos.Convert() + auth["kerberos"] = &conv + } + + return auth +} + +// KafkaPlaintextArguments configures plaintext authentication against the Kafka +// broker. +type KafkaPlaintextArguments struct { + Username string `alloy:"username,attr"` + Password alloytypes.Secret `alloy:"password,attr"` +} + +// Convert converts args into the upstream type. +func (args KafkaPlaintextArguments) Convert() map[string]interface{} { + return map[string]interface{}{ + "username": args.Username, + "password": string(args.Password), + } +} + +// KafkaSASLArguments configures SASL authentication against the Kafka broker. +type KafkaSASLArguments struct { + Username string `alloy:"username,attr"` + Password alloytypes.Secret `alloy:"password,attr"` + Mechanism string `alloy:"mechanism,attr"` + Version int `alloy:"version,attr,optional"` + AWSMSK KafkaAWSMSKArguments `alloy:"aws_msk,block,optional"` +} + +// Convert converts args into the upstream type. +func (args KafkaSASLArguments) Convert() map[string]interface{} { + return map[string]interface{}{ + "username": args.Username, + "password": string(args.Password), + "mechanism": args.Mechanism, + "version": args.Version, + "aws_msk": args.AWSMSK.Convert(), + } +} + +// KafkaAWSMSKArguments exposes additional SASL authentication measures required to +// use the AWS_MSK_IAM mechanism. +type KafkaAWSMSKArguments struct { + Region string `alloy:"region,attr"` + BrokerAddr string `alloy:"broker_addr,attr"` +} + +// Convert converts args into the upstream type. +func (args KafkaAWSMSKArguments) Convert() map[string]interface{} { + return map[string]interface{}{ + "region": args.Region, + "broker_addr": args.BrokerAddr, + } +} + +// KafkaKerberosArguments configures Kerberos authentication against the Kafka +// broker. +type KafkaKerberosArguments struct { + ServiceName string `alloy:"service_name,attr,optional"` + Realm string `alloy:"realm,attr,optional"` + UseKeyTab bool `alloy:"use_keytab,attr,optional"` + Username string `alloy:"username,attr"` + Password alloytypes.Secret `alloy:"password,attr,optional"` + ConfigPath string `alloy:"config_file,attr,optional"` + KeyTabPath string `alloy:"keytab_file,attr,optional"` +} + +// Convert converts args into the upstream type. +func (args KafkaKerberosArguments) Convert() map[string]interface{} { + return map[string]interface{}{ + "service_name": args.ServiceName, + "realm": args.Realm, + "use_keytab": args.UseKeyTab, + "username": args.Username, + "password": string(args.Password), + "config_file": args.ConfigPath, + "keytab_file": args.KeyTabPath, + } +} + +// KafkaMetadataArguments configures how the Alloy component will +// retrieve metadata from the Kafka broker. +type KafkaMetadataArguments struct { + IncludeAllTopics bool `alloy:"include_all_topics,attr,optional"` + Retry KafkaMetadataRetryArguments `alloy:"retry,block,optional"` +} + +func (args *KafkaMetadataArguments) SetToDefault() { + *args = KafkaMetadataArguments{ + IncludeAllTopics: true, + Retry: KafkaMetadataRetryArguments{ + MaxRetries: 3, + Backoff: 250 * time.Millisecond, + }, + } +} + +// Convert converts args into the upstream type. +func (args KafkaMetadataArguments) Convert() kafkaexporter.Metadata { + return kafkaexporter.Metadata{ + Full: args.IncludeAllTopics, + Retry: args.Retry.Convert(), + } +} + +// KafkaMetadataRetryArguments configures how to retry retrieving metadata from the +// Kafka broker. Retrying is useful to avoid race conditions when the Kafka +// broker is starting at the same time as the Alloy component. +type KafkaMetadataRetryArguments struct { + MaxRetries int `alloy:"max_retries,attr,optional"` + Backoff time.Duration `alloy:"backoff,attr,optional"` +} + +// Convert converts args into the upstream type. +func (args KafkaMetadataRetryArguments) Convert() kafkaexporter.MetadataRetry { + return kafkaexporter.MetadataRetry{ + Max: args.MaxRetries, + Backoff: args.Backoff, + } +} diff --git a/internal/component/otelcol/exporter/kafka/kafka.go b/internal/component/otelcol/exporter/kafka/kafka.go index 2b0a5f0d8c..01260863dd 100644 --- a/internal/component/otelcol/exporter/kafka/kafka.go +++ b/internal/component/otelcol/exporter/kafka/kafka.go @@ -9,7 +9,6 @@ import ( "github.com/grafana/alloy/internal/component/otelcol" otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" "github.com/grafana/alloy/internal/component/otelcol/exporter" - alloy_kafka_receiver "github.com/grafana/alloy/internal/component/otelcol/receiver/kafka" "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/syntax" "github.com/mitchellh/mapstructure" @@ -46,11 +45,11 @@ type Arguments struct { PartitionMetricsByResourceAttributes bool `alloy:"partition_metrics_by_resource_attributes,attr,optional"` Timeout time.Duration `alloy:"timeout,attr,optional"` - Authentication alloy_kafka_receiver.AuthenticationArguments `alloy:"authentication,block,optional"` - Metadata alloy_kafka_receiver.MetadataArguments `alloy:"metadata,block,optional"` - Retry otelcol.RetryArguments `alloy:"retry_on_failure,block,optional"` - Queue otelcol.QueueArguments `alloy:"sending_queue,block,optional"` - Producer Producer `alloy:"producer,block,optional"` + Authentication otelcol.KafkaAuthenticationArguments `alloy:"authentication,block,optional"` + Metadata otelcol.KafkaMetadataArguments `alloy:"metadata,block,optional"` + Retry otelcol.RetryArguments `alloy:"retry_on_failure,block,optional"` + Queue otelcol.QueueArguments `alloy:"sending_queue,block,optional"` + Producer Producer `alloy:"producer,block,optional"` // DebugMetrics configures component internal metrics. Optional. DebugMetrics otelcolCfg.DebugMetricsArguments `alloy:"debug_metrics,block,optional"` @@ -103,9 +102,9 @@ func (args *Arguments) SetToDefault() { Brokers: []string{"localhost:9092"}, ClientID: "sarama", Timeout: 5 * time.Second, - Metadata: alloy_kafka_receiver.MetadataArguments{ + Metadata: otelcol.KafkaMetadataArguments{ IncludeAllTopics: true, - Retry: alloy_kafka_receiver.MetadataRetryArguments{ + Retry: otelcol.KafkaMetadataRetryArguments{ MaxRetries: 3, Backoff: 250 * time.Millisecond, }, diff --git a/internal/component/otelcol/receiver/kafka/kafka.go b/internal/component/otelcol/receiver/kafka/kafka.go index b6ff5917e8..186f381f4c 100644 --- a/internal/component/otelcol/receiver/kafka/kafka.go +++ b/internal/component/otelcol/receiver/kafka/kafka.go @@ -11,9 +11,7 @@ import ( otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" "github.com/grafana/alloy/internal/component/otelcol/receiver" "github.com/grafana/alloy/internal/featuregate" - "github.com/grafana/alloy/syntax/alloytypes" "github.com/mitchellh/mapstructure" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver" otelcomponent "go.opentelemetry.io/collector/component" otelextension "go.opentelemetry.io/collector/extension" @@ -44,11 +42,11 @@ type Arguments struct { ResolveCanonicalBootstrapServersOnly bool `alloy:"resolve_canonical_bootstrap_servers_only,attr,optional"` - Authentication AuthenticationArguments `alloy:"authentication,block,optional"` - Metadata MetadataArguments `alloy:"metadata,block,optional"` - AutoCommit AutoCommitArguments `alloy:"autocommit,block,optional"` - MessageMarking MessageMarkingArguments `alloy:"message_marking,block,optional"` - HeaderExtraction HeaderExtraction `alloy:"header_extraction,block,optional"` + Authentication otelcol.KafkaAuthenticationArguments `alloy:"authentication,block,optional"` + Metadata otelcol.KafkaMetadataArguments `alloy:"metadata,block,optional"` + AutoCommit AutoCommitArguments `alloy:"autocommit,block,optional"` + MessageMarking MessageMarkingArguments `alloy:"message_marking,block,optional"` + HeaderExtraction HeaderExtraction `alloy:"header_extraction,block,optional"` // DebugMetrics configures component internal metrics. Optional. DebugMetrics otelcolCfg.DebugMetricsArguments `alloy:"debug_metrics,block,optional"` @@ -142,153 +140,6 @@ func (args Arguments) NextConsumers() *otelcol.ConsumerArguments { return args.Output } -// AuthenticationArguments configures how to authenticate to the Kafka broker. -type AuthenticationArguments struct { - Plaintext *PlaintextArguments `alloy:"plaintext,block,optional"` - SASL *SASLArguments `alloy:"sasl,block,optional"` - TLS *otelcol.TLSClientArguments `alloy:"tls,block,optional"` - Kerberos *KerberosArguments `alloy:"kerberos,block,optional"` -} - -// Convert converts args into the upstream type. -func (args AuthenticationArguments) Convert() map[string]interface{} { - auth := make(map[string]interface{}) - - if args.Plaintext != nil { - conv := args.Plaintext.Convert() - auth["plain_text"] = &conv - } - if args.SASL != nil { - conv := args.SASL.Convert() - auth["sasl"] = &conv - } - if args.TLS != nil { - auth["tls"] = args.TLS.Convert() - } - if args.Kerberos != nil { - conv := args.Kerberos.Convert() - auth["kerberos"] = &conv - } - - return auth -} - -// PlaintextArguments configures plaintext authentication against the Kafka -// broker. -type PlaintextArguments struct { - Username string `alloy:"username,attr"` - Password alloytypes.Secret `alloy:"password,attr"` -} - -// Convert converts args into the upstream type. -func (args PlaintextArguments) Convert() map[string]interface{} { - return map[string]interface{}{ - "username": args.Username, - "password": string(args.Password), - } -} - -// SASLArguments configures SASL authentication against the Kafka broker. -type SASLArguments struct { - Username string `alloy:"username,attr"` - Password alloytypes.Secret `alloy:"password,attr"` - Mechanism string `alloy:"mechanism,attr"` - Version int `alloy:"version,attr,optional"` - AWSMSK AWSMSKArguments `alloy:"aws_msk,block,optional"` -} - -// Convert converts args into the upstream type. -func (args SASLArguments) Convert() map[string]interface{} { - return map[string]interface{}{ - "username": args.Username, - "password": string(args.Password), - "mechanism": args.Mechanism, - "version": args.Version, - "aws_msk": args.AWSMSK.Convert(), - } -} - -// AWSMSKArguments exposes additional SASL authentication measures required to -// use the AWS_MSK_IAM mechanism. -type AWSMSKArguments struct { - Region string `alloy:"region,attr"` - BrokerAddr string `alloy:"broker_addr,attr"` -} - -// Convert converts args into the upstream type. -func (args AWSMSKArguments) Convert() map[string]interface{} { - return map[string]interface{}{ - "region": args.Region, - "broker_addr": args.BrokerAddr, - } -} - -// KerberosArguments configures Kerberos authentication against the Kafka -// broker. -type KerberosArguments struct { - ServiceName string `alloy:"service_name,attr,optional"` - Realm string `alloy:"realm,attr,optional"` - UseKeyTab bool `alloy:"use_keytab,attr,optional"` - Username string `alloy:"username,attr"` - Password alloytypes.Secret `alloy:"password,attr,optional"` - ConfigPath string `alloy:"config_file,attr,optional"` - KeyTabPath string `alloy:"keytab_file,attr,optional"` -} - -// Convert converts args into the upstream type. -func (args KerberosArguments) Convert() map[string]interface{} { - return map[string]interface{}{ - "service_name": args.ServiceName, - "realm": args.Realm, - "use_keytab": args.UseKeyTab, - "username": args.Username, - "password": string(args.Password), - "config_file": args.ConfigPath, - "keytab_file": args.KeyTabPath, - } -} - -// MetadataArguments configures how the otelcol.receiver.kafka component will -// retrieve metadata from the Kafka broker. -type MetadataArguments struct { - IncludeAllTopics bool `alloy:"include_all_topics,attr,optional"` - Retry MetadataRetryArguments `alloy:"retry,block,optional"` -} - -func (args *MetadataArguments) SetToDefault() { - *args = MetadataArguments{ - IncludeAllTopics: true, - Retry: MetadataRetryArguments{ - MaxRetries: 3, - Backoff: 250 * time.Millisecond, - }, - } -} - -// Convert converts args into the upstream type. -func (args MetadataArguments) Convert() kafkaexporter.Metadata { - return kafkaexporter.Metadata{ - Full: args.IncludeAllTopics, - Retry: args.Retry.Convert(), - } -} - -// MetadataRetryArguments configures how to retry retrieving metadata from the -// Kafka broker. Retrying is useful to avoid race conditions when the Kafka -// broker is starting at the same time as the otelcol.receiver.kafka component. -type MetadataRetryArguments struct { - MaxRetries int `alloy:"max_retries,attr,optional"` - Backoff time.Duration `alloy:"backoff,attr,optional"` -} - -// Convert converts args into the upstream type. -func (args MetadataRetryArguments) Convert() kafkaexporter.MetadataRetry { - return kafkaexporter.MetadataRetry{ - Max: args.MaxRetries, - Backoff: args.Backoff, - } -} - // AutoCommitArguments configures how to automatically commit updated topic // offsets back to the Kafka broker. type AutoCommitArguments struct { diff --git a/internal/converter/internal/otelcolconvert/converter_kafkaexporter.go b/internal/converter/internal/otelcolconvert/converter_kafkaexporter.go index 90d845d281..0c735ee45e 100644 --- a/internal/converter/internal/otelcolconvert/converter_kafkaexporter.go +++ b/internal/converter/internal/otelcolconvert/converter_kafkaexporter.go @@ -27,7 +27,7 @@ func (kafkaExporterConverter) ConvertAndAppend(state *State, id component.Instan label := state.AlloyComponentLabel() - args := toKafkaExporter(state, id, cfg.(*kafkaexporter.Config)) + args := toKafkaExporter(cfg.(*kafkaexporter.Config)) block := common.NewBlockWithOverride([]string{"otelcol", "exporter", "kafka"}, label, args) diags.Add( diff --git a/internal/converter/internal/otelcolconvert/converter_kafkareceiver.go b/internal/converter/internal/otelcolconvert/converter_kafkareceiver.go index 7888c04f4a..654227f81c 100644 --- a/internal/converter/internal/otelcolconvert/converter_kafkareceiver.go +++ b/internal/converter/internal/otelcolconvert/converter_kafkareceiver.go @@ -76,8 +76,8 @@ func toKafkaReceiver(state *State, id component.InstanceID, cfg *kafkareceiver.C } } -func toKafkaAuthentication(cfg map[string]any) kafka.AuthenticationArguments { - return kafka.AuthenticationArguments{ +func toKafkaAuthentication(cfg map[string]any) otelcol.KafkaAuthenticationArguments { + return otelcol.KafkaAuthenticationArguments{ Plaintext: toKafkaPlaintext(encodeMapstruct(cfg["plain_text"])), SASL: toKafkaSASL(encodeMapstruct(cfg["sasl"])), TLS: toKafkaTLSClientArguments(encodeMapstruct(cfg["tls"])), @@ -85,23 +85,23 @@ func toKafkaAuthentication(cfg map[string]any) kafka.AuthenticationArguments { } } -func toKafkaPlaintext(cfg map[string]any) *kafka.PlaintextArguments { +func toKafkaPlaintext(cfg map[string]any) *otelcol.KafkaPlaintextArguments { if cfg == nil { return nil } - return &kafka.PlaintextArguments{ + return &otelcol.KafkaPlaintextArguments{ Username: cfg["username"].(string), Password: alloytypes.Secret(cfg["password"].(string)), } } -func toKafkaSASL(cfg map[string]any) *kafka.SASLArguments { +func toKafkaSASL(cfg map[string]any) *otelcol.KafkaSASLArguments { if cfg == nil { return nil } - return &kafka.SASLArguments{ + return &otelcol.KafkaSASLArguments{ Username: cfg["username"].(string), Password: alloytypes.Secret(cfg["password"].(string)), Mechanism: cfg["mechanism"].(string), @@ -110,12 +110,12 @@ func toKafkaSASL(cfg map[string]any) *kafka.SASLArguments { } } -func toKafkaAWSMSK(cfg map[string]any) kafka.AWSMSKArguments { +func toKafkaAWSMSK(cfg map[string]any) otelcol.KafkaAWSMSKArguments { if cfg == nil { - return kafka.AWSMSKArguments{} + return otelcol.KafkaAWSMSKArguments{} } - return kafka.AWSMSKArguments{ + return otelcol.KafkaAWSMSKArguments{ Region: cfg["region"].(string), BrokerAddr: cfg["broker_addr"].(string), } @@ -136,12 +136,12 @@ func toKafkaTLSClientArguments(cfg map[string]any) *otelcol.TLSClientArguments { return &res } -func toKafkaKerberos(cfg map[string]any) *kafka.KerberosArguments { +func toKafkaKerberos(cfg map[string]any) *otelcol.KafkaKerberosArguments { if cfg == nil { return nil } - return &kafka.KerberosArguments{ + return &otelcol.KafkaKerberosArguments{ ServiceName: cfg["service_name"].(string), Realm: cfg["realm"].(string), UseKeyTab: cfg["use_keytab"].(bool), @@ -152,15 +152,15 @@ func toKafkaKerberos(cfg map[string]any) *kafka.KerberosArguments { } } -func toKafkaMetadata(cfg kafkaexporter.Metadata) kafka.MetadataArguments { - return kafka.MetadataArguments{ +func toKafkaMetadata(cfg kafkaexporter.Metadata) otelcol.KafkaMetadataArguments { + return otelcol.KafkaMetadataArguments{ IncludeAllTopics: cfg.Full, Retry: toKafkaRetry(cfg.Retry), } } -func toKafkaRetry(cfg kafkaexporter.MetadataRetry) kafka.MetadataRetryArguments { - return kafka.MetadataRetryArguments{ +func toKafkaRetry(cfg kafkaexporter.MetadataRetry) otelcol.KafkaMetadataRetryArguments { + return otelcol.KafkaMetadataRetryArguments{ MaxRetries: cfg.Max, Backoff: cfg.Backoff, } From 58d77774fc749bc33b4472dc93f4cb9516994fea Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Thu, 6 Jun 2024 17:44:46 +0100 Subject: [PATCH 4/5] Apply suggestions from code review Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> --- .../reference/components/otelcol.exporter.kafka.md | 12 ++++++------ .../otelcol-kafka-authentication-plaintext.md | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/docs/sources/reference/components/otelcol.exporter.kafka.md b/docs/sources/reference/components/otelcol.exporter.kafka.md index b4d573e4a1..c1c2b81fdd 100644 --- a/docs/sources/reference/components/otelcol.exporter.kafka.md +++ b/docs/sources/reference/components/otelcol.exporter.kafka.md @@ -10,12 +10,12 @@ title: otelcol.exporter.kafka other `otelcol` components and sends it to Kafka. It is important to use `otelcol.exporter.kafka` together with `otelcol.processor.batch` -in order to make sure `otelcol.exporter.kafka` doesn't slow down due to sending Kafka a huge number of small payloads. +to make sure `otelcol.exporter.kafka` doesn't slow down due to sending Kafka a huge number of small payloads. -> **NOTE**: `otelcol.exporter.kafka` is a wrapper over the upstream -> OpenTelemetry Collector `kafka` exporter from the `otelcol-contrib` -> distribution. Bug reports or feature requests will be redirected to the -> upstream repository, if necessary. +{{< admonition type="note" >}} +`otelcol.exporter.kafka` is a wrapper over the upstream OpenTelemetry Collector `kafka` exporter from the `otelcol-contrib` distribution. +Bug reports or feature requests will be redirected to the upstream repository, if necessary. +{{< /admonition >}} Multiple `otelcol.exporter.kafka` components can be specified by giving them different labels. @@ -169,7 +169,7 @@ Name | Type | Description | Default | Required Refer to the [sarama documentation][RequiredAcks] for more information on `required_acks`. `compression` could be set to either `none`, `gzip`, `snappy`, `lz4`, or `zstd`. -Refer to the [sarama documentation][CompressionCodec] for more information. +Refer to the [Sarama documentation][CompressionCodec] for more information. [RequiredAcks]: https://pkg.go.dev/github.com/IBM/sarama@v1.43.2#RequiredAcks [CompressionCodec]: https://pkg.go.dev/github.com/IBM/sarama@v1.43.2#CompressionCodec diff --git a/docs/sources/shared/reference/components/otelcol-kafka-authentication-plaintext.md b/docs/sources/shared/reference/components/otelcol-kafka-authentication-plaintext.md index e7b47a0fe7..705d3f1d81 100644 --- a/docs/sources/shared/reference/components/otelcol-kafka-authentication-plaintext.md +++ b/docs/sources/shared/reference/components/otelcol-kafka-authentication-plaintext.md @@ -1,5 +1,5 @@ --- -description: Shared content, otelcol kafka plain text authentication +description: Shared content, otelcol kafka plaintext authentication headless: true --- From 5f8e5fe1448c22de010104a42e9e06e6bdbb8ae4 Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Thu, 6 Jun 2024 18:06:59 +0100 Subject: [PATCH 5/5] Prettify casing --- .../components/otelcol-kafka-authentication-kerberos.md | 2 +- .../components/otelcol-kafka-authentication-plaintext.md | 2 +- .../components/otelcol-kafka-authentication-sasl-aws_msk.md | 2 +- .../reference/components/otelcol-kafka-authentication-sasl.md | 2 +- .../shared/reference/components/otelcol-kafka-authentication.md | 2 +- .../shared/reference/components/otelcol-kafka-metadata-retry.md | 2 +- .../shared/reference/components/otelcol-kafka-metadata.md | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) diff --git a/docs/sources/shared/reference/components/otelcol-kafka-authentication-kerberos.md b/docs/sources/shared/reference/components/otelcol-kafka-authentication-kerberos.md index 2fcbfea2c7..ff0e469c63 100644 --- a/docs/sources/shared/reference/components/otelcol-kafka-authentication-kerberos.md +++ b/docs/sources/shared/reference/components/otelcol-kafka-authentication-kerberos.md @@ -1,5 +1,5 @@ --- -description: Shared content, otelcol kafka kerberos authentication +description: Shared content, otelcol Kafka Kerberos authentication headless: true --- diff --git a/docs/sources/shared/reference/components/otelcol-kafka-authentication-plaintext.md b/docs/sources/shared/reference/components/otelcol-kafka-authentication-plaintext.md index 705d3f1d81..bf6d51962a 100644 --- a/docs/sources/shared/reference/components/otelcol-kafka-authentication-plaintext.md +++ b/docs/sources/shared/reference/components/otelcol-kafka-authentication-plaintext.md @@ -1,5 +1,5 @@ --- -description: Shared content, otelcol kafka plaintext authentication +description: Shared content, otelcol Kafka plaintext authentication headless: true --- diff --git a/docs/sources/shared/reference/components/otelcol-kafka-authentication-sasl-aws_msk.md b/docs/sources/shared/reference/components/otelcol-kafka-authentication-sasl-aws_msk.md index 8f35ef3552..5061b9d5cb 100644 --- a/docs/sources/shared/reference/components/otelcol-kafka-authentication-sasl-aws_msk.md +++ b/docs/sources/shared/reference/components/otelcol-kafka-authentication-sasl-aws_msk.md @@ -1,5 +1,5 @@ --- -description: Shared content, otelcol kafka sasl aws_msk authentication +description: Shared content, otelcol Kafka SASL AWS_MSK authentication headless: true --- diff --git a/docs/sources/shared/reference/components/otelcol-kafka-authentication-sasl.md b/docs/sources/shared/reference/components/otelcol-kafka-authentication-sasl.md index 0c2f20844d..a15d2b59f5 100644 --- a/docs/sources/shared/reference/components/otelcol-kafka-authentication-sasl.md +++ b/docs/sources/shared/reference/components/otelcol-kafka-authentication-sasl.md @@ -1,5 +1,5 @@ --- -description: Shared content, otelcol kafka sasl authentication +description: Shared content, otelcol Kafka SASL authentication headless: true --- diff --git a/docs/sources/shared/reference/components/otelcol-kafka-authentication.md b/docs/sources/shared/reference/components/otelcol-kafka-authentication.md index df02b37731..f0df3d7078 100644 --- a/docs/sources/shared/reference/components/otelcol-kafka-authentication.md +++ b/docs/sources/shared/reference/components/otelcol-kafka-authentication.md @@ -1,5 +1,5 @@ --- -description: Shared content, otelcol kafka authentication +description: Shared content, otelcol Kafka authentication headless: true --- diff --git a/docs/sources/shared/reference/components/otelcol-kafka-metadata-retry.md b/docs/sources/shared/reference/components/otelcol-kafka-metadata-retry.md index 52465b8192..f0ca24f486 100644 --- a/docs/sources/shared/reference/components/otelcol-kafka-metadata-retry.md +++ b/docs/sources/shared/reference/components/otelcol-kafka-metadata-retry.md @@ -1,5 +1,5 @@ --- -description: Shared content, otelcol kafka metadata retry +description: Shared content, otelcol Kafka metadata retry headless: true --- diff --git a/docs/sources/shared/reference/components/otelcol-kafka-metadata.md b/docs/sources/shared/reference/components/otelcol-kafka-metadata.md index 2ee3428230..09de4bc85c 100644 --- a/docs/sources/shared/reference/components/otelcol-kafka-metadata.md +++ b/docs/sources/shared/reference/components/otelcol-kafka-metadata.md @@ -1,5 +1,5 @@ --- -description: Shared content, otelcol kafka metadata +description: Shared content, otelcol Kafka metadata headless: true ---