Skip to content

Commit

Permalink
broker: additional patch to prior commit
Browse files Browse the repository at this point in the history
Rather than checking lastRead when acks == 0, we will just
unconditionally not reap produce connections based off reading if there
are no acks.

The problem was that initial api version negotiation set lastRead.
  • Loading branch information
twmb committed Apr 28, 2021
1 parent bf5b74c commit 3ad8fc7
Showing 1 changed file with 6 additions and 8 deletions.
14 changes: 6 additions & 8 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,16 +470,14 @@ func (b *broker) reapConnections(idleTimeout time.Duration) (total int) {
}

// If our connection is a produce connection and our client is
// configured with acks == 0, we only reap it if we have read
// on it (via the discard goroutine). We are not supposed to
// have any reads when configured with no acks; if we do, the
// broker implemented the Kafka protocol incorrectly, but since
// are receiving reads, then we can reap.
lastReadNano := atomic.LoadInt64(&reap.cxn.lastRead)
if b.cl.cfg.acks.val == 0 && reap.isProduce && lastReadNano == 0 {
// configured with acks == 0, we never try reaping it due to no
// reads. We are not supposed to have any reads past the
// initial api version negotiation. If we do, the broker
// implemented the Kafka protocol incorrectly, but that's fine.
if b.cl.cfg.acks.val == 0 && reap.isProduce {
continue
}
lastRead := time.Unix(0, lastReadNano)
lastRead := time.Unix(0, atomic.LoadInt64(&reap.cxn.lastRead))
if time.Since(lastRead) > idleTimeout && atomic.LoadUint32(&reap.cxn.reading) == 0 {
reap.cxn.die()
total++
Expand Down

0 comments on commit 3ad8fc7

Please sign in to comment.