Skip to content

Commit

Permalink
producer: add ProduceSync
Browse files Browse the repository at this point in the history
This has been requested a few times. It is not recommended, but it's an
easy add.
  • Loading branch information
twmb committed Apr 27, 2021
1 parent 7a78262 commit b983d63
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 8 deletions.
4 changes: 2 additions & 2 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -1874,7 +1874,7 @@ func (cl *Client) BlockingCommitOffsets(
// function is complete and the onDone has returned.
//
// This function itself does not wait for the commit to finish. By default,
// this function is an asyncronous commit. You can use onDone to make it sync.
// this function is an asynchronous commit. You can use onDone to make it sync.
//
// Note that this function ensures absolute ordering of commit requests by
// canceling prior requests and ensuring they are done before executing a new
Expand All @@ -1893,7 +1893,7 @@ func (cl *Client) BlockingCommitOffsets(
// This is most likely to happen if a commit occurs too late in a rebalance
// event.
//
// If manually committing, you want to set OnRevoked to commit syncronously
// If manually committing, you want to set OnRevoked to commit synchronously
// using BlockingCommitOffsets. Otherwise if committing async OnRevoked may
// return and a new group session may start before the commit is issued,
// leading to the commit being ignored and leading to duplicate messages.
Expand Down
37 changes: 35 additions & 2 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,44 @@ func (p *producer) isAborting() bool { return atomic.LoadUint32(&p.aborting) ==

func noPromise(*Record, error) {}

// ProduceSync is a synchronous produce. Please see the Produce documentation
// for an in depth description of how producing works.
//
// Note that it is heavily recommended to not use ProduceSync. Producing
// buffers multiple records into a single request issued to Kafka. A
// synchronous produce implies you may be producing one record per request,
// which is inefficient, slower, and puts more load on Kafka itself.
//
// This function should only be used when producing infrequently enough that
// waiting for a single record to be produced is what would happen anyway with
// Produce.
//
// If the produce is successful, the record's attrs / offset / etc. fields are
// updated appropriately.
func (cl *Client) ProduceSync(ctx context.Context, r *Record) error {
var (
wg sync.WaitGroup
err error
promise = func(_ *Record, perr error) {
err = perr
wg.Done()
}
)
wg.Add(1)
if perr := cl.Produce(ctx, r, promise); perr != nil {
return perr
}
wg.Wait()
return err
}

// Produce sends a Kafka record to the topic in the record's Topic field,
// calling promise with the record or an error when Kafka replies.
// calling promise with the record or an error when Kafka replies. For a
// synchronous produce (which is not recommended), see ProduceSync.
//
// The promise is optional, but not using it means you will not know if Kafka
// recorded a record properly.
// recorded a record properly. If there was no produce error, the record's
// attrs / offset / etc. fields are updated appropriately.
//
// If the record is too large to fit in a batch on its own in a produce
// request, the promise is called immediately before this function returns with
Expand Down
6 changes: 2 additions & 4 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type sink struct {
drainState workLoop

// seqRespsMu, guarded by seqRespsMu, contains responses that must
// be handled sequentially. These responses are handled asyncronously,
// be handled sequentially. These responses are handled asynchronously,
// but sequentially.
seqRespsMu sync.Mutex
seqResps []*seqResp
Expand Down Expand Up @@ -1753,9 +1753,7 @@ func (r seqRecBatch) appendTo(
dst = kbin.AppendInt64(dst, r.firstTimestamp+int64(lastRecord.timestampDelta))

seq := r.seq
if !idempotent {
producerID = -1
producerEpoch = -1
if !idempotent { // producerID and producerEpoch are already -1 if idempotent (due to producerID() itself returning -1)
seq = 0
}
dst = kbin.AppendInt64(dst, producerID)
Expand Down

0 comments on commit b983d63

Please sign in to comment.