Skip to content

Commit

Permalink
metadata: allow leader epoch rewinds after 5 tries
Browse files Browse the repository at this point in the history
See the embedded comments: if the entire cluster is in a bad state and
lost a leader epoch update, we should allow the client to recover. If we
"recover" into in accidental rewind, other aspects of the client will be
notified (produce may see NOT_LEADER_FOR_PARTITION, consume will see
FENCED_LEADER_EPOCH).

Closes #119.
  • Loading branch information
twmb committed Jan 6, 2022
1 parent 029e655 commit 12eaa1e
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 20 deletions.
63 changes: 43 additions & 20 deletions pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ func (cl *Client) updateMetadataLoop() {
}
}

again, err, why := cl.updateMetadata()
if again || err != nil {
retryWhy, err := cl.updateMetadata()
if retryWhy != nil || err != nil {
// If err is non-nil, the metadata request failed
// itself and already retried 3x; we do not loop more.
//
Expand Down Expand Up @@ -180,7 +180,7 @@ func (cl *Client) updateMetadataLoop() {
if err != nil {
cl.triggerUpdateMetadata(true, fmt.Sprintf("re-updating metadata due to err: %s", err))
} else {
cl.triggerUpdateMetadata(true, why.reason("re-updating due to inner errors"))
cl.triggerUpdateMetadata(true, retryWhy.reason("re-updating due to inner errors"))
}
}
if err == nil {
Expand All @@ -207,7 +207,7 @@ func (cl *Client) updateMetadataLoop() {
// The producer and consumer use different topic maps and underlying
// topicPartitionsData pointers, but we update those underlying pointers
// equally.
func (cl *Client) updateMetadata() (needsRetry bool, err error, why multiUpdateWhy) {
func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) {
defer cl.metawait.signal()
defer cl.consumer.doOnMetadataUpdate()

Expand Down Expand Up @@ -247,7 +247,7 @@ func (cl *Client) updateMetadata() (needsRetry bool, err error, why multiUpdateW
tpsProducerLoad,
err,
)
return true, err, nil
return nil, err
}

// If we are consuming with regex and fetched all topics, the metadata
Expand Down Expand Up @@ -305,14 +305,14 @@ func (cl *Client) updateMetadata() (needsRetry bool, err error, why multiUpdateW
}
continue
}
needsRetry = needsRetry || cl.mergeTopicPartitions(
cl.mergeTopicPartitions(
topic,
priorParts,
newParts,
m.isProduce,
&reloadOffsets,
stopConsumerSession,
&why,
&retryWhy,
)
}
}
Expand All @@ -324,7 +324,7 @@ func (cl *Client) updateMetadata() (needsRetry bool, err error, why multiUpdateW
)
}

return needsRetry, nil, why
return retryWhy, nil
}

// fetchTopicMetadata fetches metadata for all reqTopics and returns new
Expand Down Expand Up @@ -481,8 +481,8 @@ func (cl *Client) mergeTopicPartitions(
isProduce bool,
reloadOffsets *listOrEpochLoads,
stopConsumerSession func(),
why *multiUpdateWhy,
) (needsRetry bool) {
retryWhy *multiUpdateWhy,
) {
lv := *l.load() // copy so our field writes do not collide with reads

// Producers must store the update through a special function that
Expand All @@ -508,8 +508,8 @@ func (cl *Client) mergeTopicPartitions(
topicPartition.records.bumpRepeatedLoadErr(lv.loadErr)
}
}
why.add(topic, -1, r.loadErr)
return true
retryWhy.add(topic, -1, r.loadErr)
return
}

// Before the atomic update, we keep the latest partitions / writable
Expand Down Expand Up @@ -572,25 +572,47 @@ func (cl *Client) mergeTopicPartitions(
if isProduce {
newTP.records.bumpRepeatedLoadErr(newTP.loadErr)
}
needsRetry = true
why.add(topic, int32(part), newTP.loadErr)
retryWhy.add(topic, int32(part), newTP.loadErr)
continue
}

// If the new partition has an older leader epoch, then we
// fetched from an out of date broker. We just keep the old
// information.
if newTP.leaderEpoch < oldTP.leaderEpoch {
*newTP = *oldTP
// If we repeatedly rewind, then perhaps the cluster
// entered some bad state and lost forward progress.
// We will log & allow the rewind to allow the client
// to continue; other requests may encounter fenced
// epoch errors (and respectively recover).
//
// Five is a pretty low amount of retries, but since
// we iterate through known brokers, this basically
// means we keep stale metadata if five brokers all
// agree things rewound.
const maxEpochRewinds = 5
if oldTP.epochRewinds < maxEpochRewinds {
*newTP = *oldTP
newTP.epochRewinds++

cl.cfg.logger.Log(LogLevelDebug, "metadata leader epoch went backwards, ignoring update",
"topic", topic,
"partition", part,
"old_leader_epoch", oldTP.leaderEpoch,
"new_leader_epoch", newTP.leaderEpoch,
"current_num_rewinds", newTP.epochRewinds,
)
retryWhy.add(topic, int32(part), errEpochRewind)
continue
}

cl.cfg.logger.Log(LogLevelDebug, "metadata leader epoch went backwards, ignoring update",
cl.cfg.logger.Log(LogLevelInfo, "metadata leader epoch went backwards repeatedly, we are now keeping the metadata to allow forward progress",
"topic", topic,
"partition", part,
"old_epoch", oldTP.leaderEpoch,
"new_epoch", newTP.leaderEpoch,
"old_leader_epoch", oldTP.leaderEpoch,
"new_leader_epoch", newTP.leaderEpoch,
)

continue
}

// If the tp data is the same, we simply copy over the records
Expand Down Expand Up @@ -641,9 +663,10 @@ func (cl *Client) mergeTopicPartitions(
newTP.cursor.source.addCursor(newTP.cursor)
}
}
return needsRetry
}

var errEpochRewind = errors.New("epoch rewind")

type multiUpdateWhy map[kerrOrString]map[string]map[int32]struct{}

type kerrOrString struct {
Expand Down
9 changes: 9 additions & 0 deletions pkg/kgo/topics_and_partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,15 @@ type topicPartition struct {
// keep the old topicPartition data and the new error.
loadErr error

// If, on metadata refresh, the leader epoch for this partition goes
// backwards, we ignore the metadata refresh and signal the metadata
// should be reloaded: the broker we requested is stale. However, the
// broker could get into a bad state through some weird cluster failure
// scenarios. If we see the epoch rewind repeatedly, we eventually keep
// the metadata refresh. This is not detrimental and at worst will lead
// to the broker telling us to update our metadata.
epochRewinds uint8

// If we do not have a load error, we determine if the new
// topicPartition is the same or different from the old based on
// whether the data changed (leader or leader epoch, etc.).
Expand Down

0 comments on commit 12eaa1e

Please sign in to comment.