Skip to content

Commit

Permalink
kgo: support Kafka 3.8's kip-890 modifications
Browse files Browse the repository at this point in the history
STILL NOT ALL OF KIP-890, despite what I originally coded.
Kafka 3.8 only added support for TransactionAbortable.
Producers still need to send AddPartitionsToTxn.
  • Loading branch information
twmb committed Oct 15, 2024
1 parent 03ae400 commit 0fd1959
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 8 deletions.
20 changes: 20 additions & 0 deletions pkg/kerr/kerr.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,21 @@ var (
MismatchedEndpointType = &Error{"MISMATCHED_ENDPOINT_TYPE", 114, false, "The request was sent to an endpoint of the wrong type."}
UnsupportedEndpointType = &Error{"UNSUPPORTED_ENDPOINT_TYPE", 115, false, "This endpoint type is not supported yet."}
UnknownControllerID = &Error{"UNKNOWN_CONTROLLER_ID", 116, false, "This controller ID is not known"}

// UnknownSubscriptionID = &Error{"UNKNOWN_SUBSCRIPTION_ID", 117, false, "Client sent a push telemetry request with an invalid or outdated subscription ID."}
// TelemetryTooLarge = &Error{"TELEMETRY_TOO_LARGE", 118, false, "Client sent a push telemetry request larger than the maximum size the broker will accept."}
// InvalidRegistration = &Error{"INVALID_REGISTRATION", 119, false, "The controller has considered the broker registration to be invalid."}

TransactionAbortable = &Error{"TRANSACTION_ABORTABLE", 120, false, "The server encountered an error with the transaction. The client can abort the transaction to continue using this transactional ID."}

// InvalidRecordState = &Error{"INVALID_RECORD_STATE", 121, false, "The record state is invalid. The acknowledgement of delivery could not be completed."}
// ShareSessionNowFound = &Error{"SHARE_SESSION_NOT_FOUND", 122, false, "The share session was not found."}
// InvalidShareSessionEpoch = &Error{"INVALID_SHARE_SESSION_EPOCH", 123, false, "The share session epoch is invalid."}
// FencedStateEpoch = &Error{"FENCED_STATE_EPOCH", 124, false, "The share coordinator rejected the request because the share-group state epoch did not match."}
// InvalidVoterKey = &Error{"INVALID_VOTER_KEY", 125, false, "The voter key doesn't match the receiving replica's key."}
// DuplicateVoter = &Error{"DUPLICATE_VOTER", 126, false, "The voter is already part of the set of voters."}
// VoterNotFound = &Error{"VOTER_NOT_FOUND", 127, false, "The voter is not part of the set of voters."}
// InvalidRegularExpression = &Error{"INVALID_REGULAR_EXPRESSION", 128, false, "The regular expression is not valid."}
)

var code2err = map[int16]error{
Expand Down Expand Up @@ -312,4 +327,9 @@ var code2err = map[int16]error{
115: UnsupportedEndpointType, // ""
116: UnknownControllerID, // ""

// 117: UnknownSubscriptionID, // KIP-714 f1819f448 KAFKA-15778 & KAFKA-15779
// 118: TelemetryTooLarge, // ""
// 119: InvalidRegistration, // KIP-858 f467f6bb4 KAFKA-15361

120: TransactionAbortable, // KIP-890 2e8d69b78 KAFKA-16314
}
11 changes: 8 additions & 3 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ func (s *sink) produce(sem <-chan struct{}) bool {

if txnReq != nil {
// txnReq can fail from:
// - TransactionAbortable
// - retry failure
// - auth failure
// - producer id mapping / epoch errors
Expand All @@ -417,6 +418,10 @@ func (s *sink) produce(sem <-chan struct{}) bool {
batchesStripped, err := s.doTxnReq(req, txnReq)
if err != nil {
switch {
case errors.Is(err, kerr.TransactionAbortable):
// If we get TransactionAbortable, we continue into producing.
// The produce will fail with the same error, and this is the
// only way to notify the user to abort the txn.
case isRetryableBrokerErr(err) || isDialNonTimeoutErr(err):
s.cl.bumpRepeatedLoadErr(err)
s.cl.cfg.logger.Log(LogLevelWarn, "unable to AddPartitionsToTxn due to retryable broker err, bumping client's buffered record load errors by 1 and retrying", "err", err)
Expand All @@ -431,8 +436,8 @@ func (s *sink) produce(sem <-chan struct{}) bool {
// with produce request vs. end txn (KAFKA-12671)
s.cl.failProducerID(id, epoch, err)
s.cl.cfg.logger.Log(LogLevelError, "fatal AddPartitionsToTxn error, failing all buffered records (it is possible the client can recover after EndTransaction)", "broker", logID(s.nodeID), "err", err)
return false
}
return false
}

// If we stripped everything, ensure we backoff to force a
Expand Down Expand Up @@ -563,7 +568,7 @@ func (s *sink) issueTxnReq(
continue
}
for _, partition := range topic.Partitions {
if err := kerr.ErrorForCode(partition.ErrorCode); err != nil {
if err := kerr.ErrorForCode(partition.ErrorCode); err != nil && err != kerr.TransactionAbortable { // see below for txn abortable
// OperationNotAttempted is set for all partitions that are authorized
// if any partition is unauthorized _or_ does not exist. We simply remove
// unattempted partitions and treat them as retryable.
Expand Down Expand Up @@ -2057,7 +2062,7 @@ func (b *recBatch) tryBuffer(pr promisedRec, produceVersion, maxBatchBytes int32
//////////////

func (*produceRequest) Key() int16 { return 0 }
func (*produceRequest) MaxVersion() int16 { return 10 }
func (*produceRequest) MaxVersion() int16 { return 11 }
func (p *produceRequest) SetVersion(v int16) { p.version = v }
func (p *produceRequest) GetVersion() int16 { return p.version }
func (p *produceRequest) IsFlexible() bool { return p.version >= 9 }
Expand Down
15 changes: 11 additions & 4 deletions pkg/kgo/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry
errors.Is(err, kerr.CoordinatorLoadInProgress),
errors.Is(err, kerr.NotCoordinator),
errors.Is(err, kerr.ConcurrentTransactions),
errors.Is(err, kerr.UnknownServerError):
errors.Is(err, kerr.UnknownServerError),
errors.Is(err, kerr.TransactionAbortable):
return true
}
return false
Expand Down Expand Up @@ -408,6 +409,11 @@ retry:
willTryCommit = false
goto retry

case errors.Is(endTxnErr, kerr.TransactionAbortable):
s.cl.cfg.logger.Log(LogLevelInfo, "end transaction returned TransactionAbortable; retrying as abort")
willTryCommit = false
goto retry

case errors.Is(endTxnErr, kerr.UnknownServerError):
s.cl.cfg.logger.Log(LogLevelInfo, "end transaction with commit unknown server error; retrying")
after := time.NewTimer(s.cl.cfg.retryBackoff(tries))
Expand Down Expand Up @@ -517,7 +523,7 @@ const (
// Deprecated: Kafka 3.6 removed support for the hacky behavior that
// this option was abusing. Thus, as of Kafka 3.6, this option does not
// work against Kafka. This option also has never worked for Redpanda
// becuse Redpanda always strictly validated that partitions were a
// because Redpanda always strictly validated that partitions were a
// part of a transaction. Later versions of Kafka and Redpanda will
// remove the need for AddPartitionsToTxn at all and thus this option
// ultimately will be unnecessary anyway.
Expand Down Expand Up @@ -820,8 +826,9 @@ func (cl *Client) UnsafeAbortBufferedRecords() {
//
// If the producer ID has an error and you are trying to commit, this will
// return with kerr.OperationNotAttempted. If this happened, retry
// EndTransaction with TryAbort. Not other error is retryable, and you should
// not retry with TryAbort.
// EndTransaction with TryAbort. If this returns kerr.TransactionAbortable, you
// can retry with TryAbort. No other error is retryable, and you should not
// retry with TryAbort.
//
// If records failed with UnknownProducerID and your Kafka version is at least
// 2.5, then aborting here will potentially allow the client to recover for
Expand Down
18 changes: 17 additions & 1 deletion pkg/kversion/kversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ var versions = []struct {
{"v3.5", V3_5_0()},
{"v3.6", V3_6_0()},
{"v3.7", V3_7_0()},
{"v3.8", V3_8_0()},
}

// VersionStrings returns all recognized versions, minus any patch, that can be
Expand Down Expand Up @@ -520,6 +521,7 @@ func V3_4_0() *Versions { return zkBrokerOf(max340) }
func V3_5_0() *Versions { return zkBrokerOf(max350) }
func V3_6_0() *Versions { return zkBrokerOf(max360) }
func V3_7_0() *Versions { return zkBrokerOf(max370) }
func V3_8_0() *Versions { return zkBrokerOf(max380) }

func zkBrokerOf(lks listenerKeys) *Versions {
return &Versions{lks.filter(zkBroker)}
Expand Down Expand Up @@ -1158,8 +1160,22 @@ var max370 = nextMax(max360, func(v listenerKeys) listenerKeys {
return v
})

var max380 = nextMax(max370, func(v listenerKeys) listenerKeys {
// KAFKA-16314 2e8d69b78ca52196decd851c8520798aa856c073 KIP-890
// Then error rename in cf1ba099c0723f9cf65dda4cd334d36b7ede6327
v[0].inc() // 11 produce
v[10].inc() // 5 find coordinator
v[22].inc() // 5 init producer id
v[24].inc() // 5 add partitions to txn
v[25].inc() // 4 add offsets to txn
v[26].inc() // 4 end txn
v[28].inc() // 4 txn offset commit

return v
})

var (
maxStable = max370
maxStable = max380
maxTip = nextMax(maxStable, func(v listenerKeys) listenerKeys {
return v
})
Expand Down

0 comments on commit 0fd1959

Please sign in to comment.