From ef4f19693c797bc1b3a9d8495ce7d07e7aec4dfd Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Thu, 9 Jun 2022 21:53:00 +0000 Subject: [PATCH] client: Remove static connection cache, plumb it instead (backport #25667) (#25873) * client: Remove static connection cache, plumb it instead (#25667) * client: Remove static connection cache, plumb it instead * Add TpuClient::new_with_connection_cache to not break downstream * Refactor get_connection and RwLock into ConnectionCache * Fix merge conflicts from new async TpuClient * Remove `ConnectionCache::set_use_quic` * Move DEFAULT_TPU_USE_QUIC to client, use ConnectionCache::default() (cherry picked from commit 79a8ecd0ac3b31ed25fa355fdb77b2c535347f78) # Conflicts: # banking-bench/Cargo.toml # banks-server/Cargo.toml # bench-tps/src/main.rs # bench-tps/tests/bench_tps.rs # client/src/connection_cache.rs # client/src/nonblocking/tpu_client.rs # client/src/thin_client.rs # client/src/tpu_client.rs # core/src/banking_stage.rs # core/src/tvu.rs # core/src/validator.rs # dos/src/cli.rs # dos/src/main.rs # gossip/src/gossip_service.rs # local-cluster/src/cluster_tests.rs # local-cluster/src/local_cluster.rs # local-cluster/tests/local_cluster.rs * Fix merge conflicts Co-authored-by: Jon Cinque --- Cargo.lock | 3 + banking-bench/Cargo.toml | 1 + banking-bench/src/main.rs | 9 + banks-server/Cargo.toml | 1 + banks-server/src/banks_server.rs | 6 +- banks-server/src/rpc_banks_service.rs | 18 +- bench-tps/src/cli.rs | 3 +- bench-tps/src/main.rs | 41 ++- bench-tps/tests/bench_tps.rs | 22 +- cli/src/program.rs | 5 +- client/src/connection_cache.rs | 319 +++++++++--------- client/src/thin_client.rs | 47 +-- client/src/tpu_client.rs | 17 +- core/benches/banking_stage.rs | 2 + core/src/banking_stage.rs | 25 +- core/src/tpu.rs | 3 + core/src/tvu.rs | 8 +- core/src/validator.rs | 12 +- core/src/warm_quic_cache_service.rs | 5 +- gossip/src/gossip_service.rs | 26 +- local-cluster/src/cluster_tests.rs | 57 +++- local-cluster/src/local_cluster.rs | 42 ++- local-cluster/tests/common.rs | 1 + local-cluster/tests/local_cluster.rs | 32 +- local-cluster/tests/local_cluster_slow_2.rs | 3 + programs/bpf/Cargo.lock | 1 + replica-node/Cargo.toml | 1 + replica-node/src/replica_node.rs | 3 + rpc-test/tests/rpc.rs | 19 +- rpc/src/rpc.rs | 53 ++- rpc/src/rpc_service.rs | 6 +- .../src/send_transaction_service.rs | 65 +++- test-validator/src/lib.rs | 4 +- validator/src/main.rs | 1 - 34 files changed, 561 insertions(+), 300 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0cf620de3cb04d..104a67c4354ff0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4410,6 +4410,7 @@ dependencies = [ "log", "rand 0.7.3", "rayon", + "solana-client", "solana-core", "solana-gossip", "solana-ledger", @@ -4457,6 +4458,7 @@ dependencies = [ "crossbeam-channel", "futures 0.3.21", "solana-banks-interface", + "solana-client", "solana-runtime", "solana-sdk 1.10.25", "solana-send-transaction-service", @@ -5649,6 +5651,7 @@ dependencies = [ "rand 0.7.3", "serial_test", "solana-clap-utils", + "solana-client", "solana-core", "solana-download-utils", "solana-genesis-utils", diff --git a/banking-bench/Cargo.toml b/banking-bench/Cargo.toml index 0a3eb3bb63d251..7f7799d6d19493 100644 --- a/banking-bench/Cargo.toml +++ b/banking-bench/Cargo.toml @@ -14,6 +14,7 @@ crossbeam-channel = "0.5" log = "0.4.14" rand = "0.7.0" rayon = "1.5.1" +solana-client = { path = "../client", version = "=1.10.25" } solana-core = { path = "../core", version = "=1.10.25" } solana-gossip = { path = "../gossip", version = "=1.10.25" } solana-ledger = { path = "../ledger", version = "=1.10.25" } diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 23c29e9e6f3ad9..404a71ac12776e 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -5,6 +5,7 @@ use { log::*, rand::{thread_rng, Rng}, rayon::prelude::*, + solana_client::connection_cache::ConnectionCache, solana_core::banking_stage::BankingStage, solana_gossip::cluster_info::{ClusterInfo, Node}, solana_ledger::{ @@ -212,6 +213,12 @@ fn main() { .takes_value(true) .help("Number of threads to use in the banking stage"), ) + .arg( + Arg::new("tpu_use_quic") + .long("tpu-use-quic") + .takes_value(false) + .help("Forward messages to TPU using QUIC"), + ) .get_matches(); let num_banking_threads = matches @@ -334,6 +341,7 @@ fn main() { SocketAddrSpace::Unspecified, ); let cluster_info = Arc::new(cluster_info); + let tpu_use_quic = matches.is_present("tpu_use_quic"); let banking_stage = BankingStage::new_num_threads( &cluster_info, &poh_recorder, @@ -344,6 +352,7 @@ fn main() { None, replay_vote_sender, Arc::new(RwLock::new(CostModel::default())), + Arc::new(ConnectionCache::new(tpu_use_quic)), ); poh_recorder.lock().unwrap().set_bank(&bank); diff --git a/banks-server/Cargo.toml b/banks-server/Cargo.toml index 88bd4faf27803a..fc2a87bb5ee19c 100644 --- a/banks-server/Cargo.toml +++ b/banks-server/Cargo.toml @@ -14,6 +14,7 @@ bincode = "1.3.3" crossbeam-channel = "0.5" futures = "0.3" solana-banks-interface = { path = "../banks-interface", version = "=1.10.25" } +solana-client = { path = "../client", version = "=1.10.25" } solana-runtime = { path = "../runtime", version = "=1.10.25" } solana-sdk = { path = "../sdk", version = "=1.10.25" } solana-send-transaction-service = { path = "../send-transaction-service", version = "=1.10.25" } diff --git a/banks-server/src/banks_server.rs b/banks-server/src/banks_server.rs index 8a55677425f7e9..606cb50352aa66 100644 --- a/banks-server/src/banks_server.rs +++ b/banks-server/src/banks_server.rs @@ -6,6 +6,7 @@ use { Banks, BanksRequest, BanksResponse, BanksTransactionResultWithSimulation, TransactionConfirmationStatus, TransactionSimulationDetails, TransactionStatus, }, + solana_client::connection_cache::ConnectionCache, solana_runtime::{ bank::{Bank, TransactionSimulationResult}, bank_forks::BankForks, @@ -24,7 +25,7 @@ use { transaction::{self, SanitizedTransaction, Transaction}, }, solana_send_transaction_service::{ - send_transaction_service::{SendTransactionService, TransactionInfo, DEFAULT_TPU_USE_QUIC}, + send_transaction_service::{SendTransactionService, TransactionInfo}, tpu_info::NullTpuInfo, }, std::{ @@ -375,6 +376,7 @@ pub async fn start_tcp_server( tpu_addr: SocketAddr, bank_forks: Arc>, block_commitment_cache: Arc>, + connection_cache: Arc, ) -> io::Result<()> { // Note: These settings are copied straight from the tarpc example. let server = tcp::listen(listen_addr, Bincode::default) @@ -399,9 +401,9 @@ pub async fn start_tcp_server( &bank_forks, None, receiver, + &connection_cache, 5_000, 0, - DEFAULT_TPU_USE_QUIC, ); let server = BanksServer::new( diff --git a/banks-server/src/rpc_banks_service.rs b/banks-server/src/rpc_banks_service.rs index bc328e18a74fe8..822798dd1ffd62 100644 --- a/banks-server/src/rpc_banks_service.rs +++ b/banks-server/src/rpc_banks_service.rs @@ -3,6 +3,7 @@ use { crate::banks_server::start_tcp_server, futures::{future::FutureExt, pin_mut, prelude::stream::StreamExt, select}, + solana_client::connection_cache::ConnectionCache, solana_runtime::{bank_forks::BankForks, commitment::BlockCommitmentCache}, std::{ net::SocketAddr, @@ -29,6 +30,7 @@ async fn start_abortable_tcp_server( tpu_addr: SocketAddr, bank_forks: Arc>, block_commitment_cache: Arc>, + connection_cache: Arc, exit: Arc, ) { let server = start_tcp_server( @@ -36,6 +38,7 @@ async fn start_abortable_tcp_server( tpu_addr, bank_forks.clone(), block_commitment_cache.clone(), + connection_cache, ) .fuse(); let interval = IntervalStream::new(time::interval(Duration::from_millis(100))).fuse(); @@ -58,6 +61,7 @@ impl RpcBanksService { tpu_addr: SocketAddr, bank_forks: Arc>, block_commitment_cache: Arc>, + connection_cache: Arc, exit: Arc, ) { let server = start_abortable_tcp_server( @@ -65,6 +69,7 @@ impl RpcBanksService { tpu_addr, bank_forks, block_commitment_cache, + connection_cache, exit, ); Runtime::new().unwrap().block_on(server); @@ -75,10 +80,12 @@ impl RpcBanksService { tpu_addr: SocketAddr, bank_forks: &Arc>, block_commitment_cache: &Arc>, + connection_cache: &Arc, exit: &Arc, ) -> Self { let bank_forks = bank_forks.clone(); let block_commitment_cache = block_commitment_cache.clone(); + let connection_cache = connection_cache.clone(); let exit = exit.clone(); let thread_hdl = Builder::new() .name("solana-rpc-banks".to_string()) @@ -88,6 +95,7 @@ impl RpcBanksService { tpu_addr, bank_forks, block_commitment_cache, + connection_cache, exit, ) }) @@ -109,9 +117,17 @@ mod tests { fn test_rpc_banks_server_exit() { let bank_forks = Arc::new(RwLock::new(BankForks::new(Bank::default_for_tests()))); let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); + let connection_cache = Arc::new(ConnectionCache::default()); let exit = Arc::new(AtomicBool::new(false)); let addr = "127.0.0.1:0".parse().unwrap(); - let service = RpcBanksService::new(addr, addr, &bank_forks, &block_commitment_cache, &exit); + let service = RpcBanksService::new( + addr, + addr, + &bank_forks, + &block_commitment_cache, + &connection_cache, + &exit, + ); exit.store(true, Ordering::Relaxed); service.join().unwrap(); } diff --git a/bench-tps/src/cli.rs b/bench-tps/src/cli.rs index 950640d49ad095..5103c9f1d2c501 100644 --- a/bench-tps/src/cli.rs +++ b/bench-tps/src/cli.rs @@ -2,6 +2,7 @@ use { clap::{crate_description, crate_name, App, Arg, ArgMatches}, solana_clap_utils::input_validators::{is_url, is_url_or_moniker}, solana_cli_config::{ConfigInput, CONFIG_FILE}, + solana_client::connection_cache::DEFAULT_TPU_USE_QUIC, solana_sdk::{ fee_calculator::FeeRateGovernor, pubkey::Pubkey, @@ -77,7 +78,7 @@ impl Default for Config { target_slots_per_epoch: 0, target_node: None, external_client_type: ExternalClientType::default(), - use_quic: false, + use_quic: DEFAULT_TPU_USE_QUIC, } } } diff --git a/bench-tps/src/main.rs b/bench-tps/src/main.rs index 0cf38e7e4021c6..3b0e2e1a21a08e 100644 --- a/bench-tps/src/main.rs +++ b/bench-tps/src/main.rs @@ -7,7 +7,7 @@ use { keypairs::get_keypairs, }, solana_client::{ - connection_cache, + connection_cache::ConnectionCache, rpc_client::RpcClient, tpu_client::{TpuClient, TpuClientConfig}, }, @@ -105,11 +105,10 @@ fn main() { eprintln!("Failed to discover {} nodes: {:?}", num_nodes, err); exit(1); }); - if *use_quic { - connection_cache::set_use_quic(true); - } + let connection_cache = Arc::new(ConnectionCache::new(*use_quic)); let client = if *multi_client { - let (client, num_clients) = get_multi_client(&nodes, &SocketAddrSpace::Unspecified); + let (client, num_clients) = + get_multi_client(&nodes, &SocketAddrSpace::Unspecified, connection_cache); if nodes.len() < num_clients { eprintln!( "Error: Insufficient nodes discovered. Expecting {} or more", @@ -123,8 +122,11 @@ fn main() { let mut target_client = None; for node in nodes { if node.id == *target_node { - target_client = - Some(Arc::new(get_client(&[node], &SocketAddrSpace::Unspecified))); + target_client = Some(Arc::new(get_client( + &[node], + &SocketAddrSpace::Unspecified, + connection_cache, + ))); break; } } @@ -133,7 +135,11 @@ fn main() { exit(1); }) } else { - Arc::new(get_client(&nodes, &SocketAddrSpace::Unspecified)) + Arc::new(get_client( + &nodes, + &SocketAddrSpace::Unspecified, + connection_cache, + )) }; let keypairs = get_keypairs( client.clone(), @@ -150,15 +156,18 @@ fn main() { json_rpc_url.to_string(), CommitmentConfig::confirmed(), )); - if *use_quic { - connection_cache::set_use_quic(true); - } + let connection_cache = Arc::new(ConnectionCache::new(*use_quic)); let client = Arc::new( - TpuClient::new(rpc_client, websocket_url, TpuClientConfig::default()) - .unwrap_or_else(|err| { - eprintln!("Could not create TpuClient {:?}", err); - exit(1); - }), + TpuClient::new_with_connection_cache( + rpc_client, + websocket_url, + TpuClientConfig::default(), + connection_cache, + ) + .unwrap_or_else(|err| { + eprintln!("Could not create TpuClient {:?}", err); + exit(1); + }), ); let keypairs = get_keypairs( client.clone(), diff --git a/bench-tps/tests/bench_tps.rs b/bench-tps/tests/bench_tps.rs index 680aec6b231fbb..6d1c32b47a2629 100644 --- a/bench-tps/tests/bench_tps.rs +++ b/bench-tps/tests/bench_tps.rs @@ -6,8 +6,9 @@ use { cli::Config, }, solana_client::{ + connection_cache::ConnectionCache, rpc_client::RpcClient, - thin_client::create_client, + thin_client::ThinClient, tpu_client::{TpuClient, TpuClientConfig}, }, solana_core::validator::ValidatorConfig, @@ -58,10 +59,11 @@ fn test_bench_tps_local_cluster(config: Config) { cluster.transfer(&cluster.funding_keypair, &faucet_pubkey, 100_000_000); - let client = Arc::new(create_client(( + let client = Arc::new(ThinClient::new( cluster.entry_point_info.rpc, cluster.entry_point_info.tpu, - ))); + cluster.connection_cache.clone(), + )); let lamports_per_account = 100; @@ -96,9 +98,17 @@ fn test_bench_tps_test_validator(config: Config) { CommitmentConfig::processed(), )); let websocket_url = test_validator.rpc_pubsub_url(); - - let client = - Arc::new(TpuClient::new(rpc_client, &websocket_url, TpuClientConfig::default()).unwrap()); + let connection_cache = Arc::new(ConnectionCache::default()); + + let client = Arc::new( + TpuClient::new_with_connection_cache( + rpc_client, + &websocket_url, + TpuClientConfig::default(), + connection_cache, + ) + .unwrap(), + ); let lamports_per_account = 100; diff --git a/cli/src/program.rs b/cli/src/program.rs index b8481c414bdf64..4c55f71c981656 100644 --- a/cli/src/program.rs +++ b/cli/src/program.rs @@ -19,6 +19,7 @@ use { }, solana_client::{ client_error::ClientErrorKind, + connection_cache::ConnectionCache, rpc_client::RpcClient, rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcSendTransactionConfig}, rpc_filter::{Memcmp, MemcmpEncodedBytes, RpcFilterType}, @@ -2215,10 +2216,12 @@ fn send_deploy_messages( if let Some(write_messages) = write_messages { if let Some(write_signer) = write_signer { trace!("Writing program data"); - let tpu_client = TpuClient::new( + let connection_cache = Arc::new(ConnectionCache::default()); + let tpu_client = TpuClient::new_with_connection_cache( rpc_client.clone(), &config.websocket_url, TpuClientConfig::default(), + connection_cache, )?; let transaction_errors = tpu_client .send_and_confirm_messages_with_spinner( diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index a9090808d7e4ea..63d1a72424d391 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -4,14 +4,13 @@ use { }, enum_dispatch::enum_dispatch, indexmap::map::IndexMap, - lazy_static::lazy_static, rand::{thread_rng, Rng}, solana_measure::measure::Measure, solana_sdk::timing::AtomicInterval, std::{ net::SocketAddr, sync::{ - atomic::{AtomicU64, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, Arc, RwLock, }, }, @@ -26,6 +25,10 @@ pub enum Connection { QuicTpuConnection, } +/// Used to decide whether the TPU and underlying connection cache should use +/// QUIC connections. +pub const DEFAULT_TPU_USE_QUIC: bool = false; + #[derive(Default)] pub struct ConnectionCacheStats { cache_hits: AtomicU64, @@ -215,35 +218,161 @@ impl ConnectionCacheStats { } } -struct ConnectionMap { - map: IndexMap>, +pub struct ConnectionCache { + map: RwLock>>, stats: Arc, last_stats: AtomicInterval, - use_quic: bool, + use_quic: AtomicBool, } -impl ConnectionMap { - pub fn new() -> Self { +impl ConnectionCache { + pub fn new(use_quic: bool) -> Self { Self { - map: IndexMap::with_capacity(MAX_CONNECTIONS), - stats: Arc::new(ConnectionCacheStats::default()), - last_stats: AtomicInterval::default(), - use_quic: false, + use_quic: AtomicBool::new(use_quic), + ..Self::default() } } - pub fn set_use_quic(&mut self, use_quic: bool) { - self.use_quic = use_quic; + pub fn get_use_quic(&self) -> bool { + self.use_quic.load(Ordering::Relaxed) } -} -lazy_static! { - static ref CONNECTION_MAP: RwLock = RwLock::new(ConnectionMap::new()); -} + fn get_or_add_connection(&self, addr: &SocketAddr) -> GetConnectionResult { + let mut get_connection_map_lock_measure = Measure::start("get_connection_map_lock_measure"); + let map = self.map.read().unwrap(); + get_connection_map_lock_measure.stop(); + + let mut lock_timing_ms = get_connection_map_lock_measure.as_ms(); + + let report_stats = self + .last_stats + .should_update(CONNECTION_STAT_SUBMISSION_INTERVAL); + + let mut get_connection_map_measure = Measure::start("get_connection_hit_measure"); + let (connection, cache_hit, connection_cache_stats, num_evictions, eviction_timing_ms) = + match map.get(addr) { + Some(connection) => (connection.clone(), true, self.stats.clone(), 0, 0), + None => { + // Upgrade to write access by dropping read lock and acquire write lock + drop(map); + let mut get_connection_map_lock_measure = + Measure::start("get_connection_map_lock_measure"); + let mut map = self.map.write().unwrap(); + get_connection_map_lock_measure.stop(); + + lock_timing_ms = + lock_timing_ms.saturating_add(get_connection_map_lock_measure.as_ms()); + + // Read again, as it is possible that between read lock dropped and the write lock acquired + // another thread could have setup the connection. + match map.get(addr) { + Some(connection) => (connection.clone(), true, self.stats.clone(), 0, 0), + None => { + let connection: Connection = if self.use_quic.load(Ordering::Relaxed) { + QuicTpuConnection::new(*addr, self.stats.clone()).into() + } else { + UdpTpuConnection::new(*addr, self.stats.clone()).into() + }; + + let connection = Arc::new(connection); + + // evict a connection if the cache is reaching upper bounds + let mut num_evictions = 0; + let mut get_connection_cache_eviction_measure = + Measure::start("get_connection_cache_eviction_measure"); + while map.len() >= MAX_CONNECTIONS { + let mut rng = thread_rng(); + let n = rng.gen_range(0, MAX_CONNECTIONS); + map.swap_remove_index(n); + num_evictions += 1; + } + get_connection_cache_eviction_measure.stop(); + + map.insert(*addr, connection.clone()); + ( + connection, + false, + self.stats.clone(), + num_evictions, + get_connection_cache_eviction_measure.as_ms(), + ) + } + } + } + }; + get_connection_map_measure.stop(); + + GetConnectionResult { + connection, + cache_hit, + report_stats, + map_timing_ms: get_connection_map_measure.as_ms(), + lock_timing_ms, + connection_cache_stats, + num_evictions, + eviction_timing_ms, + } + } + + pub fn get_connection(&self, addr: &SocketAddr) -> Arc { + let mut get_connection_measure = Measure::start("get_connection_measure"); + let GetConnectionResult { + connection, + cache_hit, + report_stats, + map_timing_ms, + lock_timing_ms, + connection_cache_stats, + num_evictions, + eviction_timing_ms, + } = self.get_or_add_connection(addr); + + if report_stats { + connection_cache_stats.report(); + } + + if cache_hit { + connection_cache_stats + .cache_hits + .fetch_add(1, Ordering::Relaxed); + connection_cache_stats + .get_connection_hit_ms + .fetch_add(map_timing_ms, Ordering::Relaxed); + } else { + connection_cache_stats + .cache_misses + .fetch_add(1, Ordering::Relaxed); + connection_cache_stats + .get_connection_miss_ms + .fetch_add(map_timing_ms, Ordering::Relaxed); + connection_cache_stats + .cache_evictions + .fetch_add(num_evictions, Ordering::Relaxed); + connection_cache_stats + .eviction_time_ms + .fetch_add(eviction_timing_ms, Ordering::Relaxed); + } -pub fn set_use_quic(use_quic: bool) { - let mut map = (*CONNECTION_MAP).write().unwrap(); - map.set_use_quic(use_quic); + get_connection_measure.stop(); + connection_cache_stats + .get_connection_lock_ms + .fetch_add(lock_timing_ms, Ordering::Relaxed); + connection_cache_stats + .get_connection_ms + .fetch_add(get_connection_measure.as_ms(), Ordering::Relaxed); + + connection + } +} +impl Default for ConnectionCache { + fn default() -> Self { + Self { + map: RwLock::new(IndexMap::with_capacity(MAX_CONNECTIONS)), + stats: Arc::new(ConnectionCacheStats::default()), + last_stats: AtomicInterval::default(), + use_quic: AtomicBool::new(DEFAULT_TPU_USE_QUIC), + } + } } struct GetConnectionResult { @@ -257,142 +386,11 @@ struct GetConnectionResult { eviction_timing_ms: u64, } -fn get_or_add_connection(addr: &SocketAddr) -> GetConnectionResult { - let mut get_connection_map_lock_measure = Measure::start("get_connection_map_lock_measure"); - let map = (*CONNECTION_MAP).read().unwrap(); - get_connection_map_lock_measure.stop(); - - let mut lock_timing_ms = get_connection_map_lock_measure.as_ms(); - - let report_stats = map - .last_stats - .should_update(CONNECTION_STAT_SUBMISSION_INTERVAL); - - let mut get_connection_map_measure = Measure::start("get_connection_hit_measure"); - - let (connection, cache_hit, connection_cache_stats, num_evictions, eviction_timing_ms) = - match map.map.get(addr) { - Some(connection) => (connection.clone(), true, map.stats.clone(), 0, 0), - None => { - // Upgrade to write access by dropping read lock and acquire write lock - drop(map); - let mut get_connection_map_lock_measure = - Measure::start("get_connection_map_lock_measure"); - let mut map = (*CONNECTION_MAP).write().unwrap(); - get_connection_map_lock_measure.stop(); - - lock_timing_ms = - lock_timing_ms.saturating_add(get_connection_map_lock_measure.as_ms()); - - // Read again, as it is possible that between read lock dropped and the write lock acquired - // another thread could have setup the connection. - match map.map.get(addr) { - Some(connection) => (connection.clone(), true, map.stats.clone(), 0, 0), - None => { - let connection: Connection = if map.use_quic { - QuicTpuConnection::new(*addr, map.stats.clone()).into() - } else { - UdpTpuConnection::new(*addr, map.stats.clone()).into() - }; - - let connection = Arc::new(connection); - - // evict a connection if the cache is reaching upper bounds - let mut num_evictions = 0; - let mut get_connection_cache_eviction_measure = - Measure::start("get_connection_cache_eviction_measure"); - while map.map.len() >= MAX_CONNECTIONS { - let mut rng = thread_rng(); - let n = rng.gen_range(0, MAX_CONNECTIONS); - map.map.swap_remove_index(n); - num_evictions += 1; - } - get_connection_cache_eviction_measure.stop(); - - map.map.insert(*addr, connection.clone()); - ( - connection, - false, - map.stats.clone(), - num_evictions, - get_connection_cache_eviction_measure.as_ms(), - ) - } - } - } - }; - - get_connection_map_measure.stop(); - - GetConnectionResult { - connection, - cache_hit, - report_stats, - map_timing_ms: get_connection_map_measure.as_ms(), - lock_timing_ms, - connection_cache_stats, - num_evictions, - eviction_timing_ms, - } -} - -// TODO: see https://github.com/solana-labs/solana/issues/23661 -// remove lazy_static and optimize and refactor this -pub fn get_connection(addr: &SocketAddr) -> Arc { - let mut get_connection_measure = Measure::start("get_connection_measure"); - let GetConnectionResult { - connection, - cache_hit, - report_stats, - map_timing_ms, - lock_timing_ms, - connection_cache_stats, - num_evictions, - eviction_timing_ms, - } = get_or_add_connection(addr); - - if report_stats { - connection_cache_stats.report(); - } - - if cache_hit { - connection_cache_stats - .cache_hits - .fetch_add(1, Ordering::Relaxed); - connection_cache_stats - .get_connection_hit_ms - .fetch_add(map_timing_ms, Ordering::Relaxed); - } else { - connection_cache_stats - .cache_misses - .fetch_add(1, Ordering::Relaxed); - connection_cache_stats - .get_connection_miss_ms - .fetch_add(map_timing_ms, Ordering::Relaxed); - connection_cache_stats - .cache_evictions - .fetch_add(num_evictions, Ordering::Relaxed); - connection_cache_stats - .eviction_time_ms - .fetch_add(eviction_timing_ms, Ordering::Relaxed); - } - - get_connection_measure.stop(); - connection_cache_stats - .get_connection_lock_ms - .fetch_add(lock_timing_ms, Ordering::Relaxed); - connection_cache_stats - .get_connection_ms - .fetch_add(get_connection_measure.as_ms(), Ordering::Relaxed); - - connection -} - #[cfg(test)] mod tests { use { crate::{ - connection_cache::{get_connection, CONNECTION_MAP, MAX_CONNECTIONS}, + connection_cache::{ConnectionCache, MAX_CONNECTIONS}, tpu_connection::TpuConnection, }, rand::{Rng, SeedableRng}, @@ -426,28 +424,29 @@ mod tests { // we can actually connect to those addresses - TPUConnection implementations should either // be lazy and not connect until first use or handle connection errors somehow // (without crashing, as would be required in a real practical validator) + let connection_cache = ConnectionCache::default(); let addrs = (0..MAX_CONNECTIONS) .into_iter() .map(|_| { let addr = get_addr(&mut rng); - get_connection(&addr); + connection_cache.get_connection(&addr); addr }) .collect::>(); { - let map = (*CONNECTION_MAP).read().unwrap(); - assert!(map.map.len() == MAX_CONNECTIONS); + let map = connection_cache.map.read().unwrap(); + assert!(map.len() == MAX_CONNECTIONS); addrs.iter().for_each(|a| { - let conn = map.map.get(a).expect("Address not found"); + let conn = map.get(a).expect("Address not found"); assert!(a.ip() == conn.tpu_addr().ip()); }); } let addr = get_addr(&mut rng); - get_connection(&addr); + connection_cache.get_connection(&addr); - let map = (*CONNECTION_MAP).read().unwrap(); - assert!(map.map.len() == MAX_CONNECTIONS); - let _conn = map.map.get(&addr).expect("Address not found"); + let map = connection_cache.map.read().unwrap(); + assert!(map.len() == MAX_CONNECTIONS); + let _conn = map.get(&addr).expect("Address not found"); } } diff --git a/client/src/thin_client.rs b/client/src/thin_client.rs index 7d5e090d941ecf..d0807dc2d44dcf 100644 --- a/client/src/thin_client.rs +++ b/client/src/thin_client.rs @@ -5,7 +5,7 @@ use { crate::{ - connection_cache::get_connection, rpc_client::RpcClient, + connection_cache::ConnectionCache, rpc_client::RpcClient, rpc_config::RpcProgramAccountsConfig, rpc_response::Response, tpu_connection::TpuConnection, }, @@ -33,7 +33,7 @@ use { net::SocketAddr, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, - RwLock, + Arc, RwLock, }, time::{Duration, Instant}, }, @@ -123,34 +123,49 @@ pub struct ThinClient { rpc_clients: Vec, tpu_addrs: Vec, optimizer: ClientOptimizer, + connection_cache: Arc, } impl ThinClient { /// Create a new ThinClient that will interface with the Rpc at `rpc_addr` using TCP /// and the Tpu at `tpu_addr` over `transactions_socket` using Quic or UDP /// (currently hardcoded to UDP) - pub fn new(rpc_addr: SocketAddr, tpu_addr: SocketAddr) -> Self { - Self::new_from_client(RpcClient::new_socket(rpc_addr), tpu_addr) + pub fn new( + rpc_addr: SocketAddr, + tpu_addr: SocketAddr, + connection_cache: Arc, + ) -> Self { + Self::new_from_client(RpcClient::new_socket(rpc_addr), tpu_addr, connection_cache) } pub fn new_socket_with_timeout( rpc_addr: SocketAddr, tpu_addr: SocketAddr, timeout: Duration, + connection_cache: Arc, ) -> Self { let rpc_client = RpcClient::new_socket_with_timeout(rpc_addr, timeout); - Self::new_from_client(rpc_client, tpu_addr) + Self::new_from_client(rpc_client, tpu_addr, connection_cache) } - fn new_from_client(rpc_client: RpcClient, tpu_addr: SocketAddr) -> Self { + fn new_from_client( + rpc_client: RpcClient, + tpu_addr: SocketAddr, + connection_cache: Arc, + ) -> Self { Self { rpc_clients: vec![rpc_client], tpu_addrs: vec![tpu_addr], optimizer: ClientOptimizer::new(0), + connection_cache, } } - pub fn new_from_addrs(rpc_addrs: Vec, tpu_addrs: Vec) -> Self { + pub fn new_from_addrs( + rpc_addrs: Vec, + tpu_addrs: Vec, + connection_cache: Arc, + ) -> Self { assert!(!rpc_addrs.is_empty()); assert_eq!(rpc_addrs.len(), tpu_addrs.len()); @@ -160,6 +175,7 @@ impl ThinClient { rpc_clients, tpu_addrs, optimizer, + connection_cache, } } @@ -208,7 +224,7 @@ impl ThinClient { bincode::serialize(&transaction).expect("transaction serialization failed"); while now.elapsed().as_secs() < wait_time as u64 { if num_confirmed == 0 { - let conn = get_connection(self.tpu_addr()); + let conn = self.connection_cache.get_connection(self.tpu_addr()); // Send the transaction if there has been no confirmation (e.g. the first time) conn.send_wire_transaction(&wire_transaction)?; } @@ -594,14 +610,14 @@ impl SyncClient for ThinClient { impl AsyncClient for ThinClient { fn async_send_transaction(&self, transaction: Transaction) -> TransportResult { let transaction = VersionedTransaction::from(transaction); - let conn = get_connection(self.tpu_addr()); + let conn = self.connection_cache.get_connection(self.tpu_addr()); conn.serialize_and_send_transaction(&transaction)?; Ok(transaction.signatures[0]) } fn async_send_batch(&self, batch: Vec) -> TransportResult<()> { let batch: Vec = batch.into_iter().map(Into::into).collect(); - let conn = get_connection(self.tpu_addr()); + let conn = self.connection_cache.get_connection(self.tpu_addr()); conn.par_serialize_and_send_transaction_batch(&batch[..])?; Ok(()) } @@ -637,17 +653,6 @@ impl AsyncClient for ThinClient { } } -pub fn create_client((rpc, tpu): (SocketAddr, SocketAddr)) -> ThinClient { - ThinClient::new(rpc, tpu) -} - -pub fn create_client_with_timeout( - (rpc, tpu): (SocketAddr, SocketAddr), - timeout: Duration, -) -> ThinClient { - ThinClient::new_socket_with_timeout(rpc, tpu, timeout) -} - #[cfg(test)] mod tests { use {super::*, rayon::prelude::*}; diff --git a/client/src/tpu_client.rs b/client/src/tpu_client.rs index f7ea12618fb325..9f47c4c522e0a2 100644 --- a/client/src/tpu_client.rs +++ b/client/src/tpu_client.rs @@ -1,7 +1,7 @@ use { crate::{ client_error::ClientError, - connection_cache::get_connection, + connection_cache::ConnectionCache, pubsub_client::{PubsubClient, PubsubClientError, PubsubClientSubscription}, rpc_client::RpcClient, rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS, @@ -81,6 +81,7 @@ pub struct TpuClient { leader_tpu_service: LeaderTpuService, exit: Arc, rpc_client: Arc, + connection_cache: Arc, } impl TpuClient { @@ -114,7 +115,7 @@ impl TpuClient { .leader_tpu_service .leader_tpu_sockets(self.fanout_slots) { - let conn = get_connection(&tpu_address); + let conn = self.connection_cache.get_connection(&tpu_address); let result = conn.send_wire_transaction_async(wire_transaction.clone()); if let Err(err) = result { last_error = Some(err); @@ -138,6 +139,17 @@ impl TpuClient { rpc_client: Arc, websocket_url: &str, config: TpuClientConfig, + ) -> Result { + let connection_cache = Arc::new(ConnectionCache::default()); + Self::new_with_connection_cache(rpc_client, websocket_url, config, connection_cache) + } + + /// Create a new client that disconnects when dropped + pub fn new_with_connection_cache( + rpc_client: Arc, + websocket_url: &str, + config: TpuClientConfig, + connection_cache: Arc, ) -> Result { let exit = Arc::new(AtomicBool::new(false)); let leader_tpu_service = @@ -149,6 +161,7 @@ impl TpuClient { leader_tpu_service, exit, rpc_client, + connection_cache, }) } diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index fd0babbe1f8e14..9ec70ad79cbf50 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -8,6 +8,7 @@ use { log::*, rand::{thread_rng, Rng}, rayon::prelude::*, + solana_client::connection_cache::ConnectionCache, solana_core::{ banking_stage::{BankingStage, BankingStageStats}, leader_slot_banking_stage_metrics::LeaderSlotMetricsTracker, @@ -230,6 +231,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { None, s, Arc::new(RwLock::new(CostModel::default())), + Arc::new(ConnectionCache::default()), ); poh_recorder.lock().unwrap().set_bank(&bank); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index a4f011e342c896..9b37d28693908a 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -18,7 +18,7 @@ use { itertools::Itertools, min_max_heap::MinMaxHeap, solana_client::{ - connection_cache::get_connection, tpu_connection::TpuConnection, + connection_cache::ConnectionCache, tpu_connection::TpuConnection, udp_client::UdpTpuConnection, }, solana_entry::entry::hash_transactions, @@ -410,6 +410,7 @@ impl BankingStage { transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, cost_model: Arc>, + connection_cache: Arc, ) -> Self { Self::new_num_threads( cluster_info, @@ -421,6 +422,7 @@ impl BankingStage { transaction_status_sender, gossip_vote_sender, cost_model, + connection_cache, ) } @@ -435,6 +437,7 @@ impl BankingStage { transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, cost_model: Arc>, + connection_cache: Arc, ) -> Self { assert!(num_threads >= MIN_TOTAL_THREADS); // Single thread to generate entries from many banks. @@ -466,6 +469,7 @@ impl BankingStage { let gossip_vote_sender = gossip_vote_sender.clone(); let data_budget = data_budget.clone(); let cost_model = cost_model.clone(); + let connection_cache = connection_cache.clone(); Builder::new() .name(format!("solana-banking-stage-tx-{}", i)) .spawn(move || { @@ -481,6 +485,7 @@ impl BankingStage { gossip_vote_sender, &data_budget, cost_model, + connection_cache, ); }) .unwrap() @@ -506,6 +511,7 @@ impl BankingStage { /// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns /// the number of successfully forwarded packets in second part of tuple fn forward_buffered_packets( + connection_cache: &ConnectionCache, forward_option: &ForwardOption, cluster_info: &ClusterInfo, poh_recorder: &Arc>, @@ -570,7 +576,7 @@ impl BankingStage { banking_stage_stats .forwarded_transaction_count .fetch_add(packet_vec_len, Ordering::Relaxed); - get_connection(&addr) + connection_cache.get_connection(&addr) }; let res = conn.send_wire_transaction_batch_async(packet_vec); @@ -882,6 +888,7 @@ impl BankingStage { data_budget: &DataBudget, qos_service: &QosService, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, + connection_cache: &ConnectionCache, ) { let (decision, make_decision_time) = Measure::this( |_| { @@ -955,6 +962,7 @@ impl BankingStage { data_budget, slot_metrics_tracker, banking_stage_stats, + connection_cache, ) }, (), @@ -974,6 +982,7 @@ impl BankingStage { data_budget, slot_metrics_tracker, banking_stage_stats, + connection_cache, ) }, (), @@ -994,6 +1003,7 @@ impl BankingStage { data_budget: &DataBudget, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, banking_stage_stats: &BankingStageStats, + connection_cache: &ConnectionCache, ) { if let ForwardOption::NotForward = forward_option { if !hold { @@ -1006,6 +1016,7 @@ impl BankingStage { Self::filter_valid_packets_for_forwarding(buffered_packet_batches.iter()); let forwardable_packets_len = forwardable_packets.len(); let (_forward_result, sucessful_forwarded_packets_count) = Self::forward_buffered_packets( + connection_cache, forward_option, cluster_info, poh_recorder, @@ -1052,6 +1063,7 @@ impl BankingStage { gossip_vote_sender: ReplayVoteSender, data_budget: &DataBudget, cost_model: Arc>, + connection_cache: Arc, ) { let recorder = poh_recorder.lock().unwrap().recorder(); let mut buffered_packet_batches = UnprocessedPacketBatches::with_capacity(batch_limit); @@ -1079,6 +1091,7 @@ impl BankingStage { data_budget, &qos_service, &mut slot_metrics_tracker, + &connection_cache, ) }, (), @@ -2347,6 +2360,7 @@ mod tests { None, gossip_vote_sender, Arc::new(RwLock::new(CostModel::default())), + Arc::new(ConnectionCache::default()), ); drop(verified_sender); drop(gossip_verified_vote_sender); @@ -2396,6 +2410,7 @@ mod tests { None, gossip_vote_sender, Arc::new(RwLock::new(CostModel::default())), + Arc::new(ConnectionCache::default()), ); trace!("sending bank"); drop(verified_sender); @@ -2470,6 +2485,7 @@ mod tests { None, gossip_vote_sender, Arc::new(RwLock::new(CostModel::default())), + Arc::new(ConnectionCache::default()), ); // fund another account so we can send 2 good transactions in a single batch. @@ -2621,6 +2637,7 @@ mod tests { None, gossip_vote_sender, Arc::new(RwLock::new(CostModel::default())), + Arc::new(ConnectionCache::default()), ); // wait for banking_stage to eat the packets @@ -4101,6 +4118,7 @@ mod tests { ("budget-available", DataBudget::default(), 1), ]; + let connection_cache = ConnectionCache::default(); for (name, data_budget, expected_num_forwarded) in test_cases { let mut unprocessed_packet_batches: UnprocessedPacketBatches = UnprocessedPacketBatches::from_iter( @@ -4117,6 +4135,7 @@ mod tests { &data_budget, &mut LeaderSlotMetricsTracker::new(0), &stats, + &connection_cache, ); recv_socket @@ -4189,6 +4208,7 @@ mod tests { let local_node = Node::new_localhost_with_pubkey(validator_pubkey); let cluster_info = new_test_cluster_info(local_node.info); let recv_socket = &local_node.sockets.tpu_forwards[0]; + let connection_cache = ConnectionCache::default(); let test_cases = vec![ ("not-forward", ForwardOption::NotForward, true, vec![], 2), @@ -4226,6 +4246,7 @@ mod tests { &DataBudget::default(), &mut LeaderSlotMetricsTracker::new(0), &stats, + &connection_cache, ); recv_socket diff --git a/core/src/tpu.rs b/core/src/tpu.rs index bd87542357308c..a3744c77a9a524 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -16,6 +16,7 @@ use { staked_nodes_updater_service::StakedNodesUpdaterService, }, crossbeam_channel::{bounded, unbounded, Receiver, RecvTimeoutError}, + solana_client::connection_cache::ConnectionCache, solana_gossip::cluster_info::ClusterInfo, solana_ledger::{blockstore::Blockstore, blockstore_processor::TransactionStatusSender}, solana_poh::poh_recorder::{PohRecorder, WorkingBankEntry}, @@ -96,6 +97,7 @@ impl Tpu { tpu_coalesce_ms: u64, cluster_confirmed_slot_sender: GossipDuplicateConfirmedSlotsSender, cost_model: &Arc>, + connection_cache: &Arc, keypair: &Keypair, ) -> Self { let TpuSockets { @@ -226,6 +228,7 @@ impl Tpu { transaction_status_sender, replay_vote_sender, cost_model.clone(), + connection_cache.clone(), ); let broadcast_stage = broadcast_type.new_broadcast_stage( diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 3dcbf8bc413699..85801cd8b4231d 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -28,6 +28,7 @@ use { warm_quic_cache_service::WarmQuicCacheService, }, crossbeam_channel::{bounded, unbounded, Receiver, RecvTimeoutError}, + solana_client::connection_cache::ConnectionCache, solana_geyser_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierLock, solana_gossip::cluster_info::ClusterInfo, solana_ledger::{ @@ -155,7 +156,7 @@ impl Tvu { block_metadata_notifier: Option, wait_to_vote_slot: Option, pruned_banks_receiver: DroppedSlotsReceiver, - use_quic: bool, + connection_cache: &Arc, ) -> Self { let TvuSockets { repair: repair_socket, @@ -295,8 +296,9 @@ impl Tvu { bank_forks.clone(), ); - let warm_quic_cache_service = if use_quic { + let warm_quic_cache_service = if connection_cache.get_use_quic() { Some(WarmQuicCacheService::new( + connection_cache.clone(), cluster_info.clone(), poh_recorder.clone(), exit.clone(), @@ -540,7 +542,7 @@ pub mod tests { None, None, pruned_banks_receiver, - false, // use_quic + &Arc::new(ConnectionCache::default()), ); exit.store(true, Ordering::Relaxed); tvu.join().unwrap(); diff --git a/core/src/validator.rs b/core/src/validator.rs index edf1aaf1c37179..5686ef05638296 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -22,6 +22,7 @@ use { }, crossbeam_channel::{bounded, unbounded, Receiver}, rand::{thread_rng, Rng}, + solana_client::connection_cache::ConnectionCache, solana_entry::poh::compute_hash_time_ns, solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService, solana_gossip::{ @@ -667,6 +668,8 @@ impl Validator { ); let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + let connection_cache = Arc::new(ConnectionCache::new(use_quic)); + let rpc_override_health_check = Arc::new(AtomicBool::new(false)); let ( json_rpc_service, @@ -719,6 +722,7 @@ impl Validator { config.send_transaction_service_config.clone(), max_slots.clone(), leader_schedule_cache.clone(), + connection_cache.clone(), max_complete_transaction_status_slot, )), if !config.rpc_config.full_api { @@ -939,7 +943,7 @@ impl Validator { block_metadata_notifier, config.wait_to_vote_slot, pruned_banks_receiver, - use_quic, + &connection_cache, ); let tpu = Tpu::new( @@ -971,6 +975,7 @@ impl Validator { config.tpu_coalesce_ms, cluster_confirmed_slot_sender, &cost_model, + &connection_cache, &identity_keypair, ); @@ -1827,6 +1832,7 @@ mod tests { use { super::*, crossbeam_channel::{bounded, RecvTimeoutError}, + solana_client::connection_cache::DEFAULT_TPU_USE_QUIC, solana_ledger::{create_new_tmp_ledger, genesis_utils::create_genesis_config_with_leader}, solana_sdk::{genesis_config::create_genesis_config, poh_config::PohConfig}, std::{fs::remove_dir_all, thread, time::Duration}, @@ -1862,7 +1868,7 @@ mod tests { true, // should_check_duplicate_instance start_progress.clone(), SocketAddrSpace::Unspecified, - false, // use_quic + DEFAULT_TPU_USE_QUIC, ); assert_eq!( *start_progress.read().unwrap(), @@ -1957,7 +1963,7 @@ mod tests { true, // should_check_duplicate_instance Arc::new(RwLock::new(ValidatorStartProgress::default())), SocketAddrSpace::Unspecified, - false, // use_quic + DEFAULT_TPU_USE_QUIC, ) }) .collect(); diff --git a/core/src/warm_quic_cache_service.rs b/core/src/warm_quic_cache_service.rs index 00be5be7ea45af..86fb9c80dd8b0a 100644 --- a/core/src/warm_quic_cache_service.rs +++ b/core/src/warm_quic_cache_service.rs @@ -3,7 +3,7 @@ use { rand::{thread_rng, Rng}, - solana_client::{connection_cache::get_connection, tpu_connection::TpuConnection}, + solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, solana_gossip::cluster_info::ClusterInfo, solana_poh::poh_recorder::PohRecorder, std::{ @@ -26,6 +26,7 @@ const CACHE_JITTER_SLOT: i64 = 20; impl WarmQuicCacheService { pub fn new( + connection_cache: Arc, cluster_info: Arc, poh_recorder: Arc>, exit: Arc, @@ -48,7 +49,7 @@ impl WarmQuicCacheService { if let Some(addr) = cluster_info .lookup_contact_info(&leader_pubkey, |leader| leader.tpu) { - let conn = get_connection(&addr); + let conn = connection_cache.get_connection(&addr); if let Err(err) = conn.send_wire_transaction(&[0u8]) { warn!( "Failed to warmup QUIC connection to the leader {:?}, Error {:?}", diff --git a/gossip/src/gossip_service.rs b/gossip/src/gossip_service.rs index 4d86a784feec08..9bc911b405d49f 100644 --- a/gossip/src/gossip_service.rs +++ b/gossip/src/gossip_service.rs @@ -4,7 +4,7 @@ use { crate::{cluster_info::ClusterInfo, contact_info::ContactInfo}, crossbeam_channel::{unbounded, Sender}, rand::{thread_rng, Rng}, - solana_client::thin_client::{create_client, ThinClient}, + solana_client::{connection_cache::ConnectionCache, thin_client::ThinClient}, solana_perf::recycler::Recycler, solana_runtime::bank_forks::BankForks, solana_sdk::{ @@ -194,28 +194,25 @@ pub fn discover( )) } -/// Creates a ThinClient per valid node -pub fn get_clients(nodes: &[ContactInfo], socket_addr_space: &SocketAddrSpace) -> Vec { - nodes - .iter() - .filter_map(|node| ContactInfo::valid_client_facing_addr(node, socket_addr_space)) - .map(create_client) - .collect() -} - /// Creates a ThinClient by selecting a valid node at random -pub fn get_client(nodes: &[ContactInfo], socket_addr_space: &SocketAddrSpace) -> ThinClient { +pub fn get_client( + nodes: &[ContactInfo], + socket_addr_space: &SocketAddrSpace, + connection_cache: Arc, +) -> ThinClient { let nodes: Vec<_> = nodes .iter() .filter_map(|node| ContactInfo::valid_client_facing_addr(node, socket_addr_space)) .collect(); let select = thread_rng().gen_range(0, nodes.len()); - create_client(nodes[select]) + let (rpc, tpu) = nodes[select]; + ThinClient::new(rpc, tpu, connection_cache) } pub fn get_multi_client( nodes: &[ContactInfo], socket_addr_space: &SocketAddrSpace, + connection_cache: Arc, ) -> (ThinClient, usize) { let addrs: Vec<_> = nodes .iter() @@ -225,7 +222,10 @@ pub fn get_multi_client( let tpu_addrs: Vec<_> = addrs.iter().map(|addr| addr.1).collect(); let num_nodes = tpu_addrs.len(); - (ThinClient::new_from_addrs(rpc_addrs, tpu_addrs), num_nodes) + ( + ThinClient::new_from_addrs(rpc_addrs, tpu_addrs, connection_cache), + num_nodes, + ) } fn spy( diff --git a/local-cluster/src/cluster_tests.rs b/local-cluster/src/cluster_tests.rs index bf4766b00fe9af..e0ad36f3009549 100644 --- a/local-cluster/src/cluster_tests.rs +++ b/local-cluster/src/cluster_tests.rs @@ -6,7 +6,7 @@ use log::*; use { rand::{thread_rng, Rng}, rayon::prelude::*, - solana_client::thin_client::create_client, + solana_client::{connection_cache::ConnectionCache, thin_client::ThinClient}, solana_core::consensus::VOTE_THRESHOLD_DEPTH, solana_entry::entry::{Entry, EntrySlice}, solana_gossip::{ @@ -50,6 +50,7 @@ pub fn spend_and_verify_all_nodes( nodes: usize, ignore_nodes: HashSet, socket_addr_space: SocketAddrSpace, + connection_cache: &Arc, ) { let cluster_nodes = discover_cluster(&entry_point_info.gossip, nodes, socket_addr_space).unwrap(); @@ -60,7 +61,8 @@ pub fn spend_and_verify_all_nodes( return; } let random_keypair = Keypair::new(); - let client = create_client(ingress_node.client_facing_addr()); + let (rpc, tpu) = ingress_node.client_facing_addr(); + let client = ThinClient::new(rpc, tpu, connection_cache.clone()); let bal = client .poll_get_balance_with_commitment( &funding_keypair.pubkey(), @@ -81,7 +83,8 @@ pub fn spend_and_verify_all_nodes( if ignore_nodes.contains(&validator.id) { continue; } - let client = create_client(validator.client_facing_addr()); + let (rpc, tpu) = validator.client_facing_addr(); + let client = ThinClient::new(rpc, tpu, connection_cache.clone()); client.poll_for_signature_confirmation(&sig, confs).unwrap(); } }); @@ -90,8 +93,10 @@ pub fn spend_and_verify_all_nodes( pub fn verify_balances( expected_balances: HashMap, node: &ContactInfo, + connection_cache: Arc, ) { - let client = create_client(node.client_facing_addr()); + let (rpc, tpu) = node.client_facing_addr(); + let client = ThinClient::new(rpc, tpu, connection_cache); for (pk, b) in expected_balances { let bal = client .poll_get_balance_with_commitment(&pk, CommitmentConfig::processed()) @@ -103,10 +108,12 @@ pub fn verify_balances( pub fn send_many_transactions( node: &ContactInfo, funding_keypair: &Keypair, + connection_cache: &Arc, max_tokens_per_transfer: u64, num_txs: u64, ) -> HashMap { - let client = create_client(node.client_facing_addr()); + let (rpc, tpu) = node.client_facing_addr(); + let client = ThinClient::new(rpc, tpu, connection_cache.clone()); let mut expected_balances = HashMap::new(); for _ in 0..num_txs { let random_keypair = Keypair::new(); @@ -189,6 +196,7 @@ pub fn kill_entry_and_spend_and_verify_rest( entry_point_info: &ContactInfo, entry_point_validator_exit: &Arc>, funding_keypair: &Keypair, + connection_cache: &Arc, nodes: usize, slot_millis: u64, socket_addr_space: SocketAddrSpace, @@ -197,7 +205,9 @@ pub fn kill_entry_and_spend_and_verify_rest( let cluster_nodes = discover_cluster(&entry_point_info.gossip, nodes, socket_addr_space).unwrap(); assert!(cluster_nodes.len() >= nodes); - let client = create_client(entry_point_info.client_facing_addr()); + let (rpc, tpu) = entry_point_info.client_facing_addr(); + let client = ThinClient::new(rpc, tpu, connection_cache.clone()); + // sleep long enough to make sure we are in epoch 3 let first_two_epoch_slots = MINIMUM_SLOTS_PER_EPOCH * (3 + 1); @@ -225,7 +235,8 @@ pub fn kill_entry_and_spend_and_verify_rest( continue; } - let client = create_client(ingress_node.client_facing_addr()); + let (rpc, tpu) = ingress_node.client_facing_addr(); + let client = ThinClient::new(rpc, tpu, connection_cache.clone()); let balance = client .poll_get_balance_with_commitment( &funding_keypair.pubkey(), @@ -271,7 +282,13 @@ pub fn kill_entry_and_spend_and_verify_rest( } }; info!("poll_all_nodes_for_signature()"); - match poll_all_nodes_for_signature(entry_point_info, &cluster_nodes, &sig, confs) { + match poll_all_nodes_for_signature( + entry_point_info, + &cluster_nodes, + connection_cache, + &sig, + confs, + ) { Err(e) => { info!("poll_all_nodes_for_signature() failed {:?}", e); result = Err(e); @@ -285,7 +302,12 @@ pub fn kill_entry_and_spend_and_verify_rest( } } -pub fn check_for_new_roots(num_new_roots: usize, contact_infos: &[ContactInfo], test_name: &str) { +pub fn check_for_new_roots( + num_new_roots: usize, + contact_infos: &[ContactInfo], + connection_cache: &Arc, + test_name: &str, +) { let mut roots = vec![HashSet::new(); contact_infos.len()]; let mut done = false; let mut last_print = Instant::now(); @@ -296,7 +318,8 @@ pub fn check_for_new_roots(num_new_roots: usize, contact_infos: &[ContactInfo], assert!(loop_start.elapsed() < loop_timeout); for (i, ingress_node) in contact_infos.iter().enumerate() { - let client = create_client(ingress_node.client_facing_addr()); + let (rpc, tpu) = ingress_node.client_facing_addr(); + let client = ThinClient::new(rpc, tpu, connection_cache.clone()); let root_slot = client .get_slot_with_commitment(CommitmentConfig::finalized()) .unwrap_or(0); @@ -319,6 +342,7 @@ pub fn check_for_new_roots(num_new_roots: usize, contact_infos: &[ContactInfo], pub fn check_no_new_roots( num_slots_to_wait: usize, contact_infos: &[ContactInfo], + connection_cache: &Arc, test_name: &str, ) { assert!(!contact_infos.is_empty()); @@ -327,7 +351,8 @@ pub fn check_no_new_roots( .iter() .enumerate() .map(|(i, ingress_node)| { - let client = create_client(ingress_node.client_facing_addr()); + let (rpc, tpu) = ingress_node.client_facing_addr(); + let client = ThinClient::new(rpc, tpu, connection_cache.clone()); let initial_root = client .get_slot() .unwrap_or_else(|_| panic!("get_slot for {} failed", ingress_node.id)); @@ -345,7 +370,8 @@ pub fn check_no_new_roots( let mut reached_end_slot = false; loop { for contact_info in contact_infos { - let client = create_client(contact_info.client_facing_addr()); + let (rpc, tpu) = contact_info.client_facing_addr(); + let client = ThinClient::new(rpc, tpu, connection_cache.clone()); current_slot = client .get_slot_with_commitment(CommitmentConfig::processed()) .unwrap_or_else(|_| panic!("get_slot for {} failed", contact_infos[0].id)); @@ -367,7 +393,8 @@ pub fn check_no_new_roots( } for (i, ingress_node) in contact_infos.iter().enumerate() { - let client = create_client(ingress_node.client_facing_addr()); + let (rpc, tpu) = ingress_node.client_facing_addr(); + let client = ThinClient::new(rpc, tpu, connection_cache.clone()); assert_eq!( client .get_slot() @@ -380,6 +407,7 @@ pub fn check_no_new_roots( fn poll_all_nodes_for_signature( entry_point_info: &ContactInfo, cluster_nodes: &[ContactInfo], + connection_cache: &Arc, sig: &Signature, confs: usize, ) -> Result<(), TransportError> { @@ -387,7 +415,8 @@ fn poll_all_nodes_for_signature( if validator.id == entry_point_info.id { continue; } - let client = create_client(validator.client_facing_addr()); + let (rpc, tpu) = validator.client_facing_addr(); + let client = ThinClient::new(rpc, tpu, connection_cache.clone()); client.poll_for_signature_confirmation(sig, confs)?; } diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index a30380290e43cb..2548356043a994 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -6,7 +6,10 @@ use { }, itertools::izip, log::*, - solana_client::thin_client::{create_client, ThinClient}, + solana_client::{ + connection_cache::{ConnectionCache, DEFAULT_TPU_USE_QUIC}, + thin_client::ThinClient, + }, solana_core::{ tower_storage::FileTowerStorage, validator::{Validator, ValidatorConfig, ValidatorStartProgress}, @@ -76,6 +79,7 @@ pub struct ClusterConfig { pub cluster_type: ClusterType, pub poh_config: PohConfig, pub additional_accounts: Vec<(Pubkey, AccountSharedData)>, + pub tpu_use_quic: bool, } impl Default for ClusterConfig { @@ -95,6 +99,7 @@ impl Default for ClusterConfig { poh_config: PohConfig::default(), skip_warmup_slots: false, additional_accounts: vec![], + tpu_use_quic: DEFAULT_TPU_USE_QUIC, } } } @@ -106,6 +111,7 @@ pub struct LocalCluster { pub entry_point_info: ContactInfo, pub validators: HashMap, pub genesis_config: GenesisConfig, + pub connection_cache: Arc, } impl LocalCluster { @@ -248,7 +254,7 @@ impl LocalCluster { true, // should_check_duplicate_instance Arc::new(RwLock::new(ValidatorStartProgress::default())), socket_addr_space, - false, // use_quic + DEFAULT_TPU_USE_QUIC, ); let mut validators = HashMap::new(); @@ -271,6 +277,7 @@ impl LocalCluster { entry_point_info: leader_contact_info, validators, genesis_config, + connection_cache: Arc::new(ConnectionCache::new(config.tpu_use_quic)), }; let node_pubkey_to_vote_key: HashMap> = keys_in_genesis @@ -389,7 +396,8 @@ impl LocalCluster { mut voting_keypair: Option>, socket_addr_space: SocketAddrSpace, ) -> Pubkey { - let client = create_client(self.entry_point_info.client_facing_addr()); + let (rpc, tpu) = self.entry_point_info.client_facing_addr(); + let client = ThinClient::new(rpc, tpu, self.connection_cache.clone()); // Must have enough tokens to fund vote account and set delegate let should_create_vote_pubkey = voting_keypair.is_none(); @@ -441,7 +449,7 @@ impl LocalCluster { true, // should_check_duplicate_instance Arc::new(RwLock::new(ValidatorStartProgress::default())), socket_addr_space, - false, // use_quic + DEFAULT_TPU_USE_QUIC, ); let validator_pubkey = validator_keypair.pubkey(); @@ -474,7 +482,8 @@ impl LocalCluster { } pub fn transfer(&self, source_keypair: &Keypair, dest_pubkey: &Pubkey, lamports: u64) -> u64 { - let client = create_client(self.entry_point_info.client_facing_addr()); + let (rpc, tpu) = self.entry_point_info.client_facing_addr(); + let client = ThinClient::new(rpc, tpu, self.connection_cache.clone()); Self::transfer_with_client(&client, source_keypair, dest_pubkey, lamports) } @@ -499,7 +508,12 @@ impl LocalCluster { .unwrap(); info!("{} discovered {} nodes", test_name, cluster_nodes.len()); info!("{} looking for new roots on all nodes", test_name); - cluster_tests::check_for_new_roots(num_new_roots, &alive_node_contact_infos, test_name); + cluster_tests::check_for_new_roots( + num_new_roots, + &alive_node_contact_infos, + &self.connection_cache, + test_name, + ); info!("{} done waiting for roots", test_name); } @@ -524,7 +538,12 @@ impl LocalCluster { .unwrap(); info!("{} discovered {} nodes", test_name, cluster_nodes.len()); info!("{} making sure no new roots on any nodes", test_name); - cluster_tests::check_no_new_roots(num_slots_to_wait, &alive_node_contact_infos, test_name); + cluster_tests::check_no_new_roots( + num_slots_to_wait, + &alive_node_contact_infos, + &self.connection_cache, + test_name, + ); info!("{} done waiting for roots", test_name); } @@ -696,9 +715,10 @@ impl Cluster for LocalCluster { } fn get_validator_client(&self, pubkey: &Pubkey) -> Option { - self.validators - .get(pubkey) - .map(|f| create_client(f.info.contact_info.client_facing_addr())) + self.validators.get(pubkey).map(|f| { + let (rpc, tpu) = f.info.contact_info.client_facing_addr(); + ThinClient::new(rpc, tpu, self.connection_cache.clone()) + }) } fn exit_node(&mut self, pubkey: &Pubkey) -> ClusterValidatorInfo { @@ -776,7 +796,7 @@ impl Cluster for LocalCluster { true, // should_check_duplicate_instance Arc::new(RwLock::new(ValidatorStartProgress::default())), socket_addr_space, - false, // use_quic + DEFAULT_TPU_USE_QUIC, ); cluster_validator_info.validator = Some(restarted_node); cluster_validator_info diff --git a/local-cluster/tests/common.rs b/local-cluster/tests/common.rs index 84e4d64eaa78e6..83c4b17511118f 100644 --- a/local-cluster/tests/common.rs +++ b/local-cluster/tests/common.rs @@ -332,6 +332,7 @@ pub fn run_cluster_partition( num_nodes, HashSet::new(), SocketAddrSpace::Unspecified, + &cluster.connection_cache, ); let cluster_nodes = discover_cluster( diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index a4d8e6a7c08ee2..5bfff4f051877a 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -11,7 +11,7 @@ use { rpc_client::RpcClient, rpc_config::{RpcProgramAccountsConfig, RpcSignatureSubscribeConfig}, rpc_response::RpcSignatureResult, - thin_client::{create_client, ThinClient}, + thin_client::ThinClient, }, solana_core::{ broadcast_stage::BroadcastStageType, @@ -109,6 +109,7 @@ fn test_spend_and_verify_all_nodes_1() { num_nodes, HashSet::new(), SocketAddrSpace::Unspecified, + &local.connection_cache, ); } @@ -126,6 +127,7 @@ fn test_spend_and_verify_all_nodes_2() { num_nodes, HashSet::new(), SocketAddrSpace::Unspecified, + &local.connection_cache, ); } @@ -143,6 +145,7 @@ fn test_spend_and_verify_all_nodes_3() { num_nodes, HashSet::new(), SocketAddrSpace::Unspecified, + &local.connection_cache, ); } @@ -162,7 +165,9 @@ fn test_local_cluster_signature_subscribe() { .unwrap(); let non_bootstrap_info = cluster.get_contact_info(&non_bootstrap_id).unwrap(); - let tx_client = create_client(non_bootstrap_info.client_facing_addr()); + let (rpc, tpu) = non_bootstrap_info.client_facing_addr(); + let tx_client = ThinClient::new(rpc, tpu, cluster.connection_cache.clone()); + let (blockhash, _) = tx_client .get_latest_blockhash_with_commitment(CommitmentConfig::processed()) .unwrap(); @@ -234,6 +239,7 @@ fn test_spend_and_verify_all_nodes_env_num_nodes() { num_nodes, HashSet::new(), SocketAddrSpace::Unspecified, + &local.connection_cache, ); } @@ -327,7 +333,13 @@ fn test_forwarding() { .unwrap(); // Confirm that transactions were forwarded to and processed by the leader. - cluster_tests::send_many_transactions(validator_info, &cluster.funding_keypair, 10, 20); + cluster_tests::send_many_transactions( + validator_info, + &cluster.funding_keypair, + &cluster.connection_cache, + 10, + 20, + ); } #[test] @@ -367,6 +379,7 @@ fn test_restart_node() { cluster_tests::send_many_transactions( &cluster.entry_point_info, &cluster.funding_keypair, + &cluster.connection_cache, 10, 1, ); @@ -396,7 +409,8 @@ fn test_mainnet_beta_cluster_type() { .unwrap(); assert_eq!(cluster_nodes.len(), 1); - let client = create_client(cluster.entry_point_info.client_facing_addr()); + let (rpc, tpu) = cluster.entry_point_info.client_facing_addr(); + let client = ThinClient::new(rpc, tpu, cluster.connection_cache.clone()); // Programs that are available at epoch 0 for program_id in [ @@ -1138,6 +1152,7 @@ fn test_snapshot_restart_tower() { 1, HashSet::new(), SocketAddrSpace::Unspecified, + &cluster.connection_cache, ); } @@ -1295,6 +1310,7 @@ fn test_snapshots_restart_validity() { let new_balances = cluster_tests::send_many_transactions( &cluster.entry_point_info, &cluster.funding_keypair, + &cluster.connection_cache, 10, 10, ); @@ -1321,7 +1337,11 @@ fn test_snapshots_restart_validity() { // Verify account balances on validator trace!("Verifying balances"); - cluster_tests::verify_balances(expected_balances.clone(), &cluster.entry_point_info); + cluster_tests::verify_balances( + expected_balances.clone(), + &cluster.entry_point_info, + cluster.connection_cache.clone(), + ); // Check that we can still push transactions trace!("Spending and verifying"); @@ -1331,6 +1351,7 @@ fn test_snapshots_restart_validity() { 1, HashSet::new(), SocketAddrSpace::Unspecified, + &cluster.connection_cache, ); } } @@ -1557,6 +1578,7 @@ fn test_optimistic_confirmation_violation_detection() { cluster_tests::check_for_new_roots( 16, &[cluster.get_contact_info(&entry_point_id).unwrap().clone()], + &cluster.connection_cache, "test_optimistic_confirmation_violation", ); } diff --git a/local-cluster/tests/local_cluster_slow_2.rs b/local-cluster/tests/local_cluster_slow_2.rs index 8cb99b3545e1b3..9afbca3755f5c7 100644 --- a/local-cluster/tests/local_cluster_slow_2.rs +++ b/local-cluster/tests/local_cluster_slow_2.rs @@ -194,6 +194,7 @@ fn test_leader_failure_4() { .config .validator_exit, &local.funding_keypair, + &local.connection_cache, num_nodes, config.ticks_per_slot * config.poh_config.target_tick_duration.as_millis() as u64, SocketAddrSpace::Unspecified, @@ -227,6 +228,7 @@ fn test_ledger_cleanup_service() { num_nodes, HashSet::new(), SocketAddrSpace::Unspecified, + &cluster.connection_cache, ); cluster.close_preserve_ledgers(); //check everyone's ledgers and make sure only ~100 slots are stored @@ -437,6 +439,7 @@ fn test_slot_hash_expiry() { cluster_tests::check_for_new_roots( 16, &[cluster.get_contact_info(&a_pubkey).unwrap().clone()], + &cluster.connection_cache, "test_slot_hashes_expiry", ); } diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index 0ecd496a057c19..969688420d0c64 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -3851,6 +3851,7 @@ dependencies = [ "crossbeam-channel", "futures 0.3.21", "solana-banks-interface", + "solana-client", "solana-runtime", "solana-sdk 1.10.25", "solana-send-transaction-service", diff --git a/replica-node/Cargo.toml b/replica-node/Cargo.toml index 667d50cc6ae5df..2e59efff64ce80 100644 --- a/replica-node/Cargo.toml +++ b/replica-node/Cargo.toml @@ -15,6 +15,7 @@ crossbeam-channel = "0.5" log = "0.4.14" rand = "0.7.0" solana-clap-utils = { path = "../clap-utils", version = "=1.10.25" } +solana-client = { path = "../client", version = "=1.10.25" } solana-download-utils = { path = "../download-utils", version = "=1.10.25" } solana-genesis-utils = { path = "../genesis-utils", version = "=1.10.25" } solana-gossip = { path = "../gossip", version = "=1.10.25" } diff --git a/replica-node/src/replica_node.rs b/replica-node/src/replica_node.rs index 7f2a55f0488a77..29ba56cf81f1fb 100644 --- a/replica-node/src/replica_node.rs +++ b/replica-node/src/replica_node.rs @@ -2,6 +2,7 @@ use { crate::accountsdb_repl_service::AccountsDbReplService, crossbeam_channel::unbounded, log::*, + solana_client::connection_cache::ConnectionCache, solana_download_utils::download_snapshot_archive, solana_genesis_utils::download_then_check_genesis_hash, solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, @@ -222,6 +223,7 @@ fn start_client_rpc_services( .write() .unwrap() .register_exit(Box::new(move || trigger.cancel())); + let connection_cache = Arc::new(ConnectionCache::default()); let (_bank_notification_sender, bank_notification_receiver) = unbounded(); ( @@ -247,6 +249,7 @@ fn start_client_rpc_services( }, max_slots, leader_schedule_cache.clone(), + connection_cache, max_complete_transaction_status_slot, )), Some(pubsub_service), diff --git a/rpc-test/tests/rpc.rs b/rpc-test/tests/rpc.rs index defa51f2e959c8..cd50ccf60578ae 100644 --- a/rpc-test/tests/rpc.rs +++ b/rpc-test/tests/rpc.rs @@ -8,6 +8,7 @@ use { solana_account_decoder::UiAccount, solana_client::{ client_error::{ClientErrorKind, Result as ClientResult}, + connection_cache::ConnectionCache, nonblocking::pubsub_client::PubsubClient, rpc_client::RpcClient, rpc_config::{RpcAccountInfoConfig, RpcSignatureSubscribeConfig}, @@ -410,8 +411,7 @@ fn test_rpc_subscriptions() { } } -#[test] -fn test_tpu_send_transaction() { +fn run_tpu_send_transaction(tpu_use_quic: bool) { let mint_keypair = Keypair::new(); let mint_pubkey = mint_keypair.pubkey(); let test_validator = @@ -420,11 +420,12 @@ fn test_tpu_send_transaction() { test_validator.rpc_url(), CommitmentConfig::processed(), )); - - let tpu_client = TpuClient::new( + let connection_cache = Arc::new(ConnectionCache::new(tpu_use_quic)); + let tpu_client = TpuClient::new_with_connection_cache( rpc_client.clone(), &test_validator.rpc_pubsub_url(), TpuClientConfig::default(), + connection_cache, ) .unwrap(); @@ -445,6 +446,16 @@ fn test_tpu_send_transaction() { } } +#[test] +fn test_tpu_send_transaction() { + run_tpu_send_transaction(/*tpu_use_quic*/ false) +} + +#[test] +fn test_tpu_send_transaction_with_quic() { + run_tpu_send_transaction(/*tpu_use_quic*/ true) +} + #[test] fn deserialize_rpc_error() -> ClientResult<()> { solana_logger::setup(); diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index cf3bcd0c042349..3628118970a4eb 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -15,6 +15,7 @@ use { UiAccount, UiAccountEncoding, UiDataSliceConfig, MAX_BASE58_BYTES, }, solana_client::{ + connection_cache::ConnectionCache, rpc_cache::LargestAccountsCache, rpc_config::*, rpc_custom_error::RpcCustomError, @@ -75,7 +76,7 @@ use { }, }, solana_send_transaction_service::{ - send_transaction_service::{SendTransactionService, TransactionInfo, DEFAULT_TPU_USE_QUIC}, + send_transaction_service::{SendTransactionService, TransactionInfo}, tpu_info::NullTpuInfo, }, solana_storage_bigtable::Error as StorageError, @@ -335,7 +336,11 @@ impl JsonRpcRequestProcessor { } // Useful for unit testing - pub fn new_from_bank(bank: &Arc, socket_addr_space: SocketAddrSpace) -> Self { + pub fn new_from_bank( + bank: &Arc, + socket_addr_space: SocketAddrSpace, + connection_cache: Arc, + ) -> Self { let genesis_hash = bank.hash(); let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks( &[bank.clone()], @@ -355,9 +360,9 @@ impl JsonRpcRequestProcessor { &bank_forks, None, receiver, + &connection_cache, 1000, 1, - DEFAULT_TPU_USE_QUIC, ); Self { @@ -4904,8 +4909,12 @@ pub mod tests { let bank = Arc::new(Bank::new_for_tests(&genesis.genesis_config)); bank.transfer(20, &genesis.mint_keypair, &bob_pubkey) .unwrap(); - let request_processor = - JsonRpcRequestProcessor::new_from_bank(&bank, SocketAddrSpace::Unspecified); + let connection_cache = Arc::new(ConnectionCache::default()); + let request_processor = JsonRpcRequestProcessor::new_from_bank( + &bank, + SocketAddrSpace::Unspecified, + connection_cache, + ); assert_eq!( request_processor .get_transaction_count(RpcContextConfig::default()) @@ -4919,7 +4928,12 @@ pub mod tests { let genesis = create_genesis_config(20); let mint_pubkey = genesis.mint_keypair.pubkey(); let bank = Arc::new(Bank::new_for_tests(&genesis.genesis_config)); - let meta = JsonRpcRequestProcessor::new_from_bank(&bank, SocketAddrSpace::Unspecified); + let connection_cache = Arc::new(ConnectionCache::default()); + let meta = JsonRpcRequestProcessor::new_from_bank( + &bank, + SocketAddrSpace::Unspecified, + connection_cache, + ); let mut io = MetaIoHandler::default(); io.extend_with(rpc_minimal::MinimalImpl.to_delegate()); @@ -4947,7 +4961,12 @@ pub mod tests { let genesis = create_genesis_config(20); let mint_pubkey = genesis.mint_keypair.pubkey(); let bank = Arc::new(Bank::new_for_tests(&genesis.genesis_config)); - let meta = JsonRpcRequestProcessor::new_from_bank(&bank, SocketAddrSpace::Unspecified); + let connection_cache = Arc::new(ConnectionCache::default()); + let meta = JsonRpcRequestProcessor::new_from_bank( + &bank, + SocketAddrSpace::Unspecified, + connection_cache, + ); let mut io = MetaIoHandler::default(); io.extend_with(rpc_minimal::MinimalImpl.to_delegate()); @@ -5054,7 +5073,12 @@ pub mod tests { bank.transfer(4, &genesis.mint_keypair, &bob_pubkey) .unwrap(); - let meta = JsonRpcRequestProcessor::new_from_bank(&bank, SocketAddrSpace::Unspecified); + let connection_cache = Arc::new(ConnectionCache::default()); + let meta = JsonRpcRequestProcessor::new_from_bank( + &bank, + SocketAddrSpace::Unspecified, + connection_cache, + ); let mut io = MetaIoHandler::default(); io.extend_with(rpc_minimal::MinimalImpl.to_delegate()); @@ -6171,7 +6195,12 @@ pub mod tests { fn test_rpc_send_bad_tx() { let genesis = create_genesis_config(100); let bank = Arc::new(Bank::new_for_tests(&genesis.genesis_config)); - let meta = JsonRpcRequestProcessor::new_from_bank(&bank, SocketAddrSpace::Unspecified); + let connection_cache = Arc::new(ConnectionCache::default()); + let meta = JsonRpcRequestProcessor::new_from_bank( + &bank, + SocketAddrSpace::Unspecified, + connection_cache, + ); let mut io = MetaIoHandler::default(); io.extend_with(rpc_full::FullImpl.to_delegate()); @@ -6221,14 +6250,15 @@ pub mod tests { Arc::new(LeaderScheduleCache::default()), Arc::new(AtomicU64::default()), ); + let connection_cache = Arc::new(ConnectionCache::default()); SendTransactionService::new::( tpu_address, &bank_forks, None, receiver, + &connection_cache, 1000, 1, - DEFAULT_TPU_USE_QUIC, ); let mut bad_transaction = system_transaction::transfer( @@ -6487,14 +6517,15 @@ pub mod tests { Arc::new(LeaderScheduleCache::default()), Arc::new(AtomicU64::default()), ); + let connection_cache = Arc::new(ConnectionCache::default()); SendTransactionService::new::( tpu_address, &bank_forks, None, receiver, + &connection_cache, 1000, 1, - DEFAULT_TPU_USE_QUIC, ); assert_eq!( request_processor.get_block_commitment(0), diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index 77e1c8891d9681..3ec783ce0332d7 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -18,7 +18,7 @@ use { RequestMiddlewareAction, ServerBuilder, }, regex::Regex, - solana_client::rpc_cache::LargestAccountsCache, + solana_client::{connection_cache::ConnectionCache, rpc_cache::LargestAccountsCache}, solana_gossip::cluster_info::ClusterInfo, solana_ledger::{ bigtable_upload::ConfirmedBlockUploadConfig, @@ -337,6 +337,7 @@ impl JsonRpcService { send_transaction_service_config: send_transaction_service::Config, max_slots: Arc, leader_schedule_cache: Arc, + connection_cache: Arc, current_transaction_status_slot: Arc, ) -> Self { info!("rpc bound to {:?}", rpc_addr); @@ -449,6 +450,7 @@ impl JsonRpcService { &bank_forks, leader_info, receiver, + &connection_cache, send_transaction_service_config, )); @@ -594,6 +596,7 @@ mod tests { let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); + let connection_cache = Arc::new(ConnectionCache::default()); let mut rpc_service = JsonRpcService::new( rpc_addr, JsonRpcConfig::default(), @@ -616,6 +619,7 @@ mod tests { }, Arc::new(MaxSlots::default()), Arc::new(LeaderScheduleCache::default()), + connection_cache, Arc::new(AtomicU64::default()), ); let thread = rpc_service.thread_hdl.thread(); diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index bb5f7a0b42f5b0..f12b3752e6e6b2 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -2,7 +2,7 @@ use { crate::tpu_info::TpuInfo, crossbeam_channel::{Receiver, RecvTimeoutError}, log::*, - solana_client::{connection_cache, tpu_connection::TpuConnection}, + solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, solana_measure::measure::Measure, solana_metrics::datapoint_warn, solana_runtime::{bank::Bank, bank_forks::BankForks}, @@ -101,16 +101,12 @@ struct ProcessTransactionsResult { retained: u64, } -pub const DEFAULT_TPU_USE_QUIC: bool = false; - #[derive(Clone, Debug)] pub struct Config { pub retry_rate_ms: u64, pub leader_forward_count: u64, pub default_max_retries: Option, pub service_max_retries: usize, - /// Whether to use Quic protocol to send transactions - pub use_quic: bool, /// The batch size for sending transactions in batches pub batch_size: usize, /// How frequently batches are sent @@ -124,7 +120,6 @@ impl Default for Config { leader_forward_count: DEFAULT_LEADER_FORWARD_COUNT, default_max_retries: None, service_max_retries: DEFAULT_SERVICE_MAX_RETRIES, - use_quic: DEFAULT_TPU_USE_QUIC, batch_size: DEFAULT_TRANSACTION_BATCH_SIZE, batch_send_rate_ms: DEFAULT_BATCH_SEND_RATE_MS, } @@ -334,17 +329,23 @@ impl SendTransactionService { bank_forks: &Arc>, leader_info: Option, receiver: Receiver, + connection_cache: &Arc, retry_rate_ms: u64, leader_forward_count: u64, - use_quic: bool, ) -> Self { let config = Config { retry_rate_ms, leader_forward_count, - use_quic, ..Config::default() }; - Self::new_with_config(tpu_address, bank_forks, leader_info, receiver, config) + Self::new_with_config( + tpu_address, + bank_forks, + leader_info, + receiver, + connection_cache, + config, + ) } pub fn new_with_config( @@ -352,6 +353,7 @@ impl SendTransactionService { bank_forks: &Arc>, leader_info: Option, receiver: Receiver, + connection_cache: &Arc, config: Config, ) -> Self { let stats_report = Arc::new(SendTransactionServiceStatsReport::default()); @@ -365,6 +367,7 @@ impl SendTransactionService { tpu_address, receiver, leader_info_provider.clone(), + connection_cache.clone(), config.clone(), retry_transactions.clone(), stats_report.clone(), @@ -375,6 +378,7 @@ impl SendTransactionService { tpu_address, bank_forks.clone(), leader_info_provider, + connection_cache.clone(), config, retry_transactions, stats_report, @@ -392,6 +396,7 @@ impl SendTransactionService { tpu_address: SocketAddr, receiver: Receiver, leader_info_provider: Arc>>, + connection_cache: Arc, config: Config, retry_transactions: Arc>>, stats_report: Arc, @@ -404,7 +409,6 @@ impl SendTransactionService { "Starting send-transaction-service::receive_txn_thread with config {:?}", config ); - connection_cache::set_use_quic(config.use_quic); Builder::new() .name("send-tx-receive".to_string()) .spawn(move || loop { @@ -450,6 +454,7 @@ impl SendTransactionService { &tpu_address, &mut transactions, leader_info_provider.lock().unwrap().get_leader_info(), + &connection_cache, &config, stats, ); @@ -494,6 +499,7 @@ impl SendTransactionService { tpu_address: SocketAddr, bank_forks: Arc>, leader_info_provider: Arc>>, + connection_cache: Arc, config: Config, retry_transactions: Arc>>, stats_report: Arc, @@ -503,7 +509,6 @@ impl SendTransactionService { "Starting send-transaction-service::retry_thread with config {:?}", config ); - connection_cache::set_use_quic(config.use_quic); Builder::new() .name("send-tx-retry".to_string()) .spawn(move || loop { @@ -534,6 +539,7 @@ impl SendTransactionService { &tpu_address, &mut transactions, &leader_info_provider, + &connection_cache, &config, stats, ); @@ -548,6 +554,7 @@ impl SendTransactionService { tpu_address: &SocketAddr, transactions: &mut HashMap, leader_info: Option<&T>, + connection_cache: &Arc, config: &Config, stats: &SendTransactionServiceStats, ) { @@ -560,7 +567,7 @@ impl SendTransactionService { .collect::>(); for address in &addresses { - Self::send_transactions(address, &wire_transactions, stats); + Self::send_transactions(address, &wire_transactions, connection_cache, stats); } } @@ -571,6 +578,7 @@ impl SendTransactionService { tpu_address: &SocketAddr, transactions: &mut HashMap, leader_info_provider: &Arc>>, + connection_cache: &Arc, config: &Config, stats: &SendTransactionServiceStats, ) -> ProcessTransactionsResult { @@ -684,7 +692,7 @@ impl SendTransactionService { let addresses = Self::get_tpu_addresses(tpu_address, leader_info, config); for address in &addresses { - Self::send_transactions(address, chunk, stats); + Self::send_transactions(address, chunk, connection_cache, stats); } } } @@ -694,30 +702,33 @@ impl SendTransactionService { fn send_transaction( tpu_address: &SocketAddr, wire_transaction: &[u8], + connection_cache: &Arc, ) -> Result<(), TransportError> { - let conn = connection_cache::get_connection(tpu_address); + let conn = connection_cache.get_connection(tpu_address); conn.send_wire_transaction_async(wire_transaction.to_vec()) } fn send_transactions_with_metrics( tpu_address: &SocketAddr, wire_transactions: &[&[u8]], + connection_cache: &Arc, ) -> Result<(), TransportError> { let wire_transactions = wire_transactions.iter().map(|t| t.to_vec()).collect(); - let conn = connection_cache::get_connection(tpu_address); + let conn = connection_cache.get_connection(tpu_address); conn.send_wire_transaction_batch_async(wire_transactions) } fn send_transactions( tpu_address: &SocketAddr, wire_transactions: &[&[u8]], + connection_cache: &Arc, stats: &SendTransactionServiceStats, ) { let mut measure = Measure::start("send-us"); let result = if wire_transactions.len() == 1 { - Self::send_transaction(tpu_address, wire_transactions[0]) + Self::send_transaction(tpu_address, wire_transactions[0], connection_cache) } else { - Self::send_transactions_with_metrics(tpu_address, wire_transactions) + Self::send_transactions_with_metrics(tpu_address, wire_transactions, connection_cache) }; if let Err(err) = result { @@ -783,14 +794,15 @@ mod test { let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let (sender, receiver) = unbounded(); + let connection_cache = Arc::new(ConnectionCache::default()); let send_tranaction_service = SendTransactionService::new::( tpu_address, &bank_forks, None, receiver, + &connection_cache, 1000, 1, - DEFAULT_TPU_USE_QUIC, ); drop(sender); @@ -850,12 +862,14 @@ mod test { Some(Instant::now()), ), ); + let connection_cache = Arc::new(ConnectionCache::default()); let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, &tpu_address, &mut transactions, &leader_info_provider, + &connection_cache, &config, &stats, ); @@ -886,6 +900,7 @@ mod test { &tpu_address, &mut transactions, &leader_info_provider, + &connection_cache, &config, &stats, ); @@ -916,6 +931,7 @@ mod test { &tpu_address, &mut transactions, &leader_info_provider, + &connection_cache, &config, &stats, ); @@ -946,6 +962,7 @@ mod test { &tpu_address, &mut transactions, &leader_info_provider, + &connection_cache, &config, &stats, ); @@ -978,6 +995,7 @@ mod test { &tpu_address, &mut transactions, &leader_info_provider, + &connection_cache, &config, &stats, ); @@ -1020,6 +1038,7 @@ mod test { &tpu_address, &mut transactions, &leader_info_provider, + &connection_cache, &config, &stats, ); @@ -1038,6 +1057,7 @@ mod test { &tpu_address, &mut transactions, &leader_info_provider, + &connection_cache, &config, &stats, ); @@ -1120,12 +1140,14 @@ mod test { ); let leader_info_provider = Arc::new(Mutex::new(CurrentLeaderInfo::new(None))); let stats = SendTransactionServiceStats::default(); + let connection_cache = Arc::new(ConnectionCache::default()); let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, &tpu_address, &mut transactions, &leader_info_provider, + &connection_cache, &config, &stats, ); @@ -1155,6 +1177,7 @@ mod test { &tpu_address, &mut transactions, &leader_info_provider, + &connection_cache, &config, &stats, ); @@ -1186,6 +1209,7 @@ mod test { &tpu_address, &mut transactions, &leader_info_provider, + &connection_cache, &config, &stats, ); @@ -1215,6 +1239,7 @@ mod test { &tpu_address, &mut transactions, &leader_info_provider, + &connection_cache, &config, &stats, ); @@ -1245,6 +1270,7 @@ mod test { &tpu_address, &mut transactions, &leader_info_provider, + &connection_cache, &config, &stats, ); @@ -1275,6 +1301,7 @@ mod test { &tpu_address, &mut transactions, &leader_info_provider, + &connection_cache, &config, &stats, ); @@ -1307,6 +1334,7 @@ mod test { &tpu_address, &mut transactions, &leader_info_provider, + &connection_cache, &config, &stats, ); @@ -1342,6 +1370,7 @@ mod test { &tpu_address, &mut transactions, &leader_info_provider, + &connection_cache, &config, &stats, ); diff --git a/test-validator/src/lib.rs b/test-validator/src/lib.rs index c5ede9c3c78649..6f1cb6110ad21e 100644 --- a/test-validator/src/lib.rs +++ b/test-validator/src/lib.rs @@ -2,7 +2,7 @@ use { log::*, solana_cli_output::CliAccount, - solana_client::{nonblocking, rpc_client::RpcClient}, + solana_client::{connection_cache::DEFAULT_TPU_USE_QUIC, nonblocking, rpc_client::RpcClient}, solana_core::{ tower_storage::TowerStorage, validator::{Validator, ValidatorConfig, ValidatorStartProgress}, @@ -722,7 +722,7 @@ impl TestValidator { true, // should_check_duplicate_instance config.start_progress.clone(), socket_addr_space, - false, // use_quic + DEFAULT_TPU_USE_QUIC, )); // Needed to avoid panics in `solana-responder-gossip` in tests that create a number of diff --git a/validator/src/main.rs b/validator/src/main.rs index 97aa40fe423039..c5713e80a6f7cf 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -2459,7 +2459,6 @@ pub fn main() { "rpc_send_transaction_service_max_retries", usize ), - use_quic: tpu_use_quic, batch_send_rate_ms: rpc_send_batch_send_rate_ms, batch_size: rpc_send_batch_size, },