Actually clone client data by reference when cloning the client (#1941)

* actually clone client data by reference when clonning the client

* spelling

* clippy
This commit is contained in:
Svyatoslav Nikolsky
2023-03-07 17:37:20 +03:00
committed by Bastian Köcher
parent e0c0861bc7
commit f4ca16b98d
+32 -24
View File
@@ -26,7 +26,7 @@ use crate::{
Result, SignParam, TransactionTracker, UnsignedTransaction, Result, SignParam, TransactionTracker, UnsignedTransaction,
}; };
use async_std::sync::{Arc, Mutex}; use async_std::sync::{Arc, Mutex, RwLock};
use async_trait::async_trait; use async_trait::async_trait;
use bp_runtime::{HeaderIdProvider, StorageDoubleMapKeyProvider, StorageMapKeyProvider}; use bp_runtime::{HeaderIdProvider, StorageDoubleMapKeyProvider, StorageMapKeyProvider};
use codec::{Decode, Encode}; use codec::{Decode, Encode};
@@ -111,23 +111,31 @@ pub enum ChainRuntimeVersion {
/// Substrate client type. /// Substrate client type.
/// ///
/// Cloning `Client` is a cheap operation. /// Cloning `Client` is a cheap operation that only clones internal references. Different
/// clones of the same client are guaranteed to use the same references.
pub struct Client<C: Chain> { pub struct Client<C: Chain> {
/// Tokio runtime handle. // Lock order: `submit_signed_extrinsic_lock`, `data`
tokio: Arc<tokio::runtime::Runtime>,
/// Client connection params. /// Client connection params.
params: Arc<ConnectionParams>, params: Arc<ConnectionParams>,
/// Substrate RPC client. /// Saved chain runtime version.
client: Arc<RpcClient>, chain_runtime_version: ChainRuntimeVersion,
/// Genesis block hash.
genesis_hash: HashOf<C>,
/// If several tasks are submitting their transactions simultaneously using /// If several tasks are submitting their transactions simultaneously using
/// `submit_signed_extrinsic` method, they may get the same transaction nonce. So one of /// `submit_signed_extrinsic` method, they may get the same transaction nonce. So one of
/// transactions will be rejected from the pool. This lock is here to prevent situations like /// transactions will be rejected from the pool. This lock is here to prevent situations like
/// that. /// that.
submit_signed_extrinsic_lock: Arc<Mutex<()>>, submit_signed_extrinsic_lock: Arc<Mutex<()>>,
/// Saved chain runtime version /// Genesis block hash.
chain_runtime_version: ChainRuntimeVersion, genesis_hash: HashOf<C>,
/// Shared dynamic data.
data: Arc<RwLock<ClientData>>,
}
/// Client data, shared by all `Client` clones.
struct ClientData {
/// Tokio runtime handle.
tokio: Arc<tokio::runtime::Runtime>,
/// Substrate RPC client.
client: Arc<RpcClient>,
} }
#[async_trait] #[async_trait]
@@ -135,9 +143,10 @@ impl<C: Chain> relay_utils::relay_loop::Client for Client<C> {
type Error = Error; type Error = Error;
async fn reconnect(&mut self) -> Result<()> { async fn reconnect(&mut self) -> Result<()> {
let mut data = self.data.write().await;
let (tokio, client) = Self::build_client(&self.params).await?; let (tokio, client) = Self::build_client(&self.params).await?;
self.tokio = tokio; data.tokio = tokio;
self.client = client; data.client = client;
Ok(()) Ok(())
} }
} }
@@ -145,12 +154,11 @@ impl<C: Chain> relay_utils::relay_loop::Client for Client<C> {
impl<C: Chain> Clone for Client<C> { impl<C: Chain> Clone for Client<C> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Client { Client {
tokio: self.tokio.clone(),
params: self.params.clone(), params: self.params.clone(),
client: self.client.clone(),
genesis_hash: self.genesis_hash,
submit_signed_extrinsic_lock: self.submit_signed_extrinsic_lock.clone(),
chain_runtime_version: self.chain_runtime_version.clone(), chain_runtime_version: self.chain_runtime_version.clone(),
submit_signed_extrinsic_lock: self.submit_signed_extrinsic_lock.clone(),
genesis_hash: self.genesis_hash,
data: self.data.clone(),
} }
} }
} }
@@ -199,12 +207,11 @@ impl<C: Chain> Client<C> {
let chain_runtime_version = params.chain_runtime_version.clone(); let chain_runtime_version = params.chain_runtime_version.clone();
Ok(Self { Ok(Self {
tokio,
params, params,
client,
genesis_hash,
submit_signed_extrinsic_lock: Arc::new(Mutex::new(())),
chain_runtime_version, chain_runtime_version,
submit_signed_extrinsic_lock: Arc::new(Mutex::new(())),
genesis_hash,
data: Arc::new(RwLock::new(ClientData { tokio, client })),
}) })
} }
@@ -572,7 +579,7 @@ impl<C: Chain> Client<C> {
Ok((tracker, subscription)) Ok((tracker, subscription))
}) })
.await?; .await?;
self.tokio.spawn(Subscription::background_worker( self.data.read().await.tokio.spawn(Subscription::background_worker(
C::NAME.into(), C::NAME.into(),
"extrinsic".into(), "extrinsic".into(),
subscription, subscription,
@@ -719,7 +726,7 @@ impl<C: Chain> Client<C> {
}) })
.await?; .await?;
let (sender, receiver) = futures::channel::mpsc::channel(MAX_SUBSCRIPTION_CAPACITY); let (sender, receiver) = futures::channel::mpsc::channel(MAX_SUBSCRIPTION_CAPACITY);
self.tokio.spawn(Subscription::background_worker( self.data.read().await.tokio.spawn(Subscription::background_worker(
C::NAME.into(), C::NAME.into(),
"justification".into(), "justification".into(),
subscription, subscription,
@@ -735,8 +742,9 @@ impl<C: Chain> Client<C> {
F: Future<Output = Result<T>> + Send, F: Future<Output = Result<T>> + Send,
T: Send + 'static, T: Send + 'static,
{ {
let client = self.client.clone(); let data = self.data.read().await;
self.tokio.spawn(async move { make_jsonrpsee_future(client).await }).await? let client = data.client.clone();
data.tokio.spawn(async move { make_jsonrpsee_future(client).await }).await?
} }
/// Returns `true` if version guard can be started. /// Returns `true` if version guard can be started.