Skip to content

Commit

Permalink
client: add EndAndBeginTransaction
Browse files Browse the repository at this point in the history
For situations where you just want to end a transaction and begin a new
one, without caring about what *exactly* is flushed, it can be
beneficial to allow concurrent producing while end / begin are ongoing.
In particular, the restriction to flush may add unexpected latency.

This adds EndAndBeginTransaction that can be called concurrent with
producing. There are two safety options to choose: one that does not
really increase throughput **that** much, but does keep things safe,
and one that relaxes throughput at the expense of safety.

We delete some logic from txn_test to use our new function. Previously,
we would know what all buffered records are flushed when ending a
transaction. That is no longer true, so we can no longer count on the
transactional marker being where we expect. Instead, we ensure our
offsets are at least monotonically increasing and call it good.
  • Loading branch information
twmb committed Mar 1, 2022
1 parent b7a6f5a commit 83dfa9d
Show file tree
Hide file tree
Showing 5 changed files with 336 additions and 41 deletions.
8 changes: 8 additions & 0 deletions pkg/kgo/atomic_maybe_work.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ func (b *atomicBool) set(v bool) {

func (b *atomicBool) get() bool { return atomic.LoadUint32((*uint32)(b)) == 1 }

func (b *atomicBool) swap(v bool) bool {
var swap uint32
if v {
swap = 1
}
return atomic.SwapUint32((*uint32)(b), swap) == 1
}

const (
stateUnstarted = iota
stateWorking
Expand Down
84 changes: 71 additions & 13 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,15 @@ type producer struct {
idVersion int16
waitBuffer chan struct{}

// notifyMu and notifyCond are used for flush and drain notifications.
notifyMu sync.Mutex
notifyCond *sync.Cond
// mu and c are used for flush and drain notifications; mu is used for
// a few other tight locks.
mu sync.Mutex
c *sync.Cond

inflight int64 // high 16: # waiters, low 48: # inflight

batchPromises ringBatchPromise
promisesMu sync.Mutex

txnMu sync.Mutex
inTxn bool
Expand Down Expand Up @@ -84,7 +88,7 @@ func (p *producer) init(cl *Client) {
epoch: -1,
err: errReloadProducerID,
})
p.notifyCond = sync.NewCond(&p.notifyMu)
p.c = sync.NewCond(&p.mu)

inithooks := func() {
if p.hooks == nil {
Expand Down Expand Up @@ -415,6 +419,7 @@ func (p *producer) finishPromises(b batchPromise) {
cl := p.cl
var more bool
start:
p.promisesMu.Lock()
for i, pr := range b.recs {
pr.Offset = b.baseOffset + int64(i)
pr.Partition = b.partition
Expand All @@ -424,6 +429,7 @@ start:
cl.finishRecordPromise(pr, b.err)
b.recs[i] = promisedRec{}
}
p.promisesMu.Unlock()
if cap(b.recs) > 4 {
cl.prsPool.put(b.recs)
}
Expand Down Expand Up @@ -452,9 +458,9 @@ func (cl *Client) finishRecordPromise(pr promisedRec, err error) {
if buffered >= cl.cfg.maxBufferedRecords {
p.waitBuffer <- struct{}{}
} else if buffered == 0 && atomic.LoadInt32(&p.flushing) > 0 {
p.notifyMu.Lock()
p.notifyMu.Unlock() // nolint:gocritic,staticcheck // We use the lock as a barrier, unlocking immediately is safe.
p.notifyCond.Broadcast()
p.mu.Lock()
p.mu.Unlock() // nolint:gocritic,staticcheck // We use the lock as a barrier, unlocking immediately is safe.
p.c.Broadcast()
}
}

Expand Down Expand Up @@ -867,27 +873,79 @@ func (cl *Client) Flush(ctx context.Context) error {
quit := false
done := make(chan struct{})
go func() {
p.notifyMu.Lock()
defer p.notifyMu.Unlock()
p.mu.Lock()
defer p.mu.Unlock()
defer close(done)

for !quit && atomic.LoadInt64(&p.bufferedRecords) > 0 {
p.notifyCond.Wait()
p.c.Wait()
}
}()

select {
case <-done:
return nil
case <-ctx.Done():
p.notifyMu.Lock()
p.mu.Lock()
quit = true
p.notifyMu.Unlock()
p.notifyCond.Broadcast()
p.mu.Unlock()
p.c.Broadcast()
return ctx.Err()
}
}

func (p *producer) pause(ctx context.Context) error {
atomic.AddInt64(&p.inflight, 1<<48)

quit := false
done := make(chan struct{})
go func() {
p.mu.Lock()
defer p.mu.Unlock()
defer close(done)
for !quit && atomic.LoadInt64(&p.inflight)&0x0000ffffffffffff != 0 {
p.c.Wait()
}
}()

select {
case <-done:
return nil
case <-ctx.Done():
p.mu.Lock()
quit = true
p.mu.Unlock()
p.c.Broadcast()
p.resume() // dec our inflight
return ctx.Err()
}
}

func (p *producer) resume() {
if atomic.AddInt64(&p.inflight, -1<<48) == 0 {
p.cl.allSinksAndSources(func(sns sinkAndSource) {
sns.sink.maybeDrain()
})
}
}

func (p *producer) maybeAddInflight() bool {
if atomic.LoadInt64(&p.inflight)>>48 > 0 {
return false
}
if atomic.AddInt64(&p.inflight, 1)>>48 > 0 {
p.decInflight()
return false
}
return true
}

func (p *producer) decInflight() {
if atomic.AddInt64(&p.inflight, -1)>>48 > 0 {
p.c.Broadcast()
}
}

// Bumps the tries for all buffered records in the client.
//
// This is called whenever there is a problematic error that would affect the
Expand Down
26 changes: 15 additions & 11 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,9 @@ func (t *txnReqBuilder) add(rb *recBuf) {
if t.txnID == nil {
return
}
if rb.addedToTxn {
if rb.addedToTxn.swap(true) {
return
}
rb.addedToTxn = true
if t.req == nil {
req := kmsg.NewPtrAddPartitionsToTxnRequest()
req.TransactionalID = *t.txnID
Expand Down Expand Up @@ -282,6 +281,15 @@ func (s *sink) produce(sem <-chan struct{}) bool {
return false
}

if !s.cl.producer.maybeAddInflight() { // must do before marking recBufs on a txn
return false
}
defer func() {
if !produced {
s.cl.producer.decInflight()
}
}()

// NOTE: we create the req AFTER getting our producer ID!
//
// If a prior response caused errReloadProducerID, then calling
Expand Down Expand Up @@ -335,6 +343,7 @@ func (s *sink) produce(sem <-chan struct{}) bool {

batches := req.batches.sliced()
s.doSequenced(req, func(br *broker, resp kmsg.Response, err error) {
s.cl.producer.decInflight()
s.handleReqResp(br, req, resp, err)
batches.eachOwnerLocked((*recBatch).decInflight)
<-sem
Expand Down Expand Up @@ -409,7 +418,7 @@ func (s *sink) doTxnReq(
// inflight, and that it was not added to the txn and that we need to reset the
// drain index.
func (b *recBatch) removeFromTxn() {
b.owner.addedToTxn = false
b.owner.addedToTxn.set(false)
b.owner.resetBatchDrainIdx()
b.decInflight()
}
Expand All @@ -426,7 +435,7 @@ func (s *sink) issueTxnReq(
for _, topic := range resp.Topics {
topicBatches, ok := req.batches[topic.Topic]
if !ok {
s.cl.cfg.logger.Log(LogLevelError, "Kafka replied with topic in AddPartitionsToTxnResponse that was not in request", "broker", logID(s.nodeID), "topic", topic.Topic)
s.cl.cfg.logger.Log(LogLevelError, "Kafka replied with topic in AddPartitionsToTxnResponse that was not in request", "topic", topic.Topic)
continue
}
for _, partition := range topic.Partitions {
Expand All @@ -440,7 +449,7 @@ func (s *sink) issueTxnReq(

batch, ok := topicBatches[partition.Partition]
if !ok {
s.cl.cfg.logger.Log(LogLevelError, "Kafka replied with partition in AddPartitionsToTxnResponse that was not in request", "broker", logID(s.nodeID), "topic", topic.Topic, "partition", partition.Partition)
s.cl.cfg.logger.Log(LogLevelError, "Kafka replied with partition in AddPartitionsToTxnResponse that was not in request", "topic", topic.Topic, "partition", partition.Partition)
continue
}

Expand Down Expand Up @@ -950,12 +959,7 @@ type recBuf struct {

// addedToTxn, for transactions only, signifies whether this partition
// has been added to the transaction yet or not.
//
// This does not need to be under the mu since it is updated either
// serially in building a req (the first time) or after failing to add
// the partition to a txn (again serially), or in EndTransaction after
// all buffered records are flushed (if the API is used correctly).
addedToTxn bool
addedToTxn atomicBool

// For LoadTopicPartitioner partitioning; atomically tracks the number
// of records buffered in total on this recBuf.
Expand Down
Loading

0 comments on commit 83dfa9d

Please sign in to comment.