Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
make remote ext use batch ws-client (#8916)
Browse files Browse the repository at this point in the history
* make remote ext use batch ws-client

* Add debug log for key length

* better assertions

* new sanity_checl

* try and make it work with batch

* update test

* remove exctra uri

* add missing at

* remove unused rpc stuff

* improve

Co-authored-by: emostov <[email protected]>
  • Loading branch information
kianenigma and emostov committed Jul 4, 2021
1 parent 8252b41 commit 45c88b0
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 30 deletions.
7 changes: 5 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions primitives/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,12 @@ impl PrefixedStorageKey {

/// Storage data associated to a [`StorageKey`].
#[derive(PartialEq, Eq, RuntimeDebug)]
#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Hash, PartialOrd, Ord, Clone, Encode, Decode))]
#[cfg_attr(
feature = "std",
derive(Serialize, Deserialize, Hash, PartialOrd, Ord, Clone, Encode, Decode, Default)
)]
pub struct StorageData(
#[cfg_attr(feature = "std", serde(with="impl_serde::serialize"))]
pub Vec<u8>,
#[cfg_attr(feature = "std", serde(with = "impl_serde::serialize"))] pub Vec<u8>,
);

/// Map of data to use in a storage, it is a collection of
Expand Down
6 changes: 5 additions & 1 deletion utils/frame/remote-externalities/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@ targets = ["x86_64-unknown-linux-gnu"]
jsonrpsee-ws-client = { version = "=0.2.0-alpha.6", default-features = false }
jsonrpsee-proc-macros = "=0.2.0-alpha.6"

hex-literal = "0.3.1"
hex = "0.4.0"
env_logger = "0.8.2"
log = "0.4.11"
codec = { package = "parity-scale-codec", version = "2.0.0" }

serde_json = "1.0"

sp-io = { version = "3.0.0", path = "../../../primitives/io" }
sp-core = { version = "3.0.0", path = "../../../primitives/core" }
sp-runtime = { version = "3.0.0", path = "../../../primitives/runtime" }

[dev-dependencies]
tokio = { version = "1.6.0", features = ["macros", "rt"] }
pallet-elections-phragmen = { path = "../../../frame/elections-phragmen", version = "4.0.0" }
frame-support = { path = "../../../frame/support", version = "3.0.0" }

[features]
remote-test = []
122 changes: 98 additions & 24 deletions utils/frame/remote-externalities/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,17 +114,18 @@ use sp_core::{
};
use codec::{Encode, Decode};
use sp_runtime::traits::Block as BlockT;
use jsonrpsee_ws_client::{WsClientBuilder, WsClient};
use jsonrpsee_ws_client::{
WsClientBuilder, WsClient, v2::params::JsonRpcParams, traits::Client,
};

type KeyPair = (StorageKey, StorageData);

const LOG_TARGET: &str = "remote-ext";
const DEFAULT_TARGET: &str = "wss://rpc.polkadot.io";
const BATCH_SIZE: usize = 512;

jsonrpsee_proc_macros::rpc_client_api! {
RpcApi<B: BlockT> {
#[rpc(method = "state_getStorage", positional_params)]
fn get_storage(prefix: StorageKey, hash: Option<B::Hash>) -> StorageData;
#[rpc(method = "state_getKeysPaged", positional_params)]
fn get_keys_paged(
prefix: Option<StorageKey>,
Expand Down Expand Up @@ -279,7 +280,7 @@ impl<B: BlockT> Builder<B> {
async fn get_keys_paged(
&self,
prefix: StorageKey,
hash: B::Hash,
at: B::Hash,
) -> Result<Vec<StorageKey>, &'static str> {
const PAGE: u32 = 512;
let mut last_key: Option<StorageKey> = None;
Expand All @@ -290,7 +291,7 @@ impl<B: BlockT> Builder<B> {
Some(prefix.clone()),
PAGE,
last_key.clone(),
Some(hash),
Some(at),
)
.await
.map_err(|e| {
Expand Down Expand Up @@ -328,29 +329,53 @@ impl<B: BlockT> Builder<B> {
prefix: StorageKey,
at: B::Hash,
) -> Result<Vec<KeyPair>, &'static str> {
use serde_json::to_value;
let keys = self.get_keys_paged(prefix, at).await?;
let keys_count = keys.len();
info!(target: LOG_TARGET, "Querying a total of {} keys", keys.len());

let mut key_values: Vec<KeyPair> = vec![];
for key in keys {
let value =
RpcApi::<B>::get_storage(self.as_online().rpc_client(), key.clone(), Some(at))
.await
.map_err(|e| {
error!(target: LOG_TARGET, "Error = {:?}", e);
"rpc get_storage failed"
})?;
key_values.push((key, value));
if key_values.len() % 1000 == 0 {
let ratio: f64 = key_values.len() as f64 / keys_count as f64;
debug!(
target: LOG_TARGET,
"progress = {:.2} [{} / {}]",
ratio,
key_values.len(),
keys_count,
);
let client = self.as_online().rpc_client();
for chunk_keys in keys.chunks(BATCH_SIZE) {
let batch = chunk_keys
.iter()
.cloned()
.map(|key| {
(
"state_getStorage",
JsonRpcParams::Array(
vec![
to_value(key).expect("json serialization will work; qed."),
to_value(at).expect("json serialization will work; qed."),
]
),
)
})
.collect::<Vec<_>>();
let values = client.batch_request::<Option<StorageData>>(batch)
.await
.map_err(|e| {
log::error!(target: LOG_TARGET, "failed to execute batch {:?} due to {:?}", chunk_keys, e);
"batch failed."
})?;
assert_eq!(chunk_keys.len(), values.len());
for (idx, key) in chunk_keys.into_iter().enumerate() {
let maybe_value = values[idx].clone();
let value = maybe_value.unwrap_or_else(|| {
log::warn!(target: LOG_TARGET, "key {:?} had none corresponding value.", &key);
StorageData(vec![])
});
key_values.push((key.clone(), value));
if key_values.len() % (10 * BATCH_SIZE) == 0 {
let ratio: f64 = key_values.len() as f64 / keys_count as f64;
debug!(
target: LOG_TARGET,
"progress = {:.2} [{} / {}]",
ratio,
key_values.len(),
keys_count,
);
}
}
}

Expand Down Expand Up @@ -529,7 +554,7 @@ mod remote_tests {
init_logger();
Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig {
modules: vec!["Proxy".to_owned()],
modules: vec!["System".to_owned()],
..Default::default()
}))
.build()
Expand All @@ -538,6 +563,55 @@ mod remote_tests {
.execute_with(|| {});
}

#[tokio::test]
async fn can_build_few_pallet() {
init_logger();
Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig {
modules: vec!["Proxy".to_owned(), "Multisig".to_owned(), "PhragmenElection".to_owned()],
..Default::default()
}))
.build()
.await
.expect("Can't reach the remote node. Is it running?")
.execute_with(|| {});
}

#[tokio::test]
async fn sanity_check_decoding() {
use pallet_elections_phragmen::SeatHolder;
use sp_core::crypto::Ss58Codec;
type AccountId = sp_runtime::AccountId32;
type Balance = u128;
frame_support::generate_storage_alias!(
PhragmenElection,
Members =>
Value<Vec<SeatHolder<AccountId, Balance>>>
);

init_logger();
Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig {
modules: vec!["PhragmenElection".to_owned()],
..Default::default()
}))
.build()
.await
.expect("Can't reach the remote node. Is it running?")
.execute_with(|| {
// Gav's polkadot account. 99% this will be in the council.
let gav_polkadot =
AccountId::from_ss58check("13RDY9nrJpyTDBSUdBw12dGwhk19sGwsrVZ2bxkzYHBSagP2")
.unwrap();
let members = Members::get().unwrap();
assert!(members
.iter()
.map(|s| s.who.clone())
.find(|a| a == &gav_polkadot)
.is_some());
});
}

#[tokio::test]
async fn can_create_state_snapshot() {
init_logger();
Expand Down

0 comments on commit 45c88b0

Please sign in to comment.