diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index 77b2e30c8a..5048f09ada 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -19,6 +19,8 @@ //! This manages routing for parachain statements, parachain block and outgoing message //! data fetching, communication between collators and validators, and more. +#![recursion_limit="256"] + use polkadot_primitives::{Block, Hash, BlakeTwo256, HashT}; pub mod legacy; diff --git a/polkadot/network/src/protocol/mod.rs b/polkadot/network/src/protocol/mod.rs index 787429d88f..ac3b6edfc6 100644 --- a/polkadot/network/src/protocol/mod.rs +++ b/polkadot/network/src/protocol/mod.rs @@ -26,7 +26,8 @@ use codec::{Decode, Encode}; use futures::channel::{mpsc, oneshot}; use futures::future::Either; use futures::prelude::*; -use futures::task::{Spawn, SpawnExt}; +use futures::task::{Spawn, SpawnExt, Context, Poll}; +use futures::stream::{FuturesUnordered, StreamFuture}; use log::{debug, trace}; use polkadot_primitives::{ @@ -76,8 +77,7 @@ enum ServiceToWorkerMsg { PeerDisconnected(PeerId), // service messages. - BuildConsensusNetworking(Arc, Vec), - DropConsensusNetworking(Hash), + BuildConsensusNetworking(mpsc::Receiver, Arc, Vec), SubmitValidatedCollation( AbridgedCandidateReceipt, PoVBlock, @@ -782,6 +782,21 @@ fn send_peer_collations( } } +/// Receives messages associated to a certain consensus networking instance. +struct ConsensusNetworkingReceiver { + receiver: mpsc::Receiver, + /// The relay parent of this consensus network. + relay_parent: Hash, +} + +impl Stream for ConsensusNetworkingReceiver { + type Item = ServiceToWorkerMsg; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.receiver).poll_next(cx) + } +} + struct Worker { protocol_handler: ProtocolHandler, api: Arc, @@ -790,6 +805,7 @@ struct Worker { background_to_main_sender: mpsc::Sender, background_receiver: mpsc::Receiver, service_receiver: mpsc::Receiver, + consensus_networking_receivers: FuturesUnordered>, } impl Worker where @@ -801,6 +817,7 @@ impl Worker where // spawns a background task to spawn consensus networking. fn build_consensus_networking( &mut self, + receiver: mpsc::Receiver, table: Arc, authorities: Vec, ) { @@ -832,6 +849,9 @@ impl Worker where }, ); + let relay_parent = table.signing_context().parent_hash; + self.consensus_networking_receivers.push(ConsensusNetworkingReceiver { receiver, relay_parent }.into_future()); + // glue the incoming messages, shared table, and validation // work together. let _ = self.executor.spawn(statement_import_loop( @@ -855,12 +875,8 @@ impl Worker where ServiceToWorkerMsg::PeerMessage(remote, messages) => { self.protocol_handler.on_raw_messages(remote, messages) } - - ServiceToWorkerMsg::BuildConsensusNetworking(table, authorities) => { - self.build_consensus_networking(table, authorities); - } - ServiceToWorkerMsg::DropConsensusNetworking(relay_parent) => { - self.protocol_handler.drop_consensus_networking(&relay_parent); + ServiceToWorkerMsg::BuildConsensusNetworking(receiver, table, authorities) => { + self.build_consensus_networking(receiver, table, authorities); } ServiceToWorkerMsg::SubmitValidatedCollation(receipt, pov_block, chunks) => { let relay_parent = receipt.relay_parent; @@ -985,6 +1001,16 @@ impl Worker where Some(msg) => self.handle_service_message(msg), None => return, }, + consensus_service_msg = self.consensus_networking_receivers.next() => match consensus_service_msg { + Some((Some(msg), receiver)) => { + self.handle_service_message(msg); + self.consensus_networking_receivers.push(receiver.into_future()); + }, + Some((None, receiver)) => { + self.protocol_handler.drop_consensus_networking(&receiver.relay_parent); + }, + None => {}, + }, background_msg = self.background_receiver.next() => match background_msg { Some(msg) => self.handle_background_message(msg), None => return, @@ -1017,6 +1043,7 @@ async fn worker_loop( background_to_main_sender: background_tx, background_receiver: background_rx, service_receiver: receiver, + consensus_networking_receivers: Default::default(), }; worker.main_loop().await @@ -1296,24 +1323,6 @@ struct RouterInner { sender: mpsc::Sender, } -impl Drop for RouterInner { - fn drop(&mut self) { - let res = self.sender.try_send( - ServiceToWorkerMsg::DropConsensusNetworking(self.relay_parent) - ); - - if let Err(e) = res { - assert!( - !e.is_full(), - "futures 0.3 guarantees at least one free slot in the capacity \ - per sender; this is the first message sent via this sender; \ - therefore we will not have to wait for capacity; qed" - ); - // other error variants (disconnection) are fine here. - } - } -} - impl Service { /// Register an availablility-store that the network can query. pub fn register_availability_store(&self, store: av_store::Store) { @@ -1379,14 +1388,15 @@ impl ParachainNetwork for Service { let relay_parent = table.signing_context().parent_hash.clone(); Box::pin(async move { + let (router_sender, receiver) = mpsc::channel(0); sender.send( - ServiceToWorkerMsg::BuildConsensusNetworking(table, authorities) + ServiceToWorkerMsg::BuildConsensusNetworking(receiver, table, authorities) ).await?; Ok(Router { inner: Arc::new(RouterInner { relay_parent, - sender, + sender: router_sender, }) }) }) diff --git a/polkadot/network/src/protocol/tests.rs b/polkadot/network/src/protocol/tests.rs index 2b82dcb7f7..481b27cf72 100644 --- a/polkadot/network/src/protocol/tests.rs +++ b/polkadot/network/src/protocol/tests.rs @@ -54,6 +54,7 @@ type GossipStreamEntry = (mpsc::UnboundedReceiver, oneshot::S #[derive(Default, Clone)] struct MockGossip { inner: Arc>>, + gossip_messages: Arc>>, } impl MockGossip { @@ -102,8 +103,8 @@ impl crate::legacy::GossipService for MockGossip { }) } - fn gossip_message(&self, _topic: Hash, _message: GossipMessage) { - + fn gossip_message(&self, topic: Hash, message: GossipMessage) { + self.gossip_messages.lock().insert(topic, message); } fn send_message(&self, _who: PeerId, _message: GossipMessage) { @@ -250,22 +251,6 @@ fn test_setup(config: Config) -> ( (service, mock_gossip, pool, worker_task) } -#[test] -fn router_inner_drop_sends_worker_message() { - let parent = [1; 32].into(); - - let (sender, mut receiver) = mpsc::channel(0); - drop(RouterInner { - relay_parent: parent, - sender, - }); - - match receiver.try_next() { - Ok(Some(ServiceToWorkerMsg::DropConsensusNetworking(x))) => assert_eq!(parent, x), - _ => panic!("message not sent"), - } -} - #[test] fn worker_task_shuts_down_when_sender_dropped() { let (service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None }); @@ -274,6 +259,30 @@ fn worker_task_shuts_down_when_sender_dropped() { let _ = pool.run_until(worker_task); } +/// Given the async nature of `select!` that is being used in the main loop of the worker +/// and that consensus instances use their own channels, we don't know when the synchronize message +/// is handled. This helper functions checks multiple times that the given instance is dropped. Even +/// if the first round fails, the second one should be successful as the consensus instance drop +/// should be already handled this time. +fn wait_for_instance_drop(service: &mut Service, pool: &mut LocalPool, instance: Hash) { + let mut try_counter = 0; + let max_tries = 3; + + while try_counter < max_tries { + let dropped = pool.run_until(service.synchronize(move |proto| { + !proto.consensus_instances.contains_key(&instance) + })); + + if dropped { + return; + } + + try_counter += 1; + } + + panic!("Consensus instance `{}` wasn't dropped!", instance); +} + #[test] fn consensus_instances_cleaned_up() { let (mut service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None }); @@ -300,11 +309,61 @@ fn consensus_instances_cleaned_up() { drop(router); + wait_for_instance_drop(&mut service, &mut pool, relay_parent); +} + +#[test] +fn collation_is_received_with_dropped_router() { + let (mut service, gossip, mut pool, worker_task) = test_setup(Config { collating_for: None }); + let relay_parent = [0; 32].into(); + let topic = crate::legacy::gossip::attestation_topic(relay_parent); + + let signing_context = SigningContext { + session_index: Default::default(), + parent_hash: relay_parent, + }; + let table = Arc::new(SharedTable::new( + vec![Sr25519Keyring::Alice.public().into()], + HashMap::new(), + Some(Arc::new(Sr25519Keyring::Alice.pair().into())), + signing_context, + AvailabilityStore::new_in_memory(service.clone()), + None, + )); + + pool.spawner().spawn_local(worker_task).unwrap(); + + let router = pool.run_until( + service.build_table_router(table, &[]) + ).unwrap(); + + let receipt = AbridgedCandidateReceipt { relay_parent, ..Default::default() }; + let local_collation_future = router.local_collation( + receipt, + PoVBlock { block_data: BlockData(Vec::new()) }, + (0, &[]), + ); + + // Drop the router and make sure that the consensus instance is still alive + drop(router); + assert!(pool.run_until(service.synchronize(move |proto| { - !proto.consensus_instances.contains_key(&relay_parent) + proto.consensus_instances.contains_key(&relay_parent) + }))); + + // The gossip message should still be unknown + assert!(!gossip.gossip_messages.lock().contains_key(&topic)); + + pool.run_until(local_collation_future).unwrap(); + + // Make sure the instance is now dropped and the message was gossiped + wait_for_instance_drop(&mut service, &mut pool, relay_parent); + assert!(pool.run_until(service.synchronize(move |_| { + gossip.gossip_messages.lock().contains_key(&topic) }))); } + #[test] fn validator_peer_cleaned_up() { let (mut service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None }); diff --git a/polkadot/primitives/src/parachain.rs b/polkadot/primitives/src/parachain.rs index 6b9e95f9a8..de05063387 100644 --- a/polkadot/primitives/src/parachain.rs +++ b/polkadot/primitives/src/parachain.rs @@ -75,9 +75,10 @@ pub type ValidatorId = validator_app::Public; /// Index of the validator is used as a lightweight replacement of the `ValidatorId` when appropriate. pub type ValidatorIndex = u32; -/// A Parachain validator keypair. -#[cfg(feature = "std")] -pub type ValidatorPair = validator_app::Pair; +application_crypto::with_pair! { + /// A Parachain validator keypair. + pub type ValidatorPair = validator_app::Pair; +} /// Signature with which parachain validators sign blocks. ///