Skip to content

Commit

Permalink
pkg/kgo: inject fake fetches on metadata load errors
Browse files Browse the repository at this point in the history
A consumer should be notified when it cannot make progress due to
metadata load errors. We now
* unconditionally inject a fake fetch if a load error is not retryable
* inject a fake fetch if the user has configured KeepRetryableFetchErrors
  • Loading branch information
twmb committed Jan 21, 2024
1 parent a2f5ce5 commit a2340eb
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 5 deletions.
8 changes: 4 additions & 4 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func NewErrFetch(err error) Fetches {
Topics: []FetchTopic{{
Topic: "",
Partitions: []FetchPartition{{
Partition: 0,
Partition: -1,
Err: err,
}},
}},
Expand Down Expand Up @@ -384,11 +384,11 @@ func (cl *Client) PollFetches(ctx context.Context) Fetches {
}

// PollRecords waits for records to be available, returning as soon as any
// broker returns records in a fetch. If the context is nil, this function
// will return immediately with any currently buffered records.
// broker returns records in a fetch. If the context is nil, this function will
// return immediately with any currently buffered records.
//
// If the client is closed, a fake fetch will be injected that has no topic, a
// partition of 0, and a partition error of ErrClientClosed. If the context is
// partition of -1, and a partition error of ErrClientClosed. If the context is
// canceled, a fake fetch will be injected with ctx.Err. These injected errors
// can be used to break out of a poll loop.
//
Expand Down
28 changes: 28 additions & 0 deletions pkg/kgo/consumer_direct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"sync/atomic"
"testing"
"time"

"github.com/twmb/franz-go/pkg/kerr"
)

// Allow adding a topic to consume after the client is initialized with nothing
Expand Down Expand Up @@ -541,3 +543,29 @@ func TestSetOffsetsForNewTopic(t *testing.T) {
cl.Close()
}
}

func TestIssue648(t *testing.T) {
t.Parallel()
cl, _ := newTestClient(
MetadataMinAge(100*time.Millisecond),
ConsumeTopics("bizbazbuz"),
FetchMaxWait(time.Second),
KeepRetryableFetchErrors(),
)
defer cl.Close()
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
fs := cl.PollFetches(ctx)
cancel()

var found bool
fs.EachError(func(_ string, _ int32, err error) {
if !errors.Is(err, kerr.UnknownTopicOrPartition) {
t.Errorf("expected ErrUnknownTopicOrPartition, got %v", err)
} else {
found = true
}
})
if !found {
t.Errorf("did not see ErrUnknownTopicOrPartition")
}
}
6 changes: 5 additions & 1 deletion pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,8 @@ func (cl *Client) mergeTopicPartitions(
for _, topicPartition := range lv.partitions {
topicPartition.records.bumpRepeatedLoadErr(lv.loadErr)
}
} else if !kerr.IsRetriable(r.loadErr) || cl.cfg.keepRetryableFetchErrors {
cl.consumer.addFakeReadyForDraining(topic, -1, r.loadErr, "metadata refresh has a load error on this entire topic")
}
retryWhy.add(topic, -1, r.loadErr)
return
Expand Down Expand Up @@ -753,7 +755,7 @@ func (cl *Client) mergeTopicPartitions(
}
newTP := r.partitions[part]

// Like above for the entire topic, an individual partittion
// Like above for the entire topic, an individual partition
// can have a load error. Unlike for the topic, individual
// partition errors are always retryable.
//
Expand All @@ -765,6 +767,8 @@ func (cl *Client) mergeTopicPartitions(
newTP.loadErr = err
if isProduce {
newTP.records.bumpRepeatedLoadErr(newTP.loadErr)
} else if !kerr.IsRetriable(newTP.loadErr) || cl.cfg.keepRetryableFetchErrors {
cl.consumer.addFakeReadyForDraining(topic, int32(part), newTP.loadErr, "metadata refresh has a load error on this partition")
}
retryWhy.add(topic, int32(part), newTP.loadErr)
continue
Expand Down

0 comments on commit a2340eb

Please sign in to comment.