Skip to content

Commit

Permalink
kgo: add a debug log for stripping retryable errors from fetches
Browse files Browse the repository at this point in the history
  • Loading branch information
twmb committed Jul 8, 2023
1 parent b45d663 commit c5d0fc5
Showing 1 changed file with 18 additions and 5 deletions.
23 changes: 18 additions & 5 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,11 +796,20 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
) {
f = Fetch{Topics: make([]FetchTopic, 0, len(resp.Topics))}
var (
updateWhy multiUpdateWhy
numErrsStripped int
kip320 = s.cl.supportsOffsetForLeaderEpoch()
updateWhy multiUpdateWhy
debugWhyStripped multiUpdateWhy
numErrsStripped int
kip320 = s.cl.supportsOffsetForLeaderEpoch()
)

strip := func(t string, p int32, err error) {
numErrsStripped++
if s.cl.cfg.logger.Level() < LogLevelDebug {
return
}
debugWhyStripped.add(t, p, err)
}

for _, rt := range resp.Topics {
topic := rt.Topic
// v13 only uses topic IDs, so we have to map the response
Expand Down Expand Up @@ -866,7 +875,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
// OffsetNotAvailable: fetched from out of sync replica or a behind in-sync one (KIP-392 case 1 and case 2)
// UnknownTopicID: kafka has not synced the state on all brokers
// And other standard retryable errors.
numErrsStripped++
strip(topic, partition, fp.Err)
} else {
// - bad auth
// - unsupported compression
Expand Down Expand Up @@ -899,7 +908,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
} else if s.cl.cfg.keepRetryableFetchErrors {
keep = true
} else {
numErrsStripped++
strip(topic, partition, fp.Err)
}

case kerr.OffsetOutOfRange:
Expand Down Expand Up @@ -994,6 +1003,10 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
}
}

if s.cl.cfg.logger.Level() >= LogLevelDebug && len(debugWhyStripped) > 0 {
s.cl.cfg.logger.Log(LogLevelDebug, "fetch stripped partitions", "why", debugWhyStripped.reason(""))
}

return f, reloadOffsets, preferreds, req.numOffsets == numErrsStripped, updateMeta, updateWhy.reason("fetch had inner topic errors")
}

Expand Down

0 comments on commit c5d0fc5

Please sign in to comment.