Skip to content

Commit

Permalink
Merge pull request #224 from twmb/fixup-kraft
Browse files Browse the repository at this point in the history
fixup kraft
  • Loading branch information
twmb authored Oct 18, 2022
2 parents 76c13f7 + b4c041b commit 9387634
Show file tree
Hide file tree
Showing 9 changed files with 199 additions and 85 deletions.
63 changes: 33 additions & 30 deletions .github/workflows/lint-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
golangci:
if: github.repository == 'twmb/franz-go'
runs-on: ubuntu-latest
name: 'golangci-lint on amd64'
name: "golangci-lint on amd64"
steps:
- uses: actions/checkout@v3
- uses: actions/setup-go@v3
Expand All @@ -28,7 +28,7 @@ jobs:
if: github.repository == 'twmb/franz-go'
needs: golangci
runs-on: ubuntu-latest
name: 'vet on arm'
name: "vet on arm"
steps:
- uses: actions/checkout@v3
with:
Expand Down Expand Up @@ -57,31 +57,34 @@ jobs:
echo "staticcheck ./..."
staticcheck -checks 'all,-ST1003,-SA1012,-ST1016,-SA1019,-SA2001' ./... # actually contains atomicalign check
# TODO: fix
# integration-test:
# if: github.repository == 'twmb/franz-go'
# needs: golangci
# runs-on: ubuntu-latest
# name: 'integration test kafka'
# container: golang:1.19.2
# services:
# zk:
# image: bitnami/zookeeper:latest
# ports:
# - 2181:2181
# env:
# ALLOW_ANONYMOUS_LOGIN: yes
# kafka:
# image: bitnami/kafka:latest
# ports:
# - 9092:9092
# env:
# ALLOW_PLAINTEXT_LISTENER: yes
# KAFKA_CFG_ZOOKEEPER_CONNECT: zk:2181
# steps:
# - uses: actions/checkout@v3
# - run: go test ./...
# env:
# KGO_TEST_RF: 1
# KGO_SEEDS: kafka:9092
# KGO_TEST_RECORDS: 50000
integration-test:
if: github.repository == 'twmb/franz-go'
needs: golangci
runs-on: ubuntu-latest
name: "integration test kafka"
container: golang:1.19.2
services:
kafka:
image: bitnami/kafka:latest
ports:
- 9092:9092
env:
KAFKA_ENABLE_KRAFT: yes
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: [email protected]:9093
# Set this to "PLAINTEXT://127.0.0.1:9092" if you want to run this container on localhost via Docker
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_CFG_BROKER_ID: 1
ALLOW_PLAINTEXT_LISTENER: yes
KAFKA_KRAFT_CLUSTER_ID: XkpGZQ27R3eTl3OdTm2LYA # 16 byte base64-encoded UUID
# BITNAMI_DEBUG: true # Enable this to get more info on startup failures
steps:
- uses: actions/checkout@v3
- run: go test ./...
env:
KGO_TEST_RF: 1
KGO_SEEDS: kafka:9092
KGO_TEST_RECORDS: 50000
3 changes: 3 additions & 0 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,9 @@ func WithHooks(hooks ...Hook) Opt {
// are expected to backoff slightly and retry the operation. Lower backoffs may
// increase load on the brokers, while higher backoffs may increase transaction
// latency in clients.
//
// Note that if brokers are hanging in this concurrent transactions state for
// too long, the client progressively increases the backoff.
func ConcurrentTransactionsBackoff(backoff time.Duration) Opt {
return clientOpt{func(cfg *cfg) { cfg.txnBackoff = backoff }}
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/kgo/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ func TestGroupETL(t *testing.T) {

go func() {
cl, _ := NewClient(
getSeedBrokers(),
WithLogger(BasicLogger(os.Stderr, testLogLevel, nil)),
MaxBufferedRecords(10000),
UnknownTopicRetries(-1), // see txn_test comment
)
defer cl.Close()

Expand Down Expand Up @@ -116,6 +118,8 @@ func (c *testConsumer) etl(etlsBeforeQuit int) {
netls := 0 // for if etlsBeforeQuit is non-negative

opts := []Opt{
getSeedBrokers(),
UnknownTopicRetries(-1), // see txn_test comment
WithLogger(testLogger()),
ConsumerGroup(c.group),
ConsumeTopics(c.consumeFrom),
Expand Down
14 changes: 9 additions & 5 deletions pkg/kgo/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,8 @@ var (
)

func init() {
seeds := os.Getenv("KGO_SEEDS")
if seeds == "" {
seeds = "127.0.0.1:9092"
}
var err error
adm, err = NewClient(SeedBrokers(strings.Split(seeds, ",")...))
adm, err = NewClient(getSeedBrokers())
if err != nil {
panic(fmt.Sprintf("unable to create admin client: %v", err))
}
Expand All @@ -43,6 +39,14 @@ func init() {
}
}

func getSeedBrokers() Opt {
seeds := os.Getenv("KGO_SEEDS")
if seeds == "" {
seeds = "127.0.0.1:9092"
}
return SeedBrokers(strings.Split(seeds, ",")...)
}

var loggerNum int64

var testLogLevel = func() LogLevel {
Expand Down
4 changes: 3 additions & 1 deletion pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,9 @@ func (cl *Client) doInitProducerID(lastID int64, lastEpoch int16) (*producerID,
}

if err = kerr.ErrorForCode(resp.ErrorCode); err != nil {
if kerr.IsRetriable(err) { // this could return ConcurrentTransactions, but this is rare; ignore until a user report
// We could receive concurrent transactions; this is ignorable
// and we just want to re-init.
if kerr.IsRetriable(err) || errors.Is(err, kerr.ConcurrentTransactions) {
cl.cfg.logger.Log(LogLevelInfo, "producer id initialization resulted in retriable error, discarding initialization attempt", "err", err)
return &producerID{lastID, lastEpoch, err}, false
}
Expand Down
24 changes: 20 additions & 4 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (s *sink) createReq(id int64, epoch int16) (*produceRequest, *kmsg.AddParti
recBufsIdx = (recBufsIdx + 1) % len(s.recBufs)

recBuf.mu.Lock()
if recBuf.failing || len(recBuf.batches) == recBuf.batchDrainIdx || recBuf.inflightOnSink != nil && recBuf.inflightOnSink != s {
if recBuf.failing || len(recBuf.batches) == recBuf.batchDrainIdx || recBuf.inflightOnSink != nil && recBuf.inflightOnSink != s || recBuf.inflight != 0 && !recBuf.okOnSink {
recBuf.mu.Unlock()
continue
}
Expand Down Expand Up @@ -427,7 +427,7 @@ func (s *sink) doTxnReq(
req.batches.eachOwnerLocked(seqRecBatch.removeFromTxn)
}
}()
err = s.cl.doWithConcurrentTransactions("AddPartitionsToTxn", func() error {
err = s.cl.doWithConcurrentTransactions(s.cl.ctx, "AddPartitionsToTxn", func() error {
stripped, err = s.issueTxnReq(req, txnReq)
return err
})
Expand Down Expand Up @@ -828,6 +828,9 @@ func (s *sink) handleReqRespBatch(
"err_is_retriable", kerr.IsRetriable(err),
"max_retries_reached", !failUnknown && batch.tries >= s.cl.cfg.recordRetries,
)
batch.owner.okOnSink = false
} else {
batch.owner.okOnSink = true
}
s.cl.finishBatch(batch.recBatch, producerID, producerEpoch, partition, baseOffset, err)
didProduce = err == nil
Expand Down Expand Up @@ -1045,6 +1048,20 @@ type recBuf struct {
// finishing, we would allow requests to finish out of order:
// handleSeqResps works per sink, not across sinks.
inflightOnSink *sink
// We only want to allow more than 1 inflight on a sink *if* we are
// currently receiving successful responses. Unimportantly, this allows
// us to save resources if the broker is having a problem or just
// recovered from one. Importantly, we work around an edge case in
// Kafka. Kafka will accept the first produce request for a pid/epoch
// with *any* sequence number. Say we sent two requests inflight. The
// first request Kafka replies to with NOT_LEADER_FOR_PARTITION, the
// second, the broker finished setting up and accepts. The broker now
// has the second request but not the first, we will retry both
// requests and receive OOOSN, and the broker has logs out of order.
// By only allowing more than one inflight if we have seen an ok
// response, we largely eliminate risk of this problem. See #223 for
// more details.
okOnSink bool
// Inflight tracks the number of requests inflight using batches from
// this recBuf. Every time this hits zero, if the batchDrainIdx is not
// at the end, we clear inflightOnSink and trigger the *current* sink
Expand Down Expand Up @@ -1463,9 +1480,8 @@ func (b *recBatch) decInflight() {
if recBuf.inflight != 0 {
return
}
oldSink := recBuf.inflightOnSink
recBuf.inflightOnSink = nil
if oldSink != recBuf.sink && recBuf.batchDrainIdx != len(recBuf.batches) {
if recBuf.batchDrainIdx != len(recBuf.batches) {
recBuf.sink.maybeDrain()
}
}
Expand Down
68 changes: 47 additions & 21 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,16 +594,15 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct
return
}

// If we had an error, we backoff. Killing a fetch quits the backoff,
// but that is fine; we may just re-request too early and fall into
// another backoff.
if err != nil {
var didBackoff bool
backoff := func() {
// We preemptively allow more fetches (since we are not buffering)
// and reset our session because of the error (who knows if kafka
// processed the request but the client failed to receive it).
doneFetch <- struct{}{}
alreadySentToDoneFetch = true
s.session.reset()
didBackoff = true

s.cl.triggerUpdateMetadata(false, "opportunistic load during source backoff") // as good a time as any
s.consecutiveFailures++
Expand All @@ -613,19 +612,31 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct
case <-after.C:
case <-ctx.Done():
}
}
defer func() {
if !didBackoff {
s.consecutiveFailures = 0
}
}()

// If we had an error, we backoff. Killing a fetch quits the backoff,
// but that is fine; we may just re-request too early and fall into
// another backoff.
if err != nil {
backoff()
return
}
s.consecutiveFailures = 0

resp := kresp.(*kmsg.FetchResponse)

var (
fetch Fetch
reloadOffsets listOrEpochLoads
preferreds cursorPreferreds
updateMeta bool
updateWhy string
handled = make(chan struct{})
fetch Fetch
reloadOffsets listOrEpochLoads
preferreds cursorPreferreds
allErrsStripped bool
updateMeta bool
updateWhy string
handled = make(chan struct{})
)

// Theoretically, handleReqResp could take a bit of CPU time due to
Expand All @@ -635,7 +646,7 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct
// Processing the response only needs the source's nodeID and client.
go func() {
defer close(handled)
fetch, reloadOffsets, preferreds, updateMeta, updateWhy = s.handleReqResp(br, req, resp)
fetch, reloadOffsets, preferreds, allErrsStripped, updateMeta, updateWhy = s.handleReqResp(br, req, resp)
}()

select {
Expand Down Expand Up @@ -729,6 +740,12 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct
s.sem = make(chan struct{})
s.hook(&fetch, true, false) // buffered, not polled
s.cl.consumer.addSourceReadyForDraining(s)
} else if allErrsStripped {
// If we stripped all errors from the response, we are likely
// fetching from topics that were deleted. We want to back off
// a bit rather than spin-loop immediately re-requesting
// deleted topics.
backoff()
}
return
}
Expand All @@ -740,15 +757,20 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct
// the source mutex.
//
// This function, and everything it calls, is side effect free.
func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchResponse) (Fetch, listOrEpochLoads, cursorPreferreds, bool, string) {
func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchResponse) (
f Fetch,
reloadOffsets listOrEpochLoads,
preferreds cursorPreferreds,
allErrsStripped bool,
updateMeta bool,
why string,
) {
f = Fetch{Topics: make([]FetchTopic, 0, len(resp.Topics))}
var (
f = Fetch{
Topics: make([]FetchTopic, 0, len(resp.Topics)),
}
reloadOffsets listOrEpochLoads
preferreds []cursorOffsetPreferred
updateMeta bool
updateWhy multiUpdateWhy
updateWhy multiUpdateWhy

numParts int
numErrsStripped int

kip320 = s.cl.supportsOffsetForLeaderEpoch()
)
Expand Down Expand Up @@ -791,6 +813,8 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
continue
}

numParts++

// If we are fetching from the replica already, Kafka replies with a -1
// preferred read replica. If Kafka replies with a preferred replica,
// it sends no records.
Expand Down Expand Up @@ -827,6 +851,8 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
kerr.UnknownLeaderEpoch, // our meta is newer than broker we fetched from
kerr.OffsetNotAvailable: // fetched from out of sync replica or a behind in-sync one (KIP-392: case 1 and case 2)

numErrsStripped++

case kerr.OffsetOutOfRange:
// If we are out of range, we reset to what we can.
// With Kafka >= 2.1.0, we should only get offset out
Expand Down Expand Up @@ -919,7 +945,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
}
}

return f, reloadOffsets, preferreds, updateMeta, updateWhy.reason("fetch had inner topic errors")
return f, reloadOffsets, preferreds, numParts == numErrsStripped, updateMeta, updateWhy.reason("fetch had inner topic errors")
}

// processRespPartition processes all records in all potentially compressed
Expand Down
Loading

0 comments on commit 9387634

Please sign in to comment.