Skip to content

Commit

Permalink
Migrate to jsonrpsee v2 (paritytech#787)
Browse files Browse the repository at this point in the history
* POC jsonrpsee v2

* POC update ws client

* connect to eth nodes using ws

* fix for subscriptions

* reverted unncecessary changes

* reference jsonrpsee from crates.io

* fixed eth port in deployments

* fmt

* order deps

* remove unnecessary comment

* clone is no longer required for subscriptions

* treat RpcError::Internal as connection error

* resubscribe on terminate

* Update deployments/bridges/poa-rialto/entrypoints/poa-exchange-tx-generator-entrypoint.sh

Co-authored-by: Niklas Adolfsson <[email protected]>
Co-authored-by: Hernando Castano <[email protected]>
  • Loading branch information
3 people authored and serban300 committed Apr 9, 2024
1 parent 98b71ec commit 4f44216
Show file tree
Hide file tree
Showing 18 changed files with 178 additions and 131 deletions.
4 changes: 3 additions & 1 deletion bridges/relays/ethereum-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ bp-eth-poa = { path = "../../primitives/ethereum-poa" }
codec = { package = "parity-scale-codec", version = "2.0.0" }
headers-relay = { path = "../headers-relay" }
hex-literal = "0.3"
jsonrpsee = { git = "https:/svyatonik/jsonrpsee.git", branch = "shared-client-in-rpc-api", default-features = false, features = ["http"] }
jsonrpsee-proc-macros = "0.2.0-alpha"
jsonrpsee-types = "0.2.0-alpha"
jsonrpsee-ws-client = "0.2.0-alpha"
libsecp256k1 = { version = "0.3.4", default-features = false, features = ["hmac"] }
log = "0.4.11"
relay-utils = { path = "../utils" }
Expand Down
54 changes: 27 additions & 27 deletions bridges/relays/ethereum-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ use crate::types::{
};
use crate::{ConnectionParams, Error, Result};

use jsonrpsee::raw::RawClient;
use jsonrpsee::transport::http::HttpTransportClient;
use jsonrpsee::Client as RpcClient;
use jsonrpsee_ws_client::{WsClient as RpcClient, WsConfig as RpcConfig};
use std::sync::Arc;

/// Number of headers missing from the Ethereum node for us to consider node not synced.
const MAJOR_SYNC_BLOCKS: u64 = 5;
Expand All @@ -32,36 +31,36 @@ const MAJOR_SYNC_BLOCKS: u64 = 5;
#[derive(Clone)]
pub struct Client {
params: ConnectionParams,
client: RpcClient,
client: Arc<RpcClient>,
}

impl Client {
/// Create a new Ethereum RPC Client.
pub fn new(params: ConnectionParams) -> Self {
Self {
client: Self::build_client(&params),
pub async fn new(params: ConnectionParams) -> Result<Self> {
Ok(Self {
client: Self::build_client(&params).await?,
params,
}
})
}

/// Build client to use in connection.
fn build_client(params: &ConnectionParams) -> RpcClient {
let uri = format!("http://{}:{}", params.host, params.port);
let transport = HttpTransportClient::new(&uri);
let raw_client = RawClient::new(transport);
raw_client.into()
async fn build_client(params: &ConnectionParams) -> Result<Arc<RpcClient>> {
let uri = format!("ws://{}:{}", params.host, params.port);
let client = RpcClient::new(RpcConfig::with_url(&uri)).await?;
Ok(Arc::new(client))
}

/// Reopen client connection.
pub fn reconnect(&mut self) {
self.client = Self::build_client(&self.params);
pub async fn reconnect(&mut self) -> Result<()> {
self.client = Self::build_client(&self.params).await?;
Ok(())
}
}

impl Client {
/// Returns true if client is connected to at least one peer and is in synced state.
pub async fn ensure_synced(&self) -> Result<()> {
match Ethereum::syncing(&self.client).await? {
match Ethereum::syncing(&*self.client).await? {
SyncState::NotSyncing => Ok(()),
SyncState::Syncing(syncing) => {
let missing_headers = syncing.highest_block.saturating_sub(syncing.current_block);
Expand All @@ -76,18 +75,18 @@ impl Client {

/// Estimate gas usage for the given call.
pub async fn estimate_gas(&self, call_request: CallRequest) -> Result<U256> {
Ok(Ethereum::estimate_gas(&self.client, call_request).await?)
Ok(Ethereum::estimate_gas(&*self.client, call_request).await?)
}

/// Retrieve number of the best known block from the Ethereum node.
pub async fn best_block_number(&self) -> Result<u64> {
Ok(Ethereum::block_number(&self.client).await?.as_u64())
Ok(Ethereum::block_number(&*self.client).await?.as_u64())
}

/// Retrieve number of the best known block from the Ethereum node.
pub async fn header_by_number(&self, block_number: u64) -> Result<Header> {
let get_full_tx_objects = false;
let header = Ethereum::get_block_by_number(&self.client, block_number, get_full_tx_objects).await?;
let header = Ethereum::get_block_by_number(&*self.client, block_number, get_full_tx_objects).await?;
match header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some() {
true => Ok(header),
false => Err(Error::IncompleteHeader),
Expand All @@ -97,7 +96,7 @@ impl Client {
/// Retrieve block header by its hash from Ethereum node.
pub async fn header_by_hash(&self, hash: H256) -> Result<Header> {
let get_full_tx_objects = false;
let header = Ethereum::get_block_by_hash(&self.client, hash, get_full_tx_objects).await?;
let header = Ethereum::get_block_by_hash(&*self.client, hash, get_full_tx_objects).await?;
match header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some() {
true => Ok(header),
false => Err(Error::IncompleteHeader),
Expand All @@ -107,7 +106,8 @@ impl Client {
/// Retrieve block header and its transactions by its number from Ethereum node.
pub async fn header_by_number_with_transactions(&self, number: u64) -> Result<HeaderWithTransactions> {
let get_full_tx_objects = true;
let header = Ethereum::get_block_by_number_with_transactions(&self.client, number, get_full_tx_objects).await?;
let header =
Ethereum::get_block_by_number_with_transactions(&*self.client, number, get_full_tx_objects).await?;

let is_complete_header = header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some();
if !is_complete_header {
Expand All @@ -125,7 +125,7 @@ impl Client {
/// Retrieve block header and its transactions by its hash from Ethereum node.
pub async fn header_by_hash_with_transactions(&self, hash: H256) -> Result<HeaderWithTransactions> {
let get_full_tx_objects = true;
let header = Ethereum::get_block_by_hash_with_transactions(&self.client, hash, get_full_tx_objects).await?;
let header = Ethereum::get_block_by_hash_with_transactions(&*self.client, hash, get_full_tx_objects).await?;

let is_complete_header = header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some();
if !is_complete_header {
Expand All @@ -142,31 +142,31 @@ impl Client {

/// Retrieve transaction by its hash from Ethereum node.
pub async fn transaction_by_hash(&self, hash: H256) -> Result<Option<Transaction>> {
Ok(Ethereum::transaction_by_hash(&self.client, hash).await?)
Ok(Ethereum::transaction_by_hash(&*self.client, hash).await?)
}

/// Retrieve transaction receipt by transaction hash.
pub async fn transaction_receipt(&self, transaction_hash: H256) -> Result<Receipt> {
Ok(Ethereum::get_transaction_receipt(&self.client, transaction_hash).await?)
Ok(Ethereum::get_transaction_receipt(&*self.client, transaction_hash).await?)
}

/// Get the nonce of the given account.
pub async fn account_nonce(&self, address: Address) -> Result<U256> {
Ok(Ethereum::get_transaction_count(&self.client, address).await?)
Ok(Ethereum::get_transaction_count(&*self.client, address).await?)
}

/// Submit an Ethereum transaction.
///
/// The transaction must already be signed before sending it through this method.
pub async fn submit_transaction(&self, signed_raw_tx: SignedRawTx) -> Result<TransactionHash> {
let transaction = Bytes(signed_raw_tx);
let tx_hash = Ethereum::submit_transaction(&self.client, transaction).await?;
let tx_hash = Ethereum::submit_transaction(&*self.client, transaction).await?;
log::trace!(target: "bridge", "Sent transaction to Ethereum node: {:?}", tx_hash);
Ok(tx_hash)
}

/// Call Ethereum smart contract.
pub async fn eth_call(&self, call_transaction: CallRequest) -> Result<Bytes> {
Ok(Ethereum::call(&self.client, call_transaction).await?)
Ok(Ethereum::call(&*self.client, call_transaction).await?)
}
}
18 changes: 11 additions & 7 deletions bridges/relays/ethereum-client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

use crate::types::U256;

use jsonrpsee::client::RequestError;
use jsonrpsee_types::error::Error as RpcError;
use relay_utils::MaybeConnectionError;

/// Result type used by Ethereum client.
Expand All @@ -30,7 +30,7 @@ pub type Result<T> = std::result::Result<T, Error>;
pub enum Error {
/// An error that can occur when making an HTTP request to
/// an JSON-RPC client.
Request(RequestError),
RpcError(RpcError),
/// Failed to parse response.
ResponseParseFailed(String),
/// We have received a header with missing fields.
Expand All @@ -47,25 +47,29 @@ pub enum Error {
ClientNotSynced(U256),
}

impl From<RequestError> for Error {
fn from(error: RequestError) -> Self {
Error::Request(error)
impl From<RpcError> for Error {
fn from(error: RpcError) -> Self {
Error::RpcError(error)
}
}

impl MaybeConnectionError for Error {
fn is_connection_error(&self) -> bool {
matches!(
*self,
Error::Request(RequestError::TransportError(_)) | Error::ClientNotSynced(_),
Error::RpcError(RpcError::TransportError(_))
// right now if connection to the ws server is dropped (after it is already established),
// we're getting this error
| Error::RpcError(RpcError::Internal(_))
| Error::ClientNotSynced(_),
)
}
}

impl ToString for Error {
fn to_string(&self) -> String {
match self {
Self::Request(e) => e.to_string(),
Self::RpcError(e) => e.to_string(),
Self::ResponseParseFailed(e) => e.to_string(),
Self::IncompleteHeader => {
"Incomplete Ethereum Header Received (missing some of required fields - hash, number, logs_bloom)"
Expand Down
8 changes: 4 additions & 4 deletions bridges/relays/ethereum-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,20 @@ pub use crate::sign::{sign_and_submit_transaction, SigningParams};

pub mod types;

/// Ethereum connection params.
/// Ethereum-over-websocket connection params.
#[derive(Debug, Clone)]
pub struct ConnectionParams {
/// Ethereum RPC host.
/// Websocket server hostname.
pub host: String,
/// Ethereum RPC port.
/// Websocket server TCP port.
pub port: u16,
}

impl Default for ConnectionParams {
fn default() -> Self {
ConnectionParams {
host: "localhost".into(),
port: 8545,
port: 8546,
}
}
}
2 changes: 1 addition & 1 deletion bridges/relays/ethereum-client/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::types::{
H256, U256, U64,
};

jsonrpsee::rpc_api! {
jsonrpsee_proc_macros::rpc_client_api! {
pub(crate) Ethereum {
#[rpc(method = "eth_syncing", positional_params)]
fn syncing() -> SyncState;
Expand Down
6 changes: 3 additions & 3 deletions bridges/relays/ethereum/src/cli.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ subcommands:
- eth-host: &eth-host
long: eth-host
value_name: ETH_HOST
help: Connect to Ethereum node at given host.
help: Connect to Ethereum node websocket server at given host.
takes_value: true
- eth-port: &eth-port
long: eth-port
value_name: ETH_PORT
help: Connect to Ethereum node at given port.
help: Connect to Ethereum node websocket server at given port.
takes_value: true
- sub-host: &sub-host
long: sub-host
value_name: SUB_HOST
help: Connect to Substrate node at given host.
help: Connect to Substrate node websocket server at given host.
takes_value: true
- sub-port: &sub-port
long: sub-port
Expand Down
2 changes: 1 addition & 1 deletion bridges/relays/ethereum/src/ethereum_deploy_contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub fn run(params: EthereumDeployContractParams) {
} = params;

let result = local_pool.run_until(async move {
let eth_client = EthereumClient::new(eth_params);
let eth_client = EthereumClient::new(eth_params).await.map_err(RpcError::Ethereum)?;
let sub_client = SubstrateClient::<Rialto>::new(sub_params).await.map_err(RpcError::Substrate)?;

let (initial_header_id, initial_header) = prepare_initial_header(&sub_client, sub_initial_header).await?;
Expand Down
8 changes: 4 additions & 4 deletions bridges/relays/ethereum/src/ethereum_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ impl RelayClient for EthereumTransactionsSource {
type Error = RpcError;

async fn reconnect(&mut self) -> Result<(), RpcError> {
self.client.reconnect();
Ok(())
self.client.reconnect().await.map_err(Into::into)
}
}

Expand Down Expand Up @@ -305,7 +304,7 @@ fn run_single_transaction_relay(params: EthereumExchangeParams, eth_tx_hash: H25
} = params;

let result = local_pool.run_until(async move {
let eth_client = EthereumClient::new(eth_params);
let eth_client = EthereumClient::new(eth_params).await.map_err(RpcError::Ethereum)?;
let sub_client = SubstrateClient::<Rialto>::new(sub_params)
.await
.map_err(RpcError::Substrate)?;
Expand Down Expand Up @@ -351,7 +350,8 @@ fn run_auto_transactions_relay_loop(params: EthereumExchangeParams, eth_start_wi
} = params;

let do_run_loop = move || -> Result<(), String> {
let eth_client = EthereumClient::new(eth_params);
let eth_client = async_std::task::block_on(EthereumClient::new(eth_params))
.map_err(|err| format!("Error starting Ethereum client: {:?}", err))?;
let sub_client = async_std::task::block_on(SubstrateClient::<Rialto>::new(sub_params))
.map_err(|err| format!("Error starting Substrate client: {:?}", err))?;

Expand Down
4 changes: 3 additions & 1 deletion bridges/relays/ethereum/src/ethereum_exchange_submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ pub fn run(params: EthereumExchangeSubmitParams) {
} = params;

let result: Result<_, String> = local_pool.run_until(async move {
let eth_client = EthereumClient::new(eth_params);
let eth_client = EthereumClient::new(eth_params)
.await
.map_err(|err| format!("error connecting to Ethereum node: {:?}", err))?;

let eth_signer_address = secret_to_address(&eth_sign.signer);
let sub_recipient_encoded = sub_recipient;
Expand Down
7 changes: 3 additions & 4 deletions bridges/relays/ethereum/src/ethereum_sync_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@ impl RelayClient for EthereumHeadersSource {
type Error = RpcError;

async fn reconnect(&mut self) -> Result<(), RpcError> {
self.client.reconnect();
Ok(())
self.client.reconnect().await.map_err(Into::into)
}
}

Expand Down Expand Up @@ -259,8 +258,8 @@ pub fn run(params: EthereumSyncParams) -> Result<(), RpcError> {
instance,
} = params;

let eth_client = EthereumClient::new(eth_params);
let sub_client = async_std::task::block_on(async { SubstrateClient::<Rialto>::new(sub_params).await })?;
let eth_client = async_std::task::block_on(EthereumClient::new(eth_params))?;
let sub_client = async_std::task::block_on(SubstrateClient::<Rialto>::new(sub_params))?;

let sign_sub_transactions = match sync_params.target_tx_mode {
TargetTransactionMode::Signed | TargetTransactionMode::Backup => true,
Expand Down
7 changes: 3 additions & 4 deletions bridges/relays/ethereum/src/substrate_sync_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,7 @@ impl RelayClient for EthereumHeadersTarget {
type Error = RpcError;

async fn reconnect(&mut self) -> Result<(), RpcError> {
self.client.reconnect();
Ok(())
self.client.reconnect().await.map_err(Into::into)
}
}

Expand Down Expand Up @@ -174,8 +173,8 @@ pub fn run(params: SubstrateSyncParams) -> Result<(), RpcError> {
metrics_params,
} = params;

let eth_client = EthereumClient::new(eth_params);
let sub_client = async_std::task::block_on(async { SubstrateClient::<Rialto>::new(sub_params).await })?;
let eth_client = async_std::task::block_on(EthereumClient::new(eth_params))?;
let sub_client = async_std::task::block_on(SubstrateClient::<Rialto>::new(sub_params))?;

let target = EthereumHeadersTarget::new(eth_client, eth_contract_address, eth_sign);
let source = SubstrateHeadersSource::new(sub_client);
Expand Down
4 changes: 3 additions & 1 deletion bridges/relays/substrate-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
async-std = "1.6.5"
async-trait = "0.1.40"
codec = { package = "parity-scale-codec", version = "2.0.0" }
jsonrpsee = { git = "https:/svyatonik/jsonrpsee.git", branch = "shared-client-in-rpc-api", default-features = false, features = ["ws"] }
jsonrpsee-proc-macros = "0.2.0-alpha"
jsonrpsee-types = "0.2.0-alpha"
jsonrpsee-ws-client = "0.2.0-alpha"
log = "0.4.11"
num-traits = "0.2"
rand = "0.7"
Expand Down
2 changes: 1 addition & 1 deletion bridges/relays/substrate-client/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::client::Client;

use bp_runtime::Chain as ChainBase;
use frame_support::Parameter;
use jsonrpsee::common::{DeserializeOwned, Serialize};
use jsonrpsee_types::jsonrpc::{DeserializeOwned, Serialize};
use num_traits::{CheckedSub, Zero};
use sp_core::{storage::StorageKey, Pair};
use sp_runtime::{
Expand Down
Loading

0 comments on commit 4f44216

Please sign in to comment.