Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for non-unique keys in Kafka output headers #30369

Merged
merged 2 commits into from
Feb 14, 2022

Conversation

rdner
Copy link
Member

@rdner rdner commented Feb 12, 2022

What does this PR do?

According to the Kafka documentation, header keys are not supposed to
be unique, therefore we must support the headers in the same way.

Also, documented the headers configuration, so customers can start using it.

This is a follow up to #29940

Why is it important?

Initially, it was a community PR hence for someone this functionality is important. This PR brings the implementation closer to the original Kafka feature as the documentation states:

A Header is a key-value pair, and multiple headers can be included with the key, value, and timestamp in each Kafka message.

source https://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/connect/header/Header.html

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
    - [ ] I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.

How to test this PR locally

  1. Run Kafka locally in its standard configuration

  2. Run filebeat with the following configuration:

filebeat.inputs:
  - type: log
    paths:
      - "/path/to/your/file.log"

output.kafka:
  hosts: ["localhost:9092"]
  topic: "test-topic"
  headers:
    - key: "same-key"
      value: "value1"
    - key: "same-key"
      value: "value2"
    - key: "another-key"
      value: "another value"

(replace /path/to/your/file.log with you real path)

  1. Write a line to the file /path/to/your/file.log
  2. Check that the message has headers with this command:
kafka-console-consumer --from-beginning --property print.headers=true --topic test-topic --bootstrap-server localhost:9092

For example, when I appended a line "test message" to my file I got this output:

same-key:value1,same-key:value2,another-key:another value       {"@timestamp":"2022-02-13T08:30:20.645Z","@metadata":{"beat":"filebeat","type":"_doc","version":"8.2.0"},"log":{"offset":0,"file":{"path":"/Users/rdner/Projects/tests/input.log"}},"message":"test message","input":{"type":"log"},"ecs":{"version":"8.0.0"},"host":{"name":"MacBook-Pro.localdomain"},"agent":{"ephemeral_id":"1ac3ddd7-c294-4f22-a081-e6baeab977eb","id":"4fd6b173-0f13-4792-888a-b54dcd85496c","name":"MacBook-Pro.localdomain","type":"filebeat","version":"8.2.0"}}

Mind the headers in the beginning of the line.

Related issues

According to the Kafka documentation header keys are not supposed to
be unique, therefore we must support the headers in the same way.

Also, documented the headers configuration, so customers can start using it.
@rdner rdner self-assigned this Feb 12, 2022
@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Feb 12, 2022
@mergify
Copy link
Contributor

mergify bot commented Feb 12, 2022

This pull request does not have a backport label. Could you fix it @rdner? 🙏
To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-v./d./d./d is the label to automatically backport to the 7./d branch. /d is the digit

NOTE: backport-skip has been added to this pull request.

@mergify mergify bot added the backport-skip Skip notification from the automated backport with mergify label Feb 12, 2022
@elasticmachine
Copy link
Collaborator

elasticmachine commented Feb 12, 2022

💚 Build Succeeded

the below badges are clickable and redirect to their specific view in the CI or DOCS
Pipeline View Test View Changes Artifacts preview preview

Expand to view the summary

Build stats

  • Start Time: 2022-02-12T21:09:56.240+0000

  • Duration: 58 min 56 sec

Test stats 🧪

Test Results
Failed 0
Passed 28215
Skipped 3032
Total 31247

💚 Flaky test report

Tests succeeded.

🤖 GitHub comments

To re-run your PR in the CI, just comment with:

  • /test : Re-trigger the build.

  • /package : Generate the packages and run the E2E tests.

  • /beats-tester : Run the installation tests with beats-tester.

  • run elasticsearch-ci/docs : Re-trigger the docs validation. (use unformatted text in the comment!)

@rdner
Copy link
Member Author

rdner commented Feb 12, 2022

/test

@rdner rdner added the Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team label Feb 14, 2022
@elasticmachine
Copy link
Collaborator

Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane)

@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Feb 14, 2022
@rdner rdner requested a review from kvch February 14, 2022 07:16
Copy link
Contributor

@kvch kvch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code LGTM. However, I am on the fence about this change. It would be nice if we had a similar configuration for all outputs. In the Elasticsearch output, we use the previous format, you just changed: https://github.com/elastic/beats/blob/main/libbeat/_meta/config/output-elasticsearch.reference.yml.tmpl#L44-L46

Can't we provide multiple values in one header by separating them with a comma? Example: My-Header: first-value, second-value.

@rdner
Copy link
Member Author

rdner commented Feb 14, 2022

@kvch I don't think we can separate values with a comma, how does one put a value that contains a comma then?
This change is not to support something extra but rather to reflect the actual Kafka feature and their spec, this is how it's configured in Kafka.

@kvch
Copy link
Contributor

kvch commented Feb 14, 2022

My idea is based on the RFC of the HTTP protocol:

Multiple message-header fields with the same field-name MAY be
present in a message if and only if the entire field-value for that
header field is defined as a comma-separated list [i.e., #(values)].

According to the RFC values that contain commas are quoted:

Because commas (",") are used as a generic delimiter between
field-values, they need to be treated with care if they are allowed
in the field-value. Typically, components that might contain a comma
are protected with double-quotes using the quoted-string ABNF
production.

Ref: https://datatracker.ietf.org/doc/html/rfc7231

If Kafka treats headers the same way as HTTP does, there is no need for your PR. Users can already configure multiple values for the same keys. But I haven't really found any information about how Kafka treats such headers. That is the bedrock of my question in my previous comment.

This change is not to support something extra but rather to reflect the actual Kafka feature and their spec, this is how it's configured in Kafka.

Right. Our configuration should not reflect how Kafka itself is configured. We should be consistent throughout our own configuration. For example, letting people configure headers the same way in all outputs regardless of how the output themselves implement headers. That should not concern our users.

@kvch
Copy link
Contributor

kvch commented Feb 14, 2022

Also, even if Kafka does not support it, we can still implement the comma-separated list approach.

@rdner
Copy link
Member Author

rdner commented Feb 14, 2022

@kvch I'm not sure I understand the comparison with HTTP, Kafka headers are not related to HTTP headers in any manner. Let's say I, as a user, want to set the following headers for each message sent to Kafka:

{
  "key": "first-key",
  "value": "1,2,3,4"
},
{
  "key": "second-key",
  "value": "value1"
},
{
  "key": "second-key",
  "value": "value2"
}

These headers should be attached to the message as it's represented above in the same order without any transformations. If we don't support this config structure, it's not possible to satisfy the following requirements:

  1. Preserve the order – using maps does not preserve the order.
  2. Have multiple keys with the same name – it's supported by Kafka, so I believe we must support it too, otherwise we have compatibility problems and bad UX
  3. Passthrough the initial value unchanged – If we choose to support comma-separated values we cannot keep the value 1,2,3,4 from my example unchanged.

We cannot just change how this feature works on our side if we say this parameter is Kafka headers. It's going to be very confusing for our customers that they cannot do the same thing as any Kafka client does.

For me personally, the argument of having consistent Kafka headers configuration with HTTP headers in Elasticsearh is quite weak, I don't see any reason why they should be related and consistent, they are different things.

@rdner rdner requested a review from cmacknz February 14, 2022 13:56
@rdner
Copy link
Member Author

rdner commented Feb 14, 2022

The requirements I listed were taken from the initial design document for Kafka Headers:

Key duplication and ordering: Duplicate headers with the same key must be supported.
The order of headers must be retained throughout a record's end-to-end lifetime: from producer to consumer.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers#KIP82AddRecordHeaders-Keyduplicationandordering

@kvch
Copy link
Contributor

kvch commented Feb 14, 2022

@kvch I'm not sure I understand the comparison with HTTP, Kafka headers are not related to HTTP headers in any manner.

I understand that Kafka headers are not related to HTTP headers. I was just curious if Kafka allowed setting values in headers similarly to HTTP headers. The documentation you referred to was quite short and did not provide much info about what a value can be or about the ordering, etc. But thanks for explaining it in detail.

The headers can also have a schema. Is that something we should support?

@rdner
Copy link
Member Author

rdner commented Feb 14, 2022

@kvch

The headers can also have a schema. Is that something we should support?

Well, there was no request from the community yet. The headers feature was a community contribution (linked PR).

UPD
Didn't find any mention of header schemas in the spec document though.

@rdner rdner merged commit 85f4be1 into elastic:main Feb 14, 2022
@rdner rdner deleted the enable-multiple-output-kafka-headers branch February 14, 2022 14:29
@cmacknz
Copy link
Member

cmacknz commented Feb 14, 2022

LGTM, good catch in making this consistent with the spec.

v1v added a commit to v1v/beats that referenced this pull request Feb 21, 2022
…into feature/use-with-kind-k8s-env

* 'feature/use-with-kind-k8s-env' of github.com:v1v/beats: (52 commits)
  ci: home is declared within withBeatsEnv
  ci: use withKindEnv step
  ci: use getBranchesFromAliases and support next-patch-8 (elastic#30400)
  Update fields.yml (elastic#29609)
  Heartbeat: fix browser metrics and trace mappings (elastic#30258)
  Apply light edits to 8.0 changelog (elastic#30351)
  packetbeat/beater: make sure Npcap installation runs before interfaces are needed (elastic#30396)
  Add a ring-buffer reporter to libbeat (elastic#28750)
  Osquerybeat: Add install verification for osquerybeat (elastic#30388)
  update windows matrix support (elastic#30373)
  Refactor of metricbeat process-gathering metrics and system/process (elastic#30076)
  adjust next changelog wording (elastic#30371)
  [Metricbeat] azure: move event report into loop validDim loop (elastic#29945)
  fix: report GitHub Check before the cache (elastic#30372)
  Add support for non-unique keys in Kafka output headers (elastic#30369)
  ci: 6 major branch reached EOL (elastic#30357)
  reduce Elastic Agent shut down time by stopping processes concurrently (elastic#29650)
  [Filebeat] Add message to register encode/decode debug logs (elastic#30271)
  [libbeat] kafka message header support (elastic#29940)
  Heartbeat: set duration to zero for syntax errors (elastic#30227)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport-skip Skip notification from the automated backport with mergify docs enhancement libbeat Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team v8.2.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants