From cbd4cdcc3a3a6ccdba82f5e44f51f2a2e7774a59 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Tue, 3 Sep 2024 16:15:50 +0200 Subject: [PATCH] k/group_manager: return not_coordinator quickly in tx operations group_manager::attached_partition::catchup_lock can get blocked for extended periods of time. For example in the following scenario: 1. consumer_offsets partition leader gets isolated 2. some group operation acquires a read lock and tries to replicate a batch to the consumer_offsets partition. This operation hangs for an indefinite period of time. 3. the consumer_offsets leader steps down 4. group state cleanup gets triggered, tries to acquire a write lock, hangs until (2) finishes Meanwhile, clients trying to perform any tx group operations will get a coordinator_load_in_progress errors and blindly retry, without even trying to find the real coordinator. Check for leadership without the read lock first to prevent that (this is basically a "double-check" pattern as we have to check the second time under the lock.) (cherry picked from commit 440ed2c3088ba1c83c969745acd1f032332e408a) --- src/v/kafka/server/group_manager.cc | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/src/v/kafka/server/group_manager.cc b/src/v/kafka/server/group_manager.cc index 97bf501c8666..5e464d313563 100644 --- a/src/v/kafka/server/group_manager.cc +++ b/src/v/kafka/server/group_manager.cc @@ -1253,7 +1253,11 @@ group_manager::leave_group(leave_group_request&& r) { ss::future group_manager::txn_offset_commit(txn_offset_commit_request&& r) { auto p = get_attached_partition(r.ntp); - if (!p || !p->catchup_lock->try_read_lock()) { + if (!p || !p->partition->is_leader()) { + return ss::make_ready_future( + txn_offset_commit_response(r, error_code::not_coordinator)); + } + if (!p->catchup_lock->try_read_lock()) { // transaction operations can't run in parallel with loading // state from the log (happens once per term change) vlog( @@ -1304,7 +1308,11 @@ group_manager::txn_offset_commit(txn_offset_commit_request&& r) { ss::future group_manager::commit_tx(cluster::commit_group_tx_request&& r) { auto p = get_attached_partition(r.ntp); - if (!p || !p->catchup_lock->try_read_lock()) { + if (!p || !p->partition->is_leader()) { + return ss::make_ready_future( + make_commit_tx_reply(cluster::tx_errc::not_coordinator)); + } + if (!p->catchup_lock->try_read_lock()) { // transaction operations can't run in parallel with loading // state from the log (happens once per term change) vlog( @@ -1343,7 +1351,11 @@ group_manager::commit_tx(cluster::commit_group_tx_request&& r) { ss::future group_manager::begin_tx(cluster::begin_group_tx_request&& r) { auto p = get_attached_partition(r.ntp); - if (!p || !p->catchup_lock->try_read_lock()) { + if (!p || !p->partition->is_leader()) { + return ss::make_ready_future( + make_begin_tx_reply(cluster::tx_errc::not_coordinator)); + } + if (!p->catchup_lock->try_read_lock()) { // transaction operations can't run in parallel with loading // state from the log (happens once per term change) vlog( @@ -1391,7 +1403,11 @@ group_manager::begin_tx(cluster::begin_group_tx_request&& r) { ss::future group_manager::abort_tx(cluster::abort_group_tx_request&& r) { auto p = get_attached_partition(r.ntp); - if (!p || !p->catchup_lock->try_read_lock()) { + if (!p || !p->partition->is_leader()) { + return ss::make_ready_future( + make_abort_tx_reply(cluster::tx_errc::not_coordinator)); + } + if (!p->catchup_lock->try_read_lock()) { // transaction operations can't run in parallel with loading // state from the log (happens once per term change) vlog(