Skip to content

Commit

Permalink
client improvement: retry more when input brokers are invalid
Browse files Browse the repository at this point in the history
If a user passed two seeds and one is invalid, we can still operate with
some degree of success. Previously, we would fail a request immediately
if it was attempted on the invalid seed. However, if the next seed is
valid, we should be able to recover, eventually load metadata, and
always be successful.

We will now retry on broker-specific network errors if they are not
context related, and if the next broker we would retry on is different.
  • Loading branch information
twmb committed Aug 18, 2021
1 parent f0d458b commit ec0c992
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 7 deletions.
32 changes: 25 additions & 7 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,10 @@ func (cl *Client) shouldRetry(tries int, err error) bool {
return (kerr.IsRetriable(err) || isRetriableBrokerErr(err)) && int64(tries) < cl.cfg.retries
}

func (cl *Client) shouldRetryNext(tries int, err error) bool {
return isSkippableBrokerErr(err) && int64(tries) < cl.cfg.retries
}

type retriable struct {
cl *Client
br func() (*broker, error)
Expand All @@ -601,9 +605,11 @@ func (r *retriable) Request(ctx context.Context, req kmsg.Request) (kmsg.Respons
tries := 0
tryStart := time.Now()
retryTimeout := r.cl.cfg.retryTimeout(req.Key())

next, nextErr := r.br()
start:
tries++
br, err := r.br()
br, err := next, nextErr
r.last = br
var resp kmsg.Response
var retryErr error
Expand All @@ -613,13 +619,25 @@ start:
retryErr = r.parseRetryErr(resp)
}
}
if err != nil || retryErr != nil {
if retryTimeout == 0 || time.Since(tryStart) <= retryTimeout {
if (r.cl.shouldRetry(tries, err) || r.cl.shouldRetry(tries, retryErr)) &&
(r.limitRetries == 0 || tries < r.limitRetries) &&
r.cl.waitTries(ctx, tries) {

goto start
if err != nil || retryErr != nil {
if r.limitRetries == 0 || tries < r.limitRetries {
if retryTimeout == 0 || time.Since(tryStart) <= retryTimeout {
// If this broker / request had a retriable error, we can
// just retry now. If the error is *not* retriable but
// is a broker-specific network error, and the next
// broker is different than the current, we also retry.
if r.cl.shouldRetry(tries, err) || r.cl.shouldRetry(tries, retryErr) {
if r.cl.waitTries(ctx, tries) {
next, err = r.br()
goto start
}
} else if r.cl.shouldRetryNext(tries, err) {
next, nextErr = r.br()
if next != br && r.cl.waitTries(ctx, tries) {
goto start
}
}
}
}
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/kgo/errors.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package kgo

import (
"context"
"errors"
"fmt"
"net"
"os"
)

Expand Down Expand Up @@ -45,6 +47,24 @@ func isRetriableBrokerErr(err error) bool {
return false
}

func isSkippableBrokerErr(err error) bool {
// Some broker errors are not retriable for the given broker itself,
// but we *could* skip the broker and try again on the next broker. For
// example, if the user input an invalid address and a valid address
// for seeds, when we fail dialing the first seed, we cannot retry that
// broker, but we can skip to the next.
//
// We take anything that returns an OpError that *is not* a context
// error deep inside.
var ne *net.OpError
if errors.As(err, &ne) &&
!errors.Is(err, context.Canceled) &&
!errors.Is(err, context.DeadlineExceeded) {
return true
}
return false
}

var (
//////////////
// INTERNAL // -- when used multiple times or checked in different areas of the client
Expand Down

0 comments on commit ec0c992

Please sign in to comment.