Skip to content

Commit

Permalink
fetches: add CollectRecords convenience function
Browse files Browse the repository at this point in the history
This is yet another alternative, complementing EachRecord and
RecordIter; this time, we just want all records.
  • Loading branch information
twmb committed Sep 1, 2021
1 parent 0eaf03a commit c973268
Showing 1 changed file with 19 additions and 2 deletions.
21 changes: 19 additions & 2 deletions pkg/kgo/record_and_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,10 @@ func (fs Fetches) IsClientClosed() bool {
// documentation on that function for what types of errors are possible.
func (fs Fetches) EachError(fn func(string, int32, error)) {
for _, f := range fs {
for _, ft := range f.Topics {
for _, fp := range ft.Partitions {
for i := range f.Topics {
ft := &f.Topics[i]
for j := range ft.Partitions {
fp := &ft.Partitions[j]
if fp.Err != nil {
fn(ft.Topic, fp.Partition, fp.Err)
}
Expand Down Expand Up @@ -442,6 +444,21 @@ func (fs Fetches) EachRecord(fn func(*Record)) {
}
}

// CollectRecords returns all records in all fetches.
//
// This is a convenience function that does a single slice allocation.
func (fs Fetches) CollectRecords() []*Record {
var n int
fs.EachPartition(func(p FetchTopicPartition) {
n += len(p.Records)
})
rs := make([]*Record, 0, n)
fs.EachPartition(func(p FetchTopicPartition) {
rs = append(rs, p.Records...)
})
return rs
}

// FetchTopicPartition is similar to FetchTopic, but for an individual
// partition.
type FetchTopicPartition struct {
Expand Down

0 comments on commit c973268

Please sign in to comment.