Skip to content

Commit

Permalink
producer: allow using the passed in context to cancel records
Browse files Browse the repository at this point in the history
Building off the prior commits, before we add a partition to a batch, we
will check if the first record is canceled. If so, we fail the entire
partition.

We fail an entire partition rather than just the first batch because it
is (a) easier, and (b) we really should not be taking so long, so if
producing the first record takes a while, there's a problem.
  • Loading branch information
twmb committed Apr 25, 2021
1 parent 2b9b8ca commit 6ae76d0
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 13 deletions.
14 changes: 7 additions & 7 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ func RetryBackoff(backoff func(int) time.Duration) Opt {

// RequestRetries sets the number of tries that retriable requests are allowed,
// overriding the unlimited default. This option does not apply to produce
// requests.
// requests; to limit produce request retries, see ProduceRetries.
func RequestRetries(n int) Opt {
return clientOpt{func(cfg *cfg) { cfg.retries = int64(n) }}
}
Expand Down Expand Up @@ -682,9 +682,9 @@ func ProduceRequestTimeout(limit time.Duration) ProducerOpt {
// the unlimited default.
//
// If idempotency is enabled (as it is by default), this option is only
// enforced if it is safe to do so without messing up sequence numbers. It is
// safe to enforce if a record was never issued in a request to Kafka, or if it
// was requested and received a response.
// enforced if it is safe to do so without creating invalid sequence numbers.
// It is safe to enforce if a record was never issued in a request to Kafka, or
// if it was requested and received a response.
//
// This option is different from RequestRetries to allow finer grained control
// of when to fail when producing records.
Expand Down Expand Up @@ -745,9 +745,9 @@ func ManualFlushing() ProducerOpt {
// batch before timing out, overriding the ulimited default.
//
// If idempotency is enabled (as it is by default), this option is only
// enforced if it is safe to do so without messing up sequence numbers. It is
// safe to enforce if a record was never issued in a request to Kafka, or if it
// was requested and received a response.
// enforced if it is safe to do so without creating invalid sequence numbers.
// It is safe to enforce if a record was never issued in a request to Kafka, or
// if it was requested and received a response.
//
// The timeout for all records in a batch inherit the timeout of the first
// record in that batch. That is, once the first record's timeout expires, all
Expand Down
12 changes: 10 additions & 2 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ func noPromise(*Record, error) {}
// context or client to quit. If the context / client quits, this returns an
// error.
//
// The context is also used on a per-partition basis. If the context is done
// for the first record buffered in a partition, and if it is valid to abort
// records (to avoid invalid sequence numbers), all buffered records for a
// partition are aborted. The context checked for doneness is always the first
// buffered record's context. If that record is successfully produced, the
// context will then be the next first buffered record. The context is only
// evaluated before writing a produce request.
//
// The first buffered record for an unknown topic begins a timeout for the
// configured record timeout limit; all records buffered within the wait will
// expire with the same timeout if the topic does not load in time. For
Expand Down Expand Up @@ -134,7 +142,7 @@ func (cl *Client) Produce(
// waitBuffer as normal.
drainBuffered := func() {
go func() { <-p.waitBuffer }()
cl.finishRecordPromise(promisedRec{noPromise, nil}, nil)
cl.finishRecordPromise(promisedRec{ctx, noPromise, nil}, nil)
}
if cl.cfg.manualFlushing {
drainBuffered()
Expand All @@ -154,7 +162,7 @@ func (cl *Client) Produce(
if promise == nil {
promise = noPromise
}
cl.partitionRecord(promisedRec{promise, r})
cl.partitionRecord(promisedRec{ctx, promise, r})
return nil
}

Expand Down
21 changes: 17 additions & 4 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kgo

import (
"bytes"
"context"
"errors"
"fmt"
"hash/crc32"
Expand Down Expand Up @@ -1170,6 +1171,7 @@ func (recBuf *recBuf) resetBatchDrainIdx() {
// promisedRec ties a record with the callback that will be called once
// a batch is finally written and receives a response.
type promisedRec struct {
ctx context.Context
promise func(*Record, error)
*Record
}
Expand Down Expand Up @@ -1326,10 +1328,21 @@ func (r *produceRequest) tryAddBatch(produceVersion int32, recBuf *recBuf, batch
return false
}

if recBuf.needSeqReset && recBuf.batches[0] == batch {
recBuf.needSeqReset = false
recBuf.seq = 0
recBuf.batch0Seq = 0
if recBuf.batches[0] == batch {
if batch.canFailFromLoadErrs {
ctx := batch.records[0].ctx
select {
case <-ctx.Done():
recBuf.failAllRecords(ctx.Err())
return false
default:
}
}
if recBuf.needSeqReset {
recBuf.needSeqReset = false
recBuf.seq = 0
recBuf.batch0Seq = 0
}
}

batch.tries++
Expand Down

0 comments on commit 6ae76d0

Please sign in to comment.