Skip to content

Commit

Permalink
kgo: add PreCommitTxnFnContext
Browse files Browse the repository at this point in the history
Allows modifying TxnOffsetCommitRequest before it is issued --
particularly, to modify the metadata field.

Closes #559.
  • Loading branch information
twmb committed Sep 21, 2023
1 parent 5f5c721 commit 910e91d
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 0 deletions.
14 changes: 14 additions & 0 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -2238,6 +2238,20 @@ func PreCommitFnContext(ctx context.Context, fn func(*kmsg.OffsetCommitRequest)
return context.WithValue(ctx, commitContextFn, fn)
}

type commitTxnContextFnT struct{}

var commitTxnContextFn commitTxnContextFnT

// PreCommitTxnFnContext attaches fn to the context through WithValue. Using
// the context while committing a transaction allows fn to be called just
// before the commit is issued. This can be used to modify the actual commit,
// such as by associating metadata with partitions (for transactions, the
// default internal metadata is the client's current member ID). If fn returns
// an error, the commit is not attempted.
func PreCommitTxnFnContext(ctx context.Context, fn func(*kmsg.TxnOffsetCommitRequest) error) context.Context {
return context.WithValue(ctx, commitTxnContextFn, fn)
}

// CommitRecords issues a synchronous offset commit for the offsets contained
// within rs. Retryable errors are retried up to the configured retry limit,
// and any unretryable error is returned.
Expand Down
7 changes: 7 additions & 0 deletions pkg/kgo/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1222,6 +1222,13 @@ func (g *groupConsumer) commitTxn(
req.Topics = append(req.Topics, reqTopic)
}

if fn, ok := ctx.Value(commitTxnContextFn).(func(*kmsg.TxnOffsetCommitRequest) error); ok {
if err := fn(req); err != nil {
onDone(req, nil, err)
return
}
}

var resp *kmsg.TxnOffsetCommitResponse
var err error
if len(req.Topics) > 0 {
Expand Down

0 comments on commit 910e91d

Please sign in to comment.