Skip to content

Commit

Permalink
client: Remove static connection cache, plumb it instead (backport #2…
Browse files Browse the repository at this point in the history
…5667) (#25873)

* client: Remove static connection cache, plumb it instead (#25667)

* client: Remove static connection cache, plumb it instead

* Add TpuClient::new_with_connection_cache to not break downstream

* Refactor get_connection and RwLock into ConnectionCache

* Fix merge conflicts from new async TpuClient

* Remove `ConnectionCache::set_use_quic`

* Move DEFAULT_TPU_USE_QUIC to client, use ConnectionCache::default()

(cherry picked from commit 79a8ecd)

# Conflicts:
#	banking-bench/Cargo.toml
#	banks-server/Cargo.toml
#	bench-tps/src/main.rs
#	bench-tps/tests/bench_tps.rs
#	client/src/connection_cache.rs
#	client/src/nonblocking/tpu_client.rs
#	client/src/thin_client.rs
#	client/src/tpu_client.rs
#	core/src/banking_stage.rs
#	core/src/tvu.rs
#	core/src/validator.rs
#	dos/src/cli.rs
#	dos/src/main.rs
#	gossip/src/gossip_service.rs
#	local-cluster/src/cluster_tests.rs
#	local-cluster/src/local_cluster.rs
#	local-cluster/tests/local_cluster.rs

* Fix merge conflicts

Co-authored-by: Jon Cinque <[email protected]>
  • Loading branch information
mergify[bot] and joncinque authored Jun 9, 2022
1 parent f4daece commit ef4f196
Show file tree
Hide file tree
Showing 34 changed files with 561 additions and 300 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions banking-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ crossbeam-channel = "0.5"
log = "0.4.14"
rand = "0.7.0"
rayon = "1.5.1"
solana-client = { path = "../client", version = "=1.10.25" }
solana-core = { path = "../core", version = "=1.10.25" }
solana-gossip = { path = "../gossip", version = "=1.10.25" }
solana-ledger = { path = "../ledger", version = "=1.10.25" }
Expand Down
9 changes: 9 additions & 0 deletions banking-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use {
log::*,
rand::{thread_rng, Rng},
rayon::prelude::*,
solana_client::connection_cache::ConnectionCache,
solana_core::banking_stage::BankingStage,
solana_gossip::cluster_info::{ClusterInfo, Node},
solana_ledger::{
Expand Down Expand Up @@ -212,6 +213,12 @@ fn main() {
.takes_value(true)
.help("Number of threads to use in the banking stage"),
)
.arg(
Arg::new("tpu_use_quic")
.long("tpu-use-quic")
.takes_value(false)
.help("Forward messages to TPU using QUIC"),
)
.get_matches();

let num_banking_threads = matches
Expand Down Expand Up @@ -334,6 +341,7 @@ fn main() {
SocketAddrSpace::Unspecified,
);
let cluster_info = Arc::new(cluster_info);
let tpu_use_quic = matches.is_present("tpu_use_quic");
let banking_stage = BankingStage::new_num_threads(
&cluster_info,
&poh_recorder,
Expand All @@ -344,6 +352,7 @@ fn main() {
None,
replay_vote_sender,
Arc::new(RwLock::new(CostModel::default())),
Arc::new(ConnectionCache::new(tpu_use_quic)),
);
poh_recorder.lock().unwrap().set_bank(&bank);

Expand Down
1 change: 1 addition & 0 deletions banks-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ bincode = "1.3.3"
crossbeam-channel = "0.5"
futures = "0.3"
solana-banks-interface = { path = "../banks-interface", version = "=1.10.25" }
solana-client = { path = "../client", version = "=1.10.25" }
solana-runtime = { path = "../runtime", version = "=1.10.25" }
solana-sdk = { path = "../sdk", version = "=1.10.25" }
solana-send-transaction-service = { path = "../send-transaction-service", version = "=1.10.25" }
Expand Down
6 changes: 4 additions & 2 deletions banks-server/src/banks_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use {
Banks, BanksRequest, BanksResponse, BanksTransactionResultWithSimulation,
TransactionConfirmationStatus, TransactionSimulationDetails, TransactionStatus,
},
solana_client::connection_cache::ConnectionCache,
solana_runtime::{
bank::{Bank, TransactionSimulationResult},
bank_forks::BankForks,
Expand All @@ -24,7 +25,7 @@ use {
transaction::{self, SanitizedTransaction, Transaction},
},
solana_send_transaction_service::{
send_transaction_service::{SendTransactionService, TransactionInfo, DEFAULT_TPU_USE_QUIC},
send_transaction_service::{SendTransactionService, TransactionInfo},
tpu_info::NullTpuInfo,
},
std::{
Expand Down Expand Up @@ -375,6 +376,7 @@ pub async fn start_tcp_server(
tpu_addr: SocketAddr,
bank_forks: Arc<RwLock<BankForks>>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
connection_cache: Arc<ConnectionCache>,
) -> io::Result<()> {
// Note: These settings are copied straight from the tarpc example.
let server = tcp::listen(listen_addr, Bincode::default)
Expand All @@ -399,9 +401,9 @@ pub async fn start_tcp_server(
&bank_forks,
None,
receiver,
&connection_cache,
5_000,
0,
DEFAULT_TPU_USE_QUIC,
);

let server = BanksServer::new(
Expand Down
18 changes: 17 additions & 1 deletion banks-server/src/rpc_banks_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use {
crate::banks_server::start_tcp_server,
futures::{future::FutureExt, pin_mut, prelude::stream::StreamExt, select},
solana_client::connection_cache::ConnectionCache,
solana_runtime::{bank_forks::BankForks, commitment::BlockCommitmentCache},
std::{
net::SocketAddr,
Expand All @@ -29,13 +30,15 @@ async fn start_abortable_tcp_server(
tpu_addr: SocketAddr,
bank_forks: Arc<RwLock<BankForks>>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
connection_cache: Arc<ConnectionCache>,
exit: Arc<AtomicBool>,
) {
let server = start_tcp_server(
listen_addr,
tpu_addr,
bank_forks.clone(),
block_commitment_cache.clone(),
connection_cache,
)
.fuse();
let interval = IntervalStream::new(time::interval(Duration::from_millis(100))).fuse();
Expand All @@ -58,13 +61,15 @@ impl RpcBanksService {
tpu_addr: SocketAddr,
bank_forks: Arc<RwLock<BankForks>>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
connection_cache: Arc<ConnectionCache>,
exit: Arc<AtomicBool>,
) {
let server = start_abortable_tcp_server(
listen_addr,
tpu_addr,
bank_forks,
block_commitment_cache,
connection_cache,
exit,
);
Runtime::new().unwrap().block_on(server);
Expand All @@ -75,10 +80,12 @@ impl RpcBanksService {
tpu_addr: SocketAddr,
bank_forks: &Arc<RwLock<BankForks>>,
block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>,
connection_cache: &Arc<ConnectionCache>,
exit: &Arc<AtomicBool>,
) -> Self {
let bank_forks = bank_forks.clone();
let block_commitment_cache = block_commitment_cache.clone();
let connection_cache = connection_cache.clone();
let exit = exit.clone();
let thread_hdl = Builder::new()
.name("solana-rpc-banks".to_string())
Expand All @@ -88,6 +95,7 @@ impl RpcBanksService {
tpu_addr,
bank_forks,
block_commitment_cache,
connection_cache,
exit,
)
})
Expand All @@ -109,9 +117,17 @@ mod tests {
fn test_rpc_banks_server_exit() {
let bank_forks = Arc::new(RwLock::new(BankForks::new(Bank::default_for_tests())));
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
let connection_cache = Arc::new(ConnectionCache::default());
let exit = Arc::new(AtomicBool::new(false));
let addr = "127.0.0.1:0".parse().unwrap();
let service = RpcBanksService::new(addr, addr, &bank_forks, &block_commitment_cache, &exit);
let service = RpcBanksService::new(
addr,
addr,
&bank_forks,
&block_commitment_cache,
&connection_cache,
&exit,
);
exit.store(true, Ordering::Relaxed);
service.join().unwrap();
}
Expand Down
3 changes: 2 additions & 1 deletion bench-tps/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use {
clap::{crate_description, crate_name, App, Arg, ArgMatches},
solana_clap_utils::input_validators::{is_url, is_url_or_moniker},
solana_cli_config::{ConfigInput, CONFIG_FILE},
solana_client::connection_cache::DEFAULT_TPU_USE_QUIC,
solana_sdk::{
fee_calculator::FeeRateGovernor,
pubkey::Pubkey,
Expand Down Expand Up @@ -77,7 +78,7 @@ impl Default for Config {
target_slots_per_epoch: 0,
target_node: None,
external_client_type: ExternalClientType::default(),
use_quic: false,
use_quic: DEFAULT_TPU_USE_QUIC,
}
}
}
Expand Down
41 changes: 25 additions & 16 deletions bench-tps/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use {
keypairs::get_keypairs,
},
solana_client::{
connection_cache,
connection_cache::ConnectionCache,
rpc_client::RpcClient,
tpu_client::{TpuClient, TpuClientConfig},
},
Expand Down Expand Up @@ -105,11 +105,10 @@ fn main() {
eprintln!("Failed to discover {} nodes: {:?}", num_nodes, err);
exit(1);
});
if *use_quic {
connection_cache::set_use_quic(true);
}
let connection_cache = Arc::new(ConnectionCache::new(*use_quic));
let client = if *multi_client {
let (client, num_clients) = get_multi_client(&nodes, &SocketAddrSpace::Unspecified);
let (client, num_clients) =
get_multi_client(&nodes, &SocketAddrSpace::Unspecified, connection_cache);
if nodes.len() < num_clients {
eprintln!(
"Error: Insufficient nodes discovered. Expecting {} or more",
Expand All @@ -123,8 +122,11 @@ fn main() {
let mut target_client = None;
for node in nodes {
if node.id == *target_node {
target_client =
Some(Arc::new(get_client(&[node], &SocketAddrSpace::Unspecified)));
target_client = Some(Arc::new(get_client(
&[node],
&SocketAddrSpace::Unspecified,
connection_cache,
)));
break;
}
}
Expand All @@ -133,7 +135,11 @@ fn main() {
exit(1);
})
} else {
Arc::new(get_client(&nodes, &SocketAddrSpace::Unspecified))
Arc::new(get_client(
&nodes,
&SocketAddrSpace::Unspecified,
connection_cache,
))
};
let keypairs = get_keypairs(
client.clone(),
Expand All @@ -150,15 +156,18 @@ fn main() {
json_rpc_url.to_string(),
CommitmentConfig::confirmed(),
));
if *use_quic {
connection_cache::set_use_quic(true);
}
let connection_cache = Arc::new(ConnectionCache::new(*use_quic));
let client = Arc::new(
TpuClient::new(rpc_client, websocket_url, TpuClientConfig::default())
.unwrap_or_else(|err| {
eprintln!("Could not create TpuClient {:?}", err);
exit(1);
}),
TpuClient::new_with_connection_cache(
rpc_client,
websocket_url,
TpuClientConfig::default(),
connection_cache,
)
.unwrap_or_else(|err| {
eprintln!("Could not create TpuClient {:?}", err);
exit(1);
}),
);
let keypairs = get_keypairs(
client.clone(),
Expand Down
22 changes: 16 additions & 6 deletions bench-tps/tests/bench_tps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use {
cli::Config,
},
solana_client::{
connection_cache::ConnectionCache,
rpc_client::RpcClient,
thin_client::create_client,
thin_client::ThinClient,
tpu_client::{TpuClient, TpuClientConfig},
},
solana_core::validator::ValidatorConfig,
Expand Down Expand Up @@ -58,10 +59,11 @@ fn test_bench_tps_local_cluster(config: Config) {

cluster.transfer(&cluster.funding_keypair, &faucet_pubkey, 100_000_000);

let client = Arc::new(create_client((
let client = Arc::new(ThinClient::new(
cluster.entry_point_info.rpc,
cluster.entry_point_info.tpu,
)));
cluster.connection_cache.clone(),
));

let lamports_per_account = 100;

Expand Down Expand Up @@ -96,9 +98,17 @@ fn test_bench_tps_test_validator(config: Config) {
CommitmentConfig::processed(),
));
let websocket_url = test_validator.rpc_pubsub_url();

let client =
Arc::new(TpuClient::new(rpc_client, &websocket_url, TpuClientConfig::default()).unwrap());
let connection_cache = Arc::new(ConnectionCache::default());

let client = Arc::new(
TpuClient::new_with_connection_cache(
rpc_client,
&websocket_url,
TpuClientConfig::default(),
connection_cache,
)
.unwrap(),
);

let lamports_per_account = 100;

Expand Down
5 changes: 4 additions & 1 deletion cli/src/program.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use {
},
solana_client::{
client_error::ClientErrorKind,
connection_cache::ConnectionCache,
rpc_client::RpcClient,
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcSendTransactionConfig},
rpc_filter::{Memcmp, MemcmpEncodedBytes, RpcFilterType},
Expand Down Expand Up @@ -2215,10 +2216,12 @@ fn send_deploy_messages(
if let Some(write_messages) = write_messages {
if let Some(write_signer) = write_signer {
trace!("Writing program data");
let tpu_client = TpuClient::new(
let connection_cache = Arc::new(ConnectionCache::default());
let tpu_client = TpuClient::new_with_connection_cache(
rpc_client.clone(),
&config.websocket_url,
TpuClientConfig::default(),
connection_cache,
)?;
let transaction_errors = tpu_client
.send_and_confirm_messages_with_spinner(
Expand Down
Loading

0 comments on commit ef4f196

Please sign in to comment.