Introduce rpc client for relay chain full node (#963)

* Initial network interface preparations

* Implement get_storage_by_key

* Implement `validators` and `session_index_for_child`

* Implement persisted_validation_data and candidate_pending_availability

* Fix method name for persisted_validation_data and add encoded params

* Implement `retrieve_dmq_contents` and `retrieve_all_inbound_hrmp_channel_contents`

* Implement `prove_read`

* Introduce separate RPC client, expose JsonRpSee errors

* Simplify closure in call_remote_runtime_function

* Implement import stream, upgrade JsonRpSee

* Implement finality stream

* Remove unused method from interface

* Implement `is_major_syncing`

* Implement `wait_on_block`

* Fix tests

* Unify error handling `ApiError`

* Replace WaitError with RelayChainError

* Wrap BlockChainError in RelayChainError

* Unify error handling in relay chain intefaces

* Fix return type of proof method

* Improve error handling of new methods

* Improve error handling and move logging outside of interface

* Clean up

* Remove unwanted changes, clean up

* Remove unused import

* Add format for StatemachineError and remove nused From trait

* Use 'thiserror' crate to simplify error handling

* Expose error for overseer, further simplify error handling

* Reintroduce network interface

* Implement cli option

* Adjust call_state method to use hashes

* Disable PoV recovery when RPC is used

* Add integration test for network full node

* Use Hash instead of BlockId to ensure compatibility with RPC interface

* Fix cargo check warnings

* Implement retries

* Remove `expect` statements from code

* Update jsonrpsee to 0.8.0 and make collator keys optional

* Make cli arguments conflicting

* Remove unused `block_status` method

* Add clippy fixes

* Cargo fmt

* Validate relay chain rpc url

* Clean up dependencies and add one more integration test

* Clean up

* Clean up dependencies of relay-chain-network

* Use hash instead of blockid for rpc methods

* Fix tests

* Update client/cli/src/lib.rs

Co-authored-by: Koute <koute@users.noreply.github.com>

* Improve error message of cli validation

* Add rpc client constructor

* Do not use debug formatting for errors

* Improve logging for remote runtime methods

* Only retry on transport problems

* Use PHash by value, rename test

* Improve tracing, return error  on relay-chain-interface build

* Fix naming, use generics instead of deserializing manually

* Rename RelayChainLocal and RelayChainNetwork

* lock

* Format

* Use impl trait for encodable runtime payload

* Only instantiate full node in tests when we need it

* Upgrade scale-codec to 3.0.0

* Improve expect log

Co-authored-by: Koute <koute@users.noreply.github.com>
This commit is contained in:
Sebastian Kunert
2022-03-01 12:37:51 +01:00
committed by GitHub
parent 586071bada
commit bc532724b0
34 changed files with 1109 additions and 271 deletions
+6 -1
View File
@@ -47,12 +47,15 @@ polkadot-test-service = { git = "https://github.com/paritytech/polkadot", branch
cumulus-client-consensus-relay-chain = { path = "../../client/consensus/relay-chain" }
cumulus-client-network = { path = "../../client/network" }
cumulus-client-service = { path = "../../client/service" }
cumulus-client-cli = { path = "../../client/cli" }
cumulus-client-consensus-common = { path = "../../client/consensus/common" }
cumulus-primitives-core = { path = "../../primitives/core" }
cumulus-primitives-parachain-inherent = { path = "../../primitives/parachain-inherent" }
cumulus-test-runtime = { path = "../runtime" }
cumulus-test-relay-validation-worker-provider = { path = "../relay-validation-worker-provider" }
cumulus-relay-chain-local = { path = "../../client/relay-chain-local" }
cumulus-relay-chain-inprocess-interface = { path = "../../client/relay-chain-inprocess-interface" }
cumulus-relay-chain-interface = { path = "../../client/relay-chain-interface" }
cumulus-relay-chain-rpc-interface = { path = "../../client/relay-chain-rpc-interface" }
criterion = { version = "0.3.5", features = [ "async_tokio" ] }
@@ -60,9 +63,11 @@ parking_lot = "0.12.0"
# RPC related dependencies
jsonrpc-core = "18.0.0"
url = "2.2.2"
[dev-dependencies]
futures = "0.3.5"
portpicker = "0.1.1"
# Polkadot dependencies
polkadot-test-service = { git = "https://github.com/paritytech/polkadot", branch = "master" }
@@ -151,6 +151,7 @@ fn transaction_throughput_benchmarks(c: &mut Criterion) {
Alice,
|| {},
vec![],
None,
);
// Start bob
@@ -159,6 +160,7 @@ fn transaction_throughput_benchmarks(c: &mut Criterion) {
Bob,
|| {},
vec![alice.addr.clone()],
None,
);
// Register parachain
+80 -24
View File
@@ -21,15 +21,23 @@
mod chain_spec;
mod genesis;
use std::{future::Future, time::Duration};
use std::{
future::Future,
net::{IpAddr, Ipv4Addr, SocketAddr},
time::Duration,
};
use url::Url;
use cumulus_client_cli::CollatorOptions;
use cumulus_client_consensus_common::{ParachainCandidate, ParachainConsensus};
use cumulus_client_network::BlockAnnounceValidator;
use cumulus_client_service::{
prepare_node_config, start_collator, start_full_node, StartCollatorParams, StartFullNodeParams,
};
use cumulus_primitives_core::ParaId;
use cumulus_relay_chain_local::RelayChainLocal;
use cumulus_relay_chain_inprocess_interface::RelayChainInProcessInterface;
use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult};
use cumulus_relay_chain_rpc_interface::RelayChainRPCInterface;
use cumulus_test_runtime::{Hash, Header, NodeBlock as Block, RuntimeApi};
use parking_lot::Mutex;
@@ -167,6 +175,35 @@ pub fn new_partial(
Ok(params)
}
async fn build_relay_chain_interface(
relay_chain_config: Configuration,
collator_key: Option<CollatorPair>,
collator_options: CollatorOptions,
task_manager: &mut TaskManager,
) -> RelayChainResult<Arc<dyn RelayChainInterface + 'static>> {
if let Some(relay_chain_url) = collator_options.relay_chain_rpc_url {
return Ok(Arc::new(RelayChainRPCInterface::new(relay_chain_url).await?) as Arc<_>)
}
let relay_chain_full_node = polkadot_test_service::new_full(
relay_chain_config,
if let Some(ref key) = collator_key {
polkadot_service::IsCollator::Yes(key.clone())
} else {
polkadot_service::IsCollator::Yes(CollatorPair::generate().0)
},
None,
)?;
task_manager.add_child(relay_chain_full_node.task_manager);
Ok(Arc::new(RelayChainInProcessInterface::new(
relay_chain_full_node.client.clone(),
relay_chain_full_node.backend.clone(),
Arc::new(Mutex::new(Box::new(relay_chain_full_node.network.clone()))),
relay_chain_full_node.overseer_handle.clone(),
)) as Arc<_>)
}
/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
///
/// This is the actual implementation that is abstract over the executor and the runtime api.
@@ -179,6 +216,7 @@ async fn start_node_impl<RB>(
wrap_announce_block: Option<Box<dyn FnOnce(AnnounceBlockFn) -> AnnounceBlockFn>>,
rpc_ext_builder: RB,
consensus: Consensus,
collator_options: CollatorOptions,
) -> sc_service::error::Result<(
TaskManager,
Arc<Client>,
@@ -202,30 +240,20 @@ where
let transaction_pool = params.transaction_pool.clone();
let mut task_manager = params.task_manager;
let relay_chain_full_node = polkadot_test_service::new_full(
relay_chain_config,
if let Some(ref key) = collator_key {
polkadot_service::IsCollator::Yes(key.clone())
} else {
polkadot_service::IsCollator::Yes(CollatorPair::generate().0)
},
None,
)
.map_err(|e| match e {
polkadot_service::Error::Sub(x) => x,
s => s.to_string().into(),
})?;
let client = params.client.clone();
let backend = params.backend.clone();
let relay_chain_interface = Arc::new(RelayChainLocal::new(
relay_chain_full_node.client.clone(),
relay_chain_full_node.backend.clone(),
Arc::new(Mutex::new(Box::new(relay_chain_full_node.network.clone()))),
relay_chain_full_node.overseer_handle.clone(),
));
task_manager.add_child(relay_chain_full_node.task_manager);
let relay_chain_interface = build_relay_chain_interface(
relay_chain_config,
collator_key.clone(),
collator_options.clone(),
&mut task_manager,
)
.await
.map_err(|e| match e {
RelayChainError::ServiceError(polkadot_service::Error::Sub(x)) => x,
s => s.to_string().into(),
})?;
let block_announce_validator =
BlockAnnounceValidator::new(relay_chain_interface.clone(), para_id);
@@ -342,6 +370,7 @@ where
// the recovery delay of pov-recovery. We don't want to wait for too
// long on the full node to recover, so we reduce this time here.
relay_chain_slot_duration: Duration::from_millis(6),
collator_options,
};
start_full_node(params)?;
@@ -389,6 +418,7 @@ pub struct TestNodeBuilder {
storage_update_func_parachain: Option<Box<dyn Fn()>>,
storage_update_func_relay_chain: Option<Box<dyn Fn()>>,
consensus: Consensus,
relay_chain_full_node_url: Option<Url>,
}
impl TestNodeBuilder {
@@ -410,6 +440,7 @@ impl TestNodeBuilder {
storage_update_func_parachain: None,
storage_update_func_relay_chain: None,
consensus: Consensus::RelayChain,
relay_chain_full_node_url: None,
}
}
@@ -501,6 +532,21 @@ impl TestNodeBuilder {
self
}
/// Connect to full node via RPC.
pub fn use_external_relay_chain_node_at_url(mut self, network_address: Url) -> Self {
self.relay_chain_full_node_url = Some(network_address);
self
}
/// Connect to full node via RPC.
pub fn use_external_relay_chain_node_at_port(mut self, port: u16) -> Self {
let mut localhost_url =
Url::parse("ws://localhost").expect("Should be able to parse localhost Url");
localhost_url.set_port(Some(port)).expect("Should be able to set port");
self.relay_chain_full_node_url = Some(localhost_url);
self
}
/// Build the [`TestNode`].
pub async fn build(self) -> TestNode {
let parachain_config = node_config(
@@ -513,6 +559,7 @@ impl TestNodeBuilder {
self.collator_key.is_some(),
)
.expect("could not generate Configuration");
let mut relay_chain_config = polkadot_test_service::node_config(
self.storage_update_func_relay_chain.unwrap_or_else(|| Box::new(|| ())),
self.tokio_handle,
@@ -521,6 +568,9 @@ impl TestNodeBuilder {
false,
);
let collator_options =
CollatorOptions { relay_chain_rpc_url: self.relay_chain_full_node_url };
relay_chain_config.network.node_name =
format!("{} (relay chain)", relay_chain_config.network.node_name);
@@ -533,6 +583,7 @@ impl TestNodeBuilder {
self.wrap_announce_block,
|_| Ok(Default::default()),
self.consensus,
collator_options,
)
.await
.expect("could not create Cumulus test service");
@@ -737,8 +788,9 @@ pub fn run_relay_chain_validator_node(
key: Sr25519Keyring,
storage_update_func: impl Fn(),
boot_nodes: Vec<MultiaddrWithPeerId>,
websocket_port: Option<u16>,
) -> polkadot_test_service::PolkadotTestNode {
let config = polkadot_test_service::node_config(
let mut config = polkadot_test_service::node_config(
storage_update_func,
tokio_handle,
key,
@@ -746,6 +798,10 @@ pub fn run_relay_chain_validator_node(
true,
);
if let Some(port) = websocket_port {
config.rpc_ws = Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port));
}
polkadot_test_service::run_validator_node(
config,
Some(cumulus_test_relay_validation_worker_provider::VALIDATION_WORKER.into()),
@@ -16,6 +16,7 @@
use cumulus_primitives_core::ParaId;
use cumulus_test_service::{initial_head_data, run_relay_chain_validator_node, Keyring::*};
use futures::join;
#[substrate_test_utils::test]
#[ignore]
@@ -28,12 +29,24 @@ async fn test_full_node_catching_up() {
let tokio_handle = tokio::runtime::Handle::current();
let ws_port = portpicker::pick_unused_port().expect("No free ports");
// start alice
let alice = run_relay_chain_validator_node(tokio_handle.clone(), Alice, || {}, Vec::new());
let alice = run_relay_chain_validator_node(
tokio_handle.clone(),
Alice,
|| {},
Vec::new(),
Some(ws_port),
);
// start bob
let bob =
run_relay_chain_validator_node(tokio_handle.clone(), Bob, || {}, vec![alice.addr.clone()]);
let bob = run_relay_chain_validator_node(
tokio_handle.clone(),
Bob,
|| {},
vec![alice.addr.clone()],
None,
);
// register parachain
alice
@@ -57,10 +70,19 @@ async fn test_full_node_catching_up() {
charlie.wait_for_blocks(5).await;
// run cumulus dave (a parachain full node) and wait for it to sync some blocks
let dave = cumulus_test_service::TestNodeBuilder::new(para_id, tokio_handle, Dave)
let dave = cumulus_test_service::TestNodeBuilder::new(para_id, tokio_handle.clone(), Dave)
.connect_to_parachain_node(&charlie)
.connect_to_relay_chain_nodes(vec![&alice, &bob])
.build()
.await;
dave.wait_for_blocks(7).await;
// run cumulus dave (a parachain full node) and wait for it to sync some blocks
let eve = cumulus_test_service::TestNodeBuilder::new(para_id, tokio_handle, Eve)
.connect_to_parachain_node(&charlie)
.connect_to_relay_chain_nodes(vec![&alice, &bob])
.use_external_relay_chain_node_at_port(ws_port)
.build()
.await;
join!(dave.wait_for_blocks(7), eve.wait_for_blocks(7));
}
@@ -42,11 +42,17 @@ async fn test_migrate_solo_to_para() {
let tokio_handle = tokio::runtime::Handle::current();
// start alice
let alice = run_relay_chain_validator_node(tokio_handle.clone(), Alice, || {}, Vec::new());
let alice =
run_relay_chain_validator_node(tokio_handle.clone(), Alice, || {}, Vec::new(), None);
// start bob
let bob =
run_relay_chain_validator_node(tokio_handle.clone(), Bob, || {}, vec![alice.addr.clone()]);
let bob = run_relay_chain_validator_node(
tokio_handle.clone(),
Bob,
|| {},
vec![alice.addr.clone()],
None,
);
// register parachain
alice
@@ -31,11 +31,17 @@ async fn test_runtime_upgrade() {
let tokio_handle = tokio::runtime::Handle::current();
// start alice
let alice = run_relay_chain_validator_node(tokio_handle.clone(), Alice, || {}, Vec::new());
let alice =
run_relay_chain_validator_node(tokio_handle.clone(), Alice, || {}, Vec::new(), None);
// start bob
let bob =
run_relay_chain_validator_node(tokio_handle.clone(), Bob, || {}, vec![alice.addr.clone()]);
let bob = run_relay_chain_validator_node(
tokio_handle.clone(),
Bob,
|| {},
vec![alice.addr.clone()],
None,
);
// register parachain
alice