Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Kafka version parsing / supported range #27720

Merged
merged 8 commits into from
Sep 3, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- update ECS field definitions to ECS 1.11.0. {pull}27107[27107]
- The disk queue is now GA. {pull}27515[27515]
- Allow non-padded base64 data to be decoded by decode_base64_field {pull}27311[27311], {issue}27021[27021]
- The Kafka support library Sarama has been updated to 1.29.1. {pull}27717[27717]
- Kafka is now supported up to version 2.8.0. {pull}27720[27720]

*Auditbeat*

Expand Down
1,210 changes: 1,155 additions & 55 deletions NOTICE.txt

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion filebeat/docs/inputs/input-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ link:https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-for-kafka-ecos
[[kafka-input-compatibility]]
==== Compatibility

This input works with all Kafka versions in between 0.11 and 2.1.0. Older versions
This input works with all Kafka versions in between 0.11 and 2.8.0. Older versions
might work as well, but are not supported.

[id="{beatname_lc}-input-{type}-options"]
Expand Down
18 changes: 11 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ require (
github.com/gogo/protobuf v1.3.1
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.4.3
github.com/golang/snappy v0.0.1
github.com/golang/snappy v0.0.3
github.com/gomodule/redigo v1.8.3
github.com/google/flatbuffers v1.7.2-0.20170925184458-7a6b2bf521e9
github.com/google/go-cmp v0.5.2
github.com/google/go-cmp v0.5.4
github.com/google/gopacket v1.1.18-0.20191009163724-0ad7f2610e34
github.com/google/uuid v1.1.2
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75
Expand Down Expand Up @@ -139,7 +139,7 @@ require (
github.com/prometheus/common v0.7.0
github.com/prometheus/procfs v0.0.11
github.com/prometheus/prometheus v2.5.0+incompatible
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/samuel/go-parser v0.0.0-20130731160455-ca8abbf65d0e // indirect
github.com/samuel/go-thrift v0.0.0-20140522043831-2187045faa54
github.com/sanathkr/yaml v1.0.1-0.20170819201035-0056894fa522 // indirect
Expand All @@ -155,7 +155,7 @@ require (
github.com/ugorji/go/codec v1.1.8
github.com/urso/sderr v0.0.0-20210525210834-52b04e8f5c71
github.com/vmware/govmomi v0.0.0-20170802214208-2cad15190b41
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
github.com/xdg/scram v1.0.3
github.com/yuin/gopher-lua v0.0.0-20170403160031-b402f3114ec7 // indirect
go.elastic.co/apm v1.11.0
go.elastic.co/apm/module/apmelasticsearch v1.7.2
Expand All @@ -168,19 +168,23 @@ require (
go.uber.org/zap v1.14.0
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e
golang.org/x/lint v0.0.0-20200130185559-910be7a94367
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110
golang.org/x/net v0.0.0-20210614182718-04defd469f4e
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c
golang.org/x/text v0.3.5
golang.org/x/text v0.3.6
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
golang.org/x/tools v0.1.1
google.golang.org/api v0.15.0
google.golang.org/genproto v0.0.0-20210303154014-9728d6b83eeb
google.golang.org/grpc v1.29.1
google.golang.org/protobuf v1.25.0
gopkg.in/inf.v0 v0.9.1
gopkg.in/jcmturner/aescts.v1 v1.0.1 // indirect
gopkg.in/jcmturner/dnsutils.v1 v1.0.1 // indirect
gopkg.in/jcmturner/goidentity.v3 v3.0.0 // indirect
gopkg.in/jcmturner/gokrb5.v7 v7.5.0
gopkg.in/jcmturner/rpc.v1 v1.1.0 // indirect
gopkg.in/mgo.v2 v2.0.0-20160818020120-3f83fa500528
gopkg.in/yaml.v2 v2.3.0
gotest.tools v2.2.0+incompatible
Expand All @@ -193,7 +197,7 @@ require (

replace (
github.com/Microsoft/go-winio => github.com/bi-zone/go-winio v0.4.15
github.com/Shopify/sarama => github.com/elastic/sarama v1.19.1-0.20210120173147-5c8cb347d877
github.com/Shopify/sarama => github.com/elastic/sarama v1.19.1-0.20210823122811-11c3ef800752
github.com/cucumber/godog => github.com/cucumber/godog v0.8.1
github.com/docker/docker => github.com/docker/engine v0.0.0-20191113042239-ea84732a7725
github.com/docker/go-plugins-helpers => github.com/elastic/go-plugins-helpers v0.0.0-20200207104224-bdf17607b79f
Expand Down
68 changes: 41 additions & 27 deletions go.sum

Large diffs are not rendered by default.

117 changes: 50 additions & 67 deletions libbeat/common/kafka/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,78 +26,47 @@ import (
// Version is a kafka version
type Version string

// TODO: remove me.
// Compat version overwrite for missing versions in sarama
// Public API is compatible between these versions.
var (
v0_10_2_1 = parseKafkaVersion("0.10.2.1")
v0_11_0_1 = parseKafkaVersion("0.11.0.1")
v0_11_0_2 = parseKafkaVersion("0.11.0.2")
v1_0_1 = parseKafkaVersion("1.0.1")
v1_0_2 = parseKafkaVersion("1.0.2")
v1_1_1 = parseKafkaVersion("1.1.1")

kafkaVersions = map[string]sarama.KafkaVersion{
"0.8.2.0": sarama.V0_8_2_0,
"0.8.2.1": sarama.V0_8_2_1,
"0.8.2.2": sarama.V0_8_2_2,
"0.8.2": sarama.V0_8_2_2,
"0.8": sarama.V0_8_2_2,

"0.9.0.0": sarama.V0_9_0_0,
"0.9.0.1": sarama.V0_9_0_1,
"0.9.0": sarama.V0_9_0_1,
"0.9": sarama.V0_9_0_1,

"0.10.0.0": sarama.V0_10_0_0,
"0.10.0.1": sarama.V0_10_0_1,
"0.10.0": sarama.V0_10_0_1,
"0.10.1.0": sarama.V0_10_1_0,
"0.10.1": sarama.V0_10_1_0,
"0.10.2.0": sarama.V0_10_2_0,
"0.10.2.1": v0_10_2_1,
"0.10.2": v0_10_2_1,
"0.10": v0_10_2_1,

"0.11.0.0": sarama.V0_11_0_0,
"0.11.0.1": v0_11_0_1,
"0.11.0.2": v0_11_0_2,
"0.11.0": v0_11_0_2,
"0.11": v0_11_0_2,

"1.0.0": sarama.V1_0_0_0,
"1.0.1": v1_0_1,
"1.0.2": v1_0_2,
"1.0": v1_0_2,
"1.1.0": sarama.V1_1_0_0,
"1.1.1": v1_1_1,
"1.1": v1_1_1,
"1": v1_1_1,

"2.0.0": sarama.V2_0_0_0,
"2.0.1": sarama.V2_0_1_0,
"2.0": sarama.V2_0_1_0,
"2.1": sarama.V2_1_0_0,
"2.2": sarama.V2_2_0_0,
"2": sarama.V2_1_0_0,
// Sarama expects version strings to be fully expanded, e.g. "1.1.1".
// We also allow versions to be specified as a prefix, e.g. "1",
// understood as referencing the most recent version starting with "1".
// truncatedKafkaVersions stores a lookup of the abbreviations we accept.
truncatedKafkaVersions = map[string]sarama.KafkaVersion{
"0.8.2": sarama.V0_8_2_2,
"0.8": sarama.V0_8_2_2,

"0.9.0": sarama.V0_9_0_1,
"0.9": sarama.V0_9_0_1,

"0.10.0": sarama.V0_10_0_1,
"0.10.1": sarama.V0_10_1_0,
"0.10.2": sarama.V0_10_2_1,
"0.10": sarama.V0_10_2_1,

"0.11.0": sarama.V0_11_0_2,
"0.11": sarama.V0_11_0_2,

"1.0": sarama.V1_0_0_0,
"1.1": sarama.V1_1_1_0,
"1": sarama.V1_1_1_0,

"2.0": sarama.V2_0_1_0,
"2.1": sarama.V2_1_0_0,
"2.2": sarama.V2_2_0_0,
"2.3": sarama.V2_3_0_0,
"2.4": sarama.V2_4_0_0,
"2.5": sarama.V2_5_0_0,
"2.6": sarama.V2_6_0_0,
"2": sarama.V2_6_0_0,
}
)

func parseKafkaVersion(s string) sarama.KafkaVersion {
v, err := sarama.ParseKafkaVersion(s)
if err != nil {
panic(err)
}
return v
}

// Validate that a kafka version is among the possible options
func (v *Version) Validate() error {
if _, ok := kafkaVersions[string(*v)]; !ok {
return fmt.Errorf("unknown/unsupported kafka vesion '%v'", *v)
if _, ok := v.Get(); ok {
return nil
}

return nil
return fmt.Errorf("unknown/unsupported kafka version '%v'", *v)
}

// Unpack a kafka version
Expand All @@ -113,6 +82,20 @@ func (v *Version) Unpack(s string) error {

// Get a sarama kafka version
func (v Version) Get() (sarama.KafkaVersion, bool) {
kv, ok := kafkaVersions[string(v)]
return kv, ok
// First check if it's one of the abbreviations we accept.
// If not, let sarama parse it.
s := string(v)
if version, ok := truncatedKafkaVersions[s]; ok {
return version, true
}
version, err := sarama.ParseKafkaVersion(s)
if err != nil {
return sarama.KafkaVersion{}, false
}
for _, supp := range sarama.SupportedVersions {
if version == supp {
return version, true
}
}
return sarama.KafkaVersion{}, false
}
80 changes: 80 additions & 0 deletions libbeat/common/kafka/version_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package kafka

import (
"testing"

"github.com/Shopify/sarama"
)

func TestVersionGet(t *testing.T) {
valid := map[Version]sarama.KafkaVersion{
"0.11": sarama.V0_11_0_2,
"1": sarama.V1_1_1_0,
"2.0.0": sarama.V2_0_0_0,
"2.0.1": sarama.V2_0_1_0,
"2.0": sarama.V2_0_1_0,
"2.5": sarama.V2_5_0_0,
}
invalid := []Version{
"1.1.2",
"1.2.3",
"1.3",
"hello",
"2.0.3",
}
for s, expect := range valid {
got, ok := s.Get()
if !ok {
t.Errorf("'%v' should parse as Kafka version %v, got nothing",
s, expect)
} else if got != expect {
t.Errorf("'%v' should parse as Kafka version %v, got %v",
s, expect, got)
}
}
for _, s := range invalid {
got, ok := s.Get()
if ok {
t.Errorf("'%v' is not a valid Kafka version but parsed as %v",
s, got)
}
}
}

func TestSaramaUpdate(t *testing.T) {
// If any of these versions are considered valid by our parsing code,
// it means someone updated sarama without updating the parsing code
// for the new version. Gently remind them.
flagVersions := []Version{"2.8.1", "2.9.0"}
for _, v := range flagVersions {
if _, ok := v.Get(); ok {
t.Fatalf(
"Kafka version %v is now considered valid. Did you update Sarama?\n"+
"If so, remember to:\n"+
"- Update truncatedKafkaVersions in libbeat/common/kafka/version.go\n"+
"- Update the documentation to list the latest version:\n"+
" * libbeat/outputs/kafka/docs/kafka.asciidoc\n"+
" * filebeat/docs/inputs/inputs-kafka.asciidoc\n"+
"- Update TestSaramaUpdate in libbeat/common/kafka/version_test.go\n",
v)

}
}
}
2 changes: 1 addition & 1 deletion libbeat/outputs/kafka/docs/kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ NOTE: Events bigger than <<kafka-max_message_bytes,`max_message_bytes`>> will be
[[kafka-compatibility]]
==== Compatibility

This output works with all Kafka versions in between 0.11 and 2.2.2. Older versions
This output works with all Kafka versions in between 0.11 and 2.8.0. Older versions
might work as well, but are not supported.

==== Configuration options
Expand Down