Skip to content

Commit

Permalink
consumer: add BlockRebalancesOnPoll option, AllowRebalances
Browse files Browse the repository at this point in the history
One strength of Sarama's consumer group is that it is easier to reason
about where record processing logic falls within the group management
lifecycle: if you are processing records, you can be sure that the group
will not rebalance underneath you. This is also a risk, though, if your
processing logic is so long that your group member is booted from the
group.

This client took the opposite approach of separating the group
management logic from the consuming logic. This essentially eliminated
the risk of long processing booting the member, but made it much more
difficult to reason about when to commit. There are plenty of
disclaimers about potential duplicates, and any non-transactional
consumer should expect duplicates at some point, but if a user wants to
opt for the simpler approach of consuming & processing within one group
generation, we should support that. In fact, if a user's processing loop
is very fast, we really should encourage it.

Helps #137.
  • Loading branch information
twmb committed Mar 1, 2022
1 parent 39af436 commit cffbee7
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 30 deletions.
54 changes: 51 additions & 3 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ type cfg struct {

adjustOffsetsBeforeAssign func(ctx context.Context, offsets map[string]map[int32]Offset) (map[string]map[int32]Offset, error)

blockRebalanceOnPoll bool

setAssigned bool
setRevoked bool
setLost bool
Expand Down Expand Up @@ -1383,6 +1385,43 @@ func RequireStableFetchOffsets() GroupOpt {
return groupOpt{func(cfg *cfg) { cfg.requireStable = true }}
}

// BlockRebalanceOnPoll switches the client to block rebalances whenever you
// poll until you explicitly call AllowRebalance. This option also ensures that
// any OnPartitions{Assigned,Revoked,Lost} callbacks are only called when you
// allow rebalances; they cannot be called if you have polled and are
// processing records.
//
// By default, a consumer group is managed completely independently of
// consuming. A rebalance may occur at any moment. If you poll records, and
// then a rebalance happens, and then you commit, you may be committing to
// partitions you no longer own. This will result in duplicates. In the worst
// case, you could rewind commits that a different member has already made
// (risking duplicates if another rebalance were to happen before that other
// member commits again).
//
// By blocking rebalancing after you poll until you call AllowRebalances, you
// can be sure that you commit records that your member currently owns.
// However, the big tradeoff is that by blocking rebalances, you put your group
// member at risk of waiting so long that the group member is kicked from the
// group because it exceeded the rebalance timeout. To compare clients, Sarama
// takes the default choice of blocking rebalancing; this option makes kgo more
// similar to Sarama.
//
// If you use this option, you should ensure that you always process records
// quickly, and that your OnPartitions{Assigned,Revoked,Lost} callbacks are
// fast. It is recommended you also use PollRecords rather than PollFetches so
// that you can bound how many records you process at once. You must always
// AllowRebalances when you are done processing the records you received. Only
// rebalances that lose partitions are blocked; rebalances that are strictly
// net additions or non-modifications do not block (the On callbacks are always
// blocked so that you can ensure their serialization).
//
// This function can largely replace any commit logic you may want to do in
// OnPartitionsRevoked.
func BlockRebalanceOnPoll() GroupOpt {
return groupOpt{func(cfg *cfg) { cfg.blockRebalanceOnPoll = true }}
}

// AdjustFetchOffsetsFn sets the function to be called when a group is joined
// after offsets are fetched for those partitions so that a user can adjust them
// before consumption begins.
Expand Down Expand Up @@ -1410,7 +1449,10 @@ func AdjustFetchOffsetsFn(adjustOffsetsBeforeAssign func(context.Context, map[st
// only canceled if the client is closed.
//
// This function is not called concurrent with any other On callback, and this
// function is given a new map that the user is free to modify.
// function is given a new map that the user is free to modify. This function
// can be called at any time you are polling or processing records. If you want
// to ensure this function is called serially with processing, consider the
// BlockRebalanceOnPoll option.
func OnPartitionsAssigned(onAssigned func(context.Context, *Client, map[string][]int32)) GroupOpt {
return groupOpt{func(cfg *cfg) { cfg.onAssigned, cfg.setAssigned = onAssigned, true }}
}
Expand Down Expand Up @@ -1438,7 +1480,10 @@ func OnPartitionsAssigned(onAssigned func(context.Context, *Client, map[string][
// OnPartitionsRevoked.
//
// This function is not called concurrent with any other On callback, and this
// function is given a new map that the user is free to modify.
// function is given a new map that the user is free to modify. This function
// can be called at any time you are polling or processing records. If you want
// to ensure this function is called serially with processing, consider the
// BlockRebalanceOnPoll option.
func OnPartitionsRevoked(onRevoked func(context.Context, *Client, map[string][]int32)) GroupOpt {
return groupOpt{func(cfg *cfg) { cfg.onRevoked, cfg.setRevoked = onRevoked, true }}
}
Expand All @@ -1454,7 +1499,10 @@ func OnPartitionsRevoked(onRevoked func(context.Context, *Client, map[string][]i
// lost and revoked, you can use OnPartitionsLostAsRevoked as a shortcut.
//
// This function is not called concurrent with any other On callback, and this
// function is given a new map that the user is free to modify.
// function is given a new map that the user is free to modify. This function
// can be called at any time you are polling or processing records. If you want
// to ensure this function is called serially with processing, consider the
// BlockRebalanceOnPoll option.
func OnPartitionsLost(onLost func(context.Context, *Client, map[string][]int32)) GroupOpt {
return groupOpt{func(cfg *cfg) { cfg.onLost, cfg.setLost = onLost, true }}
}
Expand Down
100 changes: 100 additions & 0 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,74 @@ type consumer struct {
sourcesReadyCond *sync.Cond
sourcesReadyForDraining []*source
fakeReadyForDraining []Fetch

pollWaitMu sync.Mutex
pollWaitC *sync.Cond
pollWaitState uint64 // 0 == nothing, low 32 bits: # pollers, high 32: # waiting rebalances
}

func (c *consumer) loadPaused() pausedTopics { return c.paused.Load().(pausedTopics) }
func (c *consumer) clonePaused() pausedTopics { return c.paused.Load().(pausedTopics).clone() }
func (c *consumer) storePaused(p pausedTopics) { c.paused.Store(p) }

func (c *consumer) waitAndAddPoller() {
if !c.cl.cfg.blockRebalanceOnPoll {
return
}
c.pollWaitMu.Lock()
defer c.pollWaitMu.Unlock()
for c.pollWaitState>>32 != 0 {
c.pollWaitC.Wait()
}
// Rebalance always takes priority, but if there are no active
// rebalances, our poll blocks rebalances.
c.pollWaitState++
}

func (c *consumer) unaddPoller() {
if !c.cl.cfg.blockRebalanceOnPoll {
return
}
c.pollWaitMu.Lock()
defer c.pollWaitMu.Unlock()
c.pollWaitState--
c.pollWaitC.Broadcast()
}

func (c *consumer) allowRebalance() {
if !c.cl.cfg.blockRebalanceOnPoll {
return
}
c.pollWaitMu.Lock()
defer c.pollWaitMu.Unlock()
// When allowing rebalances, the user is explicitly saying all pollers
// are done. We mask them out.
c.pollWaitState &= math.MaxUint32 << 32
c.pollWaitC.Broadcast()
}

func (c *consumer) waitAndAddRebalance() {
if !c.cl.cfg.blockRebalanceOnPoll {
return
}
c.pollWaitMu.Lock()
defer c.pollWaitMu.Unlock()
c.pollWaitState += 1 << 32
for c.pollWaitState&math.MaxUint32 != 0 {
c.pollWaitC.Wait()
}
}

func (c *consumer) unaddRebalance() {
if !c.cl.cfg.blockRebalanceOnPoll {
return
}
c.pollWaitMu.Lock()
defer c.pollWaitMu.Unlock()
c.pollWaitState -= 1 << 32
c.pollWaitC.Broadcast()
}

// BufferedFetchRecords returns the number of records currently buffered from
// fetching within the client.
//
Expand All @@ -198,6 +260,7 @@ func (c *consumer) init(cl *Client) {
c.cl = cl
c.paused.Store(make(pausedTopics))
c.sourcesReadyCond = sync.NewCond(&c.sourcesReadyMu)
c.pollWaitC = sync.NewCond(&c.pollWaitMu)

if len(cl.cfg.topics) == 0 && len(cl.cfg.partitions) == 0 {
return // not consuming
Expand Down Expand Up @@ -253,6 +316,12 @@ func (c *consumer) addFakeReadyForDraining(topic string, partition int32, err er
// has no topic, a partition of 0, and a partition error of ErrClientClosed.
// This can be used to detect if the client is closing and to break out of a
// poll loop.
//
// If you are group consuming, a rebalance can happen under the hood while you
// process the returned fetches. This can result in duplicate work, and you may
// accidentally commit to partitions that you no longer own. You can prevent
// this by using BlockRebalanceOnPoll, but this comes with different tradeoffs.
// See the documentation on BlockRebalanceOnPoll for more information.
func (cl *Client) PollFetches(ctx context.Context) Fetches {
return cl.PollRecords(ctx, 0)
}
Expand All @@ -273,6 +342,12 @@ func (cl *Client) PollFetches(ctx context.Context) Fetches {
// has no topic, a partition of 0, and a partition error of ErrClientClosed.
// This can be used to detect if the client is closing and to break out of a
// poll loop.
//
// If you are group consuming, a rebalance can happen under the hood while you
// process the returned fetches. This can result in duplicate work, and you may
// accidentally commit to partitions that you no longer own. You can prevent
// this by using BlockRebalanceOnPoll, but this comes with different tradeoffs.
// See the documentation on BlockRebalanceOnPoll for more information.
func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches {
if maxPollRecords == 0 {
maxPollRecords = -1
Expand All @@ -283,6 +358,15 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches {

var fetches Fetches
fill := func() {
if c.cl.cfg.blockRebalanceOnPoll {
c.waitAndAddPoller()
defer func() {
if len(fetches) == 0 {
c.unaddPoller()
}
}()
}

// A group can grab the consumer lock then the group mu and
// assign partitions. The group mu is grabbed to update its
// uncommitted map. Assigning partitions clears sources ready
Expand Down Expand Up @@ -387,6 +471,19 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches {
return fetches
}

// AllowRebalance allows a consumer group to rebalance if it was blocked by you
// polling records in tandem with the BlockRebalanceOnPoll option.
//
// You can poll many times before calling this function; this function
// internally resets the poll count and allows any blocked rebalances to
// continue. Rebalances take priority: if a rebalance is blocked, and you allow
// rebalances and then immediately poll, your poll will be blocked until the
// rebalance completes. Internally, this function simply waits for lost
// partitions to stop being fetched before allowing you to poll again.
func (cl *Client) AllowRebalance() {
cl.consumer.allowRebalance()
}

// PauseFetchTopics sets the client to no longer fetch the given topics and
// returns all currently paused topics. Paused topics persist until resumed.
// You can call this function with no topics to simply receive the list of
Expand Down Expand Up @@ -546,6 +643,9 @@ func (c *consumer) purgeTopics(topics []string) {
purgeAssignments[topic] = nil
}

c.waitAndAddRebalance()
defer c.unaddRebalance()

c.mu.Lock()
defer c.mu.Unlock()

Expand Down
Loading

0 comments on commit cffbee7

Please sign in to comment.