Skip to content

Commit

Permalink
Implement basic tx throttling on high P2P load (kaspanet#377)
Browse files Browse the repository at this point in the history
* Implement basic tx throttling on high P2P load

When P2P load is high:
1. Limit the number of peers to broadcast to
2. Limit the number of transactions requested when requesting missing
   transactions
3. Reduce the size of the invs channel and allow dropping invs when
   it's full

* Renames and small fixes

* Saturating sub for unknown tx limit

* Some fixes

* Remove redundant collect

* Fix lint

* restore collect

---------

Co-authored-by: Ori Newman <[email protected]>
Co-authored-by: Michael Sutton <[email protected]>
Co-authored-by: Michael Sutton <[email protected]>
  • Loading branch information
4 people authored and D-Stacks committed Jan 8, 2024
1 parent 60598a2 commit 5bc2146
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 18 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion mining/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
topological_sort::IntoIterTopologically,
tx_query::TransactionQuery,
},
MiningCounters,
MempoolCountersSnapshot, MiningCounters,
};
use itertools::Itertools;
use kaspa_consensus_core::{
Expand Down Expand Up @@ -873,4 +873,8 @@ impl MiningManagerProxy {
pub async fn unknown_transactions(self, transactions: Vec<TransactionId>) -> Vec<TransactionId> {
spawn_blocking(move || self.inner.unknown_transactions(transactions)).await.unwrap()
}

pub fn snapshot(&self) -> MempoolCountersSnapshot {
self.inner.counters.snapshot()
}
}
20 changes: 15 additions & 5 deletions protocol/flows/src/flow_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,8 @@ impl FlowContext {
return;
}

self.broadcast_transactions(transactions_to_broadcast).await;
// TODO: Throttle these transactions as well if needed
self.broadcast_transactions(transactions_to_broadcast, false).await;

if self.should_run_mempool_scanning_task().await {
// Spawn a task executing the removal of expired low priority transactions and, if time has come too,
Expand All @@ -580,7 +581,12 @@ impl FlowContext {
mining_manager.revalidate_high_priority_transactions(&consensus_clone, tx).await;
});
while let Some(transactions) = rx.recv().await {
let _ = context.broadcast_transactions(transactions).await;
let _ = context
.broadcast_transactions(
transactions,
true, // We throttle high priority even when the network is not flooded since they will be rebroadcast if not accepted within reasonable time.
)
.await;
}
}
context.mempool_scanning_is_done().await;
Expand Down Expand Up @@ -614,7 +620,11 @@ impl FlowContext {
) -> Result<(), ProtocolError> {
let accepted_transactions =
self.mining_manager().clone().validate_and_insert_transaction(consensus, transaction, Priority::High, orphan).await?;
self.broadcast_transactions(accepted_transactions.iter().map(|x| x.id())).await;
self.broadcast_transactions(
accepted_transactions.iter().map(|x| x.id()),
false, // RPC transactions are considered high priority, so we don't want to throttle them
)
.await;
Ok(())
}

Expand All @@ -641,8 +651,8 @@ impl FlowContext {
///
/// The broadcast itself may happen only during a subsequent call to this function since it is done at most
/// after a predefined interval or when the queue length is larger than the Inv message capacity.
pub async fn broadcast_transactions<I: IntoIterator<Item = TransactionId>>(&self, transaction_ids: I) {
self.transactions_spread.write().await.broadcast_transactions(transaction_ids).await
pub async fn broadcast_transactions<I: IntoIterator<Item = TransactionId>>(&self, transaction_ids: I, should_throttle: bool) {
self.transactions_spread.write().await.broadcast_transactions(transaction_ids, should_throttle).await
}
}

Expand Down
13 changes: 9 additions & 4 deletions protocol/flows/src/flowcontext/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl TransactionsSpread {
/// capacity.
///
/// _GO-KASPAD: EnqueueTransactionIDsForPropagation_
pub async fn broadcast_transactions<I: IntoIterator<Item = TransactionId>>(&mut self, transaction_ids: I) {
pub async fn broadcast_transactions<I: IntoIterator<Item = TransactionId>>(&mut self, transaction_ids: I, should_throttle: bool) {
self.transaction_ids.enqueue_chunk(transaction_ids);

let now = Instant::now();
Expand All @@ -89,13 +89,18 @@ impl TransactionsSpread {
let ids = self.transaction_ids.dequeue_chunk(MAX_INV_PER_TX_INV_MSG).map(|x| x.into()).collect_vec();
debug!("Transaction propagation: broadcasting {} transactions", ids.len());
let msg = make_message!(Payload::InvTransactions, InvTransactionsMessage { ids });
self.broadcast(msg).await;
self.broadcast(msg, should_throttle).await;
}

self.last_broadcast_time = Instant::now();
}

async fn broadcast(&self, msg: KaspadMessage) {
self.hub.broadcast(msg).await
async fn broadcast(&self, msg: KaspadMessage, should_throttle: bool) {
if should_throttle {
// TODO: Figure out a better number
self.hub.broadcast_to_some_peers(msg, 8).await
} else {
self.hub.broadcast(msg).await
}
}
}
63 changes: 55 additions & 8 deletions protocol/flows/src/v5/txrelay/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ use crate::{
};
use kaspa_consensus_core::tx::{Transaction, TransactionId};
use kaspa_consensusmanager::ConsensusProxy;
use kaspa_core::{time::unix_now, warn};
use kaspa_mining::{
errors::MiningManagerError,
mempool::{
errors::RuleError,
tx::{Orphan, Priority},
},
model::tx_query::TransactionQuery,
MempoolCountersSnapshot,
};
use kaspa_p2p_lib::{
common::{ProtocolError, DEFAULT_TIMEOUT},
Expand All @@ -22,6 +24,8 @@ use kaspa_p2p_lib::{
use std::sync::Arc;
use tokio::time::timeout;

pub(crate) const MAX_TPS_THRESHOLD: u64 = 3000;

enum Response {
Transaction(Transaction),
NotFound(TransactionId),
Expand Down Expand Up @@ -69,7 +73,7 @@ impl RelayTransactionsFlow {
pub fn invs_channel_size() -> usize {
// TODO: reevaluate when the node is fully functional and later when the network tx rate increases
// Note: in go-kaspad we have 10,000 for this channel combined with tx channel.
8192
4096
}

pub fn txs_channel_size() -> usize {
Expand All @@ -80,7 +84,31 @@ impl RelayTransactionsFlow {

async fn start_impl(&mut self) -> Result<(), ProtocolError> {
// trace!("Starting relay transactions flow with {}", self.router.identity());
let mut last_checked_time = unix_now();
let mut curr_snapshot = self.ctx.mining_manager().clone().snapshot();
let mut should_throttle = false;

loop {
// TODO: Extract should_throttle logic to a separate function
let now = unix_now();
if now - last_checked_time > 10000 {
let next_snapshot = self.ctx.mining_manager().clone().snapshot();
let snapshot_delta = &next_snapshot - &curr_snapshot;

last_checked_time = now;
curr_snapshot = next_snapshot;

if snapshot_delta.low_priority_tx_counts > 0 {
let tps = snapshot_delta.low_priority_tx_counts / self.ctx.config.params.bps();
if !should_throttle && tps > MAX_TPS_THRESHOLD {
warn!("P2P tx relay threshold exceeded. Throttling relay. Current: {}, Max: {}", tps, MAX_TPS_THRESHOLD);
should_throttle = true;
} else if should_throttle && tps < MAX_TPS_THRESHOLD / 2 {
warn!("P2P tx relay threshold back to normal. Current: {}, Max: {}", tps, MAX_TPS_THRESHOLD);
should_throttle = false;
}
}
}
// Loop over incoming block inv messages
let inv: Vec<TransactionId> = dequeue!(self.invs_route, Payload::InvTransactions)?.try_into()?;
// trace!("Receive an inv message from {} with {} transaction ids", self.router.identity(), inv.len());
Expand All @@ -96,28 +124,43 @@ impl RelayTransactionsFlow {
continue;
}

let requests = self.request_transactions(inv).await?;
self.receive_transactions(session, requests).await?;
let requests = self.request_transactions(inv, should_throttle, &curr_snapshot).await?;
self.receive_transactions(session, requests, should_throttle).await?;
}
}

async fn request_transactions(
&self,
transaction_ids: Vec<TransactionId>,
should_throttle: bool,
curr_snapshot: &MempoolCountersSnapshot,
) -> Result<Vec<RequestScope<TransactionId>>, ProtocolError> {
// Build a vector with the transaction ids unknown in the mempool and not already requested
// by another peer
let transaction_ids = self.ctx.mining_manager().clone().unknown_transactions(transaction_ids).await;
let mut requests = Vec::new();
let snapshot_delta = curr_snapshot - &self.ctx.mining_manager().clone().snapshot();

// To reduce the P2P TPS to below the threshold, we need to request up to a max of
// whatever the balances overage. If MAX_TPS_THRESHOLD is 3000 and the current TPS is 4000,
// then we can only request up to 2000 (MAX - (4000 - 3000)) to average out into the threshold.
let curr_p2p_tps = 1000 * snapshot_delta.low_priority_tx_counts / (snapshot_delta.elapsed_time.as_millis().max(1) as u64);
let overage = if should_throttle && curr_p2p_tps > MAX_TPS_THRESHOLD { curr_p2p_tps - MAX_TPS_THRESHOLD } else { 0 };

let limit = MAX_TPS_THRESHOLD.saturating_sub(overage);

for transaction_id in transaction_ids {
if let Some(req) = self.ctx.try_adding_transaction_request(transaction_id) {
requests.push(req);
}

if should_throttle && requests.len() >= limit as usize {
break;
}
}

// Request the transactions
if !requests.is_empty() {
// TODO: determine if there should be a limit to the number of ids per message
// trace!("Send a request to {} with {} transaction ids", self.router.identity(), requests.len());
self.router
.enqueue(make_message!(
Expand Down Expand Up @@ -160,6 +203,7 @@ impl RelayTransactionsFlow {
&mut self,
consensus: ConsensusProxy,
requests: Vec<RequestScope<TransactionId>>,
should_throttle: bool,
) -> Result<(), ProtocolError> {
let mut transactions: Vec<Transaction> = Vec::with_capacity(requests.len());
for request in requests {
Expand Down Expand Up @@ -200,10 +244,13 @@ impl RelayTransactionsFlow {
}

self.ctx
.broadcast_transactions(insert_results.into_iter().filter_map(|res| match res {
Ok(x) => Some(x.id()),
Err(_) => None,
}))
.broadcast_transactions(
insert_results.into_iter().filter_map(|res| match res {
Ok(x) => Some(x.id()),
Err(_) => None,
}),
should_throttle,
)
.await;

Ok(())
Expand Down
1 change: 1 addition & 0 deletions protocol/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ itertools.workspace = true
log.workspace = true
parking_lot.workspace = true
prost.workspace = true
rand.workspace = true
seqlock.workspace = true
serde.workspace = true
thiserror.workspace = true
Expand Down
13 changes: 13 additions & 0 deletions protocol/p2p/src/core/hub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
use tokio::sync::mpsc::Receiver as MpscReceiver;

use super::peer::PeerKey;
use rand::seq::SliceRandom;

#[derive(Debug)]
pub(crate) enum HubEvent {
Expand Down Expand Up @@ -103,6 +104,18 @@ impl Hub {
}
}

/// Broadcast a message to some peers given a percentage
pub async fn broadcast_to_some_peers(&self, msg: KaspadMessage, num_peers: usize) {
assert!(num_peers > 0);
let peers = self.peers.read().values().cloned().collect::<Vec<_>>();
// TODO: At least some of the peers should be outbound, because an attacker can gain less control
// over the set of outbound peers.
let peers = peers.choose_multiple(&mut rand::thread_rng(), num_peers).cloned().collect::<Vec<_>>();
for router in peers {
let _ = router.enqueue(msg.clone()).await;
}
}

/// Broadcast a vector of messages to all peers
pub async fn broadcast_many(&self, msgs: Vec<KaspadMessage>) {
if msgs.is_empty() {
Expand Down

0 comments on commit 5bc2146

Please sign in to comment.