Skip to content

Commit

Permalink
client: add ConnIdleTimeout option && connection reaping
Browse files Browse the repository at this point in the history
A super in depth approach would be to reap connections exactly at the
idle timeout; however, this very easy approach allows for only using one
goroutine and one ticker.
  • Loading branch information
twmb committed Apr 17, 2021
1 parent 26c1ea2 commit 46138f7
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 13 deletions.
111 changes: 104 additions & 7 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ type broker struct {
cxnProduce *brokerCxn
cxnFetch *brokerCxn

reapMu sync.Mutex // held when modifying a brokerCxn

// dieMu guards sending to reqs in case the broker has been
// permanently stopped.
dieMu sync.RWMutex
Expand Down Expand Up @@ -406,10 +408,62 @@ func (b *broker) loadConnection(ctx context.Context, reqKey int16) (*brokerCxn,
}
b.cl.cfg.logger.Log(LogLevelDebug, "connection initialized successfully", "addr", b.addr, "broker", b.meta.NodeID)

b.reapMu.Lock()
defer b.reapMu.Unlock()
*pcxn = cxn
return cxn, nil
}

func (cl *Client) reapConnectionsLoop() {
idleTimeout := cl.cfg.connIdleTimeout
if idleTimeout < 0 { // impossible due to cfg.validate, but just in case
return
}

ticker := time.NewTicker(idleTimeout)
defer ticker.Stop()
for {
select {
case <-cl.ctx.Done():
case <-ticker.C:
cl.reapConnections(idleTimeout)
}
}
}

func (cl *Client) reapConnections(idleTimeout time.Duration) {
cl.brokersMu.Lock()
defer cl.brokersMu.Unlock()

for _, broker := range cl.brokers {
broker.reapConnections(idleTimeout)
}
}

func (b *broker) reapConnections(idleTimeout time.Duration) {
b.reapMu.Lock()
defer b.reapMu.Unlock()

for _, cxn := range []*brokerCxn{
b.cxnNormal,
b.cxnProduce,
b.cxnFetch,
} {
if cxn == nil || atomic.LoadInt32(&cxn.dead) == 1 {
continue
}
lastWrite := time.Unix(0, atomic.LoadInt64(&cxn.lastWrite))
if time.Since(lastWrite) > idleTimeout && atomic.LoadUint32(&cxn.writing) == 0 {
cxn.die()
continue
}
lastRead := time.Unix(0, atomic.LoadInt64(&cxn.lastRead))
if time.Since(lastRead) > idleTimeout && atomic.LoadUint32(&cxn.reading) == 0 {
cxn.die()
}
}
}

// connect connects to the broker's addr, returning the new connection.
func (b *broker) connect(ctx context.Context) (net.Conn, error) {
b.cl.cfg.logger.Log(LogLevelDebug, "opening connection to broker", "addr", b.addr, "broker", b.meta.NodeID)
Expand Down Expand Up @@ -448,6 +502,14 @@ type brokerCxn struct {

corrID int32

// The following four fields are used for connection reaping.
// Write is only updated in one location; read is updated in three
// due to readConn, readConnAsync, and discard.
lastWrite int64
lastRead int64
writing uint32
reading uint32

// dieMu guards appending to resps.
dieMu sync.Mutex
// resps manages reading kafka responses.
Expand Down Expand Up @@ -758,6 +820,12 @@ func (cxn *brokerCxn) writeRequest(ctx context.Context, enqueuedForWritingAt tim
}

func (cxn *brokerCxn) writeConn(ctx context.Context, buf []byte, timeout time.Duration, enqueuedForWritingAt time.Time) (bytesWritten int, writeErr error, writeWait, timeToWrite time.Duration) {
atomic.SwapUint32(&cxn.writing, 1)
defer func() {
atomic.StoreInt64(&cxn.lastWrite, time.Now().UnixNano())
atomic.SwapUint32(&cxn.writing, 0)
}()

if ctx == nil {
ctx = context.Background()
}
Expand Down Expand Up @@ -827,6 +895,14 @@ func (cxn *brokerCxn) readConnAsync(prFn func() (*promisedResp, error)) <-chan *
return
}

// Once we have read a size, we officially track that we are
// reading for the purposes of connection reaping.
atomic.SwapUint32(&cxn.reading, 1)
defer func() {
atomic.StoreInt64(&cxn.lastRead, time.Now().UnixNano())
atomic.SwapUint32(&cxn.reading, 0)
}()

var size int32
if size, rr.err = cxn.parseReadSize(sizeBuf); rr.err != nil {
return
Expand Down Expand Up @@ -859,6 +935,12 @@ func (cxn *brokerCxn) readConnAsync(prFn func() (*promisedResp, error)) <-chan *
// differs from readConnAsync above because here, we have written a request
// already and know what we want.
func (cxn *brokerCxn) readConn(ctx context.Context, timeout time.Duration, enqueuedForReadingAt time.Time) (nread int, buf []byte, err error, readWait, timeToRead time.Duration) {
atomic.SwapUint32(&cxn.reading, 1)
defer func() {
atomic.StoreInt64(&cxn.lastRead, time.Now().UnixNano())
atomic.SwapUint32(&cxn.reading, 0)
}()

if ctx == nil {
ctx = context.Background()
}
Expand Down Expand Up @@ -1086,6 +1168,12 @@ func (cxn *brokerCxn) discard() {
}
deadlineMu.Unlock()

atomic.SwapUint32(&cxn.reading, 1)
defer func() {
atomic.StoreInt64(&cxn.lastRead, time.Now().UnixNano())
atomic.SwapUint32(&cxn.reading, 0)
}()

readStart := time.Now()
defer func() { timeToRead = time.Since(readStart) }()
var size int32
Expand Down Expand Up @@ -1196,7 +1284,10 @@ func (cxn *brokerCxn) handleResps() {
}
cxn.dieMu.Unlock()

if err := cxn.handleReadResult(rr); err != nil {
// We only log on error if we read any bytes or if our
// connection is still alive. If our connection is dead, odds
// are we killed ourself and readConnAsync then errored.
if err := cxn.handleReadResult(rr); err != nil && (rr.nread > 0 || atomic.LoadInt32(&cxn.dead) == 0) {
if successes > 0 || len(cxn.b.cl.cfg.sasls) > 0 {
cxn.b.cl.cfg.logger.Log(LogLevelDebug, "read from broker errored, killing connection", "addr", cxn.b.addr, "id", cxn.b.meta.NodeID, "successful_reads", successes, "err", err)
} else {
Expand Down Expand Up @@ -1255,13 +1346,19 @@ func (cxn *brokerCxn) handleReadResultHeader(
corrID int32,
flexible bool,
) ([]byte, error) {
cxn.cl.cfg.hooks.each(func(h Hook) {
if h, ok := h.(BrokerReadHook); ok {
h.OnRead(cxn.b.meta, key, nread, readWait, timeToRead, readErr)
// If we have a key, then we always hook / debug log.
// If we have no key, if we read any bytes, same.
// If we have no key and read nothing, then this is from readConnAsync
// quitting due to us killing our own connection.
if key >= 0 || nread > 0 {
cxn.cl.cfg.hooks.each(func(h Hook) {
if h, ok := h.(BrokerReadHook); ok {
h.OnRead(cxn.b.meta, key, nread, readWait, timeToRead, readErr)
}
})
if logger := cxn.cl.cfg.logger; logger.Level() >= LogLevelDebug {
logger.Log(LogLevelDebug, fmt.Sprintf("read %s v%d", kmsg.NameForKey(key), version), "broker", cxn.b.meta.NodeID, "bytes_read", nread, "read_wait", readWait, "time_to_read", timeToRead, "err", readErr)
}
})
if logger := cxn.cl.cfg.logger; logger.Level() >= LogLevelDebug {
logger.Log(LogLevelDebug, fmt.Sprintf("read %s v%d", kmsg.NameForKey(key), version), "broker", cxn.b.meta.NodeID, "bytes_read", nread, "read_wait", readWait, "time_to_read", timeToRead, "err", readErr)
}
if readErr != nil {
return nil, readErr
Expand Down
9 changes: 5 additions & 4 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func NewClient(opts ...Opt) (*Client, error) {
cl.brokers[b.meta.NodeID] = b
}
go cl.updateMetadataLoop()
go cl.reapConnectionsLoop()

return cl, nil
}
Expand Down Expand Up @@ -1078,8 +1079,8 @@ func (cl *Client) Broker(id int) *Broker {
// it does include those brokers under their normal IDs as returned from a
// metadata response).
func (cl *Client) DiscoveredBrokers() []*Broker {
cl.brokersMu.Lock()
defer cl.brokersMu.Unlock()
cl.brokersMu.RLock()
defer cl.brokersMu.RUnlock()

var bs []*Broker
for _, broker := range cl.brokers {
Expand All @@ -1092,8 +1093,8 @@ func (cl *Client) DiscoveredBrokers() []*Broker {

// SeedBrokers returns the all seed brokers.
func (cl *Client) SeedBrokers() []*Broker {
cl.brokersMu.Lock()
defer cl.brokersMu.Unlock()
cl.brokersMu.RLock()
defer cl.brokersMu.RUnlock()

var bs []*Broker
for i := 0; ; i++ {
Expand Down
25 changes: 23 additions & 2 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type cfg struct {
id *string
dialFn func(context.Context, string, string) (net.Conn, error)
connTimeoutOverhead time.Duration
connIdleTimeout time.Duration

softwareName string // KIP-511
softwareVersion string // KIP-511
Expand Down Expand Up @@ -204,6 +205,10 @@ func (cfg *cfg) validate() error {
{name: "conn timeout max overhead", v: int64(cfg.connTimeoutOverhead), allowed: int64(15 * time.Minute), badcmp: i64gt, durs: true},
{name: "conn timeout min overhead", v: int64(cfg.connTimeoutOverhead), allowed: int64(time.Second), badcmp: i64lt, durs: true},

// 1s <= conn idle <= 15m
{name: "conn min idle timeout", v: int64(cfg.connIdleTimeout), allowed: int64(time.Second), badcmp: i64lt, durs: true},
{name: "conn max idle timeout", v: int64(cfg.connIdleTimeout), allowed: int64(15 * time.Minute), badcmp: i64gt, durs: true},

// 10ms <= metadata <= 1hr
{name: "metadata max age", v: int64(cfg.metadataMaxAge), allowed: int64(time.Hour), badcmp: i64gt, durs: true},
{name: "metadata min age", v: int64(cfg.metadataMinAge), allowed: int64(10 * time.Millisecond), badcmp: i64lt, durs: true},
Expand Down Expand Up @@ -250,6 +255,7 @@ func defaultCfg() cfg {
dialFn: (&net.Dialer{Timeout: 10 * time.Second}).DialContext,

connTimeoutOverhead: 20 * time.Second,
connIdleTimeout: 20 * time.Second,

softwareName: "kgo",
softwareVersion: "0.1.0",
Expand Down Expand Up @@ -373,12 +379,27 @@ func WithLogger(l Logger) Opt {
//
// For writes, the timeout is always the overhead. We buffer writes in our
// client before one quick flush, so we always expect the write to be fast.
//
// Using 0 has the overhead disables timeouts.
func ConnTimeoutOverhead(overhead time.Duration) Opt {
return clientOpt{func(cfg *cfg) { cfg.connTimeoutOverhead = overhead }}
}

// ConnIdleTimeout is a rough amount of time to allow connections to idle
// before they are closed, overriding the default 20.
//
// In the worst case, a connection can be allowed to idle for up to 2x this
// time, while the average is expected to be 1.5x (essentially, a uniform
// distribution from this interval to 2x the interval).
//
// It is possible that a connection can be reaped just as it is about to be
// written to, but the client internally retries in these cases.
//
// Connections are not reaped if they are actively being written to or read
// from; thus, a request can take a really long time itself and not be reaped
// (however, this may lead to the ConnTimeoutOverhead).
func ConnIdleTimeout(timeout time.Duration) Opt {
return clientOpt{func(cfg *cfg) { cfg.connIdleTimeout = timeout }}
}

// Dialer uses fn to dial addresses, overriding the default dialer that uses a
// 10s dial timeout and no TLS.
//
Expand Down

0 comments on commit 46138f7

Please sign in to comment.