Skip to content

Commit

Permalink
group consumer: revoke all on LeaveGroup, properly blocking commit
Browse files Browse the repository at this point in the history
Leaving the group cancels the group context. This context is used in
defaultRevoke, meaning that leaving a group prevents an offset commit in
onRevoke to be successful. Now, we use the client context, which is only
canceled after unsetting a consumer, which waits for the group to be
left.

Additionally, we now revoke all offsets if cooperative if we are leaving
the group. Previously, we would rely on the manage logic to call
onRevoke, but this also sometimes calls onLost. Technically with onLost,
it is "too late" to commit at that point. We do not want to imply that
if a user specifies their own OnLost.

Finally, we default onLost to defaultRevoke. We implicitly did this
before by falling into onRevoke if onLost was nil, but we may as well
explicitly do this.
  • Loading branch information
twmb committed Apr 7, 2021
1 parent 887c134 commit 6318b15
Showing 1 changed file with 19 additions and 10 deletions.
29 changes: 19 additions & 10 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ func (cl *Client) AssignGroup(group string, opts ...GroupOpt) {
}
if c.cl.cfg.txnID == nil {
g.onRevoked = g.defaultRevoke
g.onLost = g.defaultRevoke
} else {
g.autocommitDisable = true
}
Expand Down Expand Up @@ -605,15 +606,19 @@ const (
//
// Lastly, for cooperative consumers, this must selectively delete what was
// lost from the uncommitted map.
func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32) {
if !g.cooperative { // stage == revokeThisSession if not cooperative
func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leaving bool) {
if !g.cooperative || leaving { // stage == revokeThisSession if not cooperative
// If we are an eager consumer, we stop fetching all of our
// current partitions as we will be revoking them.
g.c.mu.Lock()
g.c.assignPartitions(nil, assignInvalidateAll)
g.c.mu.Unlock()

g.cl.cfg.logger.Log(LogLevelInfo, "eager consumer revoking prior assigned partitions", "revoking", g.nowAssigned)
if !g.cooperative {
g.cl.cfg.logger.Log(LogLevelInfo, "eager consumer revoking prior assigned partitions", "revoking", g.nowAssigned)
} else {
g.cl.cfg.logger.Log(LogLevelInfo, "cooperative consumer revoking prior assigned partitions because leaving group", "revoking", g.nowAssigned)
}
if g.onRevoked != nil {
g.onRevoked(g.ctx, g.nowAssigned)
}
Expand Down Expand Up @@ -736,7 +741,7 @@ func (s *assignRevokeSession) prerevoke(g *groupConsumer, lost map[string][]int3
go func() {
defer close(s.prerevokeDone)
if g.cooperative && len(lost) > 0 {
g.revoke(revokeLastSession, lost)
g.revoke(revokeLastSession, lost, false)
}
}()
return s.prerevokeDone
Expand Down Expand Up @@ -765,13 +770,11 @@ func (s *assignRevokeSession) assign(g *groupConsumer, newAssigned map[string][]
// This may not run before returning from the heartbeat loop: if we encounter a
// fatal error, we return before revoking so that we can instead call onLost in
// the manage loop.
func (s *assignRevokeSession) revoke(g *groupConsumer) <-chan struct{} {
func (s *assignRevokeSession) revoke(g *groupConsumer, leaving bool) <-chan struct{} {
go func() {
defer close(s.revokeDone)
<-s.assignDone
if g.onRevoked != nil {
g.revoke(revokeThisSession, nil)
}
g.revoke(revokeThisSession, nil, leaving)
}()
return s.revokeDone
}
Expand Down Expand Up @@ -960,7 +963,10 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio
// Now we call the user provided revoke callback, even
// if cooperative: if cooperative, this only revokes
// partitions we no longer want to consume.
revoked = s.revoke(g)
//
// If the err is context.Canceled, the group is being
// left and we revoke everything.
revoked = s.revoke(g, err == context.Canceled)
}
// Since we errored, while waiting for the revoke to finish, we
// update our metadata. A leader may have re-joined with new
Expand Down Expand Up @@ -1930,7 +1936,10 @@ func (cl *Client) CommitOffsets(
func (g *groupConsumer) defaultRevoke(_ context.Context, _ map[string][]int32) {
if !g.autocommitDisable {
un := g.getUncommitted()
g.cl.BlockingCommitOffsets(g.ctx, un, func(_ *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
// We use the client's context rather than the group context,
// because this could come from the group being left. The group
// context will already be canceled.
g.cl.BlockingCommitOffsets(g.cl.ctx, un, func(_ *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
if err != nil {
if err != ErrNotGroup && err != context.Canceled {
g.cl.cfg.logger.Log(LogLevelError, "default revoke BlockingCommitOffsets failed", "err", err)
Expand Down

0 comments on commit 6318b15

Please sign in to comment.