diff --git a/cumulus/Cargo.lock b/cumulus/Cargo.lock index 33eada0167..e462792549 100644 --- a/cumulus/Cargo.lock +++ b/cumulus/Cargo.lock @@ -828,6 +828,7 @@ name = "cumulus-collator" version = "0.1.0" dependencies = [ "cumulus-consensus", + "cumulus-network", "cumulus-primitives", "cumulus-runtime", "cumulus-test-client", @@ -891,7 +892,10 @@ dependencies = [ name = "cumulus-network" version = "0.1.0" dependencies = [ + "futures 0.3.4", + "log 0.4.8", "parity-scale-codec", + "polkadot-collator", "polkadot-network", "polkadot-primitives", "polkadot-statement-table", diff --git a/cumulus/collator/Cargo.toml b/cumulus/collator/Cargo.toml index 4851cd0f36..27bee2a6df 100644 --- a/cumulus/collator/Cargo.toml +++ b/cumulus/collator/Cargo.toml @@ -24,8 +24,9 @@ polkadot-validation = { git = "https://github.com/paritytech/polkadot", branch = # Cumulus dependencies cumulus-consensus = { path = "../consensus" } -cumulus-runtime = { path = "../runtime" } +cumulus-network = { path = "../network" } cumulus-primitives = { path = "../primitives" } +cumulus-runtime = { path = "../runtime" } # Other dependencies log = "0.4.8" diff --git a/cumulus/collator/src/lib.rs b/cumulus/collator/src/lib.rs index e5f85622cc..c5fa6a7de2 100644 --- a/cumulus/collator/src/lib.rs +++ b/cumulus/collator/src/lib.rs @@ -16,6 +16,7 @@ //! Cumulus Collator implementation for Substrate. +use cumulus_network::WaitToAnnounce; use cumulus_primitives::{ inherents::VALIDATION_FUNCTION_PARAMS_IDENTIFIER as VFP_IDENT, validation_function_params::ValidationFunctionParams, @@ -30,6 +31,7 @@ use sp_inherents::{InherentData, InherentDataProviders}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, HashFor}; use sp_api::{ApiExt, ProvideRuntimeApi}; use sc_client_api::{StateBackend, UsageProvider, Finalizer, BlockchainEvents}; +use sc_service::Configuration; use polkadot_collator::{ BuildParachainContext, InvalidHead, Network as CollatorNetwork, ParachainContext, @@ -44,7 +46,8 @@ use codec::{Decode, Encode}; use log::{error, trace}; -use futures::{task::Spawn, Future, future, FutureExt}; +use futures::task::Spawn; +use futures::prelude::*; use std::{fmt::Debug, marker::PhantomData, sync::Arc, time::Duration, pin::Pin}; @@ -57,28 +60,39 @@ struct HeadData { } /// The implementation of the Cumulus `Collator`. -pub struct Collator { +pub struct Collator { proposer_factory: Arc>, _phantom: PhantomData, inherent_data_providers: InherentDataProviders, collator_network: Arc, block_import: Arc>, + wait_to_announce: Arc>>, } -impl Collator { +impl Collator { /// Create a new instance. fn new( proposer_factory: PF, inherent_data_providers: InherentDataProviders, collator_network: impl CollatorNetwork + Clone + 'static, block_import: BI, + spawner: Arc, + announce_block: Arc) + Send + Sync>, ) -> Self { + let collator_network = Arc::new(collator_network); + let wait_to_announce = Arc::new(Mutex::new(WaitToAnnounce::new( + spawner, + announce_block, + collator_network.clone(), + ))); + Self { proposer_factory: Arc::new(Mutex::new(proposer_factory)), inherent_data_providers, _phantom: PhantomData, - collator_network: Arc::new(collator_network), + collator_network, block_import: Arc::new(Mutex::new(block_import)), + wait_to_announce, } } @@ -115,7 +129,7 @@ impl Collator { } } -impl Clone for Collator { +impl Clone for Collator { fn clone(&self) -> Self { Self { proposer_factory: self.proposer_factory.clone(), @@ -123,6 +137,7 @@ impl Clone for Collator { _phantom: PhantomData, collator_network: self.collator_network.clone(), block_import: self.block_import.clone(), + wait_to_announce: self.wait_to_announce.clone(), } } } @@ -147,7 +162,7 @@ where fn produce_candidate( &mut self, - _relay_chain_parent: PHash, + relay_chain_parent: PHash, global_validation: GlobalValidationSchedule, local_validation: LocalValidationData, ) -> Self::ProduceCandidate { @@ -169,6 +184,8 @@ where .lock() .init(&last_head.header); + let wait_to_announce = self.wait_to_announce.clone(); + Box::pin(async move { let parent_state_root = *last_head.header.state_root(); @@ -245,8 +262,11 @@ where } let block_data = BlockData(b.encode()); + let header = b.into_header(); + let encoded_header = header.encode(); + let hash = header.hash(); let head_data = HeadData:: { - header: b.into_header(), + header, }; let candidate = ( @@ -254,6 +274,12 @@ where parachain::HeadData(head_data.encode()), ); + wait_to_announce.lock().wait_to_announce( + hash, + relay_chain_parent, + encoded_header, + ); + trace!(target: "cumulus-collator", "Produced candidate: {:?}", candidate); Ok(candidate) @@ -268,6 +294,7 @@ pub struct CollatorBuilder { block_import: BI, para_id: ParaId, client: Arc, + announce_block: Arc) + Send + Sync>, _marker: PhantomData<(Block, Backend)>, } @@ -281,6 +308,7 @@ impl block_import: BI, para_id: ParaId, client: Arc, + announce_block: Arc) + Send + Sync>, ) -> Self { Self { proposer_factory, @@ -288,6 +316,7 @@ impl block_import, para_id, client, + announce_block, _marker: PhantomData, } } @@ -313,7 +342,7 @@ where self, polkadot_client: Arc, spawner: Spawner, - network: impl CollatorNetwork + Clone + 'static, + polkadot_network: impl CollatorNetwork + Clone + 'static, ) -> Result where PClient: ProvideRuntimeApi + Send + Sync + BlockchainEvents + 'static, @@ -342,12 +371,24 @@ where Ok(Collator::new( self.proposer_factory, self.inherent_data_providers, - network, + polkadot_network, self.block_import, + Arc::new(spawner), + self.announce_block, )) } } +/// Prepare the collator's node condifugration +/// +/// This function will disable the default announcement of Substrate for the parachain in favor +/// of the one of Cumulus. +pub fn prepare_collator_config(mut parachain_config: Configuration) -> Configuration { + parachain_config.announce_block = false; + + parachain_config +} + #[cfg(test)] mod tests { use super::*; @@ -460,6 +501,7 @@ mod tests { let id = ParaId::from(100); let _ = env_logger::try_init(); let spawner = futures::executor::ThreadPool::new().unwrap(); + let announce_block = |_, _| (); let builder = CollatorBuilder::new( DummyFactory, @@ -467,6 +509,7 @@ mod tests { TestClientBuilder::new().build(), id, Arc::new(TestClientBuilder::new().build()), + Arc::new(announce_block), ); let context = builder .build( diff --git a/cumulus/network/Cargo.toml b/cumulus/network/Cargo.toml index 605d26145c..9f8920e069 100644 --- a/cumulus/network/Cargo.toml +++ b/cumulus/network/Cargo.toml @@ -13,6 +13,7 @@ sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "cumulu sp-api = { git = "https://github.com/paritytech/substrate", branch = "cumulus-branch" } # polkadot deps +polkadot-collator = { git = "https://github.com/paritytech/polkadot", branch = "cumulus-branch" } polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "cumulus-branch" } polkadot-statement-table = { git = "https://github.com/paritytech/polkadot", branch = "cumulus-branch" } polkadot-validation = { git = "https://github.com/paritytech/polkadot", branch = "cumulus-branch" } @@ -20,3 +21,5 @@ polkadot-network = { git = "https://github.com/paritytech/polkadot", branch = "c # other deps codec = { package = "parity-scale-codec", version = "1.3.0", features = [ "derive" ] } +futures = { version = "0.3.1", features = ["compat"] } +log = "0.4.8" diff --git a/cumulus/network/src/lib.rs b/cumulus/network/src/lib.rs index 683b77a74d..3229ea8fcb 100644 --- a/cumulus/network/src/lib.rs +++ b/cumulus/network/src/lib.rs @@ -23,15 +23,21 @@ use sp_blockchain::Error as ClientError; use sp_consensus::block_validation::{BlockAnnounceValidator, Validation}; use sp_runtime::{generic::BlockId, traits::Block as BlockT}; +use polkadot_collator::Network as CollatorNetwork; use polkadot_network::legacy::gossip::{GossipMessage, GossipStatement}; use polkadot_primitives::{ parachain::{ParachainHost, ValidatorId}, - Block as PBlock, + Block as PBlock, Hash as PHash, }; use polkadot_statement_table::{SignedStatement, Statement}; use polkadot_validation::check_statement; use codec::{Decode, Encode}; +use futures::{pin_mut, select, StreamExt}; +use futures::channel::oneshot; +use futures::future::FutureExt; +use futures::task::Spawn; +use log::{error, trace}; use std::{marker::PhantomData, sync::Arc}; @@ -141,3 +147,113 @@ where Ok(Validation::Success) } } + +/// Wait before announcing a block that a candidate message has been received for this block, then +/// add this message as justification for the block announcement. +/// +/// This object will spawn a new task every time the method `wait_to_announce` is called and cancel +/// the previous task running. +pub struct WaitToAnnounce { + spawner: Arc, + announce_block: Arc) + Send + Sync>, + collator_network: Arc, + current_trigger: oneshot::Sender<()>, +} + +impl WaitToAnnounce { + /// Create the `WaitToAnnounce` object + pub fn new( + spawner: Arc, + announce_block: Arc) + Send + Sync>, + collator_network: Arc, + ) -> WaitToAnnounce { + let (tx, _rx) = oneshot::channel(); + + WaitToAnnounce { + spawner, + announce_block, + collator_network, + current_trigger: tx, + } + } + + /// Wait for a candidate message for the block, then announce the block. The candidate + /// message will be added as justification to the block announcement. + pub fn wait_to_announce( + &mut self, + hash: ::Hash, + relay_chain_leaf: PHash, + head_data: Vec, + ) { + let (tx, rx) = oneshot::channel(); + let announce_block = self.announce_block.clone(); + let collator_network = self.collator_network.clone(); + + self.current_trigger = tx; + + if let Err(err) = self.spawner.spawn_obj(Box::pin(async move { + let t1 = wait_to_announce::( + hash, + relay_chain_leaf, + announce_block, + collator_network, + &head_data, + ).fuse(); + let t2 = rx.fuse(); + + pin_mut!(t1, t2); + + trace!( + target: "cumulus-network", + "waiting for announce block in a background task...", + ); + + select! { + _ = t1 => { + trace!( + target: "cumulus-network", + "block announcement finished", + ); + }, + _ = t2 => { + trace!( + target: "cumulus-network", + "previous task that waits for announce block has been canceled", + ); + } + } + }).into()) { + error!( + target: "cumulus-network", + "Could not spawn a new task to wait for the announce block: {:?}", + err, + ); + } + } +} + +async fn wait_to_announce( + hash: ::Hash, + relay_chain_leaf: PHash, + announce_block: Arc) + Send + Sync>, + collator_network: Arc, + head_data: &Vec, +) { + let mut checked_statements = collator_network.checked_statements(relay_chain_leaf); + + while let Some(statement) = checked_statements.next().await { + match &statement.statement { + Statement::Candidate(c) if &c.head_data.0 == head_data => { + let gossip_message: GossipMessage = GossipStatement { + relay_chain_leaf, + signed_statement: statement, + }.into(); + + announce_block(hash, gossip_message.encode()); + + break; + }, + _ => {}, + } + } +} diff --git a/cumulus/test/parachain/src/service.rs b/cumulus/test/parachain/src/service.rs index d6f8db67da..81627d24f1 100644 --- a/cumulus/test/parachain/src/service.rs +++ b/cumulus/test/parachain/src/service.rs @@ -15,17 +15,12 @@ // along with Cumulus. If not, see . use std::sync::Arc; - use sc_executor::native_executor_instance; use sc_service::{AbstractService, Configuration}; use sc_finality_grandpa::{FinalityProofProvider as GrandpaFinalityProofProvider, StorageAndProofProvider}; - use polkadot_primitives::parachain::CollatorPair; - -use cumulus_collator::CollatorBuilder; - +use cumulus_collator::{CollatorBuilder, prepare_collator_config}; use futures::FutureExt; - pub use sc_executor::NativeExecutor; // Our native executor instance. @@ -76,6 +71,8 @@ pub fn run_collator( key: Arc, polkadot_config: polkadot_collator::Configuration, ) -> sc_service::error::Result { + let parachain_config = prepare_collator_config(parachain_config); + let (builder, inherent_data_providers) = new_full_start!(parachain_config); inherent_data_providers .register_provider(sp_timestamp::InherentDataProvider) @@ -96,12 +93,17 @@ pub fn run_collator( let block_import = service.client(); let client = service.client(); + let network = service.network(); + let announce_block = Arc::new(move |hash, data| { + network.announce_block(hash, data) + }); let builder = CollatorBuilder::new( proposer_factory, inherent_data_providers, block_import, crate::PARA_ID, client, + announce_block, ); let polkadot_future = polkadot_collator::start_collator(