Skip to content

Commit

Permalink
consuming: handle exact offset consuming better
Browse files Browse the repository at this point in the history
Previously, if an exact offset was before the start offset or after the
end, the client would loop on OffsetOutOfRange errors. Now, we validate
the bounds and reset either to the start offset or the end offset if the
exact offsetis too low or too high, respectively.

We also better document this behavior.
  • Loading branch information
twmb committed Feb 28, 2022
1 parent 2ab1978 commit f9cd625
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 21 deletions.
8 changes: 6 additions & 2 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1165,9 +1165,13 @@ func MaxConcurrentFetches(n int) ConsumerOpt {
// ConsumeResetOffset sets the offset to restart consuming from when a
// partition has no commits (for groups) or when beginning to consume a
// partition (for direct partition consuming), or when a fetch sees an
// OffsetOutOfRange error, overriding the default ConsumeStartOffset.
// OffsetOutOfRange error, overriding the default NewOffset().AtStart(), i.e.,
// the earliest offset.
//
// Defaults to: NewOffset().AtStart() / Earliest Offset
// If you are choosing an exact offset to reset to (NewOffset.At(#)), if the
// offset is before the partition's log start offset or after the high
// watermark, this will reset to the start offset or end offset, respectively.
// Relative offsets are only obeyed if they fall within bounds.
func ConsumeResetOffset(offset Offset) ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.resetOffset = offset }}
}
Expand Down
189 changes: 170 additions & 19 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kgo
import (
"context"
"fmt"
"sort"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -1497,23 +1498,125 @@ func (l *loadedOffsets) addAll(as []loadedOffset) loadedOffsets {
func (cl *Client) listOffsetsForBrokerLoad(ctx context.Context, broker *broker, load offsetLoadMap, tps *topicsPartitions, results chan<- loadedOffsets) {
loaded := loadedOffsets{broker: broker.meta.NodeID, loadType: loadTypeList}

kresp, err := broker.waitResp(ctx, load.buildListReq(cl.cfg.isolationLevel))
if err != nil {
req1, req2 := load.buildListReq(cl.cfg.isolationLevel)
var (
wg sync.WaitGroup
kresp2 kmsg.Response
err2 error
)
if req2 != nil {
wg.Add(1)
go func() {
defer wg.Done()
kresp2, err2 = broker.waitResp(ctx, req2)
}()
}
kresp, err := broker.waitResp(ctx, req1)
wg.Wait()
if err != nil || err2 != nil {
results <- loaded.addAll(load.errToLoaded(err))
return
}

topics := tps.load()
resp := kresp.(*kmsg.ListOffsetsResponse)
for _, rTopic := range resp.Topics {

// If we issued a second req to check that an exact offset is in
// bounds, then regrettably for safety, we have to ensure that the
// shapes of both responses match, and the topic & partition at each
// index matches. Anything that does not match is skipped (and would be
// a bug from Kafka), and we at the end return UnknownTopicOrPartition.
var resp2 *kmsg.ListOffsetsResponse
if req2 != nil {
resp2 = kresp2.(*kmsg.ListOffsetsResponse)
for _, r := range []*kmsg.ListOffsetsResponse{
resp,
resp2,
} {
ts := r.Topics
sort.Slice(ts, func(i, j int) bool {
return ts[i].Topic < ts[j].Topic
})
for i := range ts {
ps := ts[i].Partitions
sort.Slice(ps, func(i, j int) bool {
return ps[i].Partition < ps[j].Partition
})
}
}

lt := resp.Topics
rt := resp2.Topics
lkeept := lt[:0]
rkeept := rt[:0]
// Over each response, we only keep the topic if the topics match.
for len(lt) > 0 && len(rt) > 0 {
if lt[0].Topic < rt[0].Topic {
lt = lt[1:]
continue
}
if rt[0].Topic < lt[0].Topic {
rt = rt[1:]
continue
}
// As well, for topics that match, we only keep
// partitions that match. In this case, we also want
// both partitions to be error free, otherwise we keep
// an error on both. If one has old style offsets,
// both must.
lp := lt[0].Partitions
rp := rt[0].Partitions
lkeepp := lp[:0]
rkeepp := rp[:0]
for len(lp) > 0 && len(rp) > 0 {
if lp[0].Partition < rp[0].Partition {
lp = lp[1:]
continue
}
if rp[0].Partition < lp[0].Partition {
rp = rp[1:]
continue
}
if len(lp[0].OldStyleOffsets) > 0 && len(rp[0].OldStyleOffsets) == 0 ||
len(lp[0].OldStyleOffsets) == 0 && len(rp[0].OldStyleOffsets) > 0 {
lp = lp[1:]
rp = rp[1:]
continue
}
if lp[0].ErrorCode != 0 {
rp[0].ErrorCode = lp[0].ErrorCode
} else if rp[0].ErrorCode != 0 {
lp[0].ErrorCode = rp[0].ErrorCode
}
lkeepp = append(lkeepp, lp[0])
rkeepp = append(rkeepp, rp[0])
lp = lp[1:]
rp = rp[1:]
}
// Now we update the partitions in the topic we are
// keeping, and keep our topic.
lt[0].Partitions = lkeepp
rt[0].Partitions = rkeepp
lkeept = append(lkeept, lt[0])
rkeept = append(rkeept, rt[0])
lt = lt[1:]
rt = rt[1:]
}
// Finally, update each response with the topics we kept. The
// shapes and indices are the same.
resp.Topics = lkeept
resp2.Topics = rkeept
}

for i, rTopic := range resp.Topics {
topic := rTopic.Topic
loadParts, ok := load[topic]
if !ok {
continue // should not happen: kafka replied with something we did not ask for
}

topicPartitions := topics.loadTopic(topic) // must be non-nil at this point
for _, rPartition := range rTopic.Partitions {
for j, rPartition := range rTopic.Partitions {
partition := rPartition.Partition
loadPart, ok := loadParts[partition]
if !ok {
Expand All @@ -1540,15 +1643,41 @@ func (cl *Client) listOffsetsForBrokerLoad(ctx context.Context, broker *broker,
delete(load, topic)
}

offset := rPartition.Offset + loadPart.relative
if len(rPartition.OldStyleOffsets) > 0 { // if we have any, we used list offsets v0
offset = rPartition.OldStyleOffsets[0] + loadPart.relative
offset := rPartition.Offset
if len(rPartition.OldStyleOffsets) > 0 {
offset = rPartition.OldStyleOffsets[0] // list offsets v0
}

// If the user requested an exact offset, we asked for
// both the start and end offsets. We validate the
// exact offset (delta any relative) is within bounds.
//
// For start & end, we only obey relative if it is
// positive to the start or negative to the end.
if loadPart.at >= 0 {
offset = loadPart.at + loadPart.relative // we obey exact requests, even if they end up past the end
// We ensured the resp2 shape is as we want, so
// these lookups are safe and the partition has
// no error.
pend := &resp2.Topics[i].Partitions[j]
start := offset
end := pend.Offset
if len(pend.OldStyleOffsets) > 0 {
end = pend.OldStyleOffsets[0]
}
rel := loadPart.at + loadPart.relative
if rel >= start {
offset = rel
}
if rel >= end {
offset = end
}
} else if loadPart.at == -2 && loadPart.relative > 0 {
offset += loadPart.relative
} else if loadPart.at == -1 && loadPart.relative < 0 {
offset += loadPart.relative
}
if offset < 0 {
offset = 0
offset = 0 // sanity
}

loaded.add(loadedOffset{
Expand Down Expand Up @@ -1638,21 +1767,27 @@ func (cl *Client) loadEpochsForBrokerLoad(ctx context.Context, broker *broker, l
results <- loaded.addAll(load.errToLoaded(kerr.UnknownTopicOrPartition))
}

func (o offsetLoadMap) buildListReq(isolationLevel int8) *kmsg.ListOffsetsRequest {
req := kmsg.NewPtrListOffsetsRequest()
req.ReplicaID = -1
req.IsolationLevel = isolationLevel
req.Topics = make([]kmsg.ListOffsetsRequestTopic, 0, len(o))
// In general this returns one request, but if the user is using exact offsets
// rather than start/end, then we issue both the start and end requests to
// ensure the user's requested offset is within bounds.
func (o offsetLoadMap) buildListReq(isolationLevel int8) (r1, r2 *kmsg.ListOffsetsRequest) {
r1 = kmsg.NewPtrListOffsetsRequest()
r1.ReplicaID = -1
r1.IsolationLevel = isolationLevel
r1.Topics = make([]kmsg.ListOffsetsRequestTopic, 0, len(o))
var createEnd bool
for topic, partitions := range o {
parts := make([]kmsg.ListOffsetsRequestTopicPartition, 0, len(partitions))
for partition, offset := range partitions {
// If this partition is using an exact offset request,
// then we are listing for a partition that was not yet
// loaded by the client (due to metadata). We use -1
// just to ensure the partition is loaded.
// loaded by the client (due to metadata). We use -2
// just to ensure things are loaded and to ensure we
// load the start offset to validate lower bounds.
timestamp := offset.at
if timestamp >= 0 {
timestamp = -1
timestamp = -2
createEnd = true
}
p := kmsg.NewListOffsetsRequestTopicPartition()
p.Partition = partition
Expand All @@ -1665,9 +1800,25 @@ func (o offsetLoadMap) buildListReq(isolationLevel int8) *kmsg.ListOffsetsReques
t := kmsg.NewListOffsetsRequestTopic()
t.Topic = topic
t.Partitions = parts
req.Topics = append(req.Topics, t)
r1.Topics = append(r1.Topics, t)
}

if createEnd {
r2 = kmsg.NewPtrListOffsetsRequest()
*r2 = *r1
r2.Topics = append([]kmsg.ListOffsetsRequestTopic(nil), r1.Topics...)
for i := range r1.Topics {
l := &r2.Topics[i]
r := &r1.Topics[i]
*l = *r
l.Partitions = append([]kmsg.ListOffsetsRequestTopicPartition(nil), r.Partitions...)
for i := range l.Partitions {
l.Partitions[i].Timestamp = -1
}
}
}
return req

return r1, r2
}

func (o offsetLoadMap) buildEpochReq() *kmsg.OffsetForLeaderEpochRequest {
Expand Down

0 comments on commit f9cd625

Please sign in to comment.