Skip to content

Commit

Permalink
producer: work around KAFKA-12671
Browse files Browse the repository at this point in the history
This is a racy way to work around the issue, but fundamentally, from a
client's perspective, we have limited options. Either we always flush
everything and wait for successful responses, or we work around this
with timing.

I choose timing here.

What we do is, when aborting, we wait for all sinks to be completely
done with their produce requests. This solves most of the problem to
begin with: before, we did not wait, so if we issued a produce request
we could immediately proceed to issuing EndTxn, and the EndTxn may be
handled first. By waiting, we _ensure_ that Kafka has handled our
produce requests.

However, if we wait and the request is cut, it may be that our
connection died right after writing the produce request. In this case,
we now see that the last produce request had an issuing error, and we
wait 1s before sending EndTxn. The hope is that this 1s is enough time
for the ProduceRequest to be processed by Kafka itself.

We have effectively changed this issue from a slim change to an
extremely slim chance under very bad conditions.
  • Loading branch information
twmb committed Apr 17, 2021
1 parent 1d4c8bc commit 10b743e
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 10 deletions.
13 changes: 8 additions & 5 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type producer struct {

aborting uint32 // 1 means yes
drains int32 // number of sinks draining
issues int32 // number of in flight produce requests

idMu sync.Mutex
idVersion int16
Expand Down Expand Up @@ -55,12 +56,14 @@ func (p *producer) init() {
p.notifyCond = sync.NewCond(&p.notifyMu)
}

func (p *producer) incDrains() {
atomic.AddInt32(&p.drains, 1)
}
func (p *producer) incDrains() { atomic.AddInt32(&p.drains, 1) }
func (p *producer) incIssues() { atomic.AddInt32(&p.issues, 1) }

func (p *producer) decDrains() { p.decAbortNotify(&p.drains) }
func (p *producer) decIssues() { p.decAbortNotify(&p.issues) }

func (p *producer) decDrains() {
if atomic.AddInt32(&p.drains, -1) != 0 || atomic.LoadUint32(&p.aborting) == 0 {
func (p *producer) decAbortNotify(v *int32) {
if atomic.AddInt32(v, -1) != 0 || atomic.LoadUint32(&p.aborting) == 0 {
return
}
p.notifyMu.Lock()
Expand Down
32 changes: 28 additions & 4 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ type sink struct {
needBackoff bool
backoffSeq uint32 // prevents pile on failures

// To work around KAFKA-12671, before we issue EndTxn, we check to see
// that all sinks had a final successful response. If not, then we risk
// running into KAFKA-12671 (out of order processing leading to
// orphaned begun "transaction" in ProducerStateManager), so rather
// than issuing EndTxn immediately, we wait a little bit.
lastRespSuccessful bool

// consecutiveFailures is incremented every backoff and cleared every
// successful response. For simplicity, if we have a good response
// following an error response before the error response's backoff
Expand All @@ -58,9 +65,10 @@ type seqResp struct {

func (cl *Client) newSink(nodeID int32) *sink {
s := &sink{
cl: cl,
nodeID: nodeID,
produceVersion: -1,
cl: cl,
nodeID: nodeID,
produceVersion: -1,
lastRespSuccessful: true,
}
s.inflightSem.Store(make(chan struct{}, 1))
return s
Expand Down Expand Up @@ -236,7 +244,12 @@ func (s *sink) drain() {
s.maybeBackoff()

sem := s.inflightSem.Load().(chan struct{})
sem <- struct{}{}
select {
case sem <- struct{}{}:
case <-s.cl.ctx.Done():
s.drainState.hardFinish()
return
}

again = s.drainState.maybeFinish(s.produce(sem))
}
Expand Down Expand Up @@ -297,9 +310,18 @@ func (s *sink) produce(sem <-chan struct{}) bool {

req.backoffSeq = s.backoffSeq // safe to read outside mu since we are in drain loop

// Add that we are issuing and then check if we are aborting: this
// order ensures that we will do not produce after aborting is set.
s.cl.producer.incDrains()
if s.cl.producer.isAborting() {
s.cl.producer.decDrains()
return false
}

produced = true
s.doSequenced(req, func(resp kmsg.Response, err error) {
s.handleReqResp(req, resp, err)
s.cl.producer.decDrains()
<-sem
})
return moreToDrain
Expand Down Expand Up @@ -376,8 +398,10 @@ func (s *sink) issueTxnReq(
) error {
resp, err := txnReq.RequestWith(s.cl.ctx, s.cl)
if err != nil {
s.lastRespSuccessful = false
return err
}
s.lastRespSuccessful = true

for _, topic := range resp.Topics {
topicBatches, ok := req.batches[topic.Topic]
Expand Down
25 changes: 24 additions & 1 deletion pkg/kgo/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,27 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry
return false, err // same
}
defer s.cl.ResetProducerID()

allOk := true
s.cl.sinksAndSourcesMu.Lock()
for _, sns := range s.cl.sinksAndSources {
allOk = allOk && sns.sink.lastRespSuccessful
}
s.cl.sinksAndSourcesMu.Unlock()

if !allOk {
s.cl.cfg.logger.Log(LogLevelWarn, "Buffered records were aborted, but some sink(s) did not have a final handled produce response. Kafka could still be handling these produce requests or have yet to handle them. We do not want to issue EndTxn before these produce requests are handled, because that would risk beginning a new transaction that we may not finish. Waiting 1s to give Kafka some time... (See KAFKA-12671)")
timer := time.NewTimer(time.Second)
select {
case <-timer.C:
case <-s.cl.ctx.Done():
timer.Stop()
return false, s.cl.ctx.Err()
case <-ctx.Done():
timer.Stop()
return false, ctx.Err()
}
}
}

wantCommit := bool(commit)
Expand Down Expand Up @@ -293,6 +314,8 @@ func (cl *Client) BeginTransaction() error {
func (cl *Client) AbortBufferedRecords(ctx context.Context) error {
atomic.StoreUint32(&cl.producer.aborting, 1)
defer atomic.StoreUint32(&cl.producer.aborting, 0)
atomic.AddInt32(&cl.producer.flushing, 1) // disallow lingering to start
defer atomic.AddInt32(&cl.producer.flushing, -1)
// At this point, all drain loops that start will immediately stop,
// thus they will not begin any AddPartitionsToTxn request. We must
// now wait for any req currently built to be done being issued.
Expand All @@ -316,7 +339,7 @@ func (cl *Client) AbortBufferedRecords(ctx context.Context) error {
defer cl.producer.notifyMu.Unlock()
defer close(done)

for !quit && atomic.LoadInt32(&cl.producer.drains) > 0 {
for !quit && (atomic.LoadInt32(&cl.producer.drains) > 0 || atomic.LoadInt32(&cl.producer.issues) > 0) {
cl.producer.notifyCond.Wait()
}
}()
Expand Down

0 comments on commit 10b743e

Please sign in to comment.