Skip to content

Commit

Permalink
consumer group: bugfix cooperative rebalancing losing offset fetches
Browse files Browse the repository at this point in the history
An immediate rebalance can kill offset fetching but keep assignments,
and when the rebalance completes, the client does not continue fetching
offsets from prior assignments that the group member still has.

This commit fixes that by persisting a bit of state across rebalances
for the cooperative consumer. See the embedded comments for more
details.

Closes #98.
  • Loading branch information
twmb committed Oct 25, 2021
1 parent 696392e commit 4f2e7fe
Showing 1 changed file with 97 additions and 8 deletions.
105 changes: 97 additions & 8 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ type groupConsumer struct {
lastAssigned map[string][]int32 // only updated in join&sync loop
nowAssigned map[string][]int32 // only updated in join&sync loop

// Fetching ensures we continue fetching offsets across cooperative
// rebalance if an offset fetch returns early due to an immediate
// rebalance. See the large comment on adjustCooperativeFetchOffsets
// for more details.
//
// This is modified only in that function, or in the manage loop on a
// hard error once the heartbeat/fetch has returned.
fetching map[string]map[int32]struct{}

// leader is whether we are the leader right now. This is set to false
//
// - set to false at the beginning of a join group session
Expand Down Expand Up @@ -284,6 +293,7 @@ func (g *groupConsumer) manage() {

g.nowAssigned = nil
g.lastAssigned = nil
g.fetching = nil

g.leader.set(false)
}
Expand Down Expand Up @@ -653,7 +663,7 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() error {
defer close(fetchDone)
defer close(fetchErrCh)
g.cfg.logger.Log(LogLevelInfo, "fetching offsets for added partitions", "group", g.cfg.group, "added", added)
fetchErrCh <- g.fetchOffsets(ctx, added)
fetchErrCh <- g.fetchOffsets(ctx, added, lost)
}()
} else {
close(fetchDone)
Expand Down Expand Up @@ -1062,27 +1072,100 @@ func (g *groupConsumer) joinGroupProtocols() []kmsg.JoinGroupRequestProtocol {
return protos
}

// If we are cooperatively consuming, we have a potential problem: if fetch
// offsets is canceled due to an immediate rebalance, when we resume, we will
// not re-fetch offsets for partitions we were previously assigned and are
// still assigned. We will only fetch offsets for new assignments.
//
// To work around that issue, we track everything we are fetching in g.fetching
// and only clear g.fetching if fetchOffsets returns with no error.
//
// Now, if fetching returns early due to an error, when we rejoin and re-fetch,
// we will resume fetching what we were previously:
//
// * first we remove what was lost
// * then we add anything new
// * then we translate our total set into the "added" list to be fetched on return
//
// Any time a group is completely lost, the manage loop clears fetching. When
// cooperative consuming, a hard error is basically losing the entire state and
// rejoining from scratch.
func (g *groupConsumer) adjustCooperativeFetchOffsets(added, lost map[string][]int32) map[string][]int32 {
if g.fetching != nil {
// We were fetching previously: remove anything lost.
for topic, partitions := range lost {
ft := g.fetching[topic]
if ft == nil {
continue // we were not fetching this topic
}
for _, partition := range partitions {
delete(ft, partition)
}
if len(ft) == 0 {
delete(g.fetching, topic)
}
}
} else {
// We were not fetching previously: start a new map for what we
// are adding.
g.fetching = make(map[string]map[int32]struct{})
}

// Merge everything we are newly fetching to our fetching map.
for topic, partitions := range added {
ft := g.fetching[topic]
if ft == nil {
ft = make(map[int32]struct{}, len(partitions))
g.fetching[topic] = ft
}
for _, partition := range partitions {
ft[partition] = struct{}{}
}
}

// Now translate our full set (previously fetching ++ newly fetching --
// lost) into a new "added" map to be fetched.
added = make(map[string][]int32, len(g.fetching))
for topic, partitions := range g.fetching {
ps := make([]int32, 0, len(partitions))
for partition := range partitions {
ps = append(ps, partition)
}
added[topic] = ps
}
return added
}

// fetchOffsets is issued once we join a group to see what the prior commits
// were for the partitions we were assigned.
func (g *groupConsumer) fetchOffsets(ctx context.Context, newAssigned map[string][]int32) error {
func (g *groupConsumer) fetchOffsets(ctx context.Context, added, lost map[string][]int32) (err error) {
// If cooperative consuming, we may have to resume fetches. See the
// comment on adjustCooperativeFetchOffsets. If we successfully fetch,
// we clear what we were fetching.
if g.cooperative {
added = g.adjustCooperativeFetchOffsets(added, lost)
defer func() {
if err == nil {
g.fetching = nil
}
}()
}

// Our client maps the v0 to v7 format to v8+ when sharding this
// request, if we are only requesting one group, as well as maps the
// response back, so we do not need to worry about v8+ here.
start:
req := kmsg.NewPtrOffsetFetchRequest()
req.Group = g.cfg.group
req.RequireStable = g.cfg.requireStable
for topic, partitions := range newAssigned {
for topic, partitions := range added {
reqTopic := kmsg.NewOffsetFetchRequestTopic()
reqTopic.Topic = topic
reqTopic.Partitions = partitions
req.Topics = append(req.Topics, reqTopic)
}

var (
resp *kmsg.OffsetFetchResponse
err error
)
var resp *kmsg.OffsetFetchResponse

fetchDone := make(chan struct{})
go func() {
Expand All @@ -1092,7 +1175,7 @@ start:
select {
case <-fetchDone:
case <-ctx.Done():
g.cfg.logger.Log(LogLevelError, "fetch offsets failed due to context cancelation", "group", g.cfg.group)
g.cfg.logger.Log(LogLevelInfo, "fetch offsets failed due to context cancelation", "group", g.cfg.group)
return ctx.Err()
}
if err != nil {
Expand Down Expand Up @@ -1126,6 +1209,12 @@ start:
goto start
}
}
g.cfg.logger.Log(LogLevelError, "fetch offsets failed",
"group", g.cfg.group,
"topic", rTopic.Topic,
"partition", rPartition.Partition,
"err", err,
)
return err
}
offset := Offset{
Expand Down

0 comments on commit 4f2e7fe

Please sign in to comment.