Skip to content

Commit

Permalink
pkg/kgo: patch AddConsumeTopics
Browse files Browse the repository at this point in the history
* This previously did not work for the direct consumer
* We now allow this work work for clients that do not consume anything
to begin with
* We now allow a consumer to join a group with no topic interests, so
that the end-user can add interests later
  • Loading branch information
twmb committed Jan 28, 2023
1 parent 99dba97 commit f613fb8
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 13 deletions.
3 changes: 0 additions & 3 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,6 @@ func (cfg *cfg) validate() error {
}

if len(cfg.group) > 0 {
if len(cfg.topics) == 0 {
return errors.New("unable to consume from a group when no topics are specified")
}
if len(cfg.partitions) != 0 {
return errors.New("invalid direct-partition consuming option when consuming as a group")
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,12 +290,10 @@ func (c *consumer) init(cl *Client) {
c.sourcesReadyCond = sync.NewCond(&c.sourcesReadyMu)
c.pollWaitC = sync.NewCond(&c.pollWaitMu)

if len(cl.cfg.topics) == 0 && len(cl.cfg.partitions) == 0 {
return // not consuming
if len(cl.cfg.topics) > 0 || len(cl.cfg.partitions) > 0 {
defer cl.triggerUpdateMetadataNow("querying metadata for consumer initialization") // we definitely want to trigger a metadata update
}

defer cl.triggerUpdateMetadataNow("client initialization") // we definitely want to trigger a metadata update

if len(cl.cfg.group) == 0 {
c.initDirect()
} else {
Expand Down
6 changes: 1 addition & 5 deletions pkg/kgo/consumer_direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ func (d *directConsumer) findNewAssignments() map[string]map[int32]Offset {

toUse := make(map[string]map[int32]Offset, 10)
for topic, topicPartitions := range topics {
// If we are using regex topics, we have to check all
// topic regexes to see if any match on this topic.
var useTopic bool
useTopic := true
if d.cfg.regex {
want, seen := d.reSeen[topic]
if !seen {
Expand All @@ -77,8 +75,6 @@ func (d *directConsumer) findNewAssignments() map[string]map[int32]Offset {
d.reSeen[topic] = want
}
useTopic = want
} else {
_, useTopic = d.cfg.topics[topic]
}

// If the above detected that we want to keep this topic, we
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,8 @@ func (s *sink) produce(sem <-chan struct{}) bool {

batches := req.batches.sliced()
s.doSequenced(req, func(br *broker, resp kmsg.Response, err error) {
s.cl.producer.decInflight()
s.handleReqResp(br, req, resp, err)
s.cl.producer.decInflight()
batches.eachOwnerLocked((*recBatch).decInflight)
<-sem
})
Expand Down

0 comments on commit f613fb8

Please sign in to comment.