diff --git a/Cargo.lock b/Cargo.lock index 6afcdb26ce..68077e1a24 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1951,7 +1951,6 @@ dependencies = [ "polkadot-node-primitives", "polkadot-parachain", "polkadot-primitives", - "polkadot-service", "polkadot-test-client", "portpicker", "sc-cli", diff --git a/client/consensus/common/src/parachain_consensus.rs b/client/consensus/common/src/parachain_consensus.rs index 721f2240bc..37a2d87bb5 100644 --- a/client/consensus/common/src/parachain_consensus.rs +++ b/client/consensus/common/src/parachain_consensus.rs @@ -14,7 +14,6 @@ // You should have received a copy of the GNU General Public License // along with Cumulus. If not, see . -use async_trait::async_trait; use sc_client_api::{ Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider, }; @@ -29,9 +28,9 @@ use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult}; use polkadot_primitives::v2::{Hash as PHash, Id as ParaId, OccupiedCoreAssumption}; use codec::Decode; -use futures::{channel::mpsc::Sender, select, FutureExt, Stream, StreamExt}; +use futures::{channel::mpsc::Sender, pin_mut, select, FutureExt, Stream, StreamExt}; -use std::{pin::Pin, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; const LOG_TARGET: &str = "cumulus-consensus"; @@ -42,29 +41,6 @@ const LOG_TARGET: &str = "cumulus-consensus"; const RECOVERY_DELAY: RecoveryDelay = RecoveryDelay { min: Duration::ZERO, max: Duration::from_secs(30) }; -/// Helper for the relay chain client. This is expected to be a lightweight handle like an `Arc`. -#[async_trait] -pub trait RelaychainClient: Clone + 'static { - /// The error type for interacting with the Polkadot client. - type Error: std::fmt::Debug + Send; - - /// A stream that yields head-data for a parachain. - type HeadStream: Stream> + Send + Unpin; - - /// Get a stream of new best heads for the given parachain. - async fn new_best_heads(&self, para_id: ParaId) -> RelayChainResult; - - /// Get a stream of finalized heads for the given parachain. - async fn finalized_heads(&self, para_id: ParaId) -> RelayChainResult; - - /// Returns the parachain head for the given `para_id` at the given block id. - async fn parachain_head_at( - &self, - at: PHash, - para_id: ParaId, - ) -> RelayChainResult>>; -} - /// Follow the finalized head of the given parachain. /// /// For every finalized block of the relay chain, it will get the included parachain header @@ -73,10 +49,10 @@ async fn follow_finalized_head(para_id: ParaId, parachain: Arc

+ UsageProvider, - R: RelaychainClient, + R: RelayChainInterface + Clone, B: Backend, { - let mut finalized_heads = match relay_chain.finalized_heads(para_id).await { + let finalized_heads = match finalized_heads(relay_chain, para_id).await { Ok(finalized_heads_stream) => finalized_heads_stream, Err(err) => { tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve finalized heads stream."); @@ -84,6 +60,8 @@ where }, }; + pin_mut!(finalized_heads); + loop { let finalized_head = if let Some(h) = finalized_heads.next().await { h @@ -152,7 +130,7 @@ pub async fn run_parachain_consensus( + BlockBackend + BlockchainEvents, for<'a> &'a P: BlockImport, - R: RelaychainClient, + R: RelayChainInterface + Clone, B: Backend, { let follow_new_best = follow_new_best( @@ -175,7 +153,7 @@ async fn follow_new_best( parachain: Arc

, relay_chain: R, announce_block: Arc>) + Send + Sync>, - recovery_chan_tx: Option>>, + mut recovery_chan_tx: Option>>, ) where Block: BlockT, P: Finalizer @@ -185,10 +163,10 @@ async fn follow_new_best( + BlockBackend + BlockchainEvents, for<'a> &'a P: BlockImport, - R: RelaychainClient, + R: RelayChainInterface + Clone, B: Backend, { - let mut new_best_heads = match relay_chain.new_best_heads(para_id).await { + let new_best_heads = match new_best_heads(relay_chain, para_id).await { Ok(best_heads_stream) => best_heads_stream.fuse(), Err(err) => { tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve best heads stream."); @@ -196,9 +174,11 @@ async fn follow_new_best( }, }; + pin_mut!(new_best_heads); + let mut imported_blocks = parachain.import_notification_stream().fuse(); // The unset best header of the parachain. Will be `Some(_)` when we have imported a relay chain - // block before the parachain block it included. In this case we need to wait for this block to + // block before the associated parachain block. In this case we need to wait for this block to // be imported to set it as new best. let mut unset_best_header = None; @@ -210,7 +190,7 @@ async fn follow_new_best( h, &*parachain, &mut unset_best_header, - recovery_chan_tx.clone(), + recovery_chan_tx.as_mut(), ).await, None => { tracing::debug!( @@ -304,7 +284,7 @@ async fn handle_new_best_parachain_head( head: Vec, parachain: &P, unset_best_header: &mut Option, - mut recovery_chan_tx: Option>>, + mut recovery_chan_tx: Option<&mut Sender>>, ) where Block: BlockT, P: UsageProvider + Send + Sync + BlockBackend, @@ -416,50 +396,42 @@ where } } -#[async_trait] -impl RelaychainClient for RCInterface -where - RCInterface: RelayChainInterface + Clone + 'static, -{ - type Error = ClientError; +/// Returns a stream that will yield best heads for the given `para_id`. +async fn new_best_heads( + relay_chain: impl RelayChainInterface + Clone, + para_id: ParaId, +) -> RelayChainResult>> { + let new_best_notification_stream = + relay_chain.new_best_notification_stream().await?.filter_map(move |n| { + let relay_chain = relay_chain.clone(); + async move { parachain_head_at(&relay_chain, n.hash(), para_id).await.ok().flatten() } + }); - type HeadStream = Pin> + Send>>; - - async fn new_best_heads(&self, para_id: ParaId) -> RelayChainResult { - let relay_chain = self.clone(); - - let new_best_notification_stream = self - .new_best_notification_stream() - .await? - .filter_map(move |n| { - let relay_chain = relay_chain.clone(); - async move { relay_chain.parachain_head_at(n.hash(), para_id).await.ok().flatten() } - }) - .boxed(); - Ok(new_best_notification_stream) - } - - async fn finalized_heads(&self, para_id: ParaId) -> RelayChainResult { - let relay_chain = self.clone(); - - let finality_notification_stream = self - .finality_notification_stream() - .await? - .filter_map(move |n| { - let relay_chain = relay_chain.clone(); - async move { relay_chain.parachain_head_at(n.hash(), para_id).await.ok().flatten() } - }) - .boxed(); - Ok(finality_notification_stream) - } - - async fn parachain_head_at( - &self, - at: PHash, - para_id: ParaId, - ) -> RelayChainResult>> { - self.persisted_validation_data(at, para_id, OccupiedCoreAssumption::TimedOut) - .await - .map(|s| s.map(|s| s.parent_head.0)) - } + Ok(new_best_notification_stream) +} + +/// Returns a stream that will yield finalized heads for the given `para_id`. +async fn finalized_heads( + relay_chain: impl RelayChainInterface + Clone, + para_id: ParaId, +) -> RelayChainResult>> { + let finality_notification_stream = + relay_chain.finality_notification_stream().await?.filter_map(move |n| { + let relay_chain = relay_chain.clone(); + async move { parachain_head_at(&relay_chain, n.hash(), para_id).await.ok().flatten() } + }); + + Ok(finality_notification_stream) +} + +/// Returns head of the parachain at the given relay chain block. +async fn parachain_head_at( + relay_chain: &impl RelayChainInterface, + at: PHash, + para_id: ParaId, +) -> RelayChainResult>> { + relay_chain + .persisted_validation_data(at, para_id, OccupiedCoreAssumption::TimedOut) + .await + .map(|s| s.map(|s| s.parent_head.0)) } diff --git a/client/consensus/common/src/tests.rs b/client/consensus/common/src/tests.rs index 9461b09d86..5cc4936a62 100644 --- a/client/consensus/common/src/tests.rs +++ b/client/consensus/common/src/tests.rs @@ -19,20 +19,24 @@ use crate::*; use async_trait::async_trait; use codec::Encode; use cumulus_client_pov_recovery::RecoveryKind; -use cumulus_relay_chain_interface::RelayChainResult; +use cumulus_primitives_core::{InboundDownwardMessage, InboundHrmpMessage}; +use cumulus_relay_chain_interface::{ + CommittedCandidateReceipt, OccupiedCoreAssumption, OverseerHandle, PHeader, ParaId, + RelayChainInterface, RelayChainResult, SessionIndex, StorageValue, ValidatorId, +}; use cumulus_test_client::{ runtime::{Block, Header}, Backend, Client, InitBlockBuilder, TestClientBuilder, TestClientBuilderExt, }; use futures::{channel::mpsc, executor::block_on, select, FutureExt, Stream, StreamExt}; use futures_timer::Delay; -use polkadot_primitives::v2::Id as ParaId; use sc_client_api::{blockchain::Backend as _, Backend as _, UsageProvider}; use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy}; -use sp_blockchain::Error as ClientError; use sp_consensus::{BlockOrigin, BlockStatus}; use sp_runtime::generic::BlockId; use std::{ + collections::{BTreeMap, HashMap}, + pin::Pin, sync::{Arc, Mutex}, time::Duration, }; @@ -42,6 +46,7 @@ struct RelaychainInner { finalized_heads: Option>, new_best_heads_sender: mpsc::UnboundedSender

, finalized_heads_sender: mpsc::UnboundedSender
, + relay_chain_hash_to_header: HashMap, } impl RelaychainInner { @@ -54,6 +59,7 @@ impl RelaychainInner { finalized_heads_sender, new_best_heads: Some(new_best_heads), finalized_heads: Some(finalized_heads), + relay_chain_hash_to_header: Default::default(), } } } @@ -70,37 +76,133 @@ impl Relaychain { } #[async_trait] -impl crate::parachain_consensus::RelaychainClient for Relaychain { - type Error = ClientError; - - type HeadStream = Box> + Send + Unpin>; - - async fn new_best_heads(&self, _: ParaId) -> RelayChainResult { - let stream = self - .inner - .lock() - .unwrap() - .new_best_heads - .take() - .expect("Should only be called once"); - - Ok(Box::new(stream.map(|v| v.encode()))) +impl RelayChainInterface for Relaychain { + async fn validators(&self, _: PHash) -> RelayChainResult> { + unimplemented!("Not needed for test") } - async fn finalized_heads(&self, _: ParaId) -> RelayChainResult { - let stream = self + async fn best_block_hash(&self) -> RelayChainResult { + unimplemented!("Not needed for test") + } + + async fn retrieve_dmq_contents( + &self, + _: ParaId, + _: PHash, + ) -> RelayChainResult> { + unimplemented!("Not needed for test") + } + + async fn retrieve_all_inbound_hrmp_channel_contents( + &self, + _: ParaId, + _: PHash, + ) -> RelayChainResult>> { + unimplemented!("Not needed for test") + } + + async fn persisted_validation_data( + &self, + hash: PHash, + _: ParaId, + _: OccupiedCoreAssumption, + ) -> RelayChainResult> { + Ok(Some(PersistedValidationData { + parent_head: self + .inner + .lock() + .unwrap() + .relay_chain_hash_to_header + .get(&hash) + .unwrap() + .encode() + .into(), + ..Default::default() + })) + } + + async fn candidate_pending_availability( + &self, + _: PHash, + _: ParaId, + ) -> RelayChainResult> { + unimplemented!("Not needed for test") + } + + async fn session_index_for_child(&self, _: PHash) -> RelayChainResult { + unimplemented!("Not needed for test") + } + + async fn import_notification_stream( + &self, + ) -> RelayChainResult + Send>>> { + unimplemented!("Not needed for test") + } + + async fn finality_notification_stream( + &self, + ) -> RelayChainResult + Send>>> { + let inner = self.inner.clone(); + Ok(self .inner .lock() .unwrap() .finalized_heads .take() - .expect("Should only be called once"); - - Ok(Box::new(stream.map(|v| v.encode()))) + .unwrap() + .map(move |h| { + // Let's abuse the "parachain header" directly as relay chain header. + inner.lock().unwrap().relay_chain_hash_to_header.insert(h.hash(), h.clone()); + h + }) + .boxed()) } - async fn parachain_head_at(&self, _: PHash, _: ParaId) -> RelayChainResult>> { - unimplemented!("Not required for tests") + async fn is_major_syncing(&self) -> RelayChainResult { + Ok(false) + } + + fn overseer_handle(&self) -> RelayChainResult { + unimplemented!("Not needed for test") + } + + async fn get_storage_by_key( + &self, + _: PHash, + _: &[u8], + ) -> RelayChainResult> { + unimplemented!("Not needed for test") + } + + async fn prove_read( + &self, + _: PHash, + _: &Vec>, + ) -> RelayChainResult { + unimplemented!("Not needed for test") + } + + async fn wait_for_block(&self, _: PHash) -> RelayChainResult<()> { + unimplemented!("Not needed for test") + } + + async fn new_best_notification_stream( + &self, + ) -> RelayChainResult + Send>>> { + let inner = self.inner.clone(); + Ok(self + .inner + .lock() + .unwrap() + .new_best_heads + .take() + .unwrap() + .map(move |h| { + // Let's abuse the "parachain header" directly as relay chain header. + inner.lock().unwrap().relay_chain_hash_to_header.insert(h.hash(), h.clone()); + h + }) + .boxed()) } } @@ -121,7 +223,7 @@ fn build_block( let mut block = builder.build().unwrap().block; // Simulate some form of post activity (like a Seal or Other generic things). - // This is mostly used to excercise the `LevelMonitor` correct behavior. + // This is mostly used to exercise the `LevelMonitor` correct behavior. // (in practice we want that header post-hash != pre-hash) block.header.digest.push(sp_runtime::DigestItem::Other(vec![1, 2, 3])); diff --git a/client/network/Cargo.toml b/client/network/Cargo.toml index d524062815..717b30ad38 100644 --- a/client/network/Cargo.toml +++ b/client/network/Cargo.toml @@ -45,7 +45,6 @@ substrate-test-utils = { git = "https://github.com/paritytech/substrate", branch # Polkadot polkadot-client = { git = "https://github.com/paritytech/polkadot", branch = "master" } -polkadot-service = { git = "https://github.com/paritytech/polkadot", branch = "master" } polkadot-test-client = { git = "https://github.com/paritytech/polkadot", branch = "master" } # Cumulus diff --git a/client/network/src/tests.rs b/client/network/src/tests.rs index cef327b876..0a2cd57a0a 100644 --- a/client/network/src/tests.rs +++ b/client/network/src/tests.rs @@ -17,18 +17,18 @@ use super::*; use async_trait::async_trait; use cumulus_relay_chain_inprocess_interface::{check_block_in_chain, BlockCheckStatus}; -use cumulus_relay_chain_interface::{RelayChainError, RelayChainResult}; +use cumulus_relay_chain_interface::{ + OverseerHandle, PHeader, ParaId, RelayChainError, RelayChainResult, +}; use cumulus_test_service::runtime::{Block, Hash, Header}; use futures::{executor::block_on, poll, task::Poll, FutureExt, Stream, StreamExt}; use parking_lot::Mutex; use polkadot_node_primitives::{SignedFullStatement, Statement}; use polkadot_primitives::v2::{ CandidateCommitments, CandidateDescriptor, CollatorPair, CommittedCandidateReceipt, - Hash as PHash, HeadData, Header as PHeader, Id as ParaId, InboundDownwardMessage, - InboundHrmpMessage, OccupiedCoreAssumption, PersistedValidationData, SessionIndex, - SigningContext, ValidationCodeHash, ValidatorId, + Hash as PHash, HeadData, InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption, + PersistedValidationData, SessionIndex, SigningContext, ValidationCodeHash, ValidatorId, }; -use polkadot_service::Handle; use polkadot_test_client::{ Client as PClient, ClientBlockImportExt, DefaultTestClientBuilderExt, FullBackend as PBackend, InitPolkadotBlockBuilder, TestClientBuilder, TestClientBuilderExt, @@ -174,7 +174,7 @@ impl RelayChainInterface for DummyRelayChainInterface { Ok(false) } - fn overseer_handle(&self) -> RelayChainResult { + fn overseer_handle(&self) -> RelayChainResult { unimplemented!("Not needed for test") } diff --git a/client/relay-chain-interface/src/lib.rs b/client/relay-chain-interface/src/lib.rs index 56de750b68..7dba142ab8 100644 --- a/client/relay-chain-interface/src/lib.rs +++ b/client/relay-chain-interface/src/lib.rs @@ -16,14 +16,7 @@ use std::{collections::BTreeMap, pin::Pin, sync::Arc}; -use cumulus_primitives_core::{ - relay_chain::{ - v2::{CommittedCandidateReceipt, OccupiedCoreAssumption, SessionIndex, ValidatorId}, - Hash as PHash, Header as PHeader, InboundHrmpMessage, - }, - InboundDownwardMessage, ParaId, PersistedValidationData, -}; -use polkadot_overseer::{prometheus::PrometheusError, Handle as OverseerHandle}; +use polkadot_overseer::prometheus::PrometheusError; use polkadot_service::SubstrateServiceError; use sc_client_api::StorageProof; @@ -33,7 +26,16 @@ use async_trait::async_trait; use jsonrpsee_core::Error as JsonRpcError; use parity_scale_codec::Error as CodecError; use sp_api::ApiError; -use sp_state_machine::StorageValue; + +pub use cumulus_primitives_core::{ + relay_chain::{ + v2::{CommittedCandidateReceipt, OccupiedCoreAssumption, SessionIndex, ValidatorId}, + Hash as PHash, Header as PHeader, InboundHrmpMessage, + }, + InboundDownwardMessage, ParaId, PersistedValidationData, +}; +pub use polkadot_overseer::Handle as OverseerHandle; +pub use sp_state_machine::StorageValue; pub type RelayChainResult = Result;