Skip to content

Commit

Permalink
sticky balancer: try for better topic distribution among members
Browse files Browse the repository at this point in the history
The sticky balancer currently strives for ultimate stickiness, with no
regard to trying to balance topic partitions among members equally. When
adding a member, it is often the case that an entire topic's partitions
shifts to the other member, while the first member has the other topic.

By sorting by partition number before balancing, when the algorithm
steals partitions from the end of an existing member to give to the new
member, we ensure that we divvy up the topics equally to both members
while still ensuring stickiness.

This is likely not perfect but it goes a long way.
  • Loading branch information
twmb committed Oct 21, 2023
1 parent 6a961da commit 1429d47
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 0 deletions.
18 changes: 18 additions & 0 deletions pkg/kgo/internal/sticky/sticky.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,24 @@ func (b *balancer) assignUnassignedAndInitGraph() {
}

b.tryRestickyStales(topicPotentials, partitionConsumers)

// For each member, we now sort their current partitions by partition,
// then topic. Sorting the lowest numbers first means that once we
// steal from the end (when adding a member), we steal equally across
// all topics. This benefits the standard case the most, where all
// members consume equally.
for memberNum := range b.plan {
partNums := b.plan[memberNum]
sort.Slice(partNums, func(i, j int) bool {
lpNum, rpNum := partNums[i], partNums[j]
ltNum, rtNum := b.partOwners[lpNum], b.partOwners[rpNum]
li, ri := b.topicInfos[ltNum], b.topicInfos[rtNum]
lt, rt := li.topic, ri.topic
lp, rp := lpNum-li.partNum, rpNum-ri.partNum
return lp < rp || (lp == rp && lt < rt)
})
}

for _, potentials := range topicPotentials {
(&membersByPartitions{potentials, b.plan}).init()
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/kgo/internal/sticky/sticky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1517,6 +1517,30 @@ func Test_stickyBalanceStrategy_Plan_AssignmentWithMultipleGenerations2(t *testi
testPlanUsage(t, plan3, topics, nil)
}

func Test_stickyAddEqualMove(t *testing.T) {
t.Parallel()
topics := map[string]int32{"foo": 16, "bar": 16}
members := []GroupMember{
{ID: "1", Topics: []string{"foo", "bar"}},
}
plan1 := Balance(members, topics)

// PLAN 2
members[0].UserData = udEncode(1, 1, plan1["1"])
members = append(members, GroupMember{
ID: "2", Topics: []string{"foo", "bar"},
})

plan2 := Balance(members, topics)
testEqualDivvy(t, plan2, 16, members)
testPlanUsage(t, plan2, topics, nil)

if len(plan2["1"]["foo"]) != 8 || len(plan2["1"]["bar"]) != 8 ||
len(plan2["2"]["foo"]) != 8 || len(plan2["2"]["bar"]) != 8 {
t.Errorf("bad distribution: %v", plan2)
}
}

func Test_stickyBalanceStrategy_Plan_AssignmentWithConflictingPreviousGenerations(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 1429d47

Please sign in to comment.