Skip to content

Commit

Permalink
sink: log all aspects of wanting to / failing records
Browse files Browse the repository at this point in the history
For #239, this will make debugging easier.
  • Loading branch information
twmb committed Nov 29, 2022
1 parent 9ee5efa commit 56fcfb4
Showing 1 changed file with 21 additions and 11 deletions.
32 changes: 21 additions & 11 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -1255,22 +1255,32 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) {
if len(recBuf.batches) == 0 {
return
}
recBuf.cl.cfg.logger.Log(LogLevelWarn, "produce partition load error, bumping error count on first stored batch", "broker", logID(recBuf.sink.nodeID), "topic", recBuf.topic, "partition", recBuf.partition, "err", err)
batch0 := recBuf.batches[0]
batch0.tries++

canFail := !recBuf.cl.idempotent() || batch0.canFailFromLoadErrs
if !canFail {
return
}

batch0Fail := batch0.maybeFailErr(&recBuf.cl.cfg) != nil // timeout, retries, or aborting
var (
canFail = !recBuf.cl.idempotent() || batch0.canFailFromLoadErrs // we can only fail if we are not idempotent or if we have no outstanding requests
batch0Fail = batch0.maybeFailErr(&recBuf.cl.cfg) != nil // timeout, retries, or aborting
netErr = isRetriableBrokerErr(err) || isDialErr(err) // we can fail if this is *not* a network error
retriableKerr = kerr.IsRetriable(err) // we fail if this is not a retriable kerr,
isUnknownLimit = recBuf.checkUnknownFailLimit(err) // or if it is, but it is UnknownTopicOrPartition and we are at our limit

okNet := !isRetriableBrokerErr(err) && !isDialErr(err) // we can fail if this is *not* a network error
retriableKerr := kerr.IsRetriable(err) // we fail if this is not a retriable kerr,
isUnknownLimit := recBuf.checkUnknownFailLimit(err) // or if it is, but it is UnknownTopicOrPartition and we are at our limit
willFail = canFail && (batch0Fail || !netErr && (!retriableKerr || retriableKerr && isUnknownLimit))
)

if batch0Fail || okNet && (!retriableKerr || retriableKerr && isUnknownLimit) {
recBuf.cl.cfg.logger.Log(LogLevelWarn, "produce partition load error, bumping error count on first stored batch",
"broker", logID(recBuf.sink.nodeID),
"topic", recBuf.topic,
"partition", recBuf.partition,
"err", err,
"can_fail", canFail,
"batch0_should_fail", batch0Fail,
"is_network_err", netErr,
"is_retriable_kerr", retriableKerr,
"is_unknown_limit", isUnknownLimit,
"will_fail", willFail,
)
if willFail {
recBuf.failAllRecords(err)
}
}
Expand Down

0 comments on commit 56fcfb4

Please sign in to comment.