From 10ecc32c55fd7883b848a039273f4041e8133d0a Mon Sep 17 00:00:00 2001 From: Ashley Date: Fri, 13 Dec 2019 18:37:34 +0100 Subject: [PATCH] Rewrite some Future structs as async functions (#679) * Squashed commit of the following: commit e97a17157ae0887320994661e2f816275fc75b76 Author: Ashley Date: Tue Dec 10 15:06:28 2019 +0100 Rewrite some functions as async commit 970e485179f1e087cf0a51c6a4e71f923e87df45 Merge: f98966ac df3ea965 Author: Ashley Date: Tue Dec 10 11:19:37 2019 +0100 Merge remote-tracking branch 'parity/master' into ashley-futures-update commit f98966ac188067158071d1e3e243c34ea5738f56 Author: Ashley Date: Mon Dec 9 23:40:20 2019 +0100 Add async blocks back in commit 7fa88af0271db659de9274c94cb8e7eead0e4289 Author: Ashley Date: Mon Dec 9 23:17:02 2019 +0100 Revert "Asyncify network functions" This reverts commit f20ae6548dc482cb1e75bc80641cfe55c6131a53. commit 82413550cdac40bd14a09f62df12de49dd7e55af Author: Ashley Date: Mon Dec 9 19:09:55 2019 +0100 Fix validation test again commit 47e002b08369c9c775b92aea9b6f6ed81b30241b Author: Ashley Date: Mon Dec 9 19:07:43 2019 +0100 Switch favicon commit 0c5c1409078fc57120a39e40ec5cb1763d67d593 Author: Ashley Date: Mon Dec 9 18:54:10 2019 +0100 Fix validation test commit 8bb6a0189fe824da09054cbf5b06f11a0f87072d Author: Ashley Date: Mon Dec 9 18:53:54 2019 +0100 Nits commit 33410f3a4910d3e688956cecfcca02cc2dfa6a7a Author: Ashley Date: Mon Dec 9 18:43:09 2019 +0100 Fix av store test commit f0c517eb240c42848cdb3305e0b554ef407bdfaa Merge: 938f411a 60e72111 Author: Ashley Date: Mon Dec 9 18:21:39 2019 +0100 Merge branch 'ashley-futures-updates' into ashley-futures-update commit 60e72111651f2b366592c1e56756c6bf5d8ce2f1 Author: Ashley Date: Mon Dec 9 18:19:40 2019 +0100 Clean up browser validation worker error commit f20ae6548dc482cb1e75bc80641cfe55c6131a53 Author: Ashley Date: Mon Dec 9 18:16:40 2019 +0100 Asyncify network functions commit b22758d0a3852d701923bd238484e1c9eabec5e2 Merge: 2e8b05ed ef562cd7 Author: Ashley Date: Mon Dec 9 17:47:26 2019 +0100 Merge remote-tracking branch 'parity/master' into ashley-futures-updates commit 2e8b05edf1a1fadd6943f967c27b6d34675ba06a Author: Ashley Date: Mon Dec 9 17:45:52 2019 +0100 Box pin changes commit 08bfdf7f2d27721abffee49221213304ebc4fd47 Author: Ashley Date: Mon Dec 9 17:15:38 2019 +0100 Update network/src/lib.rs Co-Authored-By: Pierre Krieger commit d8be456c508d5e5a03178db45d9f272b302a8a65 Author: Ashley Date: Mon Dec 9 17:15:32 2019 +0100 Update network/src/lib.rs Co-Authored-By: Pierre Krieger commit ec7367276fdd374b19f41555fd5985454c559600 Author: Ashley Date: Mon Dec 9 17:14:36 2019 +0100 Update availability-store/src/worker.rs Co-Authored-By: Pierre Krieger commit 938f411a9365e9c5fb16bfedb62aacac4403d063 Author: Ashley Date: Mon Dec 9 17:05:05 2019 +0100 Revert "Revert removal of tokio_executor that causes tokio version mismatch panic" This reverts commit cfeb50c01d8df5e209483406a711e64761b44ae9. commit f92f58044b4fe04bde73a60820d154080dd64b16 Author: Ashley Date: Mon Dec 9 15:47:35 2019 +0100 Fix adder test parachain commit cfeb50c01d8df5e209483406a711e64761b44ae9 Author: Ashley Date: Mon Dec 9 15:31:36 2019 +0100 Revert removal of tokio_executor that causes tokio version mismatch panic commit 5bcb83a122b9a30f240a238ca670c6b658f4ddf1 Author: Ashley Date: Mon Dec 9 15:17:55 2019 +0100 Fix typo commit fc02b1dc16e277649677396833a8d70e8588a56c Author: Ashley Date: Mon Dec 9 15:02:50 2019 +0100 Fix collator commit 6c4ff5b3bf1084a618ffec2d864090c9c8077f0f Author: Ashley Date: Mon Dec 9 14:35:37 2019 +0100 Small changes commit e1338cb4450df5377d8c911da56445914d667472 Author: Ashley Date: Mon Dec 9 14:24:42 2019 +0100 Fix network tests commit 4e458f7a91c1ed5c986795f40ed55e596d176c4b Author: Ashley Date: Mon Dec 9 12:25:26 2019 +0100 Remove futures01 from availability-store commit 5729f6cd6b53f061ff155320c815509feb02309e Author: Ashley Date: Mon Dec 9 12:22:33 2019 +0100 Fix validation tests commit a820612565b42780f8b6c09c9c1c30f06a9985ba Author: Ashley Date: Mon Dec 9 12:01:48 2019 +0100 Fix availability store tests commit 112344faeee5f8f03b3b87c6baf7036a7fcbe415 Author: Ashley Date: Mon Dec 9 11:36:03 2019 +0100 Update tokio version commit d2de6d8b3f0c3682679fe437d5459ac50a3c3895 Author: Ashley Date: Mon Dec 9 11:33:25 2019 +0100 Revert cli tokio version to avoid libp2p panic commit 0c5f24e0c1131ac58a947448456e7fb62c869702 Author: Ashley Date: Mon Dec 9 11:27:13 2019 +0100 Switch to polkadot-master commit 2e2311e33a4af87c2c545094ea8cb595cd6cfe2d Author: Ashley Date: Fri Dec 6 15:07:21 2019 +0100 Re-add release flag commit 6adc1b6114e154a590acf82acfaf0c1265409518 Merge: 9767f832 533c80ad Author: Ashley Date: Fri Dec 6 13:36:35 2019 +0100 Merge remote-tracking branch 'parity/master' into ashley-compile-to-wasm commit 9767f8325c33211065ef6830becdac0e3cf852de Merge: c528dc6d 0bf7d294 Author: Ashley Date: Wed Dec 4 17:11:39 2019 +0100 Merge remote-tracking branch 'parity/master' into ashley-compile-to-wasm commit c528dc6df8fc31cdcbc10889636355241398debd Author: Ashley Date: Wed Dec 4 17:07:00 2019 +0100 Fix wasm build commit da233a122c678dc7767dac7cc6e2564575b15cc8 Author: Ashley Date: Wed Dec 4 16:25:49 2019 +0100 tidy commit 832f8054df78afbcef1903e0f9e7e246b348c10d Merge: 4e1da888 121c917d Author: Ashley Date: Wed Dec 4 15:56:56 2019 +0100 Merge remote-tracking branch 'parity/master' into ashley-compile-to-wasm commit 4e1da8888dd2160064dd453782fb05513c65ade4 Author: Ashley Date: Tue Dec 3 16:47:02 2019 +0100 Temp switch back to substrate/master commit af88a87338688797bbc52315fdd0fc22cf23c6cf Merge: a03a980c 7832ad93 Author: Ashley Date: Mon Dec 2 19:33:14 2019 +0100 Merge remote-tracking branch 'parity/master' into ashley-compile-to-wasm commit a03a980ce417ec7b446bfcbe7a66ec0ed6458135 Merge: 31a88a93 0c1ef335 Author: Ashley Date: Mon Dec 2 13:52:37 2019 +0100 Merge remote-tracking branch 'parity/master' into ashley-compile-to-wasm commit 31a88a930ffdf5da72b3e587ec8c0e6b00922e3e Author: Ashley Date: Mon Dec 2 13:52:35 2019 +0100 Tidy commit 5b33b7a7af08d7a3aa3853b8e4995484fb640d52 Author: Ashley Date: Mon Dec 2 11:55:51 2019 +0100 Add browser-demo commit 868f6e51dfdc0a64252acd9adabe7b9ba436b1f4 Author: Ashley Date: Mon Dec 2 10:51:57 2019 +0100 Add initial browser file commit e5e399c20f1dc4e1023ee57773dcdd9ab2a0a14b Author: Ashley Date: Mon Dec 2 10:45:02 2019 +0100 Add browser-demo commit 408288b05292d952944a6b8e1f2bcf9cf259a040 Author: Ashley Date: Sun Dec 1 19:28:33 2019 +0100 Get polkadot to compile via wasm! commit 04ffe72e868be57841d31f01eec1b90423a595d6 Author: Ashley Date: Sun Dec 1 19:28:16 2019 +0100 Migrate service commit 119f0829a53b825a3ebc9efdefa76ae7eabb04aa Merge: 93fb6428 37fec553 Author: Ashley Date: Sun Dec 1 17:43:49 2019 +0100 Merge remote-tracking branch 'parity/master' into ashley-compile-to-wasm commit 93fb6428501bac612a1675cf3b6e3d26f5bbc7c2 Author: Ashley Date: Sun Dec 1 12:21:25 2019 +0100 Switch branch commit 0c4fe8331bdc9665ac2427eb8c795112ac728d70 Author: Ashley Date: Sat Nov 30 11:45:59 2019 +0100 Tidy up validation commit 73563253d95962657108820ae130a8d3f3093ee8 Author: Ashley Date: Sat Nov 30 11:39:09 2019 +0100 Tidy up network commit 1c9cf0427c0e2d15c4b6d52b91d67d4a3963e30d Author: Ashley Date: Sat Nov 30 01:16:35 2019 +0100 Final changes to validation commit 322cca5224fdca0a29d88ff91700ef704a9d0c2a Author: Ashley Date: Sat Nov 30 00:31:55 2019 +0100 Migrate network to std futures commit 96f1a99491f5ae2957effa58cc1e385014575a32 Author: Ashley Date: Fri Nov 29 23:31:04 2019 +0100 Migrate validation to std futures commit aaf5e55fffd1367c05687eb34f4365a24e3a34c0 Author: Ashley Date: Fri Nov 29 17:10:11 2019 +0100 Switch to Spawn trait commit 2ab282f57e8b9a55cf8d285b283cf009216511d2 Merge: cceb6b72 ed7ee572 Author: Ashley Date: Fri Nov 29 16:31:24 2019 +0100 Merge remote-tracking branch 'parity/master' into ashley-compile-to-wasm commit cceb6b72f5677a1c43d2cd61bd525539054f0c01 Author: Ashley Date: Fri Nov 29 15:47:14 2019 +0100 Make validation work on wasm! commit b45a95cf7d829a916bf2ad6936d1e7f4b6f3ef77 Merge: 4ec635ee db7eaa6b Author: Ashley Date: Fri Nov 29 13:57:23 2019 +0100 Merge remote-tracking branch 'tomaka/wasm-start' into HEAD commit db7eaa6bd5d3bbcea829570fb47ab4d06f3558ce Merge: 6f97dbb7 f826ce53 Author: Pierre Krieger Date: Thu Nov 28 13:58:15 2019 +0100 Merge branch 'master' into wasm-start commit 6f97dbb786750d854cf8f7a56c6a336ea5979228 Author: Pierre Krieger Date: Thu Nov 28 12:47:45 2019 +0100 Use --manifest-path instead commit 20104e98ff1713b6c81b0251b43d060d4e672d55 Author: Pierre Krieger Date: Thu Nov 28 10:44:51 2019 +0100 Make availability-store compile for WASM * Fix build * Fix futures blocking panic in validators (again) * Deindent --- polkadot/collator/src/lib.rs | 31 ++- polkadot/network/src/lib.rs | 34 ++- polkadot/network/src/router.rs | 5 +- polkadot/network/src/tests/validation.rs | 87 +++----- polkadot/network/src/validation.rs | 135 ++++-------- polkadot/validation/Cargo.toml | 2 +- polkadot/validation/src/collation.rs | 95 ++------ polkadot/validation/src/lib.rs | 208 ++++-------------- .../validation/src/shared_table/includable.rs | 25 +-- polkadot/validation/src/shared_table/mod.rs | 4 +- 10 files changed, 178 insertions(+), 448 deletions(-) diff --git a/polkadot/collator/src/lib.rs b/polkadot/collator/src/lib.rs index bfc5c346d4..20a17a2166 100644 --- a/polkadot/collator/src/lib.rs +++ b/polkadot/collator/src/lib.rs @@ -48,6 +48,7 @@ use std::collections::HashSet; use std::fmt; use std::sync::Arc; use std::time::Duration; +use std::pin::Pin; use futures::{future, Future, Stream, FutureExt, TryFutureExt, StreamExt, task::Spawn}; use log::{warn, error}; @@ -242,20 +243,26 @@ impl RelayChainContext for ApiContext> + Unpin + Send>; + type FutureEgress = Pin> + Send>>; fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress { - // TODO: https://github.com/paritytech/polkadot/issues/253 - // - // Fetch ingress and accumulate all unrounted egress - let _session = self.network.instantiate_leaf_work(LeafWorkParams { - local_session_key: None, - parent_hash: self.parent_hash, - authorities: self.validators.clone(), - }) - .map_err(|e| format!("unable to instantiate validation session: {:?}", e)); + let network = self.network.clone(); + let parent_hash = self.parent_hash; + let authorities = self.validators.clone(); - Box::new(future::ok(ConsolidatedIngress(Vec::new()))) + async move { + // TODO: https://github.com/paritytech/polkadot/issues/253 + // + // Fetch ingress and accumulate all unrounted egress + let _session = network.instantiate_leaf_work(LeafWorkParams { + local_session_key: None, + parent_hash, + authorities, + }) + .map_err(|e| format!("unable to instantiate validation session: {:?}", e)); + + Ok(ConsolidatedIngress(Vec::new())) + }.boxed() } } @@ -425,7 +432,7 @@ impl Worker for CollationNode where ); let exit = inner_exit_2.clone(); - tokio::spawn(future::select(res, exit).map(drop)); + tokio::spawn(future::select(res.boxed(), exit).map(drop)); }) }); diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index 312e0ed69d..65f7315b3e 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -28,7 +28,6 @@ pub mod gossip; use codec::{Decode, Encode}; use futures::channel::{oneshot, mpsc}; use futures::prelude::*; -use futures::future::Either; use polkadot_primitives::{Block, Hash, Header}; use polkadot_primitives::parachain::{ Id as ParaId, CollatorId, CandidateReceipt, Collation, PoVBlock, @@ -837,25 +836,6 @@ impl PolkadotProtocol { debug!(target: "p_net", "Importing local collation on relay parent {:?} and parachain {:?}", relay_parent, collation.info.parachain_index); - let res = match self.availability_store { - Some(ref availability_store) => { - let availability_store_cloned = availability_store.clone(); - let collation_cloned = collation.clone(); - Either::Left((async move { - let _ = availability_store_cloned.make_available(av_store::Data { - relay_parent, - parachain_id: collation_cloned.info.parachain_index, - block_data: collation_cloned.pov.block_data.clone(), - outgoing_queues: Some(outgoing_targeted.clone().into()), - }).await; - } - ) - .boxed() - ) - } - None => Either::Right(futures::future::ready(())), - }; - for (primary, cloned_collation) in self.local_collations.add_collation(relay_parent, targets, collation.clone()) { match self.validators.get(&primary) { Some(who) => { @@ -871,7 +851,19 @@ impl PolkadotProtocol { } } - res + let availability_store = self.availability_store.clone(); + let collation_cloned = collation.clone(); + + async move { + if let Some(availability_store) = availability_store { + let _ = availability_store.make_available(av_store::Data { + relay_parent, + parachain_id: collation_cloned.info.parachain_index, + block_data: collation_cloned.pov.block_data.clone(), + outgoing_queues: Some(outgoing_targeted.clone().into()), + }).await; + } + } } /// Give the network protocol a handle to an availability store, used for diff --git a/polkadot/network/src/router.rs b/polkadot/network/src/router.rs index 78922a1307..ae1667bab7 100644 --- a/polkadot/network/src/router.rs +++ b/polkadot/network/src/router.rs @@ -41,8 +41,9 @@ use log::{debug, trace}; use std::collections::{HashMap, HashSet}; use std::io; use std::sync::Arc; +use std::pin::Pin; -use crate::validation::{self, LeafWorkDataFetcher, Executor}; +use crate::validation::{LeafWorkDataFetcher, Executor}; use crate::NetworkService; /// Compute the gossip topic for attestations on the given parent hash. @@ -232,7 +233,7 @@ impl TableRouter for Router wh E: Future + Clone + Send + 'static, { type Error = io::Error; - type FetchValidationProof = validation::PoVReceiver; + type FetchValidationProof = Pin> + Send>>; // We have fetched from a collator and here the receipt should have been already formed. fn local_collation( diff --git a/polkadot/network/src/tests/validation.rs b/polkadot/network/src/tests/validation.rs index fc976f9bde..463ab50d59 100644 --- a/polkadot/network/src/tests/validation.rs +++ b/polkadot/network/src/tests/validation.rs @@ -41,7 +41,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::pin::Pin; use std::task::{Poll, Context}; -use futures::{prelude::*, channel::mpsc}; +use futures::{prelude::*, channel::mpsc, future::{select, Either}}; use codec::Encode; use super::{TestContext, TestChainContext}; @@ -66,77 +66,48 @@ fn clone_gossip(n: &TopicNotification) -> TopicNotification { } } -struct GossipRouter { - incoming_messages: mpsc::UnboundedReceiver<(Hash, TopicNotification)>, - incoming_streams: mpsc::UnboundedReceiver<(Hash, mpsc::UnboundedSender)>, - outgoing: Vec<(Hash, mpsc::UnboundedSender)>, - messages: Vec<(Hash, TopicNotification)>, -} +async fn gossip_router( + mut incoming_messages: mpsc::UnboundedReceiver<(Hash, TopicNotification)>, + mut incoming_streams: mpsc::UnboundedReceiver<(Hash, mpsc::UnboundedSender)> +) { + let mut outgoing: Vec<(Hash, mpsc::UnboundedSender)> = Vec::new(); + let mut messages = Vec::new(); -impl GossipRouter { - fn add_message(&mut self, topic: Hash, message: TopicNotification) { - self.outgoing.retain(|&(ref o_topic, ref sender)| { - o_topic != &topic || sender.unbounded_send(clone_gossip(&message)).is_ok() - }); - self.messages.push((topic, message)); - } + loop { + match select(incoming_messages.next(), incoming_streams.next()).await { + Either::Left((Some((topic, message)), _)) => { + outgoing.retain(|&(ref o_topic, ref sender)| { + o_topic != &topic || sender.unbounded_send(clone_gossip(&message)).is_ok() + }); + messages.push((topic, message)); + }, + Either::Right((Some((topic, sender)), _)) => { + for message in messages.iter() + .filter(|&&(ref t, _)| t == &topic) + .map(|&(_, ref msg)| clone_gossip(msg)) + { + if let Err(_) = sender.unbounded_send(message) { return } + } - fn add_outgoing(&mut self, topic: Hash, sender: mpsc::UnboundedSender) { - for message in self.messages.iter() - .filter(|&&(ref t, _)| t == &topic) - .map(|&(_, ref msg)| clone_gossip(msg)) - { - if let Err(_) = sender.unbounded_send(message) { return } + outgoing.push((topic, sender)); + }, + Either::Left((None, _)) | Either::Right((None, _)) => panic!("ended early.") } - - self.outgoing.push((topic, sender)); } } -impl Future for GossipRouter { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let this = Pin::into_inner(self); - - loop { - match Pin::new(&mut this.incoming_messages).poll_next(cx) { - Poll::Ready(Some((topic, message))) => this.add_message(topic, message), - Poll::Ready(None) => panic!("ended early."), - Poll::Pending => break, - } - } - - loop { - match Pin::new(&mut this.incoming_streams).poll_next(cx) { - Poll::Ready(Some((topic, sender))) => this.add_outgoing(topic, sender), - Poll::Ready(None) => panic!("ended early."), - Poll::Pending => break, - } - } - - Poll::Pending - } -} - - #[derive(Clone)] struct GossipHandle { send_message: mpsc::UnboundedSender<(Hash, TopicNotification)>, send_listener: mpsc::UnboundedSender<(Hash, mpsc::UnboundedSender)>, } -fn make_gossip() -> (GossipRouter, GossipHandle) { +fn make_gossip() -> (impl Future, GossipHandle) { let (message_tx, message_rx) = mpsc::unbounded(); let (listener_tx, listener_rx) = mpsc::unbounded(); ( - GossipRouter { - incoming_messages: message_rx, - incoming_streams: listener_rx, - outgoing: Vec::new(), - messages: Vec::new(), - }, + gossip_router(message_rx, listener_rx), GossipHandle { send_message: message_tx, send_listener: listener_tx }, ) } @@ -344,7 +315,7 @@ type TestValidationNetwork = crate::validation::ValidationNetwork< >; struct Built { - gossip: GossipRouter, + gossip: Pin>>, api_handle: Arc>, networks: Vec, } @@ -377,7 +348,7 @@ fn build_network(n: usize, executor: TaskExecutor) -> Built { let networks: Vec<_> = networks.collect(); Built { - gossip: gossip_router, + gossip: gossip_router.boxed(), api_handle, networks, } diff --git a/polkadot/network/src/validation.rs b/polkadot/network/src/validation.rs index f6e652fae2..1a7da20e64 100644 --- a/polkadot/network/src/validation.rs +++ b/polkadot/network/src/validation.rs @@ -33,14 +33,13 @@ use polkadot_primitives::parachain::{ use futures::prelude::*; use futures::task::SpawnExt; pub use futures::task::Spawn as Executor; -use futures::channel::oneshot::{self, Receiver}; +use futures::channel::oneshot; use futures::future::{ready, select}; use std::collections::hash_map::{HashMap, Entry}; use std::io; use std::sync::Arc; use std::pin::Pin; -use std::task::{Poll, Context}; use arrayvec::ArrayVec; use parking_lot::Mutex; @@ -242,47 +241,30 @@ impl ParachainNetwork for ValidationNetwork where #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub struct NetworkDown; -/// A future that resolves when a collation is received. -pub struct AwaitingCollation { - outer: oneshot::Receiver>, - inner: Option> -} - -impl Future for AwaitingCollation { - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let this = Pin::into_inner(self); - - if let Some(ref mut inner) = this.inner { - return Pin::new(inner).poll(cx).map_err(|_| NetworkDown) - } - match Pin::new(&mut this.outer).poll(cx) { - Poll::Ready(Ok(inner)) => { - this.inner = Some(inner); - Pin::new(this).poll(cx) - }, - Poll::Ready(Err(_)) => Poll::Ready(Err(NetworkDown)), - Poll::Pending => Poll::Pending, - } - } -} - impl Collators for ValidationNetwork where P: ProvideRuntimeApi + Send + Sync + 'static, P::Api: ParachainHost, N: NetworkService, { type Error = NetworkDown; - type Collation = AwaitingCollation; + type Collation = Pin> + Send>>; fn collate(&self, parachain: ParaId, relay_parent: Hash) -> Self::Collation { let (tx, rx) = oneshot::channel(); - self.network.with_spec(move |spec, _| { - let collation = spec.await_collation(relay_parent, parachain); - let _ = tx.send(collation); - }); - AwaitingCollation{outer: rx, inner: None} + let network = self.network.clone(); + + // A future that resolves when a collation is received. + async move { + network.with_spec(move |spec, _| { + let collation = spec.await_collation(relay_parent, parachain); + let _ = tx.send(collation); + }); + + rx.await + .map_err(|_| NetworkDown)? + .await + .map_err(|_| NetworkDown) + }.boxed() } @@ -348,27 +330,6 @@ impl Knowledge { } } -/// receiver for incoming data. -#[derive(Clone)] -pub struct IncomingReceiver { - inner: future::Shared> -} - -impl Future for IncomingReceiver { - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - match Pin::new(&mut Pin::into_inner(self).inner).poll(cx) { - Poll::Ready(Ok(i)) => Poll::Ready(Ok(Incoming::clone(&i))), - Poll::Ready(Err(_)) => Poll::Ready(Err(io::Error::new( - io::ErrorKind::Other, - "Sending end of channel hung up", - ))), - Poll::Pending => Poll::Pending, - } - } -} - /// A current validation leaf-work instance #[derive(Clone)] pub(crate) struct LiveValidationLeaf { @@ -564,36 +525,6 @@ impl LiveValidationLeaves { } } -/// Receiver for block data. -pub struct PoVReceiver { - outer: Receiver>, - inner: Option> -} - -impl Future for PoVReceiver { - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let this = Pin::into_inner(self); - - let map_err = |_| io::Error::new( - io::ErrorKind::Other, - "Sending end of channel hung up", - ); - - if let Some(ref mut inner) = this.inner { - return Pin::new(inner).poll(cx).map_err(map_err); - } - match Pin::new(&mut this.outer).poll(cx).map_err(map_err)? { - Poll::Ready(inner) => { - this.inner = Some(inner); - Pin::new(this).poll(cx) - } - Poll::Pending => Poll::Pending, - } - } -} - /// Can fetch data for a given validation leaf-work instance. pub struct LeafWorkDataFetcher { network: Arc, @@ -658,9 +589,14 @@ impl LeafWorkDataFetcher where E: Future + Clone + Send + 'static, { /// Fetch PoV block for the given candidate receipt. - pub fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> PoVReceiver { + pub fn fetch_pov_block(&self, candidate: &CandidateReceipt) + -> Pin> + Send>> { + let parachain = candidate.parachain_index; let parent_hash = self.parent_hash; + let network = self.network.clone(); + let candidate = candidate.clone(); + let (tx, rx) = oneshot::channel(); let canon_roots = self.api.runtime_api().ingress( &BlockId::hash(parent_hash), @@ -676,15 +612,24 @@ impl LeafWorkDataFetcher where ) ); - let candidate = candidate.clone(); - let (tx, rx) = oneshot::channel(); - self.network.with_spec(move |spec, ctx| { - if let Ok(Some(canon_roots)) = canon_roots { - let inner_rx = spec.fetch_pov_block(ctx, &candidate, parent_hash, canon_roots); - let _ = tx.send(inner_rx); - } - }); - PoVReceiver { outer: rx, inner: None } + async move { + network.with_spec(move |spec, ctx| { + if let Ok(Some(canon_roots)) = canon_roots { + let inner_rx = spec.fetch_pov_block(ctx, &candidate, parent_hash, canon_roots); + let _ = tx.send(inner_rx); + } + }); + + let map_err = |_| io::Error::new( + io::ErrorKind::Other, + "Sending end of channel hung up", + ); + + rx.await + .map_err(map_err)? + .await + .map_err(map_err) + }.boxed() } } diff --git a/polkadot/validation/Cargo.toml b/polkadot/validation/Cargo.toml index f7c7636e3d..b4d1400fcf 100644 --- a/polkadot/validation/Cargo.toml +++ b/polkadot/validation/Cargo.toml @@ -8,7 +8,7 @@ edition = "2018" futures = "0.3.1" futures-timer = "2.0" parking_lot = "0.9.0" -tokio = { version = "0.2.4", features = ["rt-core", "blocking"] } +tokio = { version = "0.2.4", features = ["rt-core"] } derive_more = "0.14.1" log = "0.4.8" exit-future = "0.2.0" diff --git a/polkadot/validation/src/collation.rs b/polkadot/validation/src/collation.rs index cce4d50ff9..1acb23591e 100644 --- a/polkadot/validation/src/collation.rs +++ b/polkadot/validation/src/collation.rs @@ -32,8 +32,6 @@ use parachain::{wasm_executor::{self, ExternalitiesError, ExecutionMode}, Messag use trie::TrieConfiguration; use futures::prelude::*; use log::debug; -use std::task::{Poll, Context}; -use std::pin::Pin; /// Encapsulates connections to collators and allows collation on any parachain. /// @@ -58,94 +56,41 @@ pub trait Collators: Clone { } /// A future which resolves when a collation is available. -/// -/// This future is fused. -pub struct CollationFetch { +pub async fn collation_fetch( parachain: ParaId, relay_parent_hash: Hash, - relay_parent: BlockId, collators: C, - live_fetch: Option, client: Arc

, max_block_data_size: Option, -} - -impl CollationFetch { - /// Create a new collation fetcher for the given chain. - pub fn new( - parachain: ParaId, - relay_parent_hash: Hash, - collators: C, - client: Arc

, - max_block_data_size: Option, - ) -> Self { - CollationFetch { - relay_parent: BlockId::hash(relay_parent_hash), - relay_parent_hash, - collators, - client, - parachain, - live_fetch: None, - max_block_data_size, - } - } - - /// Access the underlying relay parent hash. - pub fn relay_parent(&self) -> Hash { - self.relay_parent_hash - } - - /// Access the local parachain ID. - pub fn parachain(&self) -> ParaId { - self.parachain - } -} - -impl Future for CollationFetch +) -> Result<(Collation, OutgoingMessages, Balance),C::Error> where P::Api: ParachainHost, C: Collators + Unpin, P: ProvideRuntimeApi, ::Collation: Unpin, { - type Output = Result<(Collation, OutgoingMessages, Balance),C::Error>; + let relay_parent = BlockId::hash(relay_parent_hash); - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let this = Pin::into_inner(self); + loop { + let collation = collators.collate(parachain, relay_parent_hash) + .await?; - loop { - let collation = { - let parachain = this.parachain.clone(); - let (r, c) = (this.relay_parent_hash, &this.collators); + let res = validate_collation( + &*client, + &relay_parent, + &collation, + max_block_data_size, + ); - let future = this.live_fetch - .get_or_insert_with(move || c.collate(parachain, r)); + match res { + Ok((messages, fees)) => { + return Ok((collation, messages, fees)) + } + Err(e) => { + debug!("Failed to validate parachain due to API error: {}", e); - match Pin::new(future).poll(cx) { - Poll::Ready(Ok(c)) => c, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), - Poll::Pending => return Poll::Pending - } - }; - - let res = validate_collation( - &*this.client, - &this.relay_parent, - &collation, - this.max_block_data_size, - ); - - match res { - Ok((messages, fees)) => { - return Poll::Ready(Ok((collation, messages, fees))) - } - Err(e) => { - debug!("Failed to validate parachain due to API error: {}", e); - - // just continue if we got a bad collation or failed to validate - this.live_fetch = None; - this.collators.note_bad_collator(collation.info.collator) - } + // just continue if we got a bad collation or failed to validate + collators.note_bad_collator(collation.info.collator) } } } diff --git a/polkadot/validation/src/lib.rs b/polkadot/validation/src/lib.rs index b779ccbc98..3412dab26a 100644 --- a/polkadot/validation/src/lib.rs +++ b/polkadot/validation/src/lib.rs @@ -34,8 +34,6 @@ use std::{ pin::Pin, sync::Arc, time::{self, Duration, Instant}, - task::{Poll, Context}, - mem, }; use babe_primitives::BabeApi; @@ -60,8 +58,8 @@ use txpool_api::{TransactionPool, InPoolTransaction}; use attestation_service::ServiceHandle; use futures::prelude::*; -use futures::{future::{self, Either, select, ready}, stream::unfold, task::{Spawn, SpawnExt}}; -use collation::CollationFetch; +use futures::{future::{select, ready}, stream::unfold, task::{Spawn, SpawnExt}}; +use collation::collation_fetch; use dynamic_inclusion::DynamicInclusion; use inherents::InherentData; use sp_timestamp::TimestampInherentData; @@ -396,7 +394,7 @@ impl ParachainValidation where let with_router = move |router: N::TableRouter| { // fetch a local collation from connected collators. - let collation_work = CollationFetch::new( + let collation_work = collation_fetch( validation_para, relay_parent, collators, @@ -611,14 +609,13 @@ impl consensus::Proposer for Proposer where C::Api: ParachainHost + BlockBuilderApi + ApiExt, { type Error = Error; - type Create = Either, future::Ready>>; + type Create = Pin> + Send>>; fn propose(&mut self, inherent_data: InherentData, inherent_digests: DigestFor, max_duration: Duration, ) -> Self::Create { - const ATTEMPT_PROPOSE_EVERY: Duration = Duration::from_millis(100); const SLOT_DURATION_DENOMINATOR: u64 = 3; // wait up to 1/3 of the slot for candidates. let initial_included = self.tracker.table.includable_count(); @@ -630,56 +627,59 @@ impl consensus::Proposer for Proposer where Duration::from_millis(self.slot_duration / SLOT_DURATION_DENOMINATOR), ); - let enough_candidates = dynamic_inclusion.acceptable_in( - now, - initial_included, - ).unwrap_or_else(|| Duration::from_millis(1)); + let parent_hash = self.parent_hash.clone(); + let parent_number = self.parent_number.clone(); + let parent_id = self.parent_id.clone(); + let client = self.client.clone(); + let transaction_pool = self.transaction_pool.clone(); + let table = self.tracker.table.clone(); - let believed_timestamp = match inherent_data.timestamp_inherent_data() { - Ok(timestamp) => timestamp, - Err(e) => return Either::Right(future::err(Error::InherentError(e))), - }; + async move { + let enough_candidates = dynamic_inclusion.acceptable_in( + now, + initial_included, + ).unwrap_or_else(|| Duration::from_millis(1)); - // set up delay until next allowed timestamp. - let current_timestamp = current_timestamp(); - let delay_future = if current_timestamp >= believed_timestamp { - None - } else { - Some(Delay::new(Duration::from_millis (current_timestamp - believed_timestamp))) - }; + let believed_timestamp = match inherent_data.timestamp_inherent_data() { + Ok(timestamp) => timestamp, + Err(e) => return Err(Error::InherentError(e)), + }; - let timing = ProposalTiming { - minimum: delay_future, - attempt_propose: Box::new(interval(ATTEMPT_PROPOSE_EVERY)), - enough_candidates: Delay::new(enough_candidates), - dynamic_inclusion, - last_included: initial_included, - }; + let deadline_diff = max_duration - max_duration / 3; + let deadline = match Instant::now().checked_add(deadline_diff) { + None => return Err(Error::DeadlineComputeFailure(deadline_diff)), + Some(d) => d, + }; - let deadline_diff = max_duration - max_duration / 3; - let deadline = match Instant::now().checked_add(deadline_diff) { - None => return Either::Right( - future::err(Error::DeadlineComputeFailure(deadline_diff)), - ), - Some(d) => d, - }; - - Either::Left(CreateProposal { - state: CreateProposalState::Pending(CreateProposalData { - parent_hash: self.parent_hash.clone(), - parent_number: self.parent_number.clone(), - parent_id: self.parent_id.clone(), - client: self.client.clone(), - transaction_pool: self.transaction_pool.clone(), - table: self.tracker.table.clone(), + let data = CreateProposalData { + parent_hash, + parent_number, + parent_id, + client, + transaction_pool, + table, believed_minimum_timestamp: believed_timestamp, - timing, inherent_data: Some(inherent_data), inherent_digests, // leave some time for the proposal finalisation deadline, + }; + + // set up delay until next allowed timestamp. + let current_timestamp = current_timestamp(); + if current_timestamp < believed_timestamp { + Delay::new(Duration::from_millis(current_timestamp - believed_timestamp)) + .await; + } + + Delay::new(enough_candidates).await; + + tokio_executor::blocking::run(move || { + let proposed_candidates = data.table.proposed_set(); + data.propose_with(proposed_candidates) }) - }) + .await + }.boxed() } } @@ -689,67 +689,6 @@ fn current_timestamp() -> u64 { .as_millis() as u64 } -struct ProposalTiming { - minimum: Option, - attempt_propose: Box + Send + Unpin>, - dynamic_inclusion: DynamicInclusion, - enough_candidates: Delay, - last_included: usize, -} - -impl ProposalTiming { - // whether it's time to attempt a proposal. - // shouldn't be called outside of the context of a task. - fn poll(&mut self, cx: &mut Context, included: usize) -> Poll<()> { - // first drain from the interval so when the minimum delay is up - // we don't have any notifications built up. - // - // this interval is just meant to produce periodic task wakeups - // that lead to the `dynamic_inclusion` getting updated as necessary. - while let Poll::Ready(x) = self.attempt_propose.poll_next_unpin(cx) { - x.expect("timer still alive; intervals never end; qed"); - } - - // wait until the minimum time has passed. - if let Some(mut minimum) = self.minimum.take() { - if let Poll::Pending = minimum.poll_unpin(cx) { - self.minimum = Some(minimum); - return Poll::Pending; - } - } - - if included == self.last_included { - return self.enough_candidates.poll_unpin(cx); - } - - // the amount of includable candidates has changed. schedule a wakeup - // if it's not sufficient anymore. - match self.dynamic_inclusion.acceptable_in(Instant::now(), included) { - Some(instant) => { - self.last_included = included; - self.enough_candidates.reset(Instant::now() + instant); - self.enough_candidates.poll_unpin(cx) - } - None => Poll::Ready(()), - } - } -} - -/// Future which resolves upon the creation of a proposal. -pub struct CreateProposal { - state: CreateProposalState, -} - -/// Current status of the proposal future. -enum CreateProposalState { - /// Pending inclusion, with given proposal data. - Pending(CreateProposalData), - /// Represents the state when we switch from pending to fired. - Switching, - /// Block proposing has fired. - Fired(tokio_executor::blocking::Blocking>), -} - /// Inner data of the create proposal. struct CreateProposalData { parent_hash: Hash, @@ -758,7 +697,6 @@ struct CreateProposalData { client: Arc, transaction_pool: Arc, table: Arc, - timing: ProposalTiming, believed_minimum_timestamp: u64, inherent_data: Option, inherent_digests: DigestFor, @@ -859,58 +797,6 @@ impl CreateProposalData where } } -impl Future for CreateProposal where - TxPool: TransactionPool + 'static, - C: ProvideRuntimeApi + HeaderBackend + Send + Sync + 'static, - C::Api: ParachainHost + BlockBuilderApi + ApiExt, -{ - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let mut state = CreateProposalState::Switching; - mem::swap(&mut state, &mut self.state); - - // 1. try to propose if we have enough includable candidates and other - // delays have concluded. - let data = match state { - CreateProposalState::Pending(mut data) => { - let included = data.table.includable_count(); - match data.timing.poll(cx, included) { - Poll::Pending => { - self.state = CreateProposalState::Pending(data); - return Poll::Pending - }, - Poll::Ready(()) => (), - } - - data - }, - CreateProposalState::Switching => - unreachable!( - "State Switching are only created on call, \ - and immediately swapped out; \ - the data being read is from state; \ - thus Switching will never be reachable here; qed" - ), - CreateProposalState::Fired(mut future) => { - let ret = Pin::new(&mut future).poll(cx); - self.state = CreateProposalState::Fired(future); - return ret - }, - }; - - // 2. propose - let mut future = tokio_executor::blocking::run(move || { - let proposed_candidates = data.table.proposed_set(); - data.propose_with(proposed_candidates) - }); - let polled = Pin::new(&mut future).poll(cx); - self.state = CreateProposalState::Fired(future); - - polled - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/polkadot/validation/src/shared_table/includable.rs b/polkadot/validation/src/shared_table/includable.rs index 1b74abcf43..fa94578adf 100644 --- a/polkadot/validation/src/shared_table/includable.rs +++ b/polkadot/validation/src/shared_table/includable.rs @@ -17,16 +17,13 @@ //! Implements a future which resolves when all of the candidates referenced are includable. use std::collections::HashMap; - -use futures::prelude::*; use futures::channel::oneshot; -use std::pin::Pin; -use std::task::{Poll, Context}; - use polkadot_primitives::Hash; /// Track includability of a set of candidates, -pub(super) fn track>(candidates: I) -> (IncludabilitySender, Includable) { +pub(super) fn track>(candidates: I) + -> (IncludabilitySender, oneshot::Receiver<()>) { + let (tx, rx) = oneshot::channel(); let tracking: HashMap<_, _> = candidates.into_iter().collect(); let includable_count = tracking.values().filter(|x| **x).count(); @@ -39,10 +36,7 @@ pub(super) fn track>(candidates: I) -> (Inclu sender.try_complete(); - ( - sender, - Includable(rx), - ) + (sender, rx) } /// The sending end of the includability sender. @@ -93,17 +87,6 @@ impl IncludabilitySender { } } -/// Future that resolves when all the candidates within are includable. -pub struct Includable(oneshot::Receiver<()>); - -impl Future for Includable { - type Output = Result<(), oneshot::Canceled>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - Pin::new(&mut Pin::into_inner(self).0).poll(cx) - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/polkadot/validation/src/shared_table/mod.rs b/polkadot/validation/src/shared_table/mod.rs index e0ce64201b..3784aa7e1c 100644 --- a/polkadot/validation/src/shared_table/mod.rs +++ b/polkadot/validation/src/shared_table/mod.rs @@ -30,6 +30,7 @@ use polkadot_primitives::parachain::{ use parking_lot::Mutex; use futures::prelude::*; +use futures::channel::oneshot; use log::{warn, debug}; use bitvec::bitvec; @@ -40,7 +41,6 @@ use runtime_primitives::traits::ProvideRuntimeApi; mod includable; -pub use self::includable::Includable; pub use table::{SignedStatement, Statement}; pub use table::generic::Statement as GenericStatement; @@ -543,7 +543,7 @@ impl SharedTable { } /// Track includability of a given set of candidate hashes. - pub fn track_includability(&self, iterable: I) -> Includable + pub fn track_includability(&self, iterable: I) -> oneshot::Receiver<()> where I: IntoIterator { let mut inner = self.inner.lock();