Skip to content

Commit

Permalink
kgo: patch ConsumePartitions regression from 1.11.5
Browse files Browse the repository at this point in the history
AddConsumeTopics was patched in 1.11.5 to actually work for the direct
consumer. Unfortunately, this broke ConsumePartitions by always
consuming all partitions. The prior commits introduced tests ensuring
we do not regress on either of these issues. This commit introduces the
fix.

This also fixes two other small things:
* We retry more on ILLEGAL_GENERATION while committing, which was seen
once in tests (on GH, slow). We also sleep 10ms before retrying, which
is short but hopefully enough of a pause when retrying.
* We change blockingMetadataFn to wait for the function if the function
is called. This was noticed in the TestDirectPartitionPurge test.
  • Loading branch information
twmb committed Feb 5, 2023
1 parent a4b03b2 commit edd0985
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 18 deletions.
10 changes: 10 additions & 0 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,12 +707,19 @@ func (c *consumer) purgeTopics(topics []string) {
for _, topic := range topics {
delete(c.d.using, topic)
delete(c.d.reSeen, topic)
delete(c.d.m, topic)
}
}
}

// AddConsumeTopics adds new topics to be consumed. This function is a no-op if
// the client is configured to consume via regex.
//
// Note that if you are directly consuming and specified ConsumePartitions,
// this function will not add the rest of the partitions for a topic unless the
// topic has been previously purged. That is, if you directly consumed only one
// of five partitions originally, this will not add the other four until the
// entire topic is purged.
func (cl *Client) AddConsumeTopics(topics ...string) {
c := &cl.consumer
if len(topics) == 0 || c.g == nil && c.d == nil || cl.cfg.regex {
Expand All @@ -728,6 +735,9 @@ func (cl *Client) AddConsumeTopics(topics ...string) {
c.g.tps.storeTopics(topics)
} else {
c.d.tps.storeTopics(topics)
for _, topic := range topics {
c.d.m.addt(topic)
}
}
cl.triggerUpdateMetadataNow("from AddConsumeTopics")
}
Expand Down
39 changes: 25 additions & 14 deletions pkg/kgo/consumer_direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@ package kgo

type directConsumer struct {
cfg *cfg
tps *topicsPartitions // data for topics that the user assigned
reSeen map[string]bool // topics we evaluated against regex, and whether we want them or not
using map[string]map[int32]struct{} // topics we are currently using (this only grows)
tps *topicsPartitions // data for topics that the user assigned
using mtmps // topics we are currently using
m mtmps // mirrors cfg.topics and cfg.partitions, but can change with Purge or Add
reSeen map[string]bool // topics we evaluated against regex, and whether we want them or not
}

func (c *consumer) initDirect() {
d := &directConsumer{
cfg: &c.cl.cfg,
tps: newTopicsPartitions(),
reSeen: make(map[string]bool),
using: make(map[string]map[int32]struct{}),
using: make(mtmps),
m: make(mtmps),
}
c.d = d

Expand All @@ -21,11 +23,15 @@ func (c *consumer) initDirect() {
}

var topics []string
for topic := range d.cfg.topics {
for topic, partitions := range d.cfg.partitions {
topics = append(topics, topic)
for partition := range partitions {
d.m.add(topic, partition)
}
}
for topic := range d.cfg.partitions {
for topic := range d.cfg.topics {
topics = append(topics, topic)
d.m.addt(topic)
}
d.tps.storeTopics(topics) // prime topics to load if non-regex (this is of no benefit if regex)
}
Expand Down Expand Up @@ -59,7 +65,7 @@ func (d *directConsumer) findNewAssignments() map[string]map[int32]Offset {

toUse := make(map[string]map[int32]Offset, 10)
for topic, topicPartitions := range topics {
useTopic := true
var useTopic bool
if d.cfg.regex {
want, seen := d.reSeen[topic]
if !seen {
Expand All @@ -75,6 +81,8 @@ func (d *directConsumer) findNewAssignments() map[string]map[int32]Offset {
d.reSeen[topic] = want
}
useTopic = want
} else {
useTopic = d.m.onlyt(topic)
}

// If the above detected that we want to keep this topic, we
Expand All @@ -95,14 +103,17 @@ func (d *directConsumer) findNewAssignments() map[string]map[int32]Offset {
}

// Lastly, if this topic has some specific partitions pinned,
// we set those.
for partition, offset := range d.cfg.partitions[topic] {
toUseTopic, exists := toUse[topic]
if !exists {
toUseTopic = make(map[int32]Offset, 10)
toUse[topic] = toUseTopic
// we set those. We only use partitions from topics that have
// not been purged.
for topic := range d.m {
for partition, offset := range d.cfg.partitions[topic] {
toUseTopic, exists := toUse[topic]
if !exists {
toUseTopic = make(map[int32]Offset, 10)
toUse[topic] = toUseTopic
}
toUseTopic[partition] = offset
}
toUseTopic[partition] = offset
}
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -2596,10 +2596,10 @@ func (g *groupConsumer) commitAcrossRebalance(
return
}

// We retry up to twice (three tries total): cooperative rebalancing
// We retry four times, for five tries total: cooperative rebalancing
// uses two back to back rebalances, and the commit could
// pathologically end during both. A third failure is unexpected.
if g.cooperative && tries < 3 {
// pathologically end during both.
if g.cooperative && tries < 5 {
origDone := onDone
onDone = func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
retry := err == nil
Expand Down Expand Up @@ -2653,6 +2653,7 @@ func (g *groupConsumer) commitAcrossRebalance(
if retry {
go func() {
g.cl.cfg.logger.Log(LogLevelInfo, "CommitOffsets spanned a rebalance, we are cooperative and did not lose any partition we were trying to commit, recommitting", "err", retryErr)
time.Sleep(10 * time.Millisecond)
g.mu.Lock()
defer g.mu.Unlock()
g.commitAcrossRebalance(ctx, uncommitted, origDone, tries+1)
Expand Down
9 changes: 8 additions & 1 deletion pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,15 @@ func (cl *Client) triggerUpdateMetadataNow(why string) {
}

func (cl *Client) blockingMetadataFn(fn func()) {
var wg sync.WaitGroup
wg.Add(1)
waitfn := func() {
defer wg.Done()
fn()
}
select {
case cl.blockingMetadataFnCh <- fn:
case cl.blockingMetadataFnCh <- waitfn:
wg.Wait()
case <-cl.ctx.Done():
}
}
Expand Down
45 changes: 45 additions & 0 deletions pkg/kgo/topics_and_partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import (
"github.com/twmb/franz-go/pkg/kerr"
)

/////////////
// HELPERS // -- ugly types to eliminate the toil of nil maps and lookups
/////////////

func dupmsi32(m map[string]int32) map[string]int32 {
d := make(map[string]int32, len(m))
for t, ps := range m {
Expand Down Expand Up @@ -71,6 +75,43 @@ func (m mtps) String() string {
return sb.String()
}

type mtmps map[string]map[int32]struct{} // map of topics to map of partitions

func (m *mtmps) add(t string, p int32) {
if *m == nil {
*m = make(mtmps)
}
mps := (*m)[t]
if mps == nil {
mps = make(map[int32]struct{})
(*m)[t] = mps
}
mps[p] = struct{}{}
}

func (m *mtmps) addt(t string) {
if *m == nil {
*m = make(mtmps)
}
mps := (*m)[t]
if mps == nil {
mps = make(map[int32]struct{})
(*m)[t] = mps
}
}

func (m mtmps) onlyt(t string) bool {
if m == nil {
return false
}
ps, exists := m[t]
return exists && len(ps) == 0
}

////////////
// PAUSED // -- types for pausing topics and partitions
////////////

type pausedTopics map[string]pausedPartitions

type pausedPartitions struct {
Expand Down Expand Up @@ -174,6 +215,10 @@ func (m pausedTopics) clone() pausedTopics {
return dup
}

//////////
// GUTS // -- the key types for storing important metadata for topics & partitions
//////////

func newTopicPartitions() *topicPartitions {
parts := new(topicPartitions)
parts.v.Store(new(topicPartitionsData))
Expand Down

0 comments on commit edd0985

Please sign in to comment.