Skip to content

Commit

Permalink
kgo.Client: add ConfigValue and ConfigValues
Browse files Browse the repository at this point in the history
This allows a way to interrogate the client for the value of any option,
whether the option was specified or not.

More type safe and explicit would be to add a getter for every option,
but that'd also bloat the client API unnecessarily.
  • Loading branch information
twmb committed Mar 13, 2023
1 parent 8b9c538 commit b0fa1a0
Show file tree
Hide file tree
Showing 2 changed files with 244 additions and 2 deletions.
241 changes: 241 additions & 0 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"math/rand"
"net"
"reflect"
"runtime"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -153,6 +154,246 @@ func validateCfg(opts ...Opt) (cfg, []hostport, *compressor, error) {
return cfg, seeds, compressor, nil
}

func namefn(fn any) string {
v := reflect.ValueOf(fn)
if v.Type().Kind() != reflect.Func {
return ""
}
name := runtime.FuncForPC(v.Pointer()).Name()
dot := strings.LastIndexByte(name, '.')
if dot >= 0 {
return name[dot+1:]
}
return name
}

// OptValue returns the value for the given configuration option. If the
// given option does not exist, this returns nil. This function takes either a
// raw Opt, or an Opt function name.
//
// If a configuration option has multiple inputs, this function returns only
// the first input. If the function is a boolean function (such as
// BlockRebalanceOnPoll), this function returns the value of the internal bool.
// Variadic option inputs are returned as a single slice. Options that are
// internally stored as a pointer (ClientID, TransactionalID, and InstanceID)
// are returned as their string input; you can see if the option is internally
// nil by looking at the second value returned from OptValues.
//
// var (
// cl, _ := NewClient(
// InstanceID("foo"),
// ConsumeTopics("foo", "bar"),
// )
// iid = cl.OptValue(InstanceID) // iid is "foo"
// gid = cl.OptValue(ConsumerGroup) // gid is "" since groups are not used
// topics = cl.OptValue("ConsumeTopics") // topics is []string{"foo", "bar"}; string lookup for the option works
// bpoll = cl.OptValue(BlockRebalanceOnPoll) // bpoll is false
// t = cl.OptValue(SessionTimeout) // t is 45s, the internal default
// td = t.(time.Duration) // safe conversion since SessionTimeout's input is a time.Duration
// unk = cl.OptValue("Unknown"), // unk is nil
// )
func (cl *Client) OptValue(opt any) any {
vs := cl.OptValues(opt)
if len(vs) > 0 {
return vs[0]
}
return nil
}

// OptValues returns all values for options. This method is useful for
// options that have multiple inputs (notably, SoftwareNameAndVersion). This is
// also useful for options that are internally stored as a pointer (ClientID,
// TransactionalID, and InstanceID) -- this function will return the string
// value of the option but also whether the option is non-nil. Boolean options
// are returned as a single-element slice with the bool value. Variadic inputs
// are returned as a signle slice. If the input option does not exist, this
// returns nil.
//
// var (
// cl, _ = NewClient(
// InstanceID("foo"),
// ConsumeTopics("foo", "bar"),
// )
// idValues = cl.OptValues(InstanceID) // idValues is []any{"foo", true}
// tValues = cl.OptValues(SessionTimeout) // tValues is []any{45 * time.Second}
// topics = cl.OptValues(ConsumeTopics) // topics is []any{[]string{"foo", "bar"}
// bpoll = cl.OptValues(BlockRebalanceOnPoll) // bpoll is []any{false}
// unknown = cl.OptValues("Unknown") // unknown is nil
// )
func (cl *Client) OptValues(opt any) []any {
name := namefn(opt)
if s, ok := opt.(string); ok {
name = s
}
cfg := &cl.cfg

switch name {
case namefn(ClientID):
if cfg.id != nil {
return []any{*cfg.id, true}
}
return []any{"", false}
case namefn(SoftwareNameAndVersion):
return []any{cfg.softwareName, cfg.softwareVersion}
case namefn(WithLogger):
if cfg.logger != nil {
return []any{cfg.logger.(*wrappedLogger).inner}
}
return []any{nil}
case namefn(RequestTimeoutOverhead):
return []any{cfg.requestTimeoutOverhead}
case namefn(ConnIdleTimeout):
return []any{cfg.connIdleTimeout}
case namefn(Dialer):
return []any{cfg.dialFn}
case namefn(DialTLSConfig):
return []any{cfg.dialTLS}
case namefn(SeedBrokers):
return []any{cfg.seedBrokers}
case namefn(MaxVersions):
return []any{cfg.maxVersions}
case namefn(MinVersions):
return []any{cfg.minVersions}
case namefn(RetryBackoffFn):
return []any{cfg.retryBackoff}
case namefn(RequestRetries):
return []any{cfg.retries}
case namefn(RetryTimeout):
return []any{cfg.retryTimeout(0)}
case namefn(RetryTimeoutFn):
return []any{cfg.retryTimeout}
case namefn(AllowAutoTopicCreation):
return []any{cfg.allowAutoTopicCreation}
case namefn(BrokerMaxWriteBytes):
return []any{cfg.maxBrokerWriteBytes}
case namefn(BrokerMaxReadBytes):
return []any{cfg.maxBrokerReadBytes}
case namefn(MetadataMaxAge):
return []any{cfg.metadataMaxAge}
case namefn(MetadataMinAge):
return []any{cfg.metadataMinAge}
case namefn(SASL):
return []any{cfg.sasls}
case namefn(WithHooks):
return []any{cfg.hooks}
case namefn(ConcurrentTransactionsBackoff):
return []any{cfg.txnBackoff}

case namefn(DefaultProduceTopic):
return []any{cfg.defaultProduceTopic}
case namefn(RequiredAcks):
return []any{cfg.acks}
case namefn(DisableIdempotentWrite):
return []any{cfg.disableIdempotency}
case namefn(MaxProduceRequestsInflightPerBroker):
return []any{cfg.maxProduceInflight}
case namefn(ProducerBatchCompression):
return []any{cfg.compression}
case namefn(ProducerBatchMaxBytes):
return []any{cfg.maxRecordBatchBytes}
case namefn(MaxBufferedRecords):
return []any{cfg.maxBufferedRecords}
case namefn(RecordPartitioner):
return []any{cfg.partitioner}
case namefn(ProduceRequestTimeout):
return []any{cfg.produceTimeout}
case namefn(RecordRetries):
return []any{cfg.recordRetries}
case namefn(UnknownTopicRetries):
return []any{cfg.maxUnknownFailures}
case namefn(StopProducerOnDataLossDetected):
return []any{cfg.stopOnDataLoss}
case namefn(ProducerOnDataLossDetected):
return []any{cfg.onDataLoss}
case namefn(ProducerLinger):
return []any{cfg.linger}
case namefn(ManualFlushing):
return []any{cfg.manualFlushing}
case namefn(RecordDeliveryTimeout):
return []any{cfg.recordTimeout}
case namefn(TransactionalID):
if cfg.txnID != nil {
return []any{cfg.txnID, true}
}
return []any{"", false}
case namefn(TransactionTimeout):
return []any{cfg.txnTimeout}

case namefn(ConsumePartitions):
return []any{cfg.partitions}
case namefn(ConsumePreferringLagFn):
return []any{cfg.preferLagFn}
case namefn(ConsumeRegex):
return []any{cfg.regex}
case namefn(ConsumeResetOffset):
return []any{cfg.resetOffset}
case namefn(ConsumeTopics):
return []any{cfg.topics}
case namefn(DisableFetchSessions):
return []any{cfg.disableFetchSessions}
case namefn(FetchIsolationLevel):
return []any{cfg.isolationLevel}
case namefn(FetchMaxBytes):
return []any{int32(cfg.maxBytes)}
case namefn(FetchMaxPartitionBytes):
return []any{int32(cfg.maxPartBytes)}
case namefn(FetchMaxWait):
return []any{time.Duration(cfg.maxWait) * time.Millisecond}
case namefn(FetchMinBytes):
return []any{cfg.minBytes}
case namefn(KeepControlRecords):
return []any{cfg.keepControl}
case namefn(MaxConcurrentFetches):
return []any{cfg.maxConcurrentFetches}
case namefn(Rack):
return []any{cfg.rack}

case namefn(AdjustFetchOffsetsFn):
return []any{cfg.adjustOffsetsBeforeAssign}
case namefn(AutoCommitCallback):
return []any{cfg.commitCallback}
case namefn(AutoCommitInterval):
return []any{cfg.autocommitInterval}
case namefn(AutoCommitMarks):
return []any{cfg.autocommitMarks}
case namefn(Balancers):
return []any{cfg.balancers}
case namefn(BlockRebalanceOnPoll):
return []any{cfg.blockRebalanceOnPoll}
case namefn(ConsumerGroup):
return []any{cfg.group}
case namefn(DisableAutoCommit):
return []any{cfg.autocommitDisable}
case namefn(GreedyAutoCommit):
return []any{cfg.autocommitGreedy}
case namefn(GroupProtocol):
return []any{cfg.protocol}
case namefn(HeartbeatInterval):
return []any{cfg.heartbeatInterval}
case namefn(InstanceID):
if cfg.instanceID != nil {
return []any{*cfg.instanceID, true}
}
return []any{"", false}
case namefn(OnOffsetsFetched):
return []any{cfg.onFetched}
case namefn(OnPartitionsAssigned):
return []any{cfg.onAssigned}
case namefn(OnPartitionsLost):
return []any{cfg.onLost}
case namefn(OnPartitionsRevoked):
return []any{cfg.onRevoked}
case namefn(RebalanceTimeout):
return []any{cfg.rebalanceTimeout}
case namefn(RequireStableFetchOffsets):
return []any{cfg.requireStable}
case namefn(SessionTimeout):
return []any{cfg.sessionTimeout}
default:
return nil
}
}

// NewClient returns a new Kafka client with the given options or an error if
// the options are invalid. Connections to brokers are lazily created only when
// requests are written to them.
Expand Down
5 changes: 3 additions & 2 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ func (consumerOpt) consumerOpt() {}
func (groupOpt) groupOpt() {}

// A cfg can be written to while initializing a client, and after that it is
// only ever read from. Some areas of initializing may follow options, but all
// initializing is done before NewClient returns.
// (mostly) only ever read from. Some areas can continue to be modified --
// particularly reconfiguring what to consume from -- but most areas are
// static.
type cfg struct {
/////////////////////
// GENERAL SECTION //
Expand Down

0 comments on commit b0fa1a0

Please sign in to comment.