Skip to content

Commit

Permalink
broker: permanently store the initial ApiVersions response
Browse files Browse the repository at this point in the history
Per-broker, there is no reason to re-issue an ApiVersions request every
time a connection is initialized. Instead, we can issue it once on the
first connection (or concurrently, if two connection loads see it
uninitialized) and store the result.

After the result is stored, we will use our stored version forever.

This speeds up reconnects by avoiding one round trip, and allows us to
probe brokers to see if they actually support something (see incoming
commits).

ApiVersions is the first thing we do when connecting, so if we load a
connection successfully, we know that we have loaded versions.
  • Loading branch information
twmb committed Aug 12, 2021
1 parent 8da4eaa commit f591593
Showing 1 changed file with 60 additions and 20 deletions.
80 changes: 60 additions & 20 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ type broker struct {
addr string // net.JoinHostPort(meta.Host, meta.Port)
meta BrokerMetadata

// versions tracks the first load of an ApiVersions. We store this
// after the first connect, which helps speed things up on future
// reconnects (across any of the three broker connections) because we
// will never look up API versions for this broker again.
versions atomic.Value // *brokerVersions

// The cxn fields each manage a single tcp connection to one broker.
// Each field is managed serially in handleReqs. This means that only
// one write can happen at a time, regardless of which connection the
Expand All @@ -132,6 +138,33 @@ type broker struct {
dead int32
}

// brokerVersions is loaded once (and potentially a few times concurrently if
// multiple connections are opening at once) and then forever stored for a
// broker.
type brokerVersions struct {
versions [kmsg.MaxKey + 1]int16
}

func newBrokerVersions() *brokerVersions {
var v brokerVersions
for i := range &v.versions {
v.versions[i] = -1
}
return &v
}

func (v *brokerVersions) len() int { return kmsg.MaxKey + 1 }

func (b *broker) loadVersions() *brokerVersions {
loaded := b.versions.Load()
if loaded == nil {
return nil
}
return loaded.(*brokerVersions)
}

func (b *broker) storeVersions(v *brokerVersions) { b.versions.Store(v) }

const unknownControllerID = -1

var unknownBrokerMetadata = BrokerMetadata{
Expand Down Expand Up @@ -159,6 +192,7 @@ func (cl *Client) newBroker(nodeID int32, host string, port int32, rack *string)

reqs: make(chan promisedReq, 10),
}

go br.handleReqs()

return br
Expand Down Expand Up @@ -248,16 +282,17 @@ func (b *broker) handleReqs() {
}
}

if int(req.Key()) > len(cxn.versions[:]) ||
b.cl.cfg.maxVersions != nil && !b.cl.cfg.maxVersions.HasKey(req.Key()) {
v := b.loadVersions()

if int(req.Key()) > v.len() || b.cl.cfg.maxVersions != nil && !b.cl.cfg.maxVersions.HasKey(req.Key()) {
pr.promise(nil, errUnknownRequestKey)
continue
}

// If cxn.versions[0] is non-negative, then we loaded API
// If v.versions[0] is non-negative, then we loaded API
// versions. If the version for this request is negative, we
// know the broker cannot handle this request.
if cxn.versions[0] >= 0 && cxn.versions[req.Key()] < 0 {
if v.versions[0] >= 0 && v.versions[req.Key()] < 0 {
pr.promise(nil, errBrokerTooOld)
continue
}
Expand All @@ -274,7 +309,7 @@ func (b *broker) handleReqs() {
// versions because the client is pinned pre 0.10.0 and we
// stick with our max.
version := ourMax
if brokerMax := cxn.versions[req.Key()]; brokerMax >= 0 && brokerMax < ourMax {
if brokerMax := v.versions[req.Key()]; brokerMax >= 0 && brokerMax < ourMax {
version = brokerMax
}

Expand Down Expand Up @@ -546,8 +581,7 @@ type brokerCxn struct {
cl *Client
b *broker

addr string
versions [kmsg.MaxKey + 1]int16
addr string

mechanism sasl.Mechanism
expiry time.Time
Expand Down Expand Up @@ -575,14 +609,17 @@ type brokerCxn struct {
}

func (cxn *brokerCxn) init(isProduceCxn bool) error {
for i := 0; i < len(cxn.versions[:]); i++ {
cxn.versions[i] = -1
}

if cxn.b.cl.cfg.maxVersions == nil || cxn.b.cl.cfg.maxVersions.HasKey(18) {
if err := cxn.requestAPIVersions(); err != nil {
cxn.cl.cfg.logger.Log(LogLevelError, "unable to request api versions", "broker", logID(cxn.b.meta.NodeID), "err", err)
return err
hasVersions := cxn.b.loadVersions() != nil
if !hasVersions {
if cxn.b.cl.cfg.maxVersions == nil || cxn.b.cl.cfg.maxVersions.HasKey(18) {
if err := cxn.requestAPIVersions(); err != nil {
cxn.cl.cfg.logger.Log(LogLevelError, "unable to request api versions", "broker", logID(cxn.b.meta.NodeID), "err", err)
return err
}
} else {
// We have a max versions, and it indicates no support
// for ApiVersions. We just store a default -1 set.
cxn.b.storeVersions(newBrokerVersions())
}
}

Expand Down Expand Up @@ -667,12 +704,14 @@ start:
return errors.New("ApiVersions response invalidly contained no ApiKeys")
}

v := newBrokerVersions()
for _, key := range resp.ApiKeys {
if key.ApiKey > kmsg.MaxKey {
if key.ApiKey > kmsg.MaxKey || key.ApiKey < 0 {
continue
}
cxn.versions[key.ApiKey] = key.MaxVersion
v.versions[key.ApiKey] = key.MaxVersion
}
cxn.b.storeVersions(v)
return nil
}

Expand All @@ -684,11 +723,12 @@ func (cxn *brokerCxn) sasl() error {
retried := false
authenticate := false

v := cxn.b.loadVersions()
req := new(kmsg.SASLHandshakeRequest)
start:
if mechanism.Name() != "GSSAPI" && cxn.versions[req.Key()] >= 0 {
if mechanism.Name() != "GSSAPI" && v.versions[req.Key()] >= 0 {
req.Mechanism = mechanism.Name()
req.Version = cxn.versions[req.Key()]
req.Version = v.versions[req.Key()]
cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing SASLHandshakeRequest", "broker", logID(cxn.b.meta.NodeID))
corrID, bytesWritten, writeErr, writeWait, timeToWrite, readEnqueue := cxn.writeRequest(nil, time.Now(), req)
if writeErr != nil {
Expand Down Expand Up @@ -776,7 +816,7 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error {
req := &kmsg.SASLAuthenticateRequest{
SASLAuthBytes: clientWrite,
}
req.Version = cxn.versions[req.Key()]
req.Version = cxn.b.loadVersions().versions[req.Key()]
cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing SASLAuthenticate", "broker", logID(cxn.b.meta.NodeID), "version", req.Version, "step", step)

corrID, bytesWritten, writeErr, writeWait, timeToWrite, readEnqueue := cxn.writeRequest(nil, time.Now(), req)
Expand Down

0 comments on commit f591593

Please sign in to comment.