From 1e6f37076d299506f1d2d15f46a7fd4b2432b4bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Wed, 1 Apr 2020 17:01:59 +0200 Subject: [PATCH] Rework consensus instance communication with the network worker (#958) Up to now consensus instances used the main channel to communicate with the background network worker. This lead to a race condition when sending a local collation and dropping the router before driving the send local collation future until it is finished. This pr changes the communication between worker and the instances to use their own channels. This has the advantage that we don't need an extra `DropConsensusNetworking` message as the network is dropped automatically when the last sender is dropped. --- polkadot/network/src/lib.rs | 2 + polkadot/network/src/protocol/mod.rs | 68 ++++++++++-------- polkadot/network/src/protocol/tests.rs | 97 +++++++++++++++++++++----- polkadot/primitives/src/parachain.rs | 7 +- 4 files changed, 123 insertions(+), 51 deletions(-) diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index 77b2e30c8a..5048f09ada 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -19,6 +19,8 @@ //! This manages routing for parachain statements, parachain block and outgoing message //! data fetching, communication between collators and validators, and more. +#![recursion_limit="256"] + use polkadot_primitives::{Block, Hash, BlakeTwo256, HashT}; pub mod legacy; diff --git a/polkadot/network/src/protocol/mod.rs b/polkadot/network/src/protocol/mod.rs index 787429d88f..ac3b6edfc6 100644 --- a/polkadot/network/src/protocol/mod.rs +++ b/polkadot/network/src/protocol/mod.rs @@ -26,7 +26,8 @@ use codec::{Decode, Encode}; use futures::channel::{mpsc, oneshot}; use futures::future::Either; use futures::prelude::*; -use futures::task::{Spawn, SpawnExt}; +use futures::task::{Spawn, SpawnExt, Context, Poll}; +use futures::stream::{FuturesUnordered, StreamFuture}; use log::{debug, trace}; use polkadot_primitives::{ @@ -76,8 +77,7 @@ enum ServiceToWorkerMsg { PeerDisconnected(PeerId), // service messages. - BuildConsensusNetworking(Arc, Vec), - DropConsensusNetworking(Hash), + BuildConsensusNetworking(mpsc::Receiver, Arc, Vec), SubmitValidatedCollation( AbridgedCandidateReceipt, PoVBlock, @@ -782,6 +782,21 @@ fn send_peer_collations( } } +/// Receives messages associated to a certain consensus networking instance. +struct ConsensusNetworkingReceiver { + receiver: mpsc::Receiver, + /// The relay parent of this consensus network. + relay_parent: Hash, +} + +impl Stream for ConsensusNetworkingReceiver { + type Item = ServiceToWorkerMsg; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.receiver).poll_next(cx) + } +} + struct Worker { protocol_handler: ProtocolHandler, api: Arc, @@ -790,6 +805,7 @@ struct Worker { background_to_main_sender: mpsc::Sender, background_receiver: mpsc::Receiver, service_receiver: mpsc::Receiver, + consensus_networking_receivers: FuturesUnordered>, } impl Worker where @@ -801,6 +817,7 @@ impl Worker where // spawns a background task to spawn consensus networking. fn build_consensus_networking( &mut self, + receiver: mpsc::Receiver, table: Arc, authorities: Vec, ) { @@ -832,6 +849,9 @@ impl Worker where }, ); + let relay_parent = table.signing_context().parent_hash; + self.consensus_networking_receivers.push(ConsensusNetworkingReceiver { receiver, relay_parent }.into_future()); + // glue the incoming messages, shared table, and validation // work together. let _ = self.executor.spawn(statement_import_loop( @@ -855,12 +875,8 @@ impl Worker where ServiceToWorkerMsg::PeerMessage(remote, messages) => { self.protocol_handler.on_raw_messages(remote, messages) } - - ServiceToWorkerMsg::BuildConsensusNetworking(table, authorities) => { - self.build_consensus_networking(table, authorities); - } - ServiceToWorkerMsg::DropConsensusNetworking(relay_parent) => { - self.protocol_handler.drop_consensus_networking(&relay_parent); + ServiceToWorkerMsg::BuildConsensusNetworking(receiver, table, authorities) => { + self.build_consensus_networking(receiver, table, authorities); } ServiceToWorkerMsg::SubmitValidatedCollation(receipt, pov_block, chunks) => { let relay_parent = receipt.relay_parent; @@ -985,6 +1001,16 @@ impl Worker where Some(msg) => self.handle_service_message(msg), None => return, }, + consensus_service_msg = self.consensus_networking_receivers.next() => match consensus_service_msg { + Some((Some(msg), receiver)) => { + self.handle_service_message(msg); + self.consensus_networking_receivers.push(receiver.into_future()); + }, + Some((None, receiver)) => { + self.protocol_handler.drop_consensus_networking(&receiver.relay_parent); + }, + None => {}, + }, background_msg = self.background_receiver.next() => match background_msg { Some(msg) => self.handle_background_message(msg), None => return, @@ -1017,6 +1043,7 @@ async fn worker_loop( background_to_main_sender: background_tx, background_receiver: background_rx, service_receiver: receiver, + consensus_networking_receivers: Default::default(), }; worker.main_loop().await @@ -1296,24 +1323,6 @@ struct RouterInner { sender: mpsc::Sender, } -impl Drop for RouterInner { - fn drop(&mut self) { - let res = self.sender.try_send( - ServiceToWorkerMsg::DropConsensusNetworking(self.relay_parent) - ); - - if let Err(e) = res { - assert!( - !e.is_full(), - "futures 0.3 guarantees at least one free slot in the capacity \ - per sender; this is the first message sent via this sender; \ - therefore we will not have to wait for capacity; qed" - ); - // other error variants (disconnection) are fine here. - } - } -} - impl Service { /// Register an availablility-store that the network can query. pub fn register_availability_store(&self, store: av_store::Store) { @@ -1379,14 +1388,15 @@ impl ParachainNetwork for Service { let relay_parent = table.signing_context().parent_hash.clone(); Box::pin(async move { + let (router_sender, receiver) = mpsc::channel(0); sender.send( - ServiceToWorkerMsg::BuildConsensusNetworking(table, authorities) + ServiceToWorkerMsg::BuildConsensusNetworking(receiver, table, authorities) ).await?; Ok(Router { inner: Arc::new(RouterInner { relay_parent, - sender, + sender: router_sender, }) }) }) diff --git a/polkadot/network/src/protocol/tests.rs b/polkadot/network/src/protocol/tests.rs index 2b82dcb7f7..481b27cf72 100644 --- a/polkadot/network/src/protocol/tests.rs +++ b/polkadot/network/src/protocol/tests.rs @@ -54,6 +54,7 @@ type GossipStreamEntry = (mpsc::UnboundedReceiver, oneshot::S #[derive(Default, Clone)] struct MockGossip { inner: Arc>>, + gossip_messages: Arc>>, } impl MockGossip { @@ -102,8 +103,8 @@ impl crate::legacy::GossipService for MockGossip { }) } - fn gossip_message(&self, _topic: Hash, _message: GossipMessage) { - + fn gossip_message(&self, topic: Hash, message: GossipMessage) { + self.gossip_messages.lock().insert(topic, message); } fn send_message(&self, _who: PeerId, _message: GossipMessage) { @@ -250,22 +251,6 @@ fn test_setup(config: Config) -> ( (service, mock_gossip, pool, worker_task) } -#[test] -fn router_inner_drop_sends_worker_message() { - let parent = [1; 32].into(); - - let (sender, mut receiver) = mpsc::channel(0); - drop(RouterInner { - relay_parent: parent, - sender, - }); - - match receiver.try_next() { - Ok(Some(ServiceToWorkerMsg::DropConsensusNetworking(x))) => assert_eq!(parent, x), - _ => panic!("message not sent"), - } -} - #[test] fn worker_task_shuts_down_when_sender_dropped() { let (service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None }); @@ -274,6 +259,30 @@ fn worker_task_shuts_down_when_sender_dropped() { let _ = pool.run_until(worker_task); } +/// Given the async nature of `select!` that is being used in the main loop of the worker +/// and that consensus instances use their own channels, we don't know when the synchronize message +/// is handled. This helper functions checks multiple times that the given instance is dropped. Even +/// if the first round fails, the second one should be successful as the consensus instance drop +/// should be already handled this time. +fn wait_for_instance_drop(service: &mut Service, pool: &mut LocalPool, instance: Hash) { + let mut try_counter = 0; + let max_tries = 3; + + while try_counter < max_tries { + let dropped = pool.run_until(service.synchronize(move |proto| { + !proto.consensus_instances.contains_key(&instance) + })); + + if dropped { + return; + } + + try_counter += 1; + } + + panic!("Consensus instance `{}` wasn't dropped!", instance); +} + #[test] fn consensus_instances_cleaned_up() { let (mut service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None }); @@ -300,11 +309,61 @@ fn consensus_instances_cleaned_up() { drop(router); + wait_for_instance_drop(&mut service, &mut pool, relay_parent); +} + +#[test] +fn collation_is_received_with_dropped_router() { + let (mut service, gossip, mut pool, worker_task) = test_setup(Config { collating_for: None }); + let relay_parent = [0; 32].into(); + let topic = crate::legacy::gossip::attestation_topic(relay_parent); + + let signing_context = SigningContext { + session_index: Default::default(), + parent_hash: relay_parent, + }; + let table = Arc::new(SharedTable::new( + vec![Sr25519Keyring::Alice.public().into()], + HashMap::new(), + Some(Arc::new(Sr25519Keyring::Alice.pair().into())), + signing_context, + AvailabilityStore::new_in_memory(service.clone()), + None, + )); + + pool.spawner().spawn_local(worker_task).unwrap(); + + let router = pool.run_until( + service.build_table_router(table, &[]) + ).unwrap(); + + let receipt = AbridgedCandidateReceipt { relay_parent, ..Default::default() }; + let local_collation_future = router.local_collation( + receipt, + PoVBlock { block_data: BlockData(Vec::new()) }, + (0, &[]), + ); + + // Drop the router and make sure that the consensus instance is still alive + drop(router); + assert!(pool.run_until(service.synchronize(move |proto| { - !proto.consensus_instances.contains_key(&relay_parent) + proto.consensus_instances.contains_key(&relay_parent) + }))); + + // The gossip message should still be unknown + assert!(!gossip.gossip_messages.lock().contains_key(&topic)); + + pool.run_until(local_collation_future).unwrap(); + + // Make sure the instance is now dropped and the message was gossiped + wait_for_instance_drop(&mut service, &mut pool, relay_parent); + assert!(pool.run_until(service.synchronize(move |_| { + gossip.gossip_messages.lock().contains_key(&topic) }))); } + #[test] fn validator_peer_cleaned_up() { let (mut service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None }); diff --git a/polkadot/primitives/src/parachain.rs b/polkadot/primitives/src/parachain.rs index 6b9e95f9a8..de05063387 100644 --- a/polkadot/primitives/src/parachain.rs +++ b/polkadot/primitives/src/parachain.rs @@ -75,9 +75,10 @@ pub type ValidatorId = validator_app::Public; /// Index of the validator is used as a lightweight replacement of the `ValidatorId` when appropriate. pub type ValidatorIndex = u32; -/// A Parachain validator keypair. -#[cfg(feature = "std")] -pub type ValidatorPair = validator_app::Pair; +application_crypto::with_pair! { + /// A Parachain validator keypair. + pub type ValidatorPair = validator_app::Pair; +} /// Signature with which parachain validators sign blocks. ///