From b2db7857a79834bd2091ccbd50cc654b5304ac72 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sat, 29 Dec 2018 18:05:45 +0100 Subject: [PATCH] ensure all spawned futures are exit-guarded (#59) --- polkadot/collator/src/lib.rs | 5 +- polkadot/consensus/src/attestation_service.rs | 45 +++++++------ polkadot/network/src/consensus.rs | 67 ++++++++++++------- polkadot/network/src/router.rs | 20 +++--- polkadot/service/src/lib.rs | 6 +- 5 files changed, 85 insertions(+), 58 deletions(-) diff --git a/polkadot/collator/src/lib.rs b/polkadot/collator/src/lib.rs index 490b17ba05..de0a36e096 100644 --- a/polkadot/collator/src/lib.rs +++ b/polkadot/collator/src/lib.rs @@ -246,7 +246,7 @@ impl IntoExit for CollationNode where impl Worker for CollationNode where P: ParachainContext + Send + 'static, - E: Future + Send + 'static + E: Future + Clone + Send + 'static { type Work = Box + Send>; @@ -267,6 +267,7 @@ impl Worker for CollationNode where let client = service.client(); let network = service.network(); + let inner_exit = exit.clone(); let work = client.import_notification_stream() .for_each(move |notification| { macro_rules! try_fr { @@ -325,7 +326,7 @@ impl Worker for CollationNode where } }); - tokio::spawn(silenced); + tokio::spawn(silenced.select(inner_exit.clone()).then(|_| Ok(()))); Ok(()) }); diff --git a/polkadot/consensus/src/attestation_service.rs b/polkadot/consensus/src/attestation_service.rs index afc47a92ae..fc87f83d9e 100644 --- a/polkadot/consensus/src/attestation_service.rs +++ b/polkadot/consensus/src/attestation_service.rs @@ -137,28 +137,31 @@ pub(crate) fn start( let consensus = parachain_consensus.clone(); let key = key.clone(); - client.import_notification_stream().for_each(move |notification| { - let parent_hash = notification.hash; - if notification.is_new_best { - let res = client - .runtime_api() - .authorities(&BlockId::hash(parent_hash)) - .map_err(Into::into) - .and_then(|authorities| { - consensus.get_or_instantiate( - parent_hash, - &authorities, - key.clone(), - ) - }); + client.import_notification_stream() + .for_each(move |notification| { + let parent_hash = notification.hash; + if notification.is_new_best { + let res = client + .runtime_api() + .authorities(&BlockId::hash(parent_hash)) + .map_err(Into::into) + .and_then(|authorities| { + consensus.get_or_instantiate( + parent_hash, + &authorities, + key.clone(), + ) + }); - if let Err(e) = res { - warn!("Unable to start parachain consensus on top of {:?}: {}", - parent_hash, e); + if let Err(e) = res { + warn!("Unable to start parachain consensus on top of {:?}: {}", + parent_hash, e); + } } - } - Ok(()) - }) + Ok(()) + }) + .select(exit.clone()) + .then(|_| Ok(())) }; let prune_old_sessions = { @@ -180,6 +183,8 @@ pub(crate) fn start( } }) .map_err(|e| warn!("Timer error {:?}", e)) + .select(exit.clone()) + .then(|_| Ok(())) }; runtime.spawn(notifications); diff --git a/polkadot/network/src/consensus.rs b/polkadot/network/src/consensus.rs index cd83f96250..04263afed0 100644 --- a/polkadot/network/src/consensus.rs +++ b/polkadot/network/src/consensus.rs @@ -41,14 +41,17 @@ use router::Router; // task that processes all gossipped consensus messages, // checking signatures -struct MessageProcessTask

{ +struct MessageProcessTask { inner_stream: mpsc::UnboundedReceiver, parent_hash: Hash, table_router: Router

, + exit: E, } -impl MessageProcessTask

- where P::Api: ParachainHost, +impl MessageProcessTask where + P: ProvideRuntimeApi + Send + Sync + 'static, + P::Api: ParachainHost, + E: Future + Clone + Send + 'static, { fn process_message(&self, msg: ConsensusMessage) -> Option> { use polkadot_consensus::SignedStatement; @@ -61,7 +64,7 @@ impl MessageProcessTask

statement.sender, &self.parent_hash ) { - self.table_router.import_statement(statement); + self.table_router.import_statement(statement, self.exit.clone()); } } @@ -69,8 +72,10 @@ impl MessageProcessTask

} } -impl Future for MessageProcessTask

- where P::Api: ParachainHost, +impl Future for MessageProcessTask where + P: ProvideRuntimeApi + Send + Sync + 'static, + P::Api: ParachainHost, + E: Future + Clone + Send + 'static, { type Item = (); type Error = (); @@ -90,30 +95,34 @@ impl Future for MessageProcessTask } /// Wrapper around the network service -pub struct ConsensusNetwork

{ +pub struct ConsensusNetwork { network: Arc, api: Arc

, + exit: E, } -impl

ConsensusNetwork

{ +impl ConsensusNetwork { /// Create a new consensus networking object. - pub fn new(network: Arc, api: Arc

) -> Self { - ConsensusNetwork { network, api } + pub fn new(network: Arc, exit: E, api: Arc

) -> Self { + ConsensusNetwork { network, exit, api } } } -impl

Clone for ConsensusNetwork

{ +impl Clone for ConsensusNetwork { fn clone(&self) -> Self { ConsensusNetwork { network: self.network.clone(), + exit: self.exit.clone(), api: self.api.clone(), } } } /// A long-lived network which can create parachain statement routing processes on demand. -impl Network for ConsensusNetwork

- where P::Api: ParachainHost, +impl Network for ConsensusNetwork where + P: ProvideRuntimeApi + Send + Sync + 'static, + P::Api: ParachainHost, + E: Clone + Future + Send + 'static, { type TableRouter = Router

; @@ -122,7 +131,7 @@ impl Network for ConsensusNetwork< &self, _validators: &[SessionKey], table: Arc, - task_executor: TaskExecutor + task_executor: TaskExecutor, ) -> Self::TableRouter { let parent_hash = table.consensus_parent_hash().clone(); @@ -139,22 +148,28 @@ impl Network for ConsensusNetwork< ); let attestation_topic = table_router.gossip_topic(); + let exit = self.exit.clone(); // spin up a task in the background that processes all incoming statements // TODO: propagate statements on a timer? let inner_stream = self.network.consensus_gossip().write().messages_for(attestation_topic); - task_executor.spawn(self.network.with_spec(|spec, ctx| { - spec.new_consensus(ctx, parent_hash, CurrentConsensus { - knowledge, - local_session_key, - }); + let process_task = self.network + .with_spec(|spec, ctx| { + spec.new_consensus(ctx, parent_hash, CurrentConsensus { + knowledge, + local_session_key, + }); - MessageProcessTask { - inner_stream, - parent_hash, - table_router: table_router.clone(), - } - })); + MessageProcessTask { + inner_stream, + parent_hash, + table_router: table_router.clone(), + exit, + } + }) + .then(|_| Ok(())); + + task_executor.spawn(process_task); table_router } @@ -176,7 +191,7 @@ impl Future for AwaitingCollation { } } -impl Collators for ConsensusNetwork

+impl Collators for ConsensusNetwork where P::Api: ParachainHost, { type Error = NetworkDown; diff --git a/polkadot/network/src/router.rs b/polkadot/network/src/router.rs index e2828865af..205aff1e65 100644 --- a/polkadot/network/src/router.rs +++ b/polkadot/network/src/router.rs @@ -104,7 +104,9 @@ impl Router

where P::Api: ParachainHost { /// Import a statement whose signature has been checked already. - pub(crate) fn import_statement(&self, statement: SignedStatement) { + pub(crate) fn import_statement(&self, statement: SignedStatement, exit: Exit) + where Exit: Future + Clone + Send + 'static + { trace!(target: "p_net", "importing consensus statement {:?}", statement.statement); // defer any statements for which we haven't imported the candidate yet @@ -143,14 +145,16 @@ impl Router

for (producer, statement) in producers.into_iter().zip(statements) { self.knowledge.lock().note_statement(statement.sender, &statement.statement); - if let Some(producer) = producer { + if let Some(work) = producer.map(|p| self.create_work(c_hash, p)) { trace!(target: "consensus", "driving statement work to completion"); - self.dispatch_work(c_hash, producer); + self.task_executor.spawn(work.select(exit.clone()).then(|_| Ok(()))); } } } - fn dispatch_work(&self, candidate_hash: Hash, producer: StatementProducer) where + fn create_work(&self, candidate_hash: Hash, producer: StatementProducer) + -> impl Future + where D: Future + Send + 'static, E: Future + Send + 'static, { @@ -173,13 +177,13 @@ impl Router

let knowledge = self.knowledge.clone(); let attestation_topic = self.attestation_topic.clone(); - let work = producer.prime(validate) + producer.prime(validate) .map(move |produced| { // store the data before broadcasting statements, so other peers can fetch. knowledge.lock().note_candidate( candidate_hash, produced.block_data, - produced.extrinsic + produced.extrinsic, ); if produced.validity.is_none() && produced.availability.is_none() { @@ -204,9 +208,7 @@ impl Router

); } }) - .map_err(|e| debug!(target: "p_net", "Failed to produce statements: {:?}", e)); - - self.task_executor.spawn(work); + .map_err(|e| debug!(target: "p_net", "Failed to produce statements: {:?}", e)) } } diff --git a/polkadot/service/src/lib.rs b/polkadot/service/src/lib.rs index 05427b752c..82c641b26b 100644 --- a/polkadot/service/src/lib.rs +++ b/polkadot/service/src/lib.rs @@ -205,7 +205,11 @@ construct_service_factory! { let client = service.client(); // collator connections and consensus network both fulfilled by this - let consensus_network = ConsensusNetwork::new(service.network(), service.client()); + let consensus_network = ConsensusNetwork::new( + service.network(), + service.on_exit(), + service.client(), + ); let proposer_factory = ::consensus::ProposerFactory::new( client.clone(), consensus_network.clone(),