Skip to content

Commit

Permalink
client: add CommitRecords and CommitUncommittedOffsets
Browse files Browse the repository at this point in the history
It has been brought up multiple times that it is difficult to understand
how to commit if disabling autocommitting, so these two new functions
will hopefully make it waaaay easier.

This updates a bunch of corresponding documentation around risks / how
to use this new easy stuff.
  • Loading branch information
twmb committed Jun 10, 2021
1 parent 6808a55 commit 2092b4c
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 33 deletions.
48 changes: 32 additions & 16 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1089,22 +1089,18 @@ func ConsumeRegex() ConsumerOpt {
// that you lose absolutely no data, you can disable autocommitting and
// manually commit, like so:
//
// cl.BlockingCommitOffsets(
// context.Background(),
// cl.UncommittedOffsets(),
// callback,
// )
//
// There are two downsides with manually committing offsets: you must define
// the callback yourself (and check per-partition errors), and it is possible
// that a group rebalance can happen and you will be trying to commit offsets
// for partitions that have moved to a different consumer. If you do this, you
// will actually rewind the other consumer, and if it crashes, it will replace
// additional data.
//
// Generally, if you can tolerate a little bit of data loss from crashes
// because you do not expect to ever crash, then relying on autocommitting is a
// fine option.
// if err := cl.CommitUncommittedOffsets(context.Background()) {
// // handle err; unable to commit
// }
//
// The main downside with disabling autocommitting is that you run the risk of
// some duplicate processing of records than necessary. See the documentation
// on DisableAutoCommit for more details.
//
// If you can tolerate a little bit of data loss from crashes because you do
// not expect to ever crash, then relying on autocommitting is a fine option.
// However, if you can tolerate a little bit of duplicate processing, manually
// committing is very easy.
func ConsumerGroup(group string) GroupOpt {
return groupOpt{func(cfg *cfg) { cfg.group = group }}
}
Expand Down Expand Up @@ -1236,6 +1232,26 @@ func OnLost(onLost func(context.Context, *Client, map[string][]int32)) GroupOpt
}

// DisableAutoCommit disable auto committing.
//
// If you disable autocommitting, you may want to use a custom OnRevoked,
// otherwise you may end up doubly processing records (which is fine, just
// leads to duplicate processing). Consider the scenario: you, member A, are
// processing partition 0, and previously committed offset 4 and have now
// locally processed through offset 30. A rebalance happens, and partition 0
// moves to member B. If you use OnRevoked, you can detect that you are losing
// this partition and commit your work through offset 30, so that member B can
// start processing at offset 30. If you do not commit (i.e. you do not use a
// custom OnRevoked), the other member will start processing at offset 4. It
// may process through offset 50, leading to double processing of offsets 4
// through 29. Worse, you, member A, can rewind member B's commit, because
// member B may commit offset 50 and you may finally eventually commit offset
// 30. If a rebalance happens, then even more duplicate processing will occur
// of offsets 30 through 49.
//
// Again, OnRevoked is not necessary, and not using it just means double
// processing, which for most workloads is fine since a simple group consumer
// is not EOS / transactional, only at-least-once. But, this is something to be
// aware of.
func DisableAutoCommit() GroupOpt {
return groupOpt{func(cfg *cfg) { cfg.autocommitDisable = true }}
}
Expand Down
127 changes: 110 additions & 17 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -1616,21 +1616,113 @@ func (g *groupConsumer) getUncommittedLocked(head bool) map[string]map[int32]Epo
return uncommitted
}

// CommitRecords issues a blocking offset commit for the offsets contained
// within rs. Retriable errors are retried up to the configured retry limit,
// and any unretriable error is returned.
//
// This function is useful as a simple way to commit offsets if you have
// disabled autocommitting. As an alternative if you always want to commit
// everything, see CommitUncommittedOffsets.
//
// Simple usage of this function may lead to duplicate records if a consumer
// group rebalance occurs before or while this function is being executed. You
// can avoid this scenario by calling CommitRecords in a custom OnRevoked, but
// for most workloads, a small bit of potential duplicate processing is fine.
// See the documentation on DisableAutoCommit for more details.
func (cl *Client) CommitRecords(ctx context.Context, rs ...*Record) error {
// First build the offset commit map. We favor the latest epoch, then
// offset, if any records map to the same topic / partition.
offsets := make(map[string]map[int32]EpochOffset)
for _, r := range rs {
toffsets := offsets[r.Topic]
if toffsets == nil {
toffsets = make(map[int32]EpochOffset)
offsets[r.Topic] = toffsets
}

if at, exists := toffsets[r.Partition]; exists {
if at.Epoch > r.LeaderEpoch || at.Epoch == r.LeaderEpoch && at.Offset > r.Offset {
continue
}
}
toffsets[r.Partition] = EpochOffset{
r.LeaderEpoch,
r.Offset,
}
}

var rerr error // return error

// Our client retries an OffsetCommitRequest as necessary if the first
// response partition has a retriable group error (group coordinator
// loading, etc), so any partition error is fatal.
cl.BlockingCommitOffsets(ctx, offsets, func(_ *Client, _ *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
if err != nil {
rerr = err
return
}

for _, topic := range resp.Topics {
for _, partition := range topic.Partitions {
if err := kerr.ErrorForCode(partition.ErrorCode); err != nil {
rerr = err
return
}
}
}
})

return rerr
}

// CommitUncommittedOffsets issues a blocking offset commit for any partition
// that has been consumed from that has uncommitted offsets. Retriable errors
// are retried up to the configured retry limit, and any unretriable error is
// returned.
//
// This function is useful as a simple way to commit offsets if you have
// disabled autocommitting. As an alternative if you want to commit specific
// records, see CommitRecords.
//
// Simple usage of this function may lead to duplicate records if a consumer
// group rebalance occurs before or while this function is being executed. You
// can avoid this scenario by calling CommitRecords in a custom OnRevoked, but
// for most workloads, a small bit of potential duplicate processing is fine.
// See the documentation on DisableAutoCommit for more details.
func (cl *Client) CommitUncommittedOffsets(ctx context.Context) error {
// This function is just the tail end of CommitRecords just above.
var rerr error
cl.BlockingCommitOffsets(ctx, cl.UncommittedOffsets(), func(_ *Client, _ *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
if err != nil {
rerr = err
return
}

for _, topic := range resp.Topics {
for _, partition := range topic.Partitions {
if err := kerr.ErrorForCode(partition.ErrorCode); err != nil {
rerr = err
return
}
}
}
})
return rerr
}

// BlockingCommitOffsets cancels any active CommitOffsets, begins a commit that
// cannot be canceled, and waits for that commit to complete. This function
// will not return until the commit is done and the onDone callback is
// complete.
//
// The purpose of this function is for use in OnRevoke or committing before
// leaving a group.
// leaving a group, because you do not want to have a commit issued in
// OnRevoked canceled.
//
// For OnRevoke, you do not want to have a commit in OnRevoke canceled, because
// once the commit is done, rebalancing will continue. If you cancel an
// OnRevoke commit and commit after the revoke, you will be committing for a
// stale session, the commit will be dropped, and you will likely doubly
// process records.
// This is an advanced function, and for simpler, more easily understandable
// committing, see CommitRecords and CommitUncommittedOffsets.
//
// For more information about committing, see the documentation for
// For more information about committing and committing asynchronously, see
// CommitOffsets.
func (cl *Client) BlockingCommitOffsets(
ctx context.Context,
Expand Down Expand Up @@ -1698,11 +1790,16 @@ func (g *groupConsumer) blockingCommitOffsets(
// onDone is called with (nil, nil, nil) and this function returns immediately.
// It is OK if onDone is nil, but you will not know if your commit succeeded.
//
// If autocommitting is enabled, this function blocks autocommitting until this
// function is complete and the onDone has returned.
// This is an advanced function and is difficult to use correctly. For simpler,
// more easily understandable committing, see CommitRecords and
// CommitUncommittedOffsets.
//
// This function itself does not wait for the commit to finish. By default,
// this function is an asynchronous commit. You can use onDone to make it sync.
// If autocommitting is enabled, this function blocks autocommitting until this
// function is complete and the onDone has returned.
//
// It is invalid to use this function to commit offsets for a transaction.
//
// Note that this function ensures absolute ordering of commit requests by
// canceling prior requests and ensuring they are done before executing a new
Expand All @@ -1712,19 +1809,15 @@ func (g *groupConsumer) blockingCommitOffsets(
// differs from the Java async commit, which does not retry requests to avoid
// trampling on future commits.
//
// If using autocommitting, autocommitting will resume once this is complete.
//
// It is invalid to use this function to commit offsets for a transaction.
//
// It is highly recommended to check the response's partition's error codes if
// the response is non-nil. While unlikely, individual partitions can error.
// This is most likely to happen if a commit occurs too late in a rebalance
// event.
//
// If manually committing, you want to set OnRevoked to commit synchronously
// using BlockingCommitOffsets. Otherwise if committing async OnRevoked may
// return and a new group session may start before the commit is issued,
// leading to the commit being ignored and leading to duplicate messages.
// Do not use this async CommitOffsets in OnRevoked, instead use
// BlockingCommitOffsets. If you commit async, the rebalance will proceed
// before this function executes, and you will commit offsets for partitions
// that have moved to a different consumer.
func (cl *Client) CommitOffsets(
ctx context.Context,
uncommitted map[string]map[int32]EpochOffset,
Expand Down

0 comments on commit 2092b4c

Please sign in to comment.