Skip to content

Commit

Permalink
Add option to wait for a specific epoch length to bench-tps
Browse files Browse the repository at this point in the history
  • Loading branch information
sakridge committed May 16, 2020
1 parent 9222bc2 commit 2819669
Show file tree
Hide file tree
Showing 13 changed files with 240 additions and 112 deletions.
239 changes: 163 additions & 76 deletions bench-tps/src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::{
atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering},
Arc, Mutex, RwLock,
},
thread::{sleep, Builder},
thread::{sleep, Builder, JoinHandle},
time::{Duration, Instant},
};

Expand Down Expand Up @@ -64,6 +64,144 @@ fn get_recent_blockhash<T: Client>(client: &T) -> (Hash, FeeCalculator) {
}
}

fn wait_for_target_slots_per_epoch<T>(target_slots_per_epoch: u64, client: &Arc<T>)
where
T: 'static + Client + Send + Sync,
{
if target_slots_per_epoch != 0 {
info!(
"Waiting until epochs are {} slots long..",
target_slots_per_epoch
);
loop {
if let Ok(epoch_info) = client.get_epoch_info() {
if epoch_info.slots_in_epoch >= target_slots_per_epoch {
info!("Done epoch_info: {:?}", epoch_info);
break;
}
info!(
"Waiting for epoch: {} now: {}",
target_slots_per_epoch, epoch_info.slots_in_epoch
);
}
sleep(Duration::from_secs(3));
}
}
}

fn create_sampler_thread<T>(
client: &Arc<T>,
exit_signal: &Arc<AtomicBool>,
sample_period: u64,
maxes: &Arc<RwLock<Vec<(String, SampleStats)>>>,
) -> JoinHandle<()>
where
T: 'static + Client + Send + Sync,
{
info!("Sampling TPS every {} second...", sample_period);
let exit_signal = exit_signal.clone();
let maxes = maxes.clone();
let client = client.clone();
Builder::new()
.name("solana-client-sample".to_string())
.spawn(move || {
sample_txs(&exit_signal, &maxes, sample_period, &client);
})
.unwrap()
}

fn generate_chunked_transfers(
recent_blockhash: Arc<RwLock<Hash>>,
shared_txs: &SharedTransactions,
shared_tx_active_thread_count: Arc<AtomicIsize>,
source_keypair_chunks: Vec<Vec<&Keypair>>,
dest_keypair_chunks: &mut Vec<VecDeque<&Keypair>>,
threads: usize,
duration: Duration,
sustained: bool,
libra_args: Option<LibraKeys>,
) {
// generate and send transactions for the specified duration
let start = Instant::now();
let keypair_chunks = source_keypair_chunks.len();
let mut reclaim_lamports_back_to_source_account = false;
let mut chunk_index = 0;
while start.elapsed() < duration {
generate_txs(
shared_txs,
&recent_blockhash,
&source_keypair_chunks[chunk_index],
&dest_keypair_chunks[chunk_index],
threads,
reclaim_lamports_back_to_source_account,
&libra_args,
);

// In sustained mode, overlap the transfers with generation. This has higher average
// performance but lower peak performance in tested environments.
if sustained {
// Ensure that we don't generate more transactions than we can handle.
while shared_txs.read().unwrap().len() > 2 * threads {
sleep(Duration::from_millis(1));
}
} else {
while !shared_txs.read().unwrap().is_empty()
|| shared_tx_active_thread_count.load(Ordering::Relaxed) > 0
{
sleep(Duration::from_millis(1));
}
}

// Rotate destination keypairs so that the next round of transactions will have different
// transaction signatures even when blockhash is reused.
dest_keypair_chunks[chunk_index].rotate_left(1);

// Move on to next chunk
chunk_index = (chunk_index + 1) % keypair_chunks;

// Switch directions after transfering for each "chunk"
if chunk_index == 0 {
reclaim_lamports_back_to_source_account = !reclaim_lamports_back_to_source_account;
}
}
}

fn create_sender_threads<T>(
client: &Arc<T>,
shared_txs: &SharedTransactions,
thread_batch_sleep_ms: usize,
total_tx_sent_count: &Arc<AtomicUsize>,
threads: usize,
exit_signal: &Arc<AtomicBool>,
shared_tx_active_thread_count: &Arc<AtomicIsize>,
) -> Vec<JoinHandle<()>>
where
T: 'static + Client + Send + Sync,
{
(0..threads)
.map(|_| {
let exit_signal = exit_signal.clone();
let shared_txs = shared_txs.clone();
let shared_tx_active_thread_count = shared_tx_active_thread_count.clone();
let total_tx_sent_count = total_tx_sent_count.clone();
let client = client.clone();
Builder::new()
.name("solana-client-sender".to_string())
.spawn(move || {
do_tx_transfers(
&exit_signal,
&shared_txs,
&shared_tx_active_thread_count,
&total_tx_sent_count,
thread_batch_sleep_ms,
&client,
);
})
.unwrap()
})
.collect()
}

pub fn do_bench_tps<T>(
client: Arc<T>,
config: Config,
Expand All @@ -80,6 +218,7 @@ where
duration,
tx_count,
sustained,
target_slots_per_epoch,
..
} = config;

Expand Down Expand Up @@ -108,18 +247,7 @@ where
// collect the max transaction rate and total tx count seen
let maxes = Arc::new(RwLock::new(Vec::new()));
let sample_period = 1; // in seconds
info!("Sampling TPS every {} second...", sample_period);
let sample_thread = {
let exit_signal = exit_signal.clone();
let maxes = maxes.clone();
let client = client.clone();
Builder::new()
.name("solana-client-sample".to_string())
.spawn(move || {
sample_txs(&exit_signal, &maxes, sample_period, &client);
})
.unwrap()
};
let sample_thread = create_sampler_thread(&client, &exit_signal, sample_period, &maxes);

let shared_txs: SharedTransactions = Arc::new(RwLock::new(VecDeque::new()));

Expand All @@ -140,72 +268,31 @@ where
.unwrap()
};

let s_threads: Vec<_> = (0..threads)
.map(|_| {
let exit_signal = exit_signal.clone();
let shared_txs = shared_txs.clone();
let shared_tx_active_thread_count = shared_tx_active_thread_count.clone();
let total_tx_sent_count = total_tx_sent_count.clone();
let client = client.clone();
Builder::new()
.name("solana-client-sender".to_string())
.spawn(move || {
do_tx_transfers(
&exit_signal,
&shared_txs,
&shared_tx_active_thread_count,
&total_tx_sent_count,
thread_batch_sleep_ms,
&client,
);
})
.unwrap()
})
.collect();

// generate and send transactions for the specified duration
let start = Instant::now();
let keypair_chunks = source_keypair_chunks.len();
let mut reclaim_lamports_back_to_source_account = false;
let mut chunk_index = 0;
while start.elapsed() < duration {
generate_txs(
&shared_txs,
&recent_blockhash,
&source_keypair_chunks[chunk_index],
&dest_keypair_chunks[chunk_index],
threads,
reclaim_lamports_back_to_source_account,
&libra_args,
);

// In sustained mode, overlap the transfers with generation. This has higher average
// performance but lower peak performance in tested environments.
if sustained {
// Ensure that we don't generate more transactions than we can handle.
while shared_txs.read().unwrap().len() > 2 * threads {
sleep(Duration::from_millis(1));
}
} else {
while !shared_txs.read().unwrap().is_empty()
|| shared_tx_active_thread_count.load(Ordering::Relaxed) > 0
{
sleep(Duration::from_millis(1));
}
}
let s_threads = create_sender_threads(
&client,
&shared_txs,
thread_batch_sleep_ms,
&total_tx_sent_count,
threads,
&exit_signal,
&shared_tx_active_thread_count,
);

// Rotate destination keypairs so that the next round of transactions will have different
// transaction signatures even when blockhash is reused.
dest_keypair_chunks[chunk_index].rotate_left(1);
wait_for_target_slots_per_epoch(target_slots_per_epoch, &client);

// Move on to next chunk
chunk_index = (chunk_index + 1) % keypair_chunks;
let start = Instant::now();

// Switch directions after transfering for each "chunk"
if chunk_index == 0 {
reclaim_lamports_back_to_source_account = !reclaim_lamports_back_to_source_account;
}
}
generate_chunked_transfers(
recent_blockhash,
&shared_txs,
shared_tx_active_thread_count,
source_keypair_chunks,
&mut dest_keypair_chunks,
threads,
duration,
sustained,
libra_args,
);

// Stop the sampling threads so it will collect the stats
exit_signal.store(true, Ordering::Relaxed);
Expand Down
18 changes: 18 additions & 0 deletions bench-tps/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct Config {
pub multi_client: bool,
pub use_move: bool,
pub num_lamports_per_account: u64,
pub target_slots_per_epoch: u64,
}

impl Default for Config {
Expand All @@ -47,6 +48,7 @@ impl Default for Config {
multi_client: true,
use_move: false,
num_lamports_per_account: NUM_LAMPORTS_PER_ACCOUNT_DEFAULT,
target_slots_per_epoch: 0,
}
}
}
Expand Down Expand Up @@ -172,6 +174,15 @@ pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> {
"Number of lamports per account.",
),
)
.arg(
Arg::with_name("target_slots_per_epoch")
.long("target-slots-per-epoch")
.value_name("SLOTS")
.takes_value(true)
.help(
"Wait until epochs are this many slots long.",
),
)
}

/// Parses a clap `ArgMatches` structure into a `Config`
Expand Down Expand Up @@ -259,5 +270,12 @@ pub fn extract_args<'a>(matches: &ArgMatches<'a>) -> Config {
args.num_lamports_per_account = v.to_string().parse().expect("can't parse lamports");
}

if let Some(t) = matches.value_of("target_slots_per_epoch") {
args.target_slots_per_epoch = t
.to_string()
.parse()
.expect("can't parse target slots per epoch");
}

args
}
9 changes: 5 additions & 4 deletions cli/src/cli_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ use inflector::cases::titlecase::to_title_case;
use serde::Serialize;
use serde_json::{Map, Value};
use solana_client::rpc_response::{
RpcAccountBalance, RpcEpochInfo, RpcKeyedAccount, RpcSupply, RpcVoteAccountInfo,
RpcAccountBalance, RpcKeyedAccount, RpcSupply, RpcVoteAccountInfo,
};
use solana_sdk::{
clock::{self, Epoch, Slot, UnixTimestamp},
epoch_info::EpochInfo,
native_token::lamports_to_sol,
stake_history::StakeHistoryEntry,
};
Expand Down Expand Up @@ -186,11 +187,11 @@ pub struct CliSlotStatus {
#[serde(rename_all = "camelCase")]
pub struct CliEpochInfo {
#[serde(flatten)]
pub epoch_info: RpcEpochInfo,
pub epoch_info: EpochInfo,
}

impl From<RpcEpochInfo> for CliEpochInfo {
fn from(epoch_info: RpcEpochInfo) -> Self {
impl From<EpochInfo> for CliEpochInfo {
fn from(epoch_info: EpochInfo) -> Self {
Self { epoch_info }
}
}
Expand Down
5 changes: 3 additions & 2 deletions client/src/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use solana_sdk::{
MAX_HASH_AGE_IN_SECONDS,
},
commitment_config::CommitmentConfig,
epoch_info::EpochInfo,
epoch_schedule::EpochSchedule,
fee_calculator::{FeeCalculator, FeeRateGovernor},
hash::Hash,
Expand Down Expand Up @@ -296,14 +297,14 @@ impl RpcClient {
.map_err(|err| err.into_with_request(request))?
}

pub fn get_epoch_info(&self) -> ClientResult<RpcEpochInfo> {
pub fn get_epoch_info(&self) -> ClientResult<EpochInfo> {
self.get_epoch_info_with_commitment(CommitmentConfig::default())
}

pub fn get_epoch_info_with_commitment(
&self,
commitment_config: CommitmentConfig,
) -> ClientResult<RpcEpochInfo> {
) -> ClientResult<EpochInfo> {
self.send(RpcRequest::GetEpochInfo, json!([commitment_config]), 0)
}

Expand Down
16 changes: 0 additions & 16 deletions client/src/rpc_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,22 +114,6 @@ pub struct RpcContactInfo {
/// Map of leader base58 identity pubkeys to the slot indices relative to the first epoch slot
pub type RpcLeaderSchedule = HashMap<String, Vec<usize>>;

#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RpcEpochInfo {
/// The current epoch
pub epoch: Epoch,

/// The current slot, relative to the start of the current epoch
pub slot_index: u64,

/// The number of slots in this epoch
pub slots_in_epoch: u64,

/// The absolute current slot
pub absolute_slot: Slot,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "kebab-case")]
pub struct RpcVersionInfo {
Expand Down
Loading

0 comments on commit 2819669

Please sign in to comment.