Skip to content

Commit

Permalink
consumer: backoff when list offsets or load epoch has any err
Browse files Browse the repository at this point in the history
We used to loop immediately to retry if we had any retriable error. We
may as well just always backoff to ensure we have no request spinloops.
  • Loading branch information
twmb committed Apr 22, 2021
1 parent 9c27589 commit a3c2a5c
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 20 deletions.
42 changes: 22 additions & 20 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1032,16 +1032,24 @@ func (s *consumerSession) listOrEpoch(waiting listOrEpochLoads, immediate bool)
// Called within a consumer session, this function handles results from list
// offsets or epoch loads.
func (s *consumerSession) handleListOrEpochResults(loaded loadedOffsets) {
var (
// Retriable errors are retried immediately, while
// non-retriable errors are retried after a 1s backoff. It is
// unlikely that the client will be able to recover, but we may
// as well fetch every second to (a) force the user to notice
// errors, and (b) allow the user to auth the client at
// runtime.
reloads listOrEpochLoads
slowReloads listOrEpochLoads
)
// All errors are retried after a 1s backoff. We either have request
// level retriable errors (unknown partition, etc) or non-retriable
// errors (auth), or we have request issuing errors (no dial,
// connection cut repeatedly).
//
// For retriable request errors, we may as well back off a little bit
// to allow Kafka to harmonize if the topic exists / etc.
//
// For non-retriable request errors, we may as well retry to both (a)
// allow the user more signals about a problem that they can maybe fix
// within Kafka (i.e. the auth), and (b) force the user to notice
// errors.
//
// For request issuing errors, we may as well continue to retry because
// there is not much else we can do. RequestWith already retries, but
// returns when the retry limit is hit. We will backoff 1s and then
// allow RequestWith to continue requesting and backing off.
var reloads listOrEpochLoads
defer func() {
// When we are done handling results, we have finished loading
// all the topics and partitions. We remove them from tracking
Expand All @@ -1052,21 +1060,17 @@ func (s *consumerSession) handleListOrEpochResults(loaded loadedOffsets) {
}
s.listOrEpochMu.Unlock()

// We now add our immediate reloads back to the session. We are
// still in the context of the live session itself because this
// handling function is run with a session worker.
reloads.loadWithSession(s)
if !slowReloads.isEmpty() {
if !reloads.isEmpty() {
s.incWorker()
go func() {
// Before we dec our worker, we must add the slow
// Before we dec our worker, we must add the
// reloads back into the session's waiting loads.
// Doing so allows a concurrent stopSession to
// track the waiting loads, whereas if we did not
// add things back to the session, we could abandon
// loading these offsets and have a stuck cursor.
defer s.decWorker()
defer slowReloads.loadWithSession(s)
defer reloads.loadWithSession(s)
after := time.NewTimer(time.Second)
defer after.Stop()
select {
Expand Down Expand Up @@ -1097,12 +1101,10 @@ func (s *consumerSession) handleListOrEpochResults(loaded loadedOffsets) {
use()

default: // from ErrorCode in a response
reloads.addLoad(load.topic, load.partition, loaded.loadType, load.request)
if !kerr.IsRetriable(load.err) && !isRetriableBrokerErr(load.err) { // non-retriable response error; signal such in a response
s.c.addFakeReadyForDraining(load.topic, load.partition, load.err)
slowReloads.addLoad(load.topic, load.partition, loaded.loadType, load.request)
continue
}
reloads.addLoad(load.topic, load.partition, loaded.loadType, load.request)
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,8 @@ func (g *groupConsumer) leave() (wait func()) {
"group", g.id,
"memberID", g.memberID, // lock not needed now since nothing can change it (manageDone)
)
// If we error when leaving, there is not much
// we can do. We may as well just return.
(&kmsg.LeaveGroupRequest{
Group: g.id,
MemberID: g.memberID,
Expand Down

0 comments on commit a3c2a5c

Please sign in to comment.