Skip to content

Commit

Permalink
goroutine per consumer: add proper locking
Browse files Browse the repository at this point in the history
  • Loading branch information
twmb committed Sep 17, 2021
1 parent c153b9a commit 43cdcad
Showing 1 changed file with 8 additions and 0 deletions.
8 changes: 8 additions & 0 deletions examples/goroutine_per_partition_consuming/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -62,10 +63,13 @@ func (pc *pconsumer) consume(topic string, partition int32) {
}

type splitConsume struct {
mu sync.Mutex // gaurds assigning / losing vs. polling
consumers map[string]map[int32]pconsumer
}

func (s *splitConsume) assigned(_ context.Context, cl *kgo.Client, assigned map[string][]int32) {
s.mu.Lock()
defer s.mu.Unlock()
for topic, partitions := range assigned {
if s.consumers[topic] == nil {
s.consumers[topic] = make(map[int32]pconsumer)
Expand All @@ -82,6 +86,8 @@ func (s *splitConsume) assigned(_ context.Context, cl *kgo.Client, assigned map[
}

func (s *splitConsume) lost(_ context.Context, cl *kgo.Client, lost map[string][]int32) {
s.mu.Lock()
defer s.mu.Unlock()
for topic, partitions := range lost {
ptopics := s.consumers[topic]
for _, partition := range partitions {
Expand Down Expand Up @@ -148,6 +154,8 @@ func (s *splitConsume) poll(cl *kgo.Client) {
panic(err)
})
fetches.EachTopic(func(t kgo.FetchTopic) {
s.mu.Lock()
defer s.mu.Unlock()
tconsumers := s.consumers[t.Topic]
if tconsumers == nil {
return
Expand Down

0 comments on commit 43cdcad

Please sign in to comment.