Skip to content

Commit

Permalink
producing: only opt in to more than 1req if idempotent
Browse files Browse the repository at this point in the history
Previously, we relied on the produce response being v4 or higher to say
"ok we are talking to kafka v1+ and we can always be idempotent!".

We actually only want to opt in to more requests only if we ourselves
are idempotent.

To detect this, we only opt up if the request was issued with a producer
ID AND the response is v4 or higher. Ensuring we issued with a producer
ID ensures that we are idempotent.

This also simplifies some logic around where the idempotent fields are /
how they are used.
  • Loading branch information
twmb committed Jul 15, 2021
1 parent 5c0a2c7 commit 8770662
Showing 1 changed file with 8 additions and 9 deletions.
17 changes: 8 additions & 9 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ func (s *sink) createReq(id int64, epoch int16) (*produceRequest, *kmsg.AddParti

producerID: id,
producerEpoch: epoch,
idempotent: s.cl.idempotent(),

compressor: s.cl.compressor,

Expand Down Expand Up @@ -479,10 +478,10 @@ func (s *sink) requeueUnattemptedReq(req *produceRequest, err error) {
// outside of a small window during the store, but some pages in the Kafka
// confluence basically show that more than two in flight has marginal benefit
// anyway (although that may be due to their Java API).
func (s *sink) firstRespCheck(version int16) {
func (s *sink) firstRespCheck(idempotent bool, version int16) {
if s.produceVersion < 0 { // this is the only place this can be checked non-atomically
atomic.StoreInt32(&s.produceVersion, int32(version))
if version >= 4 {
if idempotent && version >= 4 {
s.inflightSem.Store(make(chan struct{}, 4))
}
}
Expand Down Expand Up @@ -514,7 +513,7 @@ func (s *sink) handleReqResp(br *broker, req *produceRequest, resp kmsg.Response
s.handleReqClientErr(req, err)
return
}
s.firstRespCheck(req.version)
s.firstRespCheck(req.idempotent(), req.version)
atomic.StoreUint32(&s.consecutiveFailures, 0)

var b *bytes.Buffer
Expand Down Expand Up @@ -1350,7 +1349,6 @@ type produceRequest struct {

producerID int64
producerEpoch int16
idempotent bool

// Initialized in AppendTo, metrics tracks uncompressed & compressed
// sizes (in byteS) of each batch.
Expand All @@ -1368,6 +1366,8 @@ type produceRequest struct {
wireLengthLimit int32
}

func (r *produceRequest) idempotent() bool { return r.producerID >= 0 }

func (r *produceRequest) tryAddBatch(produceVersion int32, recBuf *recBuf, batch *recBatch) bool {
batchWireLength, flexible := batch.wireLengthForProduceVersion(produceVersion)
batchWireLength += 4 // int32 partition prefix
Expand Down Expand Up @@ -1396,7 +1396,7 @@ func (r *produceRequest) tryAddBatch(produceVersion int32, recBuf *recBuf, batch
}

if recBuf.batches[0] == batch {
if !r.idempotent || batch.canFailFromLoadErrs {
if !r.idempotent() || batch.canFailFromLoadErrs {
if err := batch.maybeFailErr(&batch.owner.cl.cfg); err != nil {
recBuf.failAllRecords(err)
return false
Expand Down Expand Up @@ -1715,7 +1715,7 @@ func (p *produceRequest) AppendTo(dst []byte) []byte {
if p.version < 3 {
dst, pmetrics = batch.appendToAsMessageSet(dst, uint8(p.version), p.compressor)
} else {
dst, pmetrics = batch.appendTo(dst, p.version, p.producerID, p.producerEpoch, p.idempotent, p.txnID != nil, p.compressor)
dst, pmetrics = batch.appendTo(dst, p.version, p.producerID, p.producerEpoch, p.txnID != nil, p.compressor)
}
batch.mu.Unlock()
tmetrics[partition] = pmetrics
Expand Down Expand Up @@ -1746,7 +1746,6 @@ func (r seqRecBatch) appendTo(
version int16,
producerID int64,
producerEpoch int16,
idempotent bool,
transactional bool,
compressor *compressor,
) (dst []byte, m ProduceBatchMetrics) { // named return so that our defer for flexible versions can modify it
Expand Down Expand Up @@ -1816,7 +1815,7 @@ func (r seqRecBatch) appendTo(
dst = kbin.AppendInt64(dst, r.firstTimestamp+int64(lastRecord.timestampDelta))

seq := r.seq
if !idempotent { // producerID and producerEpoch are already -1 if idempotent (due to producerID() itself returning -1)
if producerID < 0 { // a negative producer ID means we are not using idempotence
seq = 0
}
dst = kbin.AppendInt64(dst, producerID)
Expand Down

0 comments on commit 8770662

Please sign in to comment.