Skip to content

Commit

Permalink
kversion: fix version detection for Kafka v2.7 through 3.4
Browse files Browse the repository at this point in the history
Kafka v3.4 added envelope support for the Envelope request to the zk
based broker, so I added that to kversion.

This made Envelope a required request for version detection ever since
it was introduced -- v2.7 for raft -- and pinned version detection to
"at least 2.7" for all cluster between 2.7 and 3.4.

We now ignore the envelope key when version guessing because ultimately
other keys are differentiating enough. This allows version detection to
now again correctly guess 2.7 through 3.4.

Tested against zk 3.1, 3.2, 3.5, as well as kraft 3.5 manually, and adds
a unit test for the 3.1 versions.

Closes #536.
  • Loading branch information
twmb committed Aug 17, 2023
1 parent 83cb9fe commit 5978156
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 4 deletions.
16 changes: 12 additions & 4 deletions pkg/kversion/kversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func SkipKeys(keys ...int16) VersionGuessOpt {
}

// TryRaftBroker changes from guessing the version for a classical ZooKeeper
// based broker to guessing for a raft based broker (v2.8.0+).
// based broker to guessing for a raft based broker (v2.8+).
//
// Note that with raft, there can be a TryRaftController attempt as well.
func TryRaftBroker() VersionGuessOpt {
Expand All @@ -147,7 +147,7 @@ func TryRaftBroker() VersionGuessOpt {

// TryRaftController changes from guessing the version for a classical
// ZooKeeper based broker to guessing for a raft based controller broker
// (v2.8.0+).
// (v2.8+).
//
// Note that with raft, there can be a TryRaftBroker attempt as well. Odds are
// that if you are an end user speaking to a raft based Kafka cluster, you are
Expand All @@ -164,7 +164,7 @@ type guessCfg struct {

// VersionGuess attempts to guess which version of Kafka these versions belong
// to. If an exact match can be determined, this returns a string in the format
// v0.#.# or v#.# (depending on whether Kafka is pre-1.0.0 or post). For
// v0.#.# or v#.# (depending on whether Kafka is pre-1.0 or post). For
// example, v0.8.0 or v2.7.
//
// Patch numbers are not included in the guess as it is not possible to
Expand Down Expand Up @@ -253,7 +253,15 @@ func (g guess) String() string {
func (vs *Versions) versionGuess(opts ...VersionGuessOpt) guess {
cfg := guessCfg{
listener: zkBroker,
skipKeys: []int16{4, 5, 6, 7, 27},
// Envelope was added in 2.7 for kraft and zkBroker in 3.4; we
// need to skip it for 2.7 through 3.4 otherwise the version
// detection fails. We can just skip it generally since there
// are enough differentiating factors that accurately detecting
// envelope doesn't matter.
//
// TODO: add introduced-version to differentiate some specific
// keys.
skipKeys: []int16{4, 5, 6, 7, 27, 58},
}
for _, opt := range opts {
opt.apply(&cfg)
Expand Down
72 changes: 72 additions & 0 deletions pkg/kversion/kversion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,75 @@ func TestEqual(t *testing.T) {
t.Errorf("unexpectedly not equal after backing v0.8.1 down to v0.8.0, opposite direction")
}
}

func TestVersionProbeKafka3_1(t *testing.T) {
versions := map[int16]int16{
0: 9, // Produce
1: 13, // Fetch
2: 7, // ListOffsets
3: 12, // Metadata
4: 5, // LeaderAndISR
5: 3, // StopReplica
6: 7, // UpdateMetadata
7: 3, // ControlledShutdown
8: 8, // OffsetCommit
9: 8, // OffsetFetch
10: 4, // FindCoordinator
11: 7, // JoinGroup
12: 4, // Heartbeat
13: 4, // LeaveGroup
14: 5, // SyncGroup
15: 5, // DescribeGroups
16: 4, // ListGroups
17: 1, // SASLHandshake
18: 3, // ApiVersions
19: 7, // CreateTopics
20: 6, // DeleteTopics
21: 2, // DeleteRecords
22: 4, // InitProducerID
23: 4, // OffsetForLeaderEpoch
24: 3, // AddPartitionsToTxn
25: 3, // AddOffsetsToTxn
26: 3, // EndTxn
27: 1, // WriteTxnMarkers
28: 3, // TxnOffsetCommit
29: 2, // DescribeACLs
30: 2, // CreateACLs
31: 2, // DeleteACLs
32: 4, // DescribeConfigs
33: 2, // AlterConfigs
34: 2, // AlterReplicaLogDirs
35: 2, // DescribeLogDirs
36: 2, // SASLAuthenticate
37: 3, // CreatePartitions
38: 2, // CreateDelegationToken
39: 2, // RenewDelegationToken
40: 2, // ExpireDelegationToken
41: 2, // DescribeDelegationToken
42: 2, // DeleteGroups
43: 2, // ElectLeaders
44: 1, // IncrementalAlterConfigs
45: 0, // AlterPartitionAssignments
46: 0, // ListPartitionReassignments
47: 0, // OffsetDelete
48: 1, // DescribeClientQuotas
49: 1, // AlterClientQuotas
50: 0, // DescribeUserSCRAMCredentials
51: 0, // AlterUserSCRAMCredentials
56: 0, // AlterPartition
57: 0, // UpdateFeatures
60: 0, // DescribeCluster
61: 0, // DescribeProducers
65: 0, // DescribeTransactions
66: 0, // ListTransactions
67: 0, // AllocateProducerIDs
}

var vs Versions
for k, v := range versions {
vs.SetMaxKeyVersion(k, v)
}
if guess := vs.VersionGuess(); guess != "v3.1" {
t.Errorf("unexpected version guess, got %s != exp %s", guess, "v3.1")
}
}

0 comments on commit 5978156

Please sign in to comment.