Skip to content

Commit

Permalink
hooks: add HookNewClient
Browse files Browse the repository at this point in the history
This new hook can notify hook implementations _of_ the new client.
Previously, a hook had to be declared before the client, and it was
difficult for a hook to know of the client. Now, a hook can hook into
extra behavior because of the new client: specifically, a hook can be
notified of the client and install two extra metrics for the buffered
produce and fetch records.
  • Loading branch information
twmb committed Jul 15, 2021
1 parent 3256518 commit 1e74109
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 6 deletions.
21 changes: 15 additions & 6 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,21 @@ func NewClient(opts ...Opt) (*Client, error) {
blockingMetadataFnCh: make(chan func()),
metadone: make(chan struct{}),
}

compressor, err := newCompressor(cl.cfg.compression...)
if err != nil {
return nil, err
}
cl.compressor = compressor

// Before we start any goroutines below, we must notify any interested
// hooks of our existence.
cl.cfg.hooks.each(func(h Hook) {
if h, ok := h.(HookNewClient); ok {
h.OnNewClient(cl)
}
})

cl.producer.init()
cl.consumer.init(cl)
cl.metawait.init()
Expand All @@ -183,12 +198,6 @@ func NewClient(opts ...Opt) (*Client, error) {
cl.reqFormatter = kmsg.NewRequestFormatter(kmsg.FormatterClientID(*cfg.id))
}

compressor, err := newCompressor(cl.cfg.compression...)
if err != nil {
return nil, err
}
cl.compressor = compressor

for i, seed := range seeds {
b := cl.newBroker(unknownSeedID(i), seed.host, seed.port, nil)
cl.brokers[b.meta.NodeID] = b
Expand Down
20 changes: 20 additions & 0 deletions pkg/kgo/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@ func (hs hooks) each(fn func(Hook)) {
}
}

// HookNewClient is called in NewClient after a client is initialized. This
// hook can be used to perform final setup work in your hooks.
type HookNewClient interface {
// OnNewClient is passed the newly initialized client, before any
// client goroutines are started.
OnNewClient(*Client)
}

//////////////////
// BROKER HOOKS //
//////////////////

// HookBrokerConnect is called after a connection to a broker is opened.
type HookBrokerConnect interface {
// OnBrokerConnect is passed the broker metadata, how long it took to
Expand Down Expand Up @@ -159,6 +171,10 @@ type HookBrokerThrottle interface {
OnBrokerThrottle(meta BrokerMetadata, throttleInterval time.Duration, throttledAfterResponse bool)
}

//////////
// MISC //
//////////

// HookGroupManageError is called after every error that causes the client,
// operating as a group member, to break out of the group managing loop and
// backoff temporarily.
Expand All @@ -173,6 +189,10 @@ type HookGroupManageError interface {
OnGroupManageError(error)
}

///////////////////////////////
// PRODUCE & CONSUME BATCHES //
///////////////////////////////

// ProduceBatchMetrics tracks information about successful produces to
// partitions.
type ProduceBatchMetrics struct {
Expand Down

0 comments on commit 1e74109

Please sign in to comment.