Skip to content

Commit

Permalink
Update Kafka version parsing / supported range (elastic#27720)
Browse files Browse the repository at this point in the history
  • Loading branch information
faec authored and wiwen committed Nov 1, 2021
1 parent 903a126 commit 7058097
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 69 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ https:/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- The disk queue is now GA. {pull}27515[27515]
- Allow non-padded base64 data to be decoded by decode_base64_field {pull}27311[27311], {issue}27021[27021]
- The Kafka support library Sarama has been updated to 1.29.1. {pull}27717[27717]
- Kafka is now supported up to version 2.8.0. {pull}27720[27720]

*Auditbeat*

Expand Down
2 changes: 1 addition & 1 deletion filebeat/docs/inputs/input-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ link:https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-for-kafka-ecos
[[kafka-input-compatibility]]
==== Compatibility

This input works with all Kafka versions in between 0.11 and 2.1.0. Older versions
This input works with all Kafka versions in between 0.11 and 2.8.0. Older versions
might work as well, but are not supported.

[id="{beatname_lc}-input-{type}-options"]
Expand Down
117 changes: 50 additions & 67 deletions libbeat/common/kafka/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,78 +26,47 @@ import (
// Version is a kafka version
type Version string

// TODO: remove me.
// Compat version overwrite for missing versions in sarama
// Public API is compatible between these versions.
var (
v0_10_2_1 = parseKafkaVersion("0.10.2.1")
v0_11_0_1 = parseKafkaVersion("0.11.0.1")
v0_11_0_2 = parseKafkaVersion("0.11.0.2")
v1_0_1 = parseKafkaVersion("1.0.1")
v1_0_2 = parseKafkaVersion("1.0.2")
v1_1_1 = parseKafkaVersion("1.1.1")

kafkaVersions = map[string]sarama.KafkaVersion{
"0.8.2.0": sarama.V0_8_2_0,
"0.8.2.1": sarama.V0_8_2_1,
"0.8.2.2": sarama.V0_8_2_2,
"0.8.2": sarama.V0_8_2_2,
"0.8": sarama.V0_8_2_2,

"0.9.0.0": sarama.V0_9_0_0,
"0.9.0.1": sarama.V0_9_0_1,
"0.9.0": sarama.V0_9_0_1,
"0.9": sarama.V0_9_0_1,

"0.10.0.0": sarama.V0_10_0_0,
"0.10.0.1": sarama.V0_10_0_1,
"0.10.0": sarama.V0_10_0_1,
"0.10.1.0": sarama.V0_10_1_0,
"0.10.1": sarama.V0_10_1_0,
"0.10.2.0": sarama.V0_10_2_0,
"0.10.2.1": v0_10_2_1,
"0.10.2": v0_10_2_1,
"0.10": v0_10_2_1,

"0.11.0.0": sarama.V0_11_0_0,
"0.11.0.1": v0_11_0_1,
"0.11.0.2": v0_11_0_2,
"0.11.0": v0_11_0_2,
"0.11": v0_11_0_2,

"1.0.0": sarama.V1_0_0_0,
"1.0.1": v1_0_1,
"1.0.2": v1_0_2,
"1.0": v1_0_2,
"1.1.0": sarama.V1_1_0_0,
"1.1.1": v1_1_1,
"1.1": v1_1_1,
"1": v1_1_1,

"2.0.0": sarama.V2_0_0_0,
"2.0.1": sarama.V2_0_1_0,
"2.0": sarama.V2_0_1_0,
"2.1": sarama.V2_1_0_0,
"2.2": sarama.V2_2_0_0,
"2": sarama.V2_1_0_0,
// Sarama expects version strings to be fully expanded, e.g. "1.1.1".
// We also allow versions to be specified as a prefix, e.g. "1",
// understood as referencing the most recent version starting with "1".
// truncatedKafkaVersions stores a lookup of the abbreviations we accept.
truncatedKafkaVersions = map[string]sarama.KafkaVersion{
"0.8.2": sarama.V0_8_2_2,
"0.8": sarama.V0_8_2_2,

"0.9.0": sarama.V0_9_0_1,
"0.9": sarama.V0_9_0_1,

"0.10.0": sarama.V0_10_0_1,
"0.10.1": sarama.V0_10_1_0,
"0.10.2": sarama.V0_10_2_1,
"0.10": sarama.V0_10_2_1,

"0.11.0": sarama.V0_11_0_2,
"0.11": sarama.V0_11_0_2,

"1.0": sarama.V1_0_0_0,
"1.1": sarama.V1_1_1_0,
"1": sarama.V1_1_1_0,

"2.0": sarama.V2_0_1_0,
"2.1": sarama.V2_1_0_0,
"2.2": sarama.V2_2_0_0,
"2.3": sarama.V2_3_0_0,
"2.4": sarama.V2_4_0_0,
"2.5": sarama.V2_5_0_0,
"2.6": sarama.V2_6_0_0,
"2": sarama.V2_6_0_0,
}
)

func parseKafkaVersion(s string) sarama.KafkaVersion {
v, err := sarama.ParseKafkaVersion(s)
if err != nil {
panic(err)
}
return v
}

// Validate that a kafka version is among the possible options
func (v *Version) Validate() error {
if _, ok := kafkaVersions[string(*v)]; !ok {
return fmt.Errorf("unknown/unsupported kafka vesion '%v'", *v)
if _, ok := v.Get(); ok {
return nil
}

return nil
return fmt.Errorf("unknown/unsupported kafka version '%v'", *v)
}

// Unpack a kafka version
Expand All @@ -113,6 +82,20 @@ func (v *Version) Unpack(s string) error {

// Get a sarama kafka version
func (v Version) Get() (sarama.KafkaVersion, bool) {
kv, ok := kafkaVersions[string(v)]
return kv, ok
// First check if it's one of the abbreviations we accept.
// If not, let sarama parse it.
s := string(v)
if version, ok := truncatedKafkaVersions[s]; ok {
return version, true
}
version, err := sarama.ParseKafkaVersion(s)
if err != nil {
return sarama.KafkaVersion{}, false
}
for _, supp := range sarama.SupportedVersions {
if version == supp {
return version, true
}
}
return sarama.KafkaVersion{}, false
}
80 changes: 80 additions & 0 deletions libbeat/common/kafka/version_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package kafka

import (
"testing"

"github.com/Shopify/sarama"
)

func TestVersionGet(t *testing.T) {
valid := map[Version]sarama.KafkaVersion{
"0.11": sarama.V0_11_0_2,
"1": sarama.V1_1_1_0,
"2.0.0": sarama.V2_0_0_0,
"2.0.1": sarama.V2_0_1_0,
"2.0": sarama.V2_0_1_0,
"2.5": sarama.V2_5_0_0,
}
invalid := []Version{
"1.1.2",
"1.2.3",
"1.3",
"hello",
"2.0.3",
}
for s, expect := range valid {
got, ok := s.Get()
if !ok {
t.Errorf("'%v' should parse as Kafka version %v, got nothing",
s, expect)
} else if got != expect {
t.Errorf("'%v' should parse as Kafka version %v, got %v",
s, expect, got)
}
}
for _, s := range invalid {
got, ok := s.Get()
if ok {
t.Errorf("'%v' is not a valid Kafka version but parsed as %v",
s, got)
}
}
}

func TestSaramaUpdate(t *testing.T) {
// If any of these versions are considered valid by our parsing code,
// it means someone updated sarama without updating the parsing code
// for the new version. Gently remind them.
flagVersions := []Version{"2.8.1", "2.9.0"}
for _, v := range flagVersions {
if _, ok := v.Get(); ok {
t.Fatalf(
"Kafka version %v is now considered valid. Did you update Sarama?\n"+
"If so, remember to:\n"+
"- Update truncatedKafkaVersions in libbeat/common/kafka/version.go\n"+
"- Update the documentation to list the latest version:\n"+
" * libbeat/outputs/kafka/docs/kafka.asciidoc\n"+
" * filebeat/docs/inputs/inputs-kafka.asciidoc\n"+
"- Update TestSaramaUpdate in libbeat/common/kafka/version_test.go\n",
v)

}
}
}
2 changes: 1 addition & 1 deletion libbeat/outputs/kafka/docs/kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ NOTE: Events bigger than <<kafka-max_message_bytes,`max_message_bytes`>> will be
[[kafka-compatibility]]
==== Compatibility

This output works with all Kafka versions in between 0.11 and 2.2.2. Older versions
This output works with all Kafka versions in between 0.11 and 2.8.0. Older versions
might work as well, but are not supported.

==== Configuration options
Expand Down

0 comments on commit 7058097

Please sign in to comment.