Allow to specify multiple relay chain RPC urls for collator node (#1880)

* Allow specification of multiple urls for relay chain rpc nodes

* Add pooled RPC client basics

* Add list of clients to pooled client

* Improve

* Forward requests to dispatcher

* Switch clients on error

* Implement rotation logic

* Improve subscription handling

* Error handling cleanup

* Remove retry from rpc-client

* Improve naming

* Improve documentation

* Improve `ClientManager` abstraction

* Adjust zombienet test

* Add more comments

* fmt

* Apply reviewers comments

* Extract reconnection to extra method

* Add comment to reconnection method

* Clean up some dependencies

* Fix build

* fmt

* Provide alias for cli argument

* Apply review comments

* Rename P* to Relay*

* Improve zombienet test

* fmt

* Fix zombienet sleep

* Simplify zombienet test

* Reduce log clutter and fix starting position

* Do not distribute duplicated imported and finalized blocks

* fmt

* Apply code review suggestions

* Move building of relay chain interface to `cumulus-client-service`

* Refactoring to not push back into channel

* FMT
This commit is contained in:
Sebastian Kunert
2022-12-15 11:42:07 +01:00
committed by GitHub
parent 50f212d8c0
commit 7cab12e9d2
22 changed files with 720 additions and 428 deletions
@@ -19,7 +19,7 @@ use core::time::Duration;
use cumulus_primitives_core::{
relay_chain::{
v2::{CommittedCandidateReceipt, OccupiedCoreAssumption, SessionIndex, ValidatorId},
Hash as PHash, Header as PHeader, InboundHrmpMessage,
Hash as RelayHash, Header as RelayHeader, InboundHrmpMessage,
},
InboundDownwardMessage, ParaId, PersistedValidationData,
};
@@ -34,6 +34,7 @@ use std::pin::Pin;
pub use url::Url;
mod reconnecting_ws_client;
mod rpc_client;
pub use rpc_client::{create_client_and_start_worker, RelayChainRpcClient};
@@ -58,7 +59,7 @@ impl RelayChainInterface for RelayChainRpcInterface {
async fn retrieve_dmq_contents(
&self,
para_id: ParaId,
relay_parent: PHash,
relay_parent: RelayHash,
) -> RelayChainResult<Vec<InboundDownwardMessage>> {
self.rpc_client.parachain_host_dmq_contents(para_id, relay_parent).await
}
@@ -66,7 +67,7 @@ impl RelayChainInterface for RelayChainRpcInterface {
async fn retrieve_all_inbound_hrmp_channel_contents(
&self,
para_id: ParaId,
relay_parent: PHash,
relay_parent: RelayHash,
) -> RelayChainResult<BTreeMap<ParaId, Vec<InboundHrmpMessage>>> {
self.rpc_client
.parachain_host_inbound_hrmp_channels_contents(para_id, relay_parent)
@@ -75,7 +76,7 @@ impl RelayChainInterface for RelayChainRpcInterface {
async fn persisted_validation_data(
&self,
hash: PHash,
hash: RelayHash,
para_id: ParaId,
occupied_core_assumption: OccupiedCoreAssumption,
) -> RelayChainResult<Option<PersistedValidationData>> {
@@ -86,7 +87,7 @@ impl RelayChainInterface for RelayChainRpcInterface {
async fn candidate_pending_availability(
&self,
hash: PHash,
hash: RelayHash,
para_id: ParaId,
) -> RelayChainResult<Option<CommittedCandidateReceipt>> {
self.rpc_client
@@ -94,31 +95,31 @@ impl RelayChainInterface for RelayChainRpcInterface {
.await
}
async fn session_index_for_child(&self, hash: PHash) -> RelayChainResult<SessionIndex> {
async fn session_index_for_child(&self, hash: RelayHash) -> RelayChainResult<SessionIndex> {
self.rpc_client.parachain_host_session_index_for_child(hash).await
}
async fn validators(&self, block_id: PHash) -> RelayChainResult<Vec<ValidatorId>> {
async fn validators(&self, block_id: RelayHash) -> RelayChainResult<Vec<ValidatorId>> {
self.rpc_client.parachain_host_validators(block_id).await
}
async fn import_notification_stream(
&self,
) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
let imported_headers_stream = self.rpc_client.get_imported_heads_stream().await?;
) -> RelayChainResult<Pin<Box<dyn Stream<Item = RelayHeader> + Send>>> {
let imported_headers_stream = self.rpc_client.get_imported_heads_stream()?;
Ok(imported_headers_stream.boxed())
}
async fn finality_notification_stream(
&self,
) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
let imported_headers_stream = self.rpc_client.get_finalized_heads_stream().await?;
) -> RelayChainResult<Pin<Box<dyn Stream<Item = RelayHeader> + Send>>> {
let imported_headers_stream = self.rpc_client.get_finalized_heads_stream()?;
Ok(imported_headers_stream.boxed())
}
async fn best_block_hash(&self) -> RelayChainResult<PHash> {
async fn best_block_hash(&self) -> RelayChainResult<RelayHash> {
self.rpc_client.chain_get_head(None).await
}
@@ -132,7 +133,7 @@ impl RelayChainInterface for RelayChainRpcInterface {
async fn get_storage_by_key(
&self,
relay_parent: PHash,
relay_parent: RelayHash,
key: &[u8],
) -> RelayChainResult<Option<StorageValue>> {
let storage_key = StorageKey(key.to_vec());
@@ -144,7 +145,7 @@ impl RelayChainInterface for RelayChainRpcInterface {
async fn prove_read(
&self,
relay_parent: PHash,
relay_parent: RelayHash,
relevant_keys: &Vec<Vec<u8>>,
) -> RelayChainResult<StorageProof> {
let cloned = relevant_keys.clone();
@@ -167,8 +168,8 @@ impl RelayChainInterface for RelayChainRpcInterface {
/// 2. Check if the block is already in chain. If yes, succeed early.
/// 3. Wait for the block to be imported via subscription.
/// 4. If timeout is reached, we return an error.
async fn wait_for_block(&self, wait_for_hash: PHash) -> RelayChainResult<()> {
let mut head_stream = self.rpc_client.get_imported_heads_stream().await?;
async fn wait_for_block(&self, wait_for_hash: RelayHash) -> RelayChainResult<()> {
let mut head_stream = self.rpc_client.get_imported_heads_stream()?;
if self.rpc_client.chain_get_header(Some(wait_for_hash)).await?.is_some() {
return Ok(())
@@ -191,8 +192,8 @@ impl RelayChainInterface for RelayChainRpcInterface {
async fn new_best_notification_stream(
&self,
) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
let imported_headers_stream = self.rpc_client.get_best_heads_stream().await?;
) -> RelayChainResult<Pin<Box<dyn Stream<Item = RelayHeader> + Send>>> {
let imported_headers_stream = self.rpc_client.get_best_heads_stream()?;
Ok(imported_headers_stream.boxed())
}
}