Skip to content

Commit

Permalink
consumer: add PollRecords to poll a limited number of records
Browse files Browse the repository at this point in the history
This allows users to bound the amount of work they will do between
polls. Client wise, we have to account for some records being taken from
a buffered fetch and update our cursors appropriately as some records
are taken. Overall, this requires a bit more bookkeeping but should be
non-impactful otherwise.
  • Loading branch information
twmb committed Mar 29, 2021
1 parent 2ce2def commit ca32e19
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 6 deletions.
38 changes: 33 additions & 5 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,22 @@ func (c *consumer) addFakeReadyForDraining(topic string, partition int32, err er
// It is important to check all partition errors in the returned fetches. If
// any partition has a fatal error and actually had no records, fake fetch will
// be injected with the error.
//
// It is invalid to call this multiple times concurrently.
func (cl *Client) PollFetches(ctx context.Context) Fetches {
return cl.PollRecords(ctx, 0)
}

// PollRecords waits for records to be available, returning as soon as any
// broker returns a record in a fetch. If the ctx quits, this function quits.
// This returns a maximum of maxPollRecords total across all fetches, or
// returns all buffered records if maxPollRecords is <= 0.
//
// It is important to check all partition errors in the returned fetches. If
// any partition has a fatal error and actually had no records, fake fetch will
// be injected with the error.
func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches {
if maxPollRecords == 0 {
maxPollRecords = -1
}
c := &cl.consumer

var fetches Fetches
Expand All @@ -261,13 +274,28 @@ func (cl *Client) PollFetches(ctx context.Context) Fetches {
defer c.mu.Unlock()

c.sourcesReadyMu.Lock()
for _, ready := range c.sourcesReadyForDraining {
fetches = append(fetches, ready.takeBuffered())
if maxPollRecords < 0 {
for _, ready := range c.sourcesReadyForDraining {
fetches = append(fetches, ready.takeBuffered())
}
c.sourcesReadyForDraining = nil
} else {
for len(c.sourcesReadyForDraining) > 0 && maxPollRecords > 0 {
source := c.sourcesReadyForDraining[0]
fetch, taken, drained := source.takeNBuffered(maxPollRecords)
if drained {
c.sourcesReadyForDraining = c.sourcesReadyForDraining[1:]
}
maxPollRecords -= taken
fetches = append(fetches, fetch)
}
}
c.sourcesReadyForDraining = nil

realFetches := fetches

fetches = append(fetches, c.fakeReadyForDraining...)
c.fakeReadyForDraining = nil

c.sourcesReadyMu.Unlock()

if len(realFetches) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (c *testConsumer) etl(etlsBeforeQuit int) {
// We poll with a short timeout so that we do not hang waiting
// at the end if another consumer hit the limit.
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
fetches := cl.PollFetches(ctx)
fetches := cl.PollRecords(ctx, 100)
cancel()
if len(fetches) == 0 {
if consumed := atomic.LoadUint64(&c.consumed); consumed == testRecordLimit {
Expand Down
73 changes: 73 additions & 0 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,79 @@ func (s *source) discardBuffered() {
s.takeBufferedFn(usedOffsets.finishUsingAll)
}

// takeNBuffered takes a limited amount of records from a buffered fetch,
// updating offsets in each partition per records taken.
//
// This only allows a new fetch once every buffered record has been taken.
//
// This returns the number of records taken and whether the source has been
// completely drained.
func (s *source) takeNBuffered(n int) (Fetch, int, bool) {
var r Fetch
var taken int

b := &s.buffered
bf := &b.fetch
for len(bf.Topics) > 0 && n > 0 {
t := &bf.Topics[0]

r.Topics = append(r.Topics, *t)
rt := &r.Topics[len(r.Topics)-1]
rt.Partitions = nil

tCursors := b.usedOffsets[t.Topic]

for len(t.Partitions) > 0 && n > 0 {
p := &t.Partitions[0]

rt.Partitions = append(rt.Partitions, *p)
rp := &rt.Partitions[len(rt.Partitions)-1]
rp.Records = nil

take := n
if take > len(p.Records) {
take = len(p.Records)
}

rp.Records = p.Records[:take]
p.Records = p.Records[take:]

n -= take
taken += take

pCursor := tCursors[p.Partition]

if len(p.Records) == 0 {
t.Partitions = t.Partitions[1:]

pCursor.from.setOffset(pCursor.cursorOffset)
pCursor.from.allowUsable()
delete(tCursors, p.Partition)
if len(tCursors) == 0 {
delete(b.usedOffsets, t.Topic)
}
break
}

lastReturnedRecord := rp.Records[len(rp.Records)-1]
pCursor.from.setOffset(cursorOffset{
offset: lastReturnedRecord.Offset + 1,
lastConsumedEpoch: lastReturnedRecord.LeaderEpoch,
})
}

if len(t.Partitions) == 0 {
bf.Topics = bf.Topics[1:]
}
}

drained := len(bf.Topics) == 0
if drained {
s.takeBuffered()
}
return r, taken, drained
}

func (s *source) takeBufferedFn(offsetFn func(usedOffsets)) Fetch {
r := s.buffered
s.buffered = bufferedFetch{}
Expand Down
8 changes: 8 additions & 0 deletions pkg/kgo/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ func (s *GroupTransactSession) PollFetches(ctx context.Context) Fetches {
return s.cl.PollFetches(ctx)
}

// PollRecords is a wrapper around Client.PollRecords, with the exact same
// semantics. Please refer to that function's documentation.
//
// It is invalid to call PollRecords concurrently with Begin or End.
func (s *GroupTransactSession) PollRecords(ctx context.Context, maxPollRecords int) Fetches {
return s.cl.PollRecords(ctx, maxPollRecords)
}

// Produce is a wrapper around Client.Produce, with the exact same semantics.
// Please refer to that function's documentation.
//
Expand Down

0 comments on commit ca32e19

Please sign in to comment.