Introduce rpc client for relay chain full node (#963)

* Initial network interface preparations

* Implement get_storage_by_key

* Implement `validators` and `session_index_for_child`

* Implement persisted_validation_data and candidate_pending_availability

* Fix method name for persisted_validation_data and add encoded params

* Implement `retrieve_dmq_contents` and `retrieve_all_inbound_hrmp_channel_contents`

* Implement `prove_read`

* Introduce separate RPC client, expose JsonRpSee errors

* Simplify closure in call_remote_runtime_function

* Implement import stream, upgrade JsonRpSee

* Implement finality stream

* Remove unused method from interface

* Implement `is_major_syncing`

* Implement `wait_on_block`

* Fix tests

* Unify error handling `ApiError`

* Replace WaitError with RelayChainError

* Wrap BlockChainError in RelayChainError

* Unify error handling in relay chain intefaces

* Fix return type of proof method

* Improve error handling of new methods

* Improve error handling and move logging outside of interface

* Clean up

* Remove unwanted changes, clean up

* Remove unused import

* Add format for StatemachineError and remove nused From trait

* Use 'thiserror' crate to simplify error handling

* Expose error for overseer, further simplify error handling

* Reintroduce network interface

* Implement cli option

* Adjust call_state method to use hashes

* Disable PoV recovery when RPC is used

* Add integration test for network full node

* Use Hash instead of BlockId to ensure compatibility with RPC interface

* Fix cargo check warnings

* Implement retries

* Remove `expect` statements from code

* Update jsonrpsee to 0.8.0 and make collator keys optional

* Make cli arguments conflicting

* Remove unused `block_status` method

* Add clippy fixes

* Cargo fmt

* Validate relay chain rpc url

* Clean up dependencies and add one more integration test

* Clean up

* Clean up dependencies of relay-chain-network

* Use hash instead of blockid for rpc methods

* Fix tests

* Update client/cli/src/lib.rs

Co-authored-by: Koute <koute@users.noreply.github.com>

* Improve error message of cli validation

* Add rpc client constructor

* Do not use debug formatting for errors

* Improve logging for remote runtime methods

* Only retry on transport problems

* Use PHash by value, rename test

* Improve tracing, return error  on relay-chain-interface build

* Fix naming, use generics instead of deserializing manually

* Rename RelayChainLocal and RelayChainNetwork

* lock

* Format

* Use impl trait for encodable runtime payload

* Only instantiate full node in tests when we need it

* Upgrade scale-codec to 3.0.0

* Improve expect log

Co-authored-by: Koute <koute@users.noreply.github.com>
This commit is contained in:
Sebastian Kunert
2022-03-01 12:37:51 +01:00
committed by GitHub
parent 58e02c8497
commit 234c42c2df
34 changed files with 1109 additions and 271 deletions
+1
View File
@@ -10,3 +10,4 @@ clap = { version = "3.1", features = ["derive"] }
# Substrate dependencies
sc-cli = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-service = { git = "https://github.com/paritytech/substrate", branch = "master" }
url = "2.2.2"
+36 -1
View File
@@ -19,7 +19,6 @@
#![warn(missing_docs)]
use clap::Parser;
use sc_cli;
use sc_service::{
config::{PrometheusConfig, TelemetryEndpoints},
BasePath, TransactionPoolOptions,
@@ -29,6 +28,7 @@ use std::{
io::{self, Write},
net::SocketAddr,
};
use url::Url;
/// The `purge-chain` command used to remove the whole chain: the parachain and the relay chain.
#[derive(Debug, Parser)]
@@ -119,6 +119,19 @@ impl sc_cli::CliConfiguration for PurgeChainCmd {
}
}
fn validate_relay_chain_url(arg: &str) -> Result<(), String> {
let url = Url::parse(arg).map_err(|e| e.to_string())?;
if url.scheme() == "ws" {
Ok(())
} else {
Err(format!(
"'{}' URL scheme not supported. Only websocket RPC is currently supported",
url.scheme()
))
}
}
/// The `run` command used to run a node.
#[derive(Debug, Parser)]
pub struct RunCmd {
@@ -131,6 +144,23 @@ pub struct RunCmd {
/// Note that this is the same as running with `--validator`.
#[clap(long, conflicts_with = "validator")]
pub collator: bool,
/// EXPERIMENTAL: Specify an URL to a relay chain full node to communicate with.
#[clap(
long,
parse(try_from_str),
validator = validate_relay_chain_url,
conflicts_with = "collator",
conflicts_with = "validator"
)]
pub relay_chain_rpc_url: Option<Url>,
}
/// Options only relevant for collator nodes
#[derive(Clone, Debug)]
pub struct CollatorOptions {
/// Location of relay chain full node
pub relay_chain_rpc_url: Option<Url>,
}
/// A non-redundant version of the `RunCmd` that sets the `validator` field when the
@@ -150,6 +180,11 @@ impl RunCmd {
NormalizedRunCmd { base: new_base }
}
/// Create [`CollatorOptions`] representing options only relevant to parachain collator nodes
pub fn collator_options(&self) -> CollatorOptions {
CollatorOptions { relay_chain_rpc_url: self.relay_chain_rpc_url.clone() }
}
}
impl sc_cli::CliConfiguration for NormalizedRunCmd {
@@ -27,7 +27,7 @@ use sp_runtime::{
traits::{Block as BlockT, Header as HeaderT},
};
use polkadot_primitives::v1::{Block as PBlock, Id as ParaId, OccupiedCoreAssumption};
use polkadot_primitives::v1::{Hash as PHash, Id as ParaId, OccupiedCoreAssumption};
use codec::Decode;
use futures::{select, FutureExt, Stream, StreamExt};
@@ -54,7 +54,7 @@ pub trait RelaychainClient: Clone + 'static {
/// Returns the parachain head for the given `para_id` at the given block id.
async fn parachain_head_at(
&self,
at: &BlockId<PBlock>,
at: PHash,
para_id: ParaId,
) -> RelayChainResult<Option<Vec<u8>>>;
}
@@ -402,13 +402,7 @@ where
.await?
.filter_map(move |n| {
let relay_chain = relay_chain.clone();
async move {
relay_chain
.parachain_head_at(&BlockId::hash(n.hash()), para_id)
.await
.ok()
.flatten()
}
async move { relay_chain.parachain_head_at(n.hash(), para_id).await.ok().flatten() }
})
.boxed();
Ok(new_best_notification_stream)
@@ -422,13 +416,7 @@ where
.await?
.filter_map(move |n| {
let relay_chain = relay_chain.clone();
async move {
relay_chain
.parachain_head_at(&BlockId::hash(n.hash()), para_id)
.await
.ok()
.flatten()
}
async move { relay_chain.parachain_head_at(n.hash(), para_id).await.ok().flatten() }
})
.boxed();
Ok(finality_notification_stream)
@@ -436,7 +424,7 @@ where
async fn parachain_head_at(
&self,
at: &BlockId<PBlock>,
at: PHash,
para_id: ParaId,
) -> RelayChainResult<Option<Vec<u8>>> {
self.persisted_validation_data(at, para_id, OccupiedCoreAssumption::TimedOut)
+2 -6
View File
@@ -25,7 +25,7 @@ use cumulus_test_client::{
};
use futures::{channel::mpsc, executor::block_on, select, FutureExt, Stream, StreamExt};
use futures_timer::Delay;
use polkadot_primitives::v1::{Block as PBlock, Id as ParaId};
use polkadot_primitives::v1::Id as ParaId;
use sc_client_api::UsageProvider;
use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy};
use sp_blockchain::Error as ClientError;
@@ -98,11 +98,7 @@ impl crate::parachain_consensus::RelaychainClient for Relaychain {
Ok(Box::new(stream.map(|v| v.encode())))
}
async fn parachain_head_at(
&self,
_: &BlockId<PBlock>,
_: ParaId,
) -> RelayChainResult<Option<Vec<u8>>> {
async fn parachain_head_at(&self, _: PHash, _: ParaId) -> RelayChainResult<Option<Vec<u8>>> {
unimplemented!("Not required for tests")
}
}
+3 -1
View File
@@ -32,12 +32,14 @@ derive_more = "0.99.2"
async-trait = "0.1.52"
[dev-dependencies]
portpicker = "0.1.1"
url = "2.2.2"
tokio = { version = "1.17.0", features = ["macros"] }
# Cumulus deps
cumulus-test-service = { path = "../../test/service" }
cumulus-primitives-core = { path = "../../primitives/core" }
cumulus-relay-chain-local = { path = "../relay-chain-local" }
cumulus-relay-chain-inprocess-interface = { path = "../relay-chain-inprocess-interface" }
# Polkadot deps
polkadot-test-client = { git = "https://github.com/paritytech/polkadot", branch = "master" }
+13 -22
View File
@@ -24,17 +24,14 @@ use sp_consensus::block_validation::{
BlockAnnounceValidator as BlockAnnounceValidatorT, Validation,
};
use sp_core::traits::SpawnNamed;
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, Header as HeaderT},
};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
use cumulus_relay_chain_interface::RelayChainInterface;
use polkadot_node_primitives::{CollationSecondedSignal, Statement};
use polkadot_parachain::primitives::HeadData;
use polkadot_primitives::v1::{
Block as PBlock, CandidateReceipt, CompactStatement, Hash as PHash, Id as ParaId,
OccupiedCoreAssumption, SigningContext, UncheckedSigned,
CandidateReceipt, CompactStatement, Hash as PHash, Id as ParaId, OccupiedCoreAssumption,
SigningContext, UncheckedSigned,
};
use codec::{Decode, DecodeAll, Encode};
@@ -133,9 +130,8 @@ impl BlockAnnounceData {
{
let validator_index = self.statement.unchecked_validator_index();
let runtime_api_block_id = BlockId::Hash(self.relay_parent);
let session_index =
match relay_chain_client.session_index_for_child(&runtime_api_block_id).await {
match relay_chain_client.session_index_for_child(self.relay_parent).await {
Ok(r) => r,
Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))),
};
@@ -143,7 +139,7 @@ impl BlockAnnounceData {
let signing_context = SigningContext { parent_hash: self.relay_parent, session_index };
// Check that the signer is a legit validator.
let authorities = match relay_chain_client.validators(&runtime_api_block_id).await {
let authorities = match relay_chain_client.validators(self.relay_parent).await {
Ok(r) => r,
Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))),
};
@@ -160,7 +156,7 @@ impl BlockAnnounceData {
};
// Check statement is correctly signed.
if self.statement.try_into_checked(&signing_context, &signer).is_err() {
if self.statement.try_into_checked(&signing_context, signer).is_err() {
tracing::debug!(
target: LOG_TARGET,
"Block announcement justification signature is invalid.",
@@ -231,11 +227,7 @@ where
{
/// Create a new [`BlockAnnounceValidator`].
pub fn new(relay_chain_interface: RCInterface, para_id: ParaId) -> Self {
Self {
phantom: Default::default(),
relay_chain_interface: relay_chain_interface.clone(),
para_id,
}
Self { phantom: Default::default(), relay_chain_interface, para_id }
}
}
@@ -246,11 +238,11 @@ where
/// Get the included block of the given parachain in the relay chain.
async fn included_block(
relay_chain_interface: &RCInterface,
block_id: &BlockId<PBlock>,
hash: PHash,
para_id: ParaId,
) -> Result<Block::Header, BoxedError> {
let validation_data = relay_chain_interface
.persisted_validation_data(block_id, para_id, OccupiedCoreAssumption::TimedOut)
.persisted_validation_data(hash, para_id, OccupiedCoreAssumption::TimedOut)
.await
.map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?
.ok_or_else(|| {
@@ -269,11 +261,11 @@ where
/// Get the backed block hash of the given parachain in the relay chain.
async fn backed_block_hash(
relay_chain_interface: &RCInterface,
block_id: &BlockId<PBlock>,
hash: PHash,
para_id: ParaId,
) -> Result<Option<PHash>, BoxedError> {
let candidate_receipt = relay_chain_interface
.candidate_pending_availability(block_id, para_id)
.candidate_pending_availability(hash, para_id)
.await
.map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?;
@@ -293,14 +285,13 @@ where
.best_block_hash()
.await
.map_err(|e| Box::new(e) as Box<_>)?;
let runtime_api_block_id = BlockId::Hash(relay_chain_best_hash);
let block_number = header.number();
let best_head =
Self::included_block(&relay_chain_interface, &runtime_api_block_id, para_id).await?;
Self::included_block(&relay_chain_interface, relay_chain_best_hash, para_id).await?;
let known_best_number = best_head.number();
let backed_block = || async {
Self::backed_block_hash(&relay_chain_interface, &runtime_api_block_id, para_id).await
Self::backed_block_hash(&relay_chain_interface, relay_chain_best_hash, para_id).await
};
if best_head == header {
+9 -31
View File
@@ -16,8 +16,8 @@
use super::*;
use async_trait::async_trait;
use cumulus_relay_chain_inprocess_interface::{check_block_in_chain, BlockCheckStatus};
use cumulus_relay_chain_interface::{RelayChainError, RelayChainResult};
use cumulus_relay_chain_local::{check_block_in_chain, BlockCheckStatus};
use cumulus_test_service::runtime::{Block, Hash, Header};
use futures::{executor::block_on, poll, task::Poll, FutureExt, Stream, StreamExt};
use parking_lot::Mutex;
@@ -77,23 +77,10 @@ impl DummyRelayChainInterface {
#[async_trait]
impl RelayChainInterface for DummyRelayChainInterface {
async fn validators(
&self,
_: &cumulus_primitives_core::relay_chain::BlockId,
) -> RelayChainResult<Vec<ValidatorId>> {
async fn validators(&self, _: PHash) -> RelayChainResult<Vec<ValidatorId>> {
Ok(self.data.lock().validators.clone())
}
async fn block_status(
&self,
block_id: cumulus_primitives_core::relay_chain::BlockId,
) -> RelayChainResult<sp_blockchain::BlockStatus> {
self.relay_backend
.blockchain()
.status(block_id)
.map_err(RelayChainError::BlockchainError)
}
async fn best_block_hash(&self) -> RelayChainResult<PHash> {
Ok(self.relay_backend.blockchain().info().best_hash)
}
@@ -116,7 +103,7 @@ impl RelayChainInterface for DummyRelayChainInterface {
async fn persisted_validation_data(
&self,
_: &cumulus_primitives_core::relay_chain::BlockId,
_: PHash,
_: ParaId,
_: OccupiedCoreAssumption,
) -> RelayChainResult<Option<PersistedValidationData>> {
@@ -128,7 +115,7 @@ impl RelayChainInterface for DummyRelayChainInterface {
async fn candidate_pending_availability(
&self,
_: &cumulus_primitives_core::relay_chain::BlockId,
_: PHash,
_: ParaId,
) -> RelayChainResult<Option<CommittedCandidateReceipt>> {
if self.data.lock().has_pending_availability {
@@ -159,10 +146,7 @@ impl RelayChainInterface for DummyRelayChainInterface {
}
}
async fn session_index_for_child(
&self,
_: &cumulus_primitives_core::relay_chain::BlockId,
) -> RelayChainResult<SessionIndex> {
async fn session_index_for_child(&self, _: PHash) -> RelayChainResult<SessionIndex> {
Ok(0)
}
@@ -196,7 +180,7 @@ impl RelayChainInterface for DummyRelayChainInterface {
async fn get_storage_by_key(
&self,
_: &polkadot_service::BlockId,
_: PHash,
_: &[u8],
) -> RelayChainResult<Option<StorageValue>> {
unimplemented!("Not needed for test")
@@ -204,7 +188,7 @@ impl RelayChainInterface for DummyRelayChainInterface {
async fn prove_read(
&self,
_: &polkadot_service::BlockId,
_: PHash,
_: &Vec<Vec<u8>>,
) -> RelayChainResult<sc_client_api::StorageProof> {
unimplemented!("Not needed for test")
@@ -293,10 +277,7 @@ async fn make_gossip_message_and_header(
Some(&Sr25519Keyring::Alice.to_seed()),
)
.unwrap();
let session_index = relay_chain_interface
.session_index_for_child(&BlockId::Hash(relay_parent))
.await
.unwrap();
let session_index = relay_chain_interface.session_index_for_child(relay_parent).await.unwrap();
let signing_context = SigningContext { parent_hash: relay_parent, session_index };
let header = default_header();
@@ -477,10 +458,7 @@ async fn check_statement_seconded() {
Some(&Sr25519Keyring::Alice.to_seed()),
)
.unwrap();
let session_index = relay_chain_interface
.session_index_for_child(&BlockId::Hash(relay_parent))
.await
.unwrap();
let session_index = relay_chain_interface.session_index_for_child(relay_parent).await.unwrap();
let signing_context = SigningContext { parent_hash: relay_parent, session_index };
let statement = Statement::Valid(Default::default());
@@ -16,6 +16,7 @@
use cumulus_primitives_core::ParaId;
use cumulus_test_service::{initial_head_data, run_relay_chain_validator_node, Keyring::*};
use futures::join;
#[substrate_test_utils::test]
#[ignore]
@@ -27,12 +28,24 @@ async fn sync_blocks_from_tip_without_being_connected_to_a_collator() {
let para_id = ParaId::from(100);
let tokio_handle = tokio::runtime::Handle::current();
let ws_port = portpicker::pick_unused_port().expect("No free ports");
// start alice
let alice = run_relay_chain_validator_node(tokio_handle.clone(), Alice, || {}, Vec::new());
let alice = run_relay_chain_validator_node(
tokio_handle.clone(),
Alice,
|| {},
Vec::new(),
Some(ws_port),
);
// start bob
let bob =
run_relay_chain_validator_node(tokio_handle.clone(), Bob, || {}, vec![alice.addr.clone()]);
let bob = run_relay_chain_validator_node(
tokio_handle.clone(),
Bob,
|| {},
vec![alice.addr.clone()],
None,
);
// register parachain
alice
@@ -62,12 +75,21 @@ async fn sync_blocks_from_tip_without_being_connected_to_a_collator() {
.await;
// run eve as parachain full node that is only connected to dave
let eve = cumulus_test_service::TestNodeBuilder::new(para_id, tokio_handle, Eve)
let eve = cumulus_test_service::TestNodeBuilder::new(para_id, tokio_handle.clone(), Eve)
.connect_to_parachain_node(&dave)
.exclusively_connect_to_registered_parachain_nodes()
.connect_to_relay_chain_nodes(vec![&alice, &bob])
.build()
.await;
eve.wait_for_blocks(7).await;
// run eve as parachain full node that is only connected to dave
let ferdie = cumulus_test_service::TestNodeBuilder::new(para_id, tokio_handle, Ferdie)
.connect_to_parachain_node(&dave)
.exclusively_connect_to_registered_parachain_nodes()
.connect_to_relay_chain_nodes(vec![&alice, &bob])
.use_external_relay_chain_node_at_port(ws_port)
.build()
.await;
join!(ferdie.wait_for_blocks(7), eve.wait_for_blocks(7));
}
+3 -3
View File
@@ -451,9 +451,9 @@ async fn pending_candidates(
let filtered_stream = import_notification_stream.filter_map(move |n| {
let client_for_closure = relay_chain_client.clone();
async move {
let block_id = BlockId::hash(n.hash());
let hash = n.hash();
let pending_availability_result = client_for_closure
.candidate_pending_availability(&block_id, para_id)
.candidate_pending_availability(hash, para_id)
.await
.map_err(|e| {
tracing::error!(
@@ -463,7 +463,7 @@ async fn pending_candidates(
)
});
let session_index_result =
client_for_closure.session_index_for_child(&block_id).await.map_err(|e| {
client_for_closure.session_index_for_child(hash).await.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
error = ?e,
@@ -39,6 +39,7 @@ async fn pov_recovery() {
Alice,
|| {},
Vec::new(),
None,
);
// Start bob
@@ -47,6 +48,7 @@ async fn pov_recovery() {
Bob,
|| {},
vec![alice.addr.clone()],
None,
);
// Register parachain
@@ -1,6 +1,6 @@
[package]
authors = ["Parity Technologies <admin@parity.io>"]
name = "cumulus-relay-chain-local"
name = "cumulus-relay-chain-inprocess-interface"
version = "0.1.0"
edition = "2021"
@@ -28,7 +28,7 @@ sc-tracing = { git = "https://github.com/paritytech/substrate", branch = "master
parking_lot = "0.12.0"
tracing = "0.1.31"
async-trait = "0.1.52"
futures = { version = "0.3.1", features = ["compat"] }
futures = "0.3.21"
futures-timer = "3.0.2"
[dev-dependencies]
@@ -46,15 +46,15 @@ use sp_state_machine::{Backend as StateBackend, StorageValue};
const TIMEOUT_IN_SECONDS: u64 = 6;
/// Provides an implementation of the [`RelayChainInterface`] using a local in-process relay chain node.
pub struct RelayChainLocal<Client> {
pub struct RelayChainInProcessInterface<Client> {
full_client: Arc<Client>,
backend: Arc<FullBackend>,
sync_oracle: Arc<Mutex<Box<dyn SyncOracle + Send + Sync>>>,
overseer_handle: Option<Handle>,
}
impl<Client> RelayChainLocal<Client> {
/// Create a new instance of [`RelayChainLocal`]
impl<Client> RelayChainInProcessInterface<Client> {
/// Create a new instance of [`RelayChainInProcessInterface`]
pub fn new(
full_client: Arc<Client>,
backend: Arc<FullBackend>,
@@ -65,7 +65,7 @@ impl<Client> RelayChainLocal<Client> {
}
}
impl<T> Clone for RelayChainLocal<T> {
impl<T> Clone for RelayChainInProcessInterface<T> {
fn clone(&self) -> Self {
Self {
full_client: self.full_client.clone(),
@@ -77,7 +77,7 @@ impl<T> Clone for RelayChainLocal<T> {
}
#[async_trait]
impl<Client> RelayChainInterface for RelayChainLocal<Client>
impl<Client> RelayChainInterface for RelayChainInProcessInterface<Client>
where
Client: ProvideRuntimeApi<PBlock>
+ BlockchainEvents<PBlock>
@@ -113,12 +113,12 @@ where
async fn persisted_validation_data(
&self,
block_id: &BlockId,
hash: PHash,
para_id: ParaId,
occupied_core_assumption: OccupiedCoreAssumption,
) -> RelayChainResult<Option<PersistedValidationData>> {
Ok(self.full_client.runtime_api().persisted_validation_data(
block_id,
&BlockId::Hash(hash),
para_id,
occupied_core_assumption,
)?)
@@ -126,21 +126,21 @@ where
async fn candidate_pending_availability(
&self,
block_id: &BlockId,
hash: PHash,
para_id: ParaId,
) -> RelayChainResult<Option<CommittedCandidateReceipt>> {
Ok(self
.full_client
.runtime_api()
.candidate_pending_availability(block_id, para_id)?)
.candidate_pending_availability(&BlockId::Hash(hash), para_id)?)
}
async fn session_index_for_child(&self, block_id: &BlockId) -> RelayChainResult<SessionIndex> {
Ok(self.full_client.runtime_api().session_index_for_child(block_id)?)
async fn session_index_for_child(&self, hash: PHash) -> RelayChainResult<SessionIndex> {
Ok(self.full_client.runtime_api().session_index_for_child(&BlockId::Hash(hash))?)
}
async fn validators(&self, block_id: &BlockId) -> RelayChainResult<Vec<ValidatorId>> {
Ok(self.full_client.runtime_api().validators(block_id)?)
async fn validators(&self, hash: PHash) -> RelayChainResult<Vec<ValidatorId>> {
Ok(self.full_client.runtime_api().validators(&BlockId::Hash(hash))?)
}
async fn import_notification_stream(
@@ -167,10 +167,6 @@ where
Ok(self.backend.blockchain().info().best_hash)
}
async fn block_status(&self, block_id: BlockId) -> RelayChainResult<BlockStatus> {
Ok(self.backend.blockchain().status(block_id)?)
}
async fn is_major_syncing(&self) -> RelayChainResult<bool> {
let mut network = self.sync_oracle.lock();
Ok(network.is_major_syncing())
@@ -182,19 +178,21 @@ where
async fn get_storage_by_key(
&self,
block_id: &BlockId,
relay_parent: PHash,
key: &[u8],
) -> RelayChainResult<Option<StorageValue>> {
let state = self.backend.state_at(*block_id)?;
let block_id = BlockId::Hash(relay_parent);
let state = self.backend.state_at(block_id)?;
state.storage(key).map_err(RelayChainError::GenericError)
}
async fn prove_read(
&self,
block_id: &BlockId,
relay_parent: PHash,
relevant_keys: &Vec<Vec<u8>>,
) -> RelayChainResult<StorageProof> {
let state_backend = self.backend.state_at(*block_id)?;
let block_id = BlockId::Hash(relay_parent);
let state_backend = self.backend.state_at(block_id)?;
sp_state_machine::prove_read(state_backend, relevant_keys)
.map_err(RelayChainError::StateMachineError)
@@ -271,9 +269,9 @@ where
let _lock = backend.get_import_lock().read();
let block_id = BlockId::Hash(hash);
match backend.blockchain().status(block_id)? {
BlockStatus::InChain => return Ok(BlockCheckStatus::InChain),
_ => {},
if backend.blockchain().status(block_id)? == BlockStatus::InChain {
return Ok(BlockCheckStatus::InChain)
}
let listener = client.import_notification_stream();
@@ -282,25 +280,25 @@ where
}
/// Builder for a concrete relay chain interface, created from a full node. Builds
/// a [`RelayChainLocal`] to access relay chain data necessary for parachain operation.
/// a [`RelayChainInProcessInterface`] to access relay chain data necessary for parachain operation.
///
/// The builder takes a [`polkadot_client::Client`]
/// that wraps a concrete instance. By using [`polkadot_client::ExecuteWithClient`]
/// the builder gets access to this concrete instance and instantiates a [`RelayChainLocal`] with it.
struct RelayChainLocalBuilder {
/// the builder gets access to this concrete instance and instantiates a [`RelayChainInProcessInterface`] with it.
struct RelayChainInProcessInterfaceBuilder {
polkadot_client: polkadot_client::Client,
backend: Arc<FullBackend>,
sync_oracle: Arc<Mutex<Box<dyn SyncOracle + Send + Sync>>>,
overseer_handle: Option<Handle>,
}
impl RelayChainLocalBuilder {
impl RelayChainInProcessInterfaceBuilder {
pub fn build(self) -> Arc<dyn RelayChainInterface> {
self.polkadot_client.clone().execute_with(self)
}
}
impl ExecuteWithClient for RelayChainLocalBuilder {
impl ExecuteWithClient for RelayChainInProcessInterfaceBuilder {
type Output = Arc<dyn RelayChainInterface>;
fn execute_with_client<Client, Api, Backend>(self, client: Arc<Client>) -> Self::Output
@@ -314,7 +312,12 @@ impl ExecuteWithClient for RelayChainLocalBuilder {
+ Send,
Client::Api: ParachainHost<PBlock> + BabeApi<PBlock>,
{
Arc::new(RelayChainLocal::new(client, self.backend, self.sync_oracle, self.overseer_handle))
Arc::new(RelayChainInProcessInterface::new(
client,
self.backend,
self.sync_oracle,
self.overseer_handle,
))
}
}
@@ -346,7 +349,7 @@ fn build_polkadot_full_node(
}
/// Builds a relay chain interface by constructing a full relay chain node
pub fn build_relay_chain_interface(
pub fn build_inprocess_relay_chain(
polkadot_config: Configuration,
telemetry_worker_handle: Option<TelemetryWorkerHandle>,
task_manager: &mut TaskManager,
@@ -361,7 +364,7 @@ pub fn build_relay_chain_interface(
let sync_oracle: Box<dyn SyncOracle + Send + Sync> = Box::new(full_node.network.clone());
let sync_oracle = Arc::new(Mutex::new(sync_oracle));
let relay_chain_interface_builder = RelayChainLocalBuilder {
let relay_chain_interface_builder = RelayChainInProcessInterfaceBuilder {
polkadot_client: full_node.client.clone(),
backend: full_node.backend.clone(),
sync_oracle,
@@ -402,7 +405,8 @@ mod tests {
}
}
fn build_client_backend_and_block() -> (Arc<Client>, PBlock, RelayChainLocal<Client>) {
fn build_client_backend_and_block(
) -> (Arc<Client>, PBlock, RelayChainInProcessInterface<Client>) {
let builder =
TestClientBuilder::new().set_execution_strategy(ExecutionStrategy::NativeWhenPossible);
let backend = builder.backend();
@@ -415,7 +419,7 @@ mod tests {
(
client.clone(),
block,
RelayChainLocal::new(
RelayChainInProcessInterface::new(
client,
backend.clone(),
Arc::new(Mutex::new(dummy_network)),
+4 -1
View File
@@ -6,6 +6,7 @@ edition = "2021"
[dependencies]
polkadot-overseer = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-service = { git = "https://github.com/paritytech/polkadot", branch = "master" }
cumulus-primitives-core = { path = "../../primitives/core" }
@@ -17,8 +18,10 @@ sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-service = { git = "https://github.com/paritytech/substrate", branch = "master" }
futures = "0.3.1"
futures = "0.3.21"
parking_lot = "0.12.0"
derive_more = "0.99.2"
async-trait = "0.1.52"
thiserror = "1.0.30"
jsonrpsee-core = "0.8.0"
parity-scale-codec = "3.0.0"
+37 -28
View File
@@ -19,16 +19,18 @@ use std::{collections::BTreeMap, pin::Pin, sync::Arc};
use cumulus_primitives_core::{
relay_chain::{
v1::{CommittedCandidateReceipt, OccupiedCoreAssumption, SessionIndex, ValidatorId},
BlockId, Hash as PHash, Header as PHeader, InboundHrmpMessage,
Hash as PHash, Header as PHeader, InboundHrmpMessage,
},
InboundDownwardMessage, ParaId, PersistedValidationData,
};
use polkadot_overseer::Handle as OverseerHandle;
use sc_client_api::{blockchain::BlockStatus, StorageProof};
use sc_client_api::StorageProof;
use futures::Stream;
use async_trait::async_trait;
use jsonrpsee_core::Error as JsonRPSeeError;
use parity_scale_codec::Error as CodecError;
use sp_api::ApiError;
use sp_state_machine::StorageValue;
@@ -36,37 +38,48 @@ pub type RelayChainResult<T> = Result<T, RelayChainError>;
#[derive(thiserror::Error, Debug)]
pub enum RelayChainError {
#[error("Error occurred while calling relay chain runtime: {0:?}")]
#[error("Error occured while calling relay chain runtime: {0}")]
ApiError(#[from] ApiError),
#[error("Timeout while waiting for relay-chain block `{0}` to be imported.")]
WaitTimeout(PHash),
#[error("Import listener closed while waiting for relay-chain block `{0}` to be imported.")]
ImportListenerClosed(PHash),
#[error("Blockchain returned an error while waiting for relay-chain block `{0}` to be imported: {1:?}")]
#[error("Blockchain returned an error while waiting for relay-chain block `{0}` to be imported: {1}")]
WaitBlockchainError(PHash, sp_blockchain::Error),
#[error("Blockchain returned an error: {0:?}")]
#[error("Blockchain returned an error: {0}")]
BlockchainError(#[from] sp_blockchain::Error),
#[error("State machine error occurred: {0:?}")]
#[error("State machine error occured: {0}")]
StateMachineError(Box<dyn sp_state_machine::Error>),
#[error("Unspecified error occurred: {0:?}")]
#[error("Unable to call RPC method '{0}' due to error: {1}")]
RPCCallError(String, JsonRPSeeError),
#[error("RPC Error: '{0}'")]
JsonRPCError(#[from] JsonRPSeeError),
#[error("Scale codec deserialization error: {0}")]
DeserializationError(CodecError),
#[error("Scale codec deserialization error: {0}")]
ServiceError(#[from] polkadot_service::Error),
#[error("Unspecified error occured: {0}")]
GenericError(String),
}
impl From<CodecError> for RelayChainError {
fn from(e: CodecError) -> Self {
RelayChainError::DeserializationError(e)
}
}
/// Trait that provides all necessary methods for interaction between collator and relay chain.
#[async_trait]
pub trait RelayChainInterface: Send + Sync {
/// Fetch a storage item by key.
async fn get_storage_by_key(
&self,
block_id: &BlockId,
relay_parent: PHash,
key: &[u8],
) -> RelayChainResult<Option<StorageValue>>;
/// Fetch a vector of current validators.
async fn validators(&self, block_id: &BlockId) -> RelayChainResult<Vec<ValidatorId>>;
/// Get the status of a given block.
async fn block_status(&self, block_id: BlockId) -> RelayChainResult<BlockStatus>;
async fn validators(&self, block_id: PHash) -> RelayChainResult<Vec<ValidatorId>>;
/// Get the hash of the current best block.
async fn best_block_hash(&self) -> RelayChainResult<PHash>;
@@ -98,7 +111,7 @@ pub trait RelayChainInterface: Send + Sync {
/// and the para already occupies a core.
async fn persisted_validation_data(
&self,
block_id: &BlockId,
block_id: PHash,
para_id: ParaId,
_: OccupiedCoreAssumption,
) -> RelayChainResult<Option<PersistedValidationData>>;
@@ -107,12 +120,12 @@ pub trait RelayChainInterface: Send + Sync {
/// assigned to occupied cores in `availability_cores` and `None` otherwise.
async fn candidate_pending_availability(
&self,
block_id: &BlockId,
block_id: PHash,
para_id: ParaId,
) -> RelayChainResult<Option<CommittedCandidateReceipt>>;
/// Returns the session index expected at a child of the block.
async fn session_index_for_child(&self, block_id: &BlockId) -> RelayChainResult<SessionIndex>;
async fn session_index_for_child(&self, block_id: PHash) -> RelayChainResult<SessionIndex>;
/// Get a stream of import block notifications.
async fn import_notification_stream(
@@ -145,7 +158,7 @@ pub trait RelayChainInterface: Send + Sync {
/// Generate a storage read proof.
async fn prove_read(
&self,
block_id: &BlockId,
relay_parent: PHash,
relevant_keys: &Vec<Vec<u8>>,
) -> RelayChainResult<StorageProof>;
}
@@ -173,7 +186,7 @@ where
async fn persisted_validation_data(
&self,
block_id: &BlockId,
block_id: PHash,
para_id: ParaId,
occupied_core_assumption: OccupiedCoreAssumption,
) -> RelayChainResult<Option<PersistedValidationData>> {
@@ -184,17 +197,17 @@ where
async fn candidate_pending_availability(
&self,
block_id: &BlockId,
block_id: PHash,
para_id: ParaId,
) -> RelayChainResult<Option<CommittedCandidateReceipt>> {
(**self).candidate_pending_availability(block_id, para_id).await
}
async fn session_index_for_child(&self, block_id: &BlockId) -> RelayChainResult<SessionIndex> {
async fn session_index_for_child(&self, block_id: PHash) -> RelayChainResult<SessionIndex> {
(**self).session_index_for_child(block_id).await
}
async fn validators(&self, block_id: &BlockId) -> RelayChainResult<Vec<ValidatorId>> {
async fn validators(&self, block_id: PHash) -> RelayChainResult<Vec<ValidatorId>> {
(**self).validators(block_id).await
}
@@ -214,10 +227,6 @@ where
(**self).best_block_hash().await
}
async fn block_status(&self, block_id: BlockId) -> RelayChainResult<BlockStatus> {
(**self).block_status(block_id).await
}
async fn is_major_syncing(&self) -> RelayChainResult<bool> {
(**self).is_major_syncing().await
}
@@ -228,18 +237,18 @@ where
async fn get_storage_by_key(
&self,
block_id: &BlockId,
relay_parent: PHash,
key: &[u8],
) -> RelayChainResult<Option<StorageValue>> {
(**self).get_storage_by_key(block_id, key).await
(**self).get_storage_by_key(relay_parent, key).await
}
async fn prove_read(
&self,
block_id: &BlockId,
relay_parent: PHash,
relevant_keys: &Vec<Vec<u8>>,
) -> RelayChainResult<StorageProof> {
(**self).prove_read(block_id, relevant_keys).await
(**self).prove_read(relay_parent, relevant_keys).await
}
async fn wait_for_block(&self, hash: PHash) -> RelayChainResult<()> {
@@ -0,0 +1,30 @@
[package]
authors = ["Parity Technologies <admin@parity.io>"]
name = "cumulus-relay-chain-rpc-interface"
version = "0.1.0"
edition = "2021"
[dependencies]
polkadot-service = { git = "https://github.com/paritytech/polkadot", branch = "master" }
cumulus-primitives-core = { path = "../../primitives/core" }
cumulus-relay-chain-interface = { path = "../relay-chain-interface" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-storage = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-rpc-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
futures = "0.3.21"
futures-timer = "3.0.2"
parity-scale-codec = "3.0.0"
parking_lot = "0.11.1"
jsonrpsee = { version = "0.8.0", features = ["client"] }
tracing = "0.1.25"
async-trait = "0.1.52"
url = "2.2.2"
backoff = { version = "0.4.0", features = ["tokio"] }
+472
View File
@@ -0,0 +1,472 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
// This file is part of Cumulus.
// Cumulus is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Cumulus is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
use async_trait::async_trait;
use backoff::{future::retry_notify, ExponentialBackoff};
use core::time::Duration;
use cumulus_primitives_core::{
relay_chain::{
v1::{CommittedCandidateReceipt, OccupiedCoreAssumption, SessionIndex, ValidatorId},
Hash as PHash, Header as PHeader, InboundHrmpMessage,
},
InboundDownwardMessage, ParaId, PersistedValidationData,
};
use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult};
use futures::{FutureExt, Stream, StreamExt};
use jsonrpsee::{
core::{
client::{Client as JsonRPCClient, ClientT, Subscription, SubscriptionClientT},
Error as JsonRpseeError,
},
rpc_params,
types::ParamsSer,
ws_client::WsClientBuilder,
};
use parity_scale_codec::{Decode, Encode};
use polkadot_service::Handle;
use sc_client_api::{StorageData, StorageProof};
use sc_rpc_api::{state::ReadProof, system::Health};
use sp_core::sp_std::collections::btree_map::BTreeMap;
use sp_runtime::DeserializeOwned;
use sp_state_machine::StorageValue;
use sp_storage::StorageKey;
use std::{pin::Pin, sync::Arc};
pub use url::Url;
const LOG_TARGET: &str = "relay-chain-rpc-interface";
const TIMEOUT_IN_SECONDS: u64 = 6;
/// Client that maps RPC methods and deserializes results
#[derive(Clone)]
struct RelayChainRPCClient {
/// Websocket client to make calls
ws_client: Arc<JsonRPCClient>,
/// Retry strategy that should be used for requests and subscriptions
retry_strategy: ExponentialBackoff,
}
impl RelayChainRPCClient {
pub async fn new(url: Url) -> RelayChainResult<Self> {
tracing::info!(target: LOG_TARGET, url = %url.to_string(), "Initializing RPC Client");
let ws_client = WsClientBuilder::default().build(url.as_str()).await?;
Ok(RelayChainRPCClient {
ws_client: Arc::new(ws_client),
retry_strategy: ExponentialBackoff::default(),
})
}
/// Call a call to `state_call` rpc method.
async fn call_remote_runtime_function<R: Decode>(
&self,
method_name: &str,
hash: PHash,
payload: Option<impl Encode>,
) -> RelayChainResult<R> {
let payload_bytes =
payload.map_or(sp_core::Bytes(Vec::new()), |v| sp_core::Bytes(v.encode()));
let params = rpc_params! {
method_name,
payload_bytes,
hash
};
let res = self
.request_tracing::<sp_core::Bytes, _>("state_call", params, |err| {
tracing::trace!(
target: LOG_TARGET,
%method_name,
%hash,
error = %err,
"Error during call to 'state_call'.",
);
})
.await?;
Decode::decode(&mut &*res.0).map_err(Into::into)
}
/// Subscribe to a notification stream via RPC
async fn subscribe<'a, R>(
&self,
sub_name: &'a str,
unsub_name: &'a str,
params: Option<ParamsSer<'a>>,
) -> RelayChainResult<Subscription<R>>
where
R: DeserializeOwned,
{
self.ws_client
.subscribe::<R>(sub_name, params, unsub_name)
.await
.map_err(|err| RelayChainError::RPCCallError(sub_name.to_string(), err))
}
/// Perform RPC request
async fn request<'a, R>(
&self,
method: &'a str,
params: Option<ParamsSer<'a>>,
) -> Result<R, RelayChainError>
where
R: DeserializeOwned + std::fmt::Debug,
{
self.request_tracing(
method,
params,
|e| tracing::trace!(target:LOG_TARGET, error = %e, %method, "Unable to complete RPC request"),
)
.await
}
/// Perform RPC request
async fn request_tracing<'a, R, OR>(
&self,
method: &'a str,
params: Option<ParamsSer<'a>>,
trace_error: OR,
) -> Result<R, RelayChainError>
where
R: DeserializeOwned + std::fmt::Debug,
OR: Fn(&jsonrpsee::core::Error),
{
retry_notify(
self.retry_strategy.clone(),
|| async {
self.ws_client.request(method, params.clone()).await.map_err(|err| match err {
JsonRpseeError::Transport(_) =>
backoff::Error::Transient { err, retry_after: None },
_ => backoff::Error::Permanent(err),
})
},
|error, dur| tracing::trace!(target: LOG_TARGET, %error, ?dur, "Encountered transport error, retrying."),
)
.await
.map_err(|err| {
trace_error(&err);
RelayChainError::RPCCallError(method.to_string(), err)})
}
async fn system_health(&self) -> Result<Health, RelayChainError> {
self.request("system_health", None).await
}
async fn state_get_read_proof(
&self,
storage_keys: Vec<StorageKey>,
at: Option<PHash>,
) -> Result<ReadProof<PHash>, RelayChainError> {
let params = rpc_params!(storage_keys, at);
self.request("state_getReadProof", params).await
}
async fn state_get_storage(
&self,
storage_key: StorageKey,
at: Option<PHash>,
) -> Result<Option<StorageData>, RelayChainError> {
let params = rpc_params!(storage_key, at);
self.request("state_getStorage", params).await
}
async fn chain_get_head(&self) -> Result<PHash, RelayChainError> {
self.request("chain_getHead", None).await
}
async fn chain_get_header(
&self,
hash: Option<PHash>,
) -> Result<Option<PHeader>, RelayChainError> {
let params = rpc_params!(hash);
self.request("chain_getHeader", params).await
}
async fn parachain_host_candidate_pending_availability(
&self,
at: PHash,
para_id: ParaId,
) -> Result<Option<CommittedCandidateReceipt>, RelayChainError> {
self.call_remote_runtime_function(
"ParachainHost_candidate_pending_availability",
at,
Some(para_id),
)
.await
}
async fn parachain_host_session_index_for_child(
&self,
at: PHash,
) -> Result<SessionIndex, RelayChainError> {
self.call_remote_runtime_function("ParachainHost_session_index_for_child", at, None::<()>)
.await
}
async fn parachain_host_validators(
&self,
at: PHash,
) -> Result<Vec<ValidatorId>, RelayChainError> {
self.call_remote_runtime_function("ParachainHost_validators", at, None::<()>)
.await
}
async fn parachain_host_persisted_validation_data(
&self,
at: PHash,
para_id: ParaId,
occupied_core_assumption: OccupiedCoreAssumption,
) -> Result<Option<PersistedValidationData>, RelayChainError> {
self.call_remote_runtime_function(
"ParachainHost_persisted_validation_data",
at,
Some((para_id, occupied_core_assumption)),
)
.await
}
async fn parachain_host_inbound_hrmp_channels_contents(
&self,
para_id: ParaId,
at: PHash,
) -> Result<BTreeMap<ParaId, Vec<InboundHrmpMessage>>, RelayChainError> {
self.call_remote_runtime_function(
"ParachainHost_inbound_hrmp_channels_contents",
at,
Some(para_id),
)
.await
}
async fn parachain_host_dmq_contents(
&self,
para_id: ParaId,
at: PHash,
) -> Result<Vec<InboundDownwardMessage>, RelayChainError> {
self.call_remote_runtime_function("ParachainHost_dmq_contents", at, Some(para_id))
.await
}
async fn subscribe_all_heads(&self) -> Result<Subscription<PHeader>, RelayChainError> {
self.subscribe::<PHeader>("chain_subscribeAllHeads", "chain_unsubscribeAllHeads", None)
.await
}
async fn subscribe_new_best_heads(&self) -> Result<Subscription<PHeader>, RelayChainError> {
self.subscribe::<PHeader>("chain_subscribeNewHeads", "chain_unsubscribeNewHeads", None)
.await
}
async fn subscribe_finalized_heads(&self) -> Result<Subscription<PHeader>, RelayChainError> {
self.subscribe::<PHeader>(
"chain_subscribeFinalizedHeads",
"chain_unsubscribeFinalizedHeads",
None,
)
.await
}
}
/// RelayChainRPCInterface is used to interact with a full node that is running locally
/// in the same process.
#[derive(Clone)]
pub struct RelayChainRPCInterface {
rpc_client: RelayChainRPCClient,
}
impl RelayChainRPCInterface {
pub async fn new(url: Url) -> RelayChainResult<Self> {
Ok(Self { rpc_client: RelayChainRPCClient::new(url).await? })
}
}
#[async_trait]
impl RelayChainInterface for RelayChainRPCInterface {
async fn retrieve_dmq_contents(
&self,
para_id: ParaId,
relay_parent: PHash,
) -> RelayChainResult<Vec<InboundDownwardMessage>> {
self.rpc_client.parachain_host_dmq_contents(para_id, relay_parent).await
}
async fn retrieve_all_inbound_hrmp_channel_contents(
&self,
para_id: ParaId,
relay_parent: PHash,
) -> RelayChainResult<BTreeMap<ParaId, Vec<InboundHrmpMessage>>> {
self.rpc_client
.parachain_host_inbound_hrmp_channels_contents(para_id, relay_parent)
.await
}
async fn persisted_validation_data(
&self,
hash: PHash,
para_id: ParaId,
occupied_core_assumption: OccupiedCoreAssumption,
) -> RelayChainResult<Option<PersistedValidationData>> {
self.rpc_client
.parachain_host_persisted_validation_data(hash, para_id, occupied_core_assumption)
.await
}
async fn candidate_pending_availability(
&self,
hash: PHash,
para_id: ParaId,
) -> RelayChainResult<Option<CommittedCandidateReceipt>> {
self.rpc_client
.parachain_host_candidate_pending_availability(hash, para_id)
.await
}
async fn session_index_for_child(&self, hash: PHash) -> RelayChainResult<SessionIndex> {
self.rpc_client.parachain_host_session_index_for_child(hash).await
}
async fn validators(&self, block_id: PHash) -> RelayChainResult<Vec<ValidatorId>> {
self.rpc_client.parachain_host_validators(block_id).await
}
async fn import_notification_stream(
&self,
) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
let imported_headers_stream =
self.rpc_client.subscribe_all_heads().await?.filter_map(|item| async move {
item.map_err(|err| {
tracing::error!(
target: LOG_TARGET,
"Encountered error in import notification stream: {}",
err
)
})
.ok()
});
Ok(imported_headers_stream.boxed())
}
async fn finality_notification_stream(
&self,
) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
let imported_headers_stream = self
.rpc_client
.subscribe_finalized_heads()
.await?
.filter_map(|item| async move {
item.map_err(|err| {
tracing::error!(
target: LOG_TARGET,
"Encountered error in finality notification stream: {}",
err
)
})
.ok()
});
Ok(imported_headers_stream.boxed())
}
async fn best_block_hash(&self) -> RelayChainResult<PHash> {
self.rpc_client.chain_get_head().await
}
async fn is_major_syncing(&self) -> RelayChainResult<bool> {
self.rpc_client.system_health().await.map(|h| h.is_syncing)
}
fn overseer_handle(&self) -> RelayChainResult<Option<Handle>> {
unimplemented!("Overseer handle is not available on relay-chain-rpc-interface");
}
async fn get_storage_by_key(
&self,
relay_parent: PHash,
key: &[u8],
) -> RelayChainResult<Option<StorageValue>> {
let storage_key = StorageKey(key.to_vec());
self.rpc_client
.state_get_storage(storage_key, Some(relay_parent))
.await
.map(|storage_data| storage_data.map(|sv| sv.0))
}
async fn prove_read(
&self,
relay_parent: PHash,
relevant_keys: &Vec<Vec<u8>>,
) -> RelayChainResult<StorageProof> {
let cloned = relevant_keys.clone();
let storage_keys: Vec<StorageKey> = cloned.into_iter().map(StorageKey).collect();
self.rpc_client
.state_get_read_proof(storage_keys, Some(relay_parent))
.await
.map(|read_proof| {
let bytes = read_proof.proof.into_iter().map(|bytes| bytes.to_vec()).collect();
StorageProof::new(bytes)
})
}
/// Wait for a given relay chain block
///
/// The hash of the block to wait for is passed. We wait for the block to arrive or return after a timeout.
///
/// Implementation:
/// 1. Register a listener to all new blocks.
/// 2. Check if the block is already in chain. If yes, succeed early.
/// 3. Wait for the block to be imported via subscription.
/// 4. If timeout is reached, we return an error.
async fn wait_for_block(&self, wait_for_hash: PHash) -> RelayChainResult<()> {
let mut head_stream = self.rpc_client.subscribe_all_heads().await?;
if self.rpc_client.chain_get_header(Some(wait_for_hash)).await?.is_some() {
return Ok(())
}
let mut timeout = futures_timer::Delay::new(Duration::from_secs(TIMEOUT_IN_SECONDS)).fuse();
loop {
futures::select! {
_ = timeout => return Err(RelayChainError::WaitTimeout(wait_for_hash)),
evt = head_stream.next().fuse() => match evt {
Some(Ok(evt)) if evt.hash() == wait_for_hash => return Ok(()),
// Not the event we waited on.
Some(_) => continue,
None => return Err(RelayChainError::ImportListenerClosed(wait_for_hash)),
}
}
}
}
async fn new_best_notification_stream(
&self,
) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
let imported_headers_stream =
self.rpc_client.subscribe_new_best_heads().await?.filter_map(|item| async move {
item.map_err(|err| {
tracing::error!(
target: LOG_TARGET,
"Error in best block notification stream: {}",
err
)
})
.ok()
});
Ok(imported_headers_stream.boxed())
}
}
+1
View File
@@ -9,6 +9,7 @@ edition = "2021"
cumulus-client-consensus-common = { path = "../consensus/common" }
cumulus-client-collator = { path = "../collator" }
cumulus-client-pov-recovery = { path = "../pov-recovery" }
cumulus-client-cli = { path = "../cli" }
cumulus-relay-chain-interface = { path = "../relay-chain-interface" }
cumulus-primitives-core = { path = "../../primitives/core" }
+11
View File
@@ -18,6 +18,7 @@
//!
//! Provides functions for starting a collator node or a normal full node.
use cumulus_client_cli::CollatorOptions;
use cumulus_client_consensus_common::ParachainConsensus;
use cumulus_primitives_core::{CollectCollationInfo, ParaId};
use cumulus_relay_chain_interface::RelayChainInterface;
@@ -151,6 +152,7 @@ pub struct StartFullNodeParams<'a, Block: BlockT, Client, RCInterface, IQ> {
pub announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
pub relay_chain_slot_duration: Duration,
pub import_queue: IQ,
pub collator_options: CollatorOptions,
}
/// Start a full node for a parachain.
@@ -166,6 +168,7 @@ pub fn start_full_node<Block, Client, Backend, RCInterface, IQ>(
para_id,
relay_chain_slot_duration,
import_queue,
collator_options,
}: StartFullNodeParams<Block, Client, RCInterface, IQ>,
) -> sc_service::error::Result<()>
where
@@ -193,6 +196,14 @@ where
.spawn_essential_handle()
.spawn("cumulus-consensus", None, consensus);
// PoV Recovery is currently not supported when we connect to the
// relay chain via RPC, so we return early. The node will work, but not be able to recover PoVs from the
// relay chain if blocks are not announced on parachain. This will be enabled again once
// https://github.com/paritytech/cumulus/issues/545 is finished.
if collator_options.relay_chain_rpc_url.is_some() {
return Ok(())
}
let overseer_handle = relay_chain_interface
.overseer_handle()
.map_err(|e| sc_service::Error::Application(Box::new(e)))?