Skip to content

Commit

Permalink
producer: retry logic changes
Browse files Browse the repository at this point in the history
producerID() could return errReloadProducerID, which would fail all
buffered records. Instead, we want to bump all records load errors by 1,
because this is actually a compeltely retriable error.

AddPartitionsToTxn was permanently looped, but we can use this same
bump-and-retry logic to be more consistent and remove an infinite loop.

---

When addressing KAFKA-12671, I put the lastRespSuccessful in the wrong
spot. It's not moved to a more obviously right spot. The addressing was
still successful in integration testing because we still waited for all
produce responses to finish before issuing EndTxn, and in integration
tests, all requests are successful (no request failures).

I also accidentally used incDrains/decDrains instead of
incIssues/decIssues. This was non-impactful, as both drains and issues
are waited on when aborting, but it was less clear. It may be worth it
to just merge them anyway, and I may do that soon.

---

Finally, we should set the initial failing state for a recBuf to whether
there is an ErrorCode. This will help us avoid trying to produce if we
know we loaded an error, which may make what follows this easier.
  • Loading branch information
twmb committed Apr 22, 2021
1 parent 8c21fcf commit 0554ad5
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 20 deletions.
2 changes: 2 additions & 0 deletions pkg/kgo/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ var (
// stopped due to a concurrent metadata response.
errChosenBrokerDead = errors.New("the internal broker struct chosen to issue this requesthas died--either the broker id is migrating or no longer exists")

errProducerIDLoadFail = errors.New("unable to initialize a producer ID due to request retry limits")

// A temporary error returned when Kafka replies with a different
// correlation ID than we were expecting for the request the client
// issued.
Expand Down
1 change: 1 addition & 0 deletions pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ func (cl *Client) fetchTopicMetadata(reqTopics []string) (map[string]*topicParti
maxRecordBatchBytes: cl.maxRecordBatchBytesForTopic(topicMeta.Topic),

recBufsIdx: -1,
failing: partMeta.ErrorCode != 0,
},

cursor: &cursor{
Expand Down
39 changes: 38 additions & 1 deletion pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,16 @@ func (cl *Client) producerID() (int64, int16, error) {
if keep {
id = newID
cl.producer.id.Store(id)
} else {
// If we are not keeping the producer ID,
// we will return our old ID but with a
// static error that we can check or bubble
// up where needed.
id = &producerID{
id: id.id,
epoch: id.epoch,
err: errProducerIDLoadFail,
}
}
}
}
Expand Down Expand Up @@ -459,7 +469,7 @@ func (cl *Client) addUnknownTopicRecord(pr promisedRec) {
if unknown == nil {
unknown = &unknownTopicProduces{
buffered: make([]promisedRec, 0, 100),
wait: make(chan error, 1),
wait: make(chan error, 5),
}
cl.unknownTopics[pr.Topic] = unknown
}
Expand Down Expand Up @@ -577,6 +587,33 @@ func (cl *Client) Flush(ctx context.Context) error {
}
}

// Bumps the tries for all buffered records in the client.
//
// This is called whenever there is a problematic error that would affect the
// state of all buffered records as a whole:
//
// - if we cannot init a producer ID due to RequestWith errors, producing is useless
// - if we cannot add partitions to a txn due to RequestWith errors, producing is useless
//
// Note that these are specifically due to RequestWith errors, not due to
// receiving a response that has a retriable error code. That is, if our
// request keeps dying.
func (cl *Client) bumpRepeatedLoadErr(err error) {
for _, partitions := range cl.loadTopics() {
for _, partition := range partitions.load().partitions {
partition.records.bumpRepeatedLoadErr(err)
}
}
cl.unknownTopicsMu.Lock()
defer cl.unknownTopicsMu.Unlock()
for _, unknown := range cl.unknownTopics {
select {
case unknown.wait <- err:
default:
}
}
}

// Clears all buffered records in the client with the given error.
func (cl *Client) failBufferedRecords(err error) {
for _, partitions := range cl.loadTopics() {
Expand Down
49 changes: 30 additions & 19 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,20 +264,30 @@ func (s *sink) produce(sem <-chan struct{}) bool {
}()

// producerID can fail from:
// - retry failure
// - auth failure
// - transactional: a produce failure that failed the producer ID
// - AddPartitionsToTxn failure (see just below)
// All of these are fatal. Recovery may be possible with EndTransaction
// in specific cases, but regardless, all buffered records must fail.
//
// All but the first error is fatal. Recovery may be possible with
// EndTransaction in specific cases, but regardless, all buffered
// records must fail.
//
// We init the producer ID before creating a request to ensure we are
// always using the latest id/epoch with the proper sequence numbers.
id, epoch, err := s.cl.producerID()
if err != nil {
if err != errClientClosing {
switch err {
case errProducerIDLoadFail:
s.cl.bumpRepeatedLoadErr(err)
s.cl.cfg.logger.Log(LogLevelWarn, "unable to load producer ID, bumping client's buffered record load errors by 1 and retrying")
return true // whatever caused our produce, we did nothing, so keep going
default:
s.cl.cfg.logger.Log(LogLevelError, "fatal InitProducerID error, failing all buffered records", "broker", s.nodeID, "err", err)
fallthrough
case errClientClosing:
s.cl.failBufferedRecords(err)
}
s.cl.failBufferedRecords(err)
return false
}

Expand All @@ -289,6 +299,7 @@ func (s *sink) produce(sem <-chan struct{}) bool {

if txnReq != nil {
// txnReq can fail from:
// - retry failure
// - auth failure
// - producer id mapping / epoch errors
// The latter case can potentially recover with the kip logic
Expand All @@ -297,9 +308,16 @@ func (s *sink) produce(sem <-chan struct{}) bool {
// We do not need to clear the addedToTxn flag for any recBuf
// it was set on, since producer id recovery resets the flag.
if err := s.doTxnReq(req, txnReq); err != nil {
s.cl.failProducerID(id, epoch, err)
s.cl.cfg.logger.Log(LogLevelError, "fatal AddPartitionsToTxn error, failing all buffered records (it is possible the client can recover after EndTransaction)", "broker", s.nodeID, "err", err)
s.cl.failBufferedRecords(err)
switch {
case isRetriableBrokerErr(err):
s.cl.bumpRepeatedLoadErr(err)
s.cl.cfg.logger.Log(LogLevelWarn, "unable to AddPartitionsToTxn due to retriable broker err, bumping client's buffered record load errors by 1 and retrying", "err", err)
return moreToDrain || len(req.batches) > 0
default:
s.cl.failProducerID(id, epoch, err)
s.cl.cfg.logger.Log(LogLevelError, "fatal AddPartitionsToTxn error, failing all buffered records (it is possible the client can recover after EndTransaction)", "broker", s.nodeID, "err", err)
s.cl.failBufferedRecords(err)
}
return false
}
}
Expand All @@ -312,16 +330,17 @@ func (s *sink) produce(sem <-chan struct{}) bool {

// Add that we are issuing and then check if we are aborting: this
// order ensures that we will do not produce after aborting is set.
s.cl.producer.incDrains()
s.cl.producer.incIssues()
if s.cl.producer.isAborting() {
s.cl.producer.decDrains()
s.cl.producer.decIssues()
return false
}

produced = true
s.doSequenced(req, func(resp kmsg.Response, err error) {
s.lastRespSuccessful = err == nil
s.handleReqResp(req, resp, err)
s.cl.producer.decDrains()
s.cl.producer.decIssues()
<-sem
})
return moreToDrain
Expand Down Expand Up @@ -381,15 +400,9 @@ func (s *sink) doTxnReq(
req *produceRequest,
txnReq *kmsg.AddPartitionsToTxnRequest,
) error {
start:
err := s.cl.doWithConcurrentTransactions("AddPartitionsToTxn", func() error {
return s.cl.doWithConcurrentTransactions("AddPartitionsToTxn", func() error {
return s.issueTxnReq(req, txnReq)
})
if isRetriableBrokerErr(err) {
s.cl.cfg.logger.Log(LogLevelWarn, "AddPartitionsToTxn repeatedly failed, last failure is retriable broker err, retrying issue", "broker", s.nodeID, "err", err)
goto start
}
return err
}

func (s *sink) issueTxnReq(
Expand All @@ -398,10 +411,8 @@ func (s *sink) issueTxnReq(
) error {
resp, err := txnReq.RequestWith(s.cl.ctx, s.cl)
if err != nil {
s.lastRespSuccessful = false
return err
}
s.lastRespSuccessful = true

for _, topic := range resp.Topics {
topicBatches, ok := req.batches[topic.Topic]
Expand Down

0 comments on commit 0554ad5

Please sign in to comment.