Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

make remote ext use batch ws-client #8916

Merged
11 commits merged into from
May 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions primitives/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,12 @@ impl PrefixedStorageKey {

/// Storage data associated to a [`StorageKey`].
#[derive(PartialEq, Eq, RuntimeDebug)]
#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Hash, PartialOrd, Ord, Clone, Encode, Decode))]
#[cfg_attr(
feature = "std",
derive(Serialize, Deserialize, Hash, PartialOrd, Ord, Clone, Encode, Decode, Default)
)]
pub struct StorageData(
#[cfg_attr(feature = "std", serde(with="impl_serde::serialize"))]
pub Vec<u8>,
#[cfg_attr(feature = "std", serde(with = "impl_serde::serialize"))] pub Vec<u8>,
);

/// Map of data to use in a storage, it is a collection of
Expand Down
6 changes: 5 additions & 1 deletion utils/frame/remote-externalities/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@ targets = ["x86_64-unknown-linux-gnu"]
jsonrpsee-ws-client = { version = "=0.2.0-alpha.6", default-features = false }
jsonrpsee-proc-macros = "=0.2.0-alpha.6"

hex-literal = "0.3.1"
hex = "0.4.0"
env_logger = "0.8.2"
log = "0.4.11"
codec = { package = "parity-scale-codec", version = "2.0.0" }

serde_json = "1.0"

sp-io = { version = "3.0.0", path = "../../../primitives/io" }
sp-core = { version = "3.0.0", path = "../../../primitives/core" }
sp-runtime = { version = "3.0.0", path = "../../../primitives/runtime" }

[dev-dependencies]
tokio = { version = "1.6.0", features = ["macros", "rt"] }
pallet-elections-phragmen = { path = "../../../frame/elections-phragmen", version = "4.0.0" }
frame-support = { path = "../../../frame/support", version = "3.0.0" }

[features]
remote-test = []
122 changes: 98 additions & 24 deletions utils/frame/remote-externalities/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,17 +114,18 @@ use sp_core::{
};
use codec::{Encode, Decode};
use sp_runtime::traits::Block as BlockT;
use jsonrpsee_ws_client::{WsClientBuilder, WsClient};
use jsonrpsee_ws_client::{
WsClientBuilder, WsClient, v2::params::JsonRpcParams, traits::Client,
};

type KeyPair = (StorageKey, StorageData);

const LOG_TARGET: &str = "remote-ext";
const DEFAULT_TARGET: &str = "wss://rpc.polkadot.io";
const BATCH_SIZE: usize = 512;
emostov marked this conversation as resolved.
Show resolved Hide resolved

jsonrpsee_proc_macros::rpc_client_api! {
RpcApi<B: BlockT> {
#[rpc(method = "state_getStorage", positional_params)]
fn get_storage(prefix: StorageKey, hash: Option<B::Hash>) -> StorageData;
#[rpc(method = "state_getKeysPaged", positional_params)]
fn get_keys_paged(
prefix: Option<StorageKey>,
Expand Down Expand Up @@ -279,7 +280,7 @@ impl<B: BlockT> Builder<B> {
async fn get_keys_paged(
&self,
prefix: StorageKey,
hash: B::Hash,
at: B::Hash,
) -> Result<Vec<StorageKey>, &'static str> {
const PAGE: u32 = 512;
let mut last_key: Option<StorageKey> = None;
Expand All @@ -290,7 +291,7 @@ impl<B: BlockT> Builder<B> {
Some(prefix.clone()),
PAGE,
last_key.clone(),
Some(hash),
Some(at),
)
.await
.map_err(|e| {
Expand Down Expand Up @@ -328,29 +329,53 @@ impl<B: BlockT> Builder<B> {
prefix: StorageKey,
at: B::Hash,
) -> Result<Vec<KeyPair>, &'static str> {
use serde_json::to_value;
let keys = self.get_keys_paged(prefix, at).await?;
let keys_count = keys.len();
info!(target: LOG_TARGET, "Querying a total of {} keys", keys.len());

let mut key_values: Vec<KeyPair> = vec![];
for key in keys {
let value =
RpcApi::<B>::get_storage(self.as_online().rpc_client(), key.clone(), Some(at))
.await
.map_err(|e| {
error!(target: LOG_TARGET, "Error = {:?}", e);
"rpc get_storage failed"
})?;
key_values.push((key, value));
if key_values.len() % 1000 == 0 {
let ratio: f64 = key_values.len() as f64 / keys_count as f64;
debug!(
target: LOG_TARGET,
"progress = {:.2} [{} / {}]",
ratio,
key_values.len(),
keys_count,
);
let client = self.as_online().rpc_client();
for chunk_keys in keys.chunks(BATCH_SIZE) {
let batch = chunk_keys
.iter()
.cloned()
.map(|key| {
(
"state_getStorage",
JsonRpcParams::Array(
vec![
to_value(key).expect("json serialization will work; qed."),
to_value(at).expect("json serialization will work; qed."),
]
),
)
})
.collect::<Vec<_>>();
let values = client.batch_request::<Option<StorageData>>(batch)
.await
.map_err(|e| {
log::error!(target: LOG_TARGET, "failed to execute batch {:?} due to {:?}", chunk_keys, e);
"batch failed."
})?;
assert_eq!(chunk_keys.len(), values.len());
for (idx, key) in chunk_keys.into_iter().enumerate() {
let maybe_value = values[idx].clone();
let value = maybe_value.unwrap_or_else(|| {
log::warn!(target: LOG_TARGET, "key {:?} had none corresponding value.", &key);
StorageData(vec![])
});
key_values.push((key.clone(), value));
if key_values.len() % (10 * BATCH_SIZE) == 0 {
let ratio: f64 = key_values.len() as f64 / keys_count as f64;
debug!(
target: LOG_TARGET,
"progress = {:.2} [{} / {}]",
ratio,
key_values.len(),
keys_count,
);
}
}
}

Expand Down Expand Up @@ -529,7 +554,7 @@ mod remote_tests {
init_logger();
Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig {
modules: vec!["Proxy".to_owned()],
modules: vec!["System".to_owned()],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add somewhere

/// Remote tests connect to `DEFAULT_TARGET` (`wss://rpc.polkadot.io`)

..Default::default()
}))
.build()
Expand All @@ -538,6 +563,55 @@ mod remote_tests {
.execute_with(|| {});
}

#[tokio::test]
async fn can_build_few_pallet() {
init_logger();
Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig {
modules: vec!["Proxy".to_owned(), "Multisig".to_owned(), "PhragmenElection".to_owned()],
..Default::default()
}))
.build()
.await
.expect("Can't reach the remote node. Is it running?")
.execute_with(|| {});
}

#[tokio::test]
async fn sanity_check_decoding() {
use pallet_elections_phragmen::SeatHolder;
use sp_core::crypto::Ss58Codec;
type AccountId = sp_runtime::AccountId32;
type Balance = u128;
frame_support::generate_storage_alias!(
PhragmenElection,
Members =>
Value<Vec<SeatHolder<AccountId, Balance>>>
);

init_logger();
Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig {
modules: vec!["PhragmenElection".to_owned()],
..Default::default()
}))
.build()
.await
.expect("Can't reach the remote node. Is it running?")
.execute_with(|| {
// Gav's polkadot account. 99% this will be in the council.
let gav_polkadot =
AccountId::from_ss58check("13RDY9nrJpyTDBSUdBw12dGwhk19sGwsrVZ2bxkzYHBSagP2")
.unwrap();
let members = Members::get().unwrap();
assert!(members
.iter()
.map(|s| s.who.clone())
.find(|a| a == &gav_polkadot)
.is_some());
});
}

#[tokio::test]
async fn can_create_state_snapshot() {
init_logger();
Expand Down