Skip to content

Commit

Permalink
client: create a sink & source for all partition replicas
Browse files Browse the repository at this point in the history
Previously, if using preferred read replicas, if a replica is not the
leader for any partition, then the sink&source is never created. The
leader will say "consume from 3", and the client will not have 3 saved
because it only saves the leader.

We now create a sink&source for all replicas of a partition as well.
Now, when a leader says "consume from 3", we should have 3 saved because
it will be in the Replicas portion of a MetadataResponseTopicPartition.

This also tidies up some repeated locking code for sinks and sources.
  • Loading branch information
twmb committed Dec 7, 2021
1 parent 8325ba7 commit 6bbdaa2
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 22 deletions.
9 changes: 9 additions & 0 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ type sinkAndSource struct {
source *source
}

func (cl *Client) allSinksAndSources(fn func(sns sinkAndSource)) {
cl.sinksAndSourcesMu.Lock()
defer cl.sinksAndSourcesMu.Unlock()

for _, sns := range cl.sinksAndSources {
fn(sns)
}
}

type hostport struct {
host string
port int32
Expand Down
32 changes: 10 additions & 22 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,13 +425,9 @@ func (cl *Client) PauseFetchPartitions(topicPartitions map[string][]int32) map[s
// paused. Resuming topics that are not currently paused is a per-topic no-op.
// See the documentation on PauseTfetchTopics for more details.
func (cl *Client) ResumeFetchTopics(topics ...string) {
defer func() {
cl.sinksAndSourcesMu.Lock()
for _, sns := range cl.sinksAndSources {
sns.source.maybeConsume()
}
cl.sinksAndSourcesMu.Unlock()
}()
defer cl.allSinksAndSources(func(sns sinkAndSource) {
sns.source.maybeConsume()
})

c := &cl.consumer
c.pausedMu.Lock()
Expand All @@ -447,13 +443,9 @@ func (cl *Client) ResumeFetchTopics(topics ...string) {
// per-topic no-op. See the documentation on PauseFetchPartitions for more
// details.
func (cl *Client) ResumeFetchPartitions(topicPartitions map[string][]int32) {
defer func() {
cl.sinksAndSourcesMu.Lock()
for _, sns := range cl.sinksAndSources {
sns.source.maybeConsume()
}
cl.sinksAndSourcesMu.Unlock()
}()
defer cl.allSinksAndSources(func(sns sinkAndSource) {
sns.source.maybeConsume()
})

c := &cl.consumer
c.pausedMu.Lock()
Expand Down Expand Up @@ -1123,11 +1115,9 @@ func (c *consumer) stopSession() (listOrEpochLoads, *topicsPartitions) {
// our num-fetches manager without worrying about a source trying to
// register itself.

c.cl.sinksAndSourcesMu.Lock()
for _, sns := range c.cl.sinksAndSources {
c.cl.allSinksAndSources(func(sns sinkAndSource) {
sns.source.session.reset()
}
c.cl.sinksAndSourcesMu.Unlock()
})

// At this point, if we begin fetching anew, then the sources will not
// be using stale fetch sessions.
Expand Down Expand Up @@ -1166,11 +1156,9 @@ func (c *consumer) startNewSession(tps *topicsPartitions) *consumerSession {

c.sessionChangeMu.Unlock()

c.cl.sinksAndSourcesMu.Lock()
for _, sns := range c.cl.sinksAndSources {
c.cl.allSinksAndSources(func(sns sinkAndSource) {
sns.source.maybeConsume()
}
c.cl.sinksAndSourcesMu.Unlock()
})

// At this point, any source that was not consuming becauase it saw the
// session was stopped has been notified to potentially start consuming
Expand Down
11 changes: 11 additions & 0 deletions pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,17 @@ func (cl *Client) fetchTopicMetadata(all bool, reqTopics []string) (map[string]*
}
cl.sinksAndSources[p.leader] = sns
}
for _, replica := range partMeta.Replicas {
if replica < 0 {
continue
}
if _, exists = cl.sinksAndSources[replica]; !exists {
cl.sinksAndSources[replica] = sinkAndSource{
sink: cl.newSink(replica),
source: cl.newSource(replica),
}
}
}
cl.sinksAndSourcesMu.Unlock()
p.records.sink = sns.sink
p.cursor.source = sns.source
Expand Down

0 comments on commit 6bbdaa2

Please sign in to comment.