From 454ddf8921471d8f6eeb5acef0de767e02b9ea5a Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Fri, 22 Mar 2019 00:48:36 +0100 Subject: [PATCH] Collators get incoming parachain messages (#149) * refactor out a consensus data fetcher from table router * move statement checking logic into router * refuse to start authority if collator * support building the table router asynchronously * instantiate_consensus does not overwrite old * update key in new consensus if there was none before * collator collects ingress from network * test produced egress roots * fix adder-collator compilation * address first grumbles * integrate new gossip with collator network launch * address review --- polkadot/Cargo.lock | 20 +- polkadot/cli/src/lib.rs | 5 +- polkadot/collator/Cargo.toml | 5 + polkadot/collator/src/lib.rs | 289 ++++--- polkadot/network/Cargo.toml | 1 + polkadot/network/src/lib.rs | 20 +- polkadot/network/src/router.rs | 371 ++------- polkadot/network/src/tests/mod.rs | 36 +- polkadot/network/src/tests/validation.rs | 9 +- polkadot/network/src/validation.rs | 720 +++++++++++++++--- polkadot/service/src/lib.rs | 7 + polkadot/src/main.rs | 4 +- .../adder/collator/src/main.rs | 6 +- .../validation/src/attestation_service.rs | 1 + polkadot/validation/src/collation.rs | 4 +- polkadot/validation/src/lib.rs | 119 +-- 16 files changed, 960 insertions(+), 657 deletions(-) diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index ea917de7a4..9cf2fb5bd5 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -24,7 +24,7 @@ version = "0.1.0" dependencies = [ "adder 0.1.0", "ctrlc 3.1.1 (registry+https://github.com/rust-lang/crates.io-index)", - "exit-future 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "exit-future 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "polkadot-collator 0.1.0", @@ -670,11 +670,11 @@ dependencies = [ [[package]] name = "exit-future" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", - "parking_lot 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2160,7 +2160,7 @@ dependencies = [ name = "polkadot-cli" version = "0.3.0" dependencies = [ - "exit-future 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "exit-future 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "polkadot-service 0.3.0", @@ -2176,9 +2176,12 @@ dependencies = [ "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 3.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "polkadot-cli 0.3.0", + "polkadot-network 0.1.0", "polkadot-primitives 0.1.0", "polkadot-runtime 0.1.0", + "polkadot-validation 0.1.0", "substrate-client 0.1.0 (git+https://github.com/paritytech/substrate)", + "substrate-keyring 0.1.0 (git+https://github.com/paritytech/substrate)", "substrate-primitives 0.1.0 (git+https://github.com/paritytech/substrate)", "tokio 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2208,6 +2211,7 @@ name = "polkadot-network" version = "0.1.0" dependencies = [ "arrayvec 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", + "exit-future 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 3.2.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2348,7 +2352,7 @@ name = "polkadot-validation" version = "0.1.0" dependencies = [ "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", - "exit-future 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "exit-future 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 3.2.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3540,7 +3544,7 @@ dependencies = [ "clap 2.32.0 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", - "exit-future 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "exit-future 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "fdlimit 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3929,7 +3933,7 @@ version = "0.3.0" source = "git+https://github.com/paritytech/substrate#45824913c980bb1ba3963f9bba67775a507d8624" dependencies = [ "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", - "exit-future 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "exit-future 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4789,7 +4793,7 @@ dependencies = [ "checksum env_logger 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "afb070faf94c85d17d50ca44f6ad076bce18ae92f0037d350947240a36e9d42e" "checksum environmental 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "db746025e3ea695bfa0ae744dbacd5fcfc8db51b9760cf8bd0ab69708bb93c49" "checksum error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "07e791d3be96241c77c43846b665ef1384606da2cd2a48730abe606a12906e02" -"checksum exit-future 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "87559b08e99a81a92bbb867d237543e43495857749f688e0773390a20d56c61c" +"checksum exit-future 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "d8013f441e38e31c670e7f34ec8f1d5d3a2bd9d303c1ff83976ca886005e8f48" "checksum failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "795bd83d3abeb9220f257e597aa0080a508b27533824adf336529648f6abf7e2" "checksum failure_derive 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "ea1063915fd7ef4309e222a5a07cf9c319fb9c7836b1f89b85458672dbb127e1" "checksum fake-simd 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" diff --git a/polkadot/cli/src/lib.rs b/polkadot/cli/src/lib.rs index 451f887c17..b3e40576aa 100644 --- a/polkadot/cli/src/lib.rs +++ b/polkadot/cli/src/lib.rs @@ -45,6 +45,7 @@ pub use service::{ pub use cli::{VersionInfo, IntoExit}; pub use cli::error; +pub use tokio::runtime::TaskExecutor; fn load_spec(id: &str) -> Result, String> { Ok(match ChainSpec::from(id) { @@ -68,7 +69,7 @@ pub trait Worker: IntoExit { fn configuration(&self) -> service::CustomConfiguration { Default::default() } /// Do work and schedule exit. - fn work(self, service: &S) -> Self::Work; + fn work(self, service: &S, executor: TaskExecutor) -> Self::Work; } /// Parse command line arguments into service configuration. @@ -129,7 +130,7 @@ fn run_until_exit( let executor = runtime.executor(); cli::informant::start(&service, exit.clone(), executor.clone()); - let _ = runtime.block_on(worker.work(&*service)); + let _ = runtime.block_on(worker.work(&*service, executor.clone())); exit_send.fire(); // we eagerly drop the service so that the internal exit future is fired, diff --git a/polkadot/collator/Cargo.toml b/polkadot/collator/Cargo.toml index 750bb4e8bf..7cb2eef8d8 100644 --- a/polkadot/collator/Cargo.toml +++ b/polkadot/collator/Cargo.toml @@ -12,5 +12,10 @@ substrate-primitives = { git = "https://github.com/paritytech/substrate" } polkadot-runtime = { path = "../runtime", version = "0.1" } polkadot-primitives = { path = "../primitives", version = "0.1" } polkadot-cli = { path = "../cli" } +polkadot-network = { path = "../network" } +polkadot-validation = { path = "../validation" } log = "0.4" tokio = "0.1.7" + +[dev-dependencies] +substrate-keyring = { git = "https://github.com/paritytech/substrate" } diff --git a/polkadot/collator/src/lib.rs b/polkadot/collator/src/lib.rs index 5b0db8e865..9fc4c26671 100644 --- a/polkadot/collator/src/lib.rs +++ b/polkadot/collator/src/lib.rs @@ -53,25 +53,33 @@ extern crate tokio; extern crate polkadot_cli; extern crate polkadot_runtime; extern crate polkadot_primitives; +extern crate polkadot_network; +extern crate polkadot_validation; #[macro_use] extern crate log; -use std::collections::{BTreeSet, BTreeMap, HashSet}; +#[cfg(test)] +extern crate substrate_keyring as keyring; + +use std::collections::HashSet; use std::fmt; use std::sync::Arc; use std::time::Duration; -use futures::{future, stream, Stream, Future, IntoFuture}; +use futures::{future, Stream, Future, IntoFuture}; use client::BlockchainEvents; use primitives::{ed25519, Pair}; -use polkadot_primitives::{BlockId, SessionKey}; -use polkadot_primitives::parachain::{self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId}; +use polkadot_primitives::{BlockId, SessionKey, Hash, Block}; +use polkadot_primitives::parachain::{self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId, Extrinsic}; use polkadot_cli::{PolkadotService, CustomConfiguration, CoreApi, ParachainHost}; -use polkadot_cli::{Worker, IntoExit, ProvideRuntimeApi}; +use polkadot_cli::{Worker, IntoExit, ProvideRuntimeApi, TaskExecutor}; +use polkadot_network::validation::{ValidationNetwork, SessionParams}; +use polkadot_network::NetworkService; use tokio::timer::Timeout; pub use polkadot_cli::VersionInfo; +pub use polkadot_network::validation::Incoming; const COLLATION_TIMEOUT: Duration = Duration::from_secs(30); @@ -107,63 +115,23 @@ pub trait ParachainContext: Clone { &self, last_head: HeadData, ingress: I, - ) -> Result<(BlockData, HeadData), InvalidHead>; + ) -> Result<(BlockData, HeadData, Extrinsic), InvalidHead>; } /// Relay chain context needed to collate. /// This encapsulates a network and local database which may store /// some of the input. pub trait RelayChainContext { - type Error; + type Error: ::std::fmt::Debug; /// Future that resolves to the un-routed egress queues of a parachain. /// The first item is the oldest. - type FutureEgress: IntoFuture>, Error=Self::Error>; - - /// Provide a set of all parachains meant to be routed to at a block. - fn routing_parachains(&self) -> BTreeSet; + type FutureEgress: IntoFuture; /// Get un-routed egress queues from a parachain to the local parachain. fn unrouted_egress(&self, id: ParaId) -> Self::FutureEgress; } -/// Collate the necessary ingress queue using the given context. -pub fn collate_ingress<'a, R>(relay_context: R) - -> impl Future + 'a - where - R: RelayChainContext, - R::Error: 'a, - R::FutureEgress: 'a, -{ - let mut egress_fetch = Vec::new(); - - for routing_parachain in relay_context.routing_parachains() { - let fetch = relay_context - .unrouted_egress(routing_parachain) - .into_future() - .map(move |egresses| (routing_parachain, egresses)); - - egress_fetch.push(fetch); - } - - // create a map ordered first by the depth of the egress queue - // and then by the parachain ID. - // - // then transform that into the consolidated egress queue. - stream::futures_unordered(egress_fetch) - .fold(BTreeMap::new(), |mut map, (routing_id, egresses)| { - for (depth, egress) in egresses.into_iter().rev().enumerate() { - let depth = -(depth as i64); - map.insert((depth, routing_id), egress); - } - - Ok(map) - }) - .map(|ordered| ordered.into_iter().map(|((_, id), egress)| (id, egress))) - .map(|i| i.collect::>()) - .map(ConsolidatedIngress) -} - /// Produce a candidate for the parachain, with given contexts, parent head, and signing key. pub fn collate<'a, R, P>( local_id: ParaId, @@ -174,19 +142,22 @@ pub fn collate<'a, R, P>( ) -> impl Future> + 'a where - R: RelayChainContext + 'a, + R: RelayChainContext, R::Error: 'a, R::FutureEgress: 'a, P: ParachainContext + 'a, { - collate_ingress(relay_context).map_err(Error::Polkadot).and_then(move |ingress| { - let (block_data, head_data) = para_context.produce_candidate( + let ingress = relay_context.unrouted_egress(local_id).into_future().map_err(Error::Polkadot); + ingress.and_then(move |ConsolidatedIngress(ingress)| { + let (block_data, head_data, mut extrinsic) = para_context.produce_candidate( last_head, - ingress.0.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg))) + ingress.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg))) ).map_err(Error::Collator)?; let block_data_hash = block_data.hash(); let signature = key.sign(block_data_hash.as_ref()).into(); + let egress_queue_roots = + ::polkadot_validation::egress_roots(&mut extrinsic.outgoing_messages); let receipt = parachain::CandidateReceipt { parachain_index: local_id, @@ -194,11 +165,12 @@ pub fn collate<'a, R, P>( signature, head_data, balance_uploads: Vec::new(), - egress_queue_roots: Vec::new(), + egress_queue_roots, fees: 0, block_data_hash, }; + // not necessary to send extrinsic because it is recomputed from execution. Ok(parachain::Collation { receipt, block_data, @@ -207,18 +179,34 @@ pub fn collate<'a, R, P>( } /// Polkadot-api context. -struct ApiContext; +struct ApiContext { + network: ValidationNetwork, + parent_hash: Hash, + authorities: Vec, +} -impl RelayChainContext for ApiContext { - type Error = client::error::Error; - type FutureEgress = Result>, Self::Error>; +impl RelayChainContext for ApiContext where + P: ProvideRuntimeApi + Send + Sync, + P::Api: ParachainHost, + E: Future + Clone + Send + Sync + 'static, +{ + type Error = String; + type FutureEgress = Box + Send>; - fn routing_parachains(&self) -> BTreeSet { - BTreeSet::new() - } + fn unrouted_egress(&self, id: ParaId) -> Self::FutureEgress { + let session = self.network.instantiate_session(SessionParams { + local_session_key: None, + parent_hash: self.parent_hash, + authorities: self.authorities.clone(), + }).map_err(|e| format!("unable to instantiate validation session: {:?}", e)); - fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress { - Ok(Vec::new()) + let fetch_incoming = session + .and_then(move |session| session.fetch_incoming(id).map_err(|e| + format!("unable to fetch incoming data: {:?}", e) + )) + .map(ConsolidatedIngress); + + Box::new(fetch_incoming) } } @@ -241,7 +229,7 @@ impl IntoExit for CollationNode where impl Worker for CollationNode where P: ParachainContext + Send + 'static, - E: Future + Clone + Send + 'static + E: Future + Clone + Send + Sync + 'static { type Work = Box + Send>; @@ -254,13 +242,42 @@ impl Worker for CollationNode where config } - fn work(self, service: &S) -> Self::Work + fn work(self, service: &S, task_executor: TaskExecutor) -> Self::Work where S: PolkadotService, { - let CollationNode { parachain_context, exit, para_id, key } = self; let client = service.client(); let network = service.network(); + let known_oracle = client.clone(); + + let message_validator = polkadot_network::gossip::register_validator( + &*network, + move |block_hash: &Hash| { + use client::{BlockStatus, ChainHead}; + use polkadot_network::gossip::Known; + + match known_oracle.block_status(&BlockId::hash(*block_hash)) { + Err(_) | Ok(BlockStatus::Unknown) | Ok(BlockStatus::Queued) => None, + Ok(BlockStatus::KnownBad) => Some(Known::Bad), + Ok(BlockStatus::InChain) => match known_oracle.leaves() { + Err(_) => None, + Ok(leaves) => if leaves.contains(block_hash) { + Some(Known::Leaf) + } else { + Some(Known::Old) + }, + } + } + }, + ); + + let validation_network = ValidationNetwork::new( + network.clone(), + exit.clone(), + message_validator, + client.clone(), + task_executor, + ); let inner_exit = exit.clone(); let work = client.import_notification_stream() @@ -269,7 +286,9 @@ impl Worker for CollationNode where ($e:expr) => { match $e { Ok(x) => x, - Err(e) => return future::Either::A(future::err(Error::Polkadot(e))), + Err(e) => return future::Either::A(future::err(Error::Polkadot( + format!("{:?}", e) + ))), } } } @@ -281,6 +300,7 @@ impl Worker for CollationNode where let client = client.clone(); let key = key.clone(); let parachain_context = parachain_context.clone(); + let validation_network = validation_network.clone(); let work = future::lazy(move || { let api = client.runtime_api(); @@ -289,16 +309,24 @@ impl Worker for CollationNode where None => return future::Either::A(future::ok(())), }; + let authorities = try_fr!(api.authorities(&id)); + let targets = compute_targets( para_id, - try_fr!(api.authorities(&id)).as_slice(), + authorities.as_slice(), try_fr!(api.duty_roster(&id)), ); + let context = ApiContext { + network: validation_network, + parent_hash: relay_parent, + authorities, + }; + let collation_work = collate( para_id, HeadData(last_head), - ApiContext, + context, parachain_context, key, ).map(move |collation| { @@ -355,7 +383,7 @@ pub fn run_collator( ) -> polkadot_cli::error::Result<()> where P: ParachainContext + Send + 'static, E: IntoFuture, - E::Future: Send + Clone + 'static, + E::Future: Send + Clone + Sync + 'static, I: IntoIterator, ArgT: Into + Clone, { @@ -365,74 +393,91 @@ pub fn run_collator( #[cfg(test)] mod tests { + use std::collections::HashMap; + use polkadot_primitives::parachain::OutgoingMessage; + use keyring::AuthorityKeyring; use super::*; - use std::collections::{HashMap, BTreeSet}; - - use futures::Future; - use polkadot_primitives::parachain::{Message, Id as ParaId}; - - pub struct DummyRelayChainCtx { - egresses: HashMap>>, - currently_routing: BTreeSet, + #[derive(Default, Clone)] + struct DummyRelayChainContext { + ingress: HashMap } - impl RelayChainContext for DummyRelayChainCtx { + impl RelayChainContext for DummyRelayChainContext { type Error = (); - type FutureEgress = Result>, ()>; + type FutureEgress = Box>; - fn routing_parachains(&self) -> BTreeSet { - self.currently_routing.clone() + fn unrouted_egress(&self, para_id: ParaId) -> Self::FutureEgress { + match self.ingress.get(¶_id) { + Some(ingress) => Box::new(future::ok(ingress.clone())), + None => Box::new(future::empty()), + } } + } - fn unrouted_egress(&self, id: ParaId) -> Result>, ()> { - Ok(self.egresses.get(&id).cloned().unwrap_or_default()) + #[derive(Clone)] + struct DummyParachainContext; + + impl ParachainContext for DummyParachainContext { + fn produce_candidate>( + &self, + _last_head: HeadData, + ingress: I, + ) -> Result<(BlockData, HeadData, Extrinsic), InvalidHead> { + // send messages right back. + Ok(( + BlockData(vec![1, 2, 3, 4, 5,]), + HeadData(vec![9, 9, 9]), + Extrinsic { + outgoing_messages: ingress.into_iter().map(|(id, msg)| OutgoingMessage { + target: id, + data: msg.0, + }).collect(), + } + )) } } #[test] - fn collates_ingress() { - let route_from = |x: &[ParaId]| { - let mut set = BTreeSet::new(); - set.extend(x.iter().cloned()); - set - }; + fn collates_correct_queue_roots() { + let mut context = DummyRelayChainContext::default(); - let message = |x: Vec| vec![Message(x)]; + let id = ParaId::from(100); - let dummy_ctx = DummyRelayChainCtx { - currently_routing: route_from(&[2.into(), 3.into()]), - egresses: vec![ - // egresses for `2`: last routed successfully 5 blocks ago. - (2.into(), vec![ - message(vec![1, 2, 3]), - message(vec![4, 5, 6]), - message(vec![7, 8]), - message(vec![10]), - message(vec![12]), - ]), + let a = ParaId::from(123); + let b = ParaId::from(456); - // egresses for `3`: last routed successfully 3 blocks ago. - (3.into(), vec![ - message(vec![9]), - message(vec![11]), - message(vec![13]), - ]), - ].into_iter().collect(), - }; + let messages_from_a = vec![ + Message(vec![1, 1, 1]), + Message(b"helloworld".to_vec()), + ]; + let messages_from_b = vec![ + Message(b"dogglesworth".to_vec()), + Message(b"buy_1_chili_con_carne_here_is_my_cash".to_vec()), + ]; - assert_eq!( - collate_ingress(dummy_ctx).wait().unwrap(), - ConsolidatedIngress(vec![ - (2.into(), message(vec![1, 2, 3])), - (2.into(), message(vec![4, 5, 6])), - (2.into(), message(vec![7, 8])), - (3.into(), message(vec![9])), - (2.into(), message(vec![10])), - (3.into(), message(vec![11])), - (2.into(), message(vec![12])), - (3.into(), message(vec![13])), - ] - )) + let root_a = ::polkadot_validation::message_queue_root( + messages_from_a.iter().map(|msg| &msg.0) + ); + + let root_b = ::polkadot_validation::message_queue_root( + messages_from_b.iter().map(|msg| &msg.0) + ); + + context.ingress.insert(id, ConsolidatedIngress(vec![ + (b, messages_from_b), + (a, messages_from_a), + ])); + + let collation = collate( + id, + HeadData(vec![5]), + context.clone(), + DummyParachainContext, + AuthorityKeyring::Alice.pair().into(), + ).wait().unwrap(); + + // ascending order by root. + assert_eq!(collation.receipt.egress_queue_roots, vec![(a, root_a), (b, root_b)]); } } diff --git a/polkadot/network/Cargo.toml b/polkadot/network/Cargo.toml index 16a54b8592..4e28bcbfee 100644 --- a/polkadot/network/Cargo.toml +++ b/polkadot/network/Cargo.toml @@ -19,6 +19,7 @@ futures = "0.1" tokio = "0.1.7" log = "0.4" slice-group-by = "0.2.2" +exit-future = "0.1.4" [dev-dependencies] substrate-client = { git = "https://github.com/paritytech/substrate" } diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index 86b1f6ad2b..8b3bedd9a8 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -32,6 +32,7 @@ extern crate arrayvec; extern crate parking_lot; extern crate tokio; extern crate slice_group_by; +extern crate exit_future; #[macro_use] extern crate futures; @@ -66,7 +67,6 @@ use self::local_collations::LocalCollations; use std::collections::{HashMap, HashSet}; - #[cfg(test)] mod tests; @@ -213,10 +213,13 @@ impl PolkadotProtocol { fn new_validation_session( &mut self, ctx: &mut Context, - parent_hash: Hash, - session: validation::ValidationSession, - ) { - if let Some(new_local) = self.live_validation_sessions.new_validation_session(parent_hash, session) { + params: validation::SessionParams, + ) -> validation::ValidationSession { + + let (session, new_local) = self.live_validation_sessions + .new_validation_session(params); + + if let Some(new_local) = new_local { for (id, peer_data) in self.peers.iter_mut() .filter(|&(_, ref info)| info.should_send_key()) { @@ -227,10 +230,13 @@ impl PolkadotProtocol { )); } } + + session } - fn remove_validation_session(&mut self, parent_hash: &Hash) { - self.live_validation_sessions.remove(parent_hash); + // true indicates that it was removed actually. + fn remove_validation_session(&mut self, parent_hash: Hash) -> bool { + self.live_validation_sessions.remove(parent_hash) } fn dispatch_pending_requests(&mut self, ctx: &mut Context) { diff --git a/polkadot/network/src/router.rs b/polkadot/network/src/router.rs index 8170a65752..451a714529 100644 --- a/polkadot/network/src/router.rs +++ b/polkadot/network/src/router.rs @@ -14,10 +14,11 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -//! Statement routing and consensus table router implementation. +//! Statement routing and validation statement table router implementation. //! -//! During the consensus process, validators exchange statements on validity and availability +//! During the attestation process, validators exchange statements on validity and availability //! of parachain candidates. +//! //! The `Router` in this file hooks into the underlying network to fulfill //! the `TableRouter` trait from `polkadot-validation`, which is expected to call into a shared statement table //! and dispatch evaluation work as necessary when new statements come in. @@ -33,18 +34,15 @@ use polkadot_primitives::parachain::{ }; use codec::{Encode, Decode}; -use futures::{future, prelude::*}; -use futures::sync::oneshot::{self, Receiver}; +use futures::prelude::*; use parking_lot::Mutex; -use std::collections::{hash_map::{Entry, HashMap}, HashSet}; -use std::{io, mem}; +use std::collections::{HashMap, HashSet}; +use std::io; use std::sync::Arc; -use gossip::{RegisteredMessageValidator}; -use validation::{NetworkService, Knowledge, Executor}; +use validation::{self, SessionDataFetcher, NetworkService, Executor}; -type IngressPair = (ParaId, Vec); type IngressPairRef<'a> = (ParaId, &'a [Message]); /// Compute the gossip topic for attestations on the given parent hash. @@ -55,109 +53,48 @@ pub(crate) fn attestation_topic(parent_hash: Hash) -> Hash { BlakeTwo256::hash(&v[..]) } -fn incoming_message_topic(parent_hash: Hash, parachain: ParaId) -> Hash { - let mut v = parent_hash.as_ref().to_vec(); - parachain.using_encoded(|s| v.extend(s)); - v.extend(b"incoming"); - - BlakeTwo256::hash(&v[..]) -} - -/// Receiver for block data. -pub struct BlockDataReceiver { - outer: Receiver>, - inner: Option> -} - -impl Future for BlockDataReceiver { - type Item = BlockData; - type Error = io::Error; - - fn poll(&mut self) -> Poll { - let map_err = |_| io::Error::new( - io::ErrorKind::Other, - "Sending end of channel hung up", - ); - - if let Some(ref mut inner) = self.inner { - return inner.poll().map_err(map_err); - } - match self.outer.poll().map_err(map_err)? { - Async::Ready(mut inner) => { - let poll_result = inner.poll(); - self.inner = Some(inner); - poll_result.map_err(map_err) - } - Async::NotReady => Ok(Async::NotReady), - } - } -} -/// receiver for incoming data. -#[derive(Clone)] -pub struct IncomingReceiver { - inner: future::Shared> -} - -impl Future for IncomingReceiver { - type Item = Incoming; - type Error = io::Error; - - fn poll(&mut self) -> Poll { - match self.inner.poll() { - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(i)) => Ok(Async::Ready(Incoming::clone(&*i))), - Err(_) => Err(io::Error::new( - io::ErrorKind::Other, - "Sending end of channel hung up", - )), - } - } -} - /// Table routing implementation. pub struct Router { table: Arc, - network: Arc, - api: Arc

, - exit: E, - task_executor: T, - parent_hash: Hash, attestation_topic: Hash, - knowledge: Arc>, - fetch_incoming: Arc>>, + fetcher: SessionDataFetcher, deferred_statements: Arc>, - message_validator: RegisteredMessageValidator, } impl Router { pub(crate) fn new( table: Arc, - network: Arc, - api: Arc

, - task_executor: T, - parent_hash: Hash, - knowledge: Arc>, - exit: E, - message_validator: RegisteredMessageValidator, + fetcher: SessionDataFetcher, ) -> Self { + let parent_hash = fetcher.parent_hash(); Router { table, - network, - api, - task_executor, - parent_hash, attestation_topic: attestation_topic(parent_hash), - knowledge, - fetch_incoming: Arc::new(Mutex::new(HashMap::new())), deferred_statements: Arc::new(Mutex::new(DeferredStatements::new())), - exit, - message_validator, + fetcher, } } - /// Get the attestation topic for gossip. - pub(crate) fn gossip_topic(&self) -> Hash { - self.attestation_topic + /// Return a future of checked messages. These should be imported into the router + /// with `import_statement`. + pub(crate) fn checked_statements(&self) -> impl Stream { + // spin up a task in the background that processes all incoming statements + // validation has been done already by the gossip validator. + // this will block internally until the gossip messages stream is obtained. + self.network().gossip_messages_for(self.attestation_topic) + .filter_map(|msg| { + debug!(target: "validation", "Processing statement for live validation session"); + crate::gossip::GossipMessage::decode(&mut &msg[..]) + }) + .map(|msg| msg.statement) + } + + fn parent_hash(&self) -> Hash { + self.fetcher.parent_hash() + } + + fn network(&self) -> &Arc { + self.fetcher.network() } } @@ -165,16 +102,9 @@ impl Clone for Router { fn clone(&self) -> Self { Router { table: self.table.clone(), - network: self.network.clone(), - api: self.api.clone(), - task_executor: self.task_executor.clone(), - parent_hash: self.parent_hash.clone(), + fetcher: self.fetcher.clone(), attestation_topic: self.attestation_topic.clone(), deferred_statements: self.deferred_statements.clone(), - fetch_incoming: self.fetch_incoming.clone(), - knowledge: self.knowledge.clone(), - exit: self.exit.clone(), - message_validator: self.message_validator.clone(), } } } @@ -187,7 +117,7 @@ impl Router w { /// Import a statement whose signature has been checked already. pub(crate) fn import_statement(&self, statement: SignedStatement) { - trace!(target: "p_net", "importing consensus statement {:?}", statement.statement); + trace!(target: "p_net", "importing validation statement {:?}", statement.statement); // defer any statements for which we haven't imported the candidate yet let c_hash = { @@ -214,7 +144,7 @@ impl Router w }; // prepend the candidate statement. - debug!(target: "consensus", "Importing statements about candidate {:?}", c_hash); + debug!(target: "validation", "Importing statements about candidate {:?}", c_hash); statements.insert(0, statement); let producers: Vec<_> = self.table.import_remote_statements( self, @@ -222,12 +152,12 @@ impl Router w ); // dispatch future work as necessary. for (producer, statement) in producers.into_iter().zip(statements) { - self.knowledge.lock().note_statement(statement.sender, &statement.statement); + self.fetcher.knowledge().lock().note_statement(statement.sender, &statement.statement); if let Some(work) = producer.map(|p| self.create_work(c_hash, p)) { - trace!(target: "consensus", "driving statement work to completion"); - let work = work.select2(self.exit.clone()).then(|_| Ok(())); - self.task_executor.spawn(work); + trace!(target: "validation", "driving statement work to completion"); + let work = work.select2(self.fetcher.exit().clone()).then(|_| Ok(())); + self.fetcher.executor().spawn(work); } } } @@ -251,14 +181,15 @@ impl Router w group_messages.clear(); // reuse allocation from previous iterations. group_messages.extend(group.iter().map(|msg| msg.data.clone()).map(Message)); - debug!(target: "consensus", "Circulating messages from {:?} to {:?} at {}", - source, target, self.parent_hash); + debug!(target: "valdidation", "Circulating messages from {:?} to {:?} at {}", + source, target, self.parent_hash()); // this is the ingress from source to target, with given messages. - let target_incoming = incoming_message_topic(self.parent_hash, target); + let target_incoming = + validation::incoming_message_topic(self.parent_hash(), target); let ingress_for: IngressPairRef = (source, &group_messages[..]); - self.network.gossip_message(target_incoming, ingress_for.encode()); + self.network().gossip_message(target_incoming, ingress_for.encode()); } } } @@ -269,11 +200,11 @@ impl Router w D: Future + Send + 'static, { let table = self.table.clone(); - let network = self.network.clone(); - let knowledge = self.knowledge.clone(); + let network = self.network().clone(); + let knowledge = self.fetcher.knowledge().clone(); let attestation_topic = self.attestation_topic.clone(); - producer.prime(self.api.clone()) + producer.prime(self.fetcher.api().clone()) .map(move |validated| { // store the data before broadcasting statements, so other peers can fetch. knowledge.lock().note_candidate( @@ -289,61 +220,6 @@ impl Router w }) .map_err(|e| debug!(target: "p_net", "Failed to produce statements: {:?}", e)) } - -} - -impl Router where - P::Api: ParachainHost, - N: NetworkService, - T: Executor, - E: Future + Clone + Send + 'static, -{ - fn do_fetch_incoming(&self, parachain: ParaId) -> IncomingReceiver { - use polkadot_primitives::BlockId; - let (tx, rx) = { - let mut fetching = self.fetch_incoming.lock(); - match fetching.entry(parachain) { - Entry::Occupied(entry) => return entry.get().clone(), - Entry::Vacant(entry) => { - // has not been requested yet. - let (tx, rx) = oneshot::channel(); - let rx = IncomingReceiver { inner: rx.shared() }; - entry.insert(rx.clone()); - - (tx, rx) - } - } - }; - - let parent_hash = self.parent_hash; - let topic = incoming_message_topic(parent_hash, parachain); - let gossip_messages = self.network.gossip_messages_for(topic) - .map_err(|()| panic!("unbounded receivers do not throw errors; qed")) - .filter_map(|msg| IngressPair::decode(&mut msg.as_slice())); - - let canon_roots = self.api.runtime_api().ingress(&BlockId::hash(parent_hash), parachain) - .map_err(|e| format!("Cannot fetch ingress for parachain {:?} at {:?}: {:?}", - parachain, parent_hash, e) - ); - - let work = canon_roots.into_future() - .and_then(move |ingress_roots| match ingress_roots { - None => Err(format!("No parachain {:?} registered at {}", parachain, parent_hash)), - Some(roots) => Ok(roots.into_iter().collect()) - }) - .and_then(move |ingress_roots| ComputeIngress { - inner: gossip_messages, - ingress_roots, - incoming: Vec::new(), - }) - .map(move |incoming| if let Some(i) = incoming { let _ = tx.send(i); }) - .select2(self.exit.clone()) - .then(|_| Ok(())); - - self.task_executor.spawn(work); - - rx - } } impl TableRouter for Router where @@ -353,8 +229,8 @@ impl TableRouter for Router wh E: Future + Clone + Send + 'static, { type Error = io::Error; - type FetchCandidate = BlockDataReceiver; - type FetchIncoming = IncomingReceiver; + type FetchCandidate = validation::BlockDataReceiver; + type FetchIncoming = validation::IncomingReceiver; fn local_candidate(&self, receipt: CandidateReceipt, block_data: BlockData, extrinsic: Extrinsic) { // produce a signed statement @@ -363,42 +239,22 @@ impl TableRouter for Router wh let statement = self.table.import_validated(validated); // give to network to make available. - self.knowledge.lock().note_candidate(hash, Some(block_data), Some(extrinsic)); - self.network.gossip_message(self.attestation_topic, statement.encode()); + self.fetcher.knowledge().lock().note_candidate(hash, Some(block_data), Some(extrinsic)); + self.network().gossip_message(self.attestation_topic, statement.encode()); } - fn fetch_block_data(&self, candidate: &CandidateReceipt) -> BlockDataReceiver { - let parent_hash = self.parent_hash.clone(); - let candidate = candidate.clone(); - let (tx, rx) = ::futures::sync::oneshot::channel(); - self.network.with_spec(move |spec, ctx| { - let inner_rx = spec.fetch_block_data(ctx, &candidate, parent_hash); - let _ = tx.send(inner_rx); - }); - BlockDataReceiver { outer: rx, inner: None } + fn fetch_block_data(&self, candidate: &CandidateReceipt) -> Self::FetchCandidate { + self.fetcher.fetch_block_data(candidate) } fn fetch_incoming(&self, parachain: ParaId) -> Self::FetchIncoming { - self.do_fetch_incoming(parachain) + self.fetcher.fetch_incoming(parachain) } } impl Drop for Router { fn drop(&mut self) { - let parent_hash = self.parent_hash.clone(); - self.network.with_spec(move |spec, _| spec.remove_validation_session(&parent_hash)); - self.network.drop_gossip(self.attestation_topic); - - { - let mut incoming_fetched = self.fetch_incoming.lock(); - for (para_id, _) in incoming_fetched.drain() { - self.network.drop_gossip(incoming_message_topic( - self.parent_hash, - para_id, - )); - } - } - self.message_validator.remove_session(&parent_hash); + self.fetcher.network().drop_gossip(self.attestation_topic); } } @@ -457,63 +313,10 @@ impl DeferredStatements { } } -// computes ingress from incoming stream of messages. -// returns `None` if the stream concludes too early. -#[must_use = "futures do nothing unless polled"] -struct ComputeIngress { - ingress_roots: HashMap, - incoming: Vec, - inner: S, -} - -impl Future for ComputeIngress where S: Stream { - type Item = Option; - type Error = S::Error; - - fn poll(&mut self) -> Poll, Self::Error> { - loop { - if self.ingress_roots.is_empty() { - return Ok(Async::Ready( - Some(mem::replace(&mut self.incoming, Vec::new())) - )) - } - - let (para_id, messages) = match try_ready!(self.inner.poll()) { - None => return Ok(Async::Ready(None)), - Some(next) => next, - }; - - match self.ingress_roots.entry(para_id) { - Entry::Vacant(_) => continue, - Entry::Occupied(occupied) => { - let canon_root = occupied.get().clone(); - let messages = messages.iter().map(|m| &m.0[..]); - if ::polkadot_validation::message_queue_root(messages) != canon_root { - continue; - } - - occupied.remove(); - } - } - - let pos = self.incoming.binary_search_by_key( - ¶_id, - |&(id, _)| id, - ) - .err() - .expect("incoming starts empty and only inserted when \ - para_id not inserted before; qed"); - - self.incoming.insert(pos, (para_id, messages)); - } - } -} - #[cfg(test)] mod tests { use super::*; use substrate_primitives::crypto::UncheckedInto; - use futures::stream; use polkadot_primitives::parachain::ValidatorId; #[test] @@ -556,70 +359,4 @@ mod tests { assert!(traces.is_empty()); } } - - #[test] - fn compute_ingress_works() { - let actual_messages = [ - ( - ParaId::from(1), - vec![Message(vec![1, 3, 5, 6]), Message(vec![4, 4, 4, 4])], - ), - ( - ParaId::from(2), - vec![ - Message(vec![1, 3, 7, 9, 1, 2, 3, 4, 5, 6]), - Message(b"hello world".to_vec()), - ], - ), - ( - ParaId::from(5), - vec![Message(vec![1, 2, 3, 4, 5]), Message(vec![6, 9, 6, 9])], - ), - ]; - - let roots: HashMap<_, _> = actual_messages.iter() - .map(|&(para_id, ref messages)| ( - para_id, - ::polkadot_validation::message_queue_root(messages.iter().map(|m| &m.0)), - )) - .collect(); - - let inputs = [ - ( - ParaId::from(1), // wrong message. - vec![Message(vec![1, 1, 2, 2]), Message(vec![3, 3, 4, 4])], - ), - ( - ParaId::from(1), - vec![Message(vec![1, 3, 5, 6]), Message(vec![4, 4, 4, 4])], - ), - ( - ParaId::from(1), // duplicate - vec![Message(vec![1, 3, 5, 6]), Message(vec![4, 4, 4, 4])], - ), - - ( - ParaId::from(5), // out of order - vec![Message(vec![1, 2, 3, 4, 5]), Message(vec![6, 9, 6, 9])], - ), - ( - ParaId::from(1234), // un-routed parachain. - vec![Message(vec![9, 9, 9, 9])], - ), - ( - ParaId::from(2), - vec![ - Message(vec![1, 3, 7, 9, 1, 2, 3, 4, 5, 6]), - Message(b"hello world".to_vec()), - ], - ), - ]; - let ingress = ComputeIngress { - ingress_roots: roots, - incoming: Vec::new(), - inner: stream::iter_ok::<_, ()>(inputs.iter().cloned()), - }; - - assert_eq!(ingress.wait().unwrap().unwrap(), actual_messages); - } } diff --git a/polkadot/network/src/tests/mod.rs b/polkadot/network/src/tests/mod.rs index dc07a844e4..25769da2b8 100644 --- a/polkadot/network/src/tests/mod.rs +++ b/polkadot/network/src/tests/mod.rs @@ -17,11 +17,10 @@ //! Tests for polkadot and validation network. use super::{PolkadotProtocol, Status, Message, FullStatus}; -use validation::{ValidationSession, Knowledge}; +use validation::SessionParams; -use parking_lot::Mutex; use polkadot_validation::GenericStatement; -use polkadot_primitives::{Block, SessionKey}; +use polkadot_primitives::{Block, Hash, SessionKey}; use polkadot_primitives::parachain::{CandidateReceipt, HeadData, BlockData, CollatorId, ValidatorId}; use substrate_primitives::crypto::UncheckedInto; use codec::Encode; @@ -31,7 +30,6 @@ use substrate_network::{ generic_message::Message as GenericMessage }; -use std::sync::Arc; use futures::Future; mod validation; @@ -88,11 +86,12 @@ fn make_status(status: &Status, roles: Roles) -> FullStatus { } } -fn make_validation_session(local_key: SessionKey) -> (ValidationSession, Arc>) { - let knowledge = Arc::new(Mutex::new(Knowledge::new())); - let c = ValidationSession::new(knowledge.clone(), local_key); - - (c, knowledge) +fn make_validation_session(parent_hash: Hash, local_key: SessionKey) -> SessionParams { + SessionParams { + local_session_key: Some(local_key), + parent_hash, + authorities: Vec::new(), + } } fn on_message(protocol: &mut PolkadotProtocol, ctx: &mut TestContext, from: NodeIndex, message: Message) { @@ -120,8 +119,8 @@ fn sends_session_key() { { let mut ctx = TestContext::default(); - let (session, _knowledge) = make_validation_session(local_key.clone()); - protocol.new_validation_session(&mut ctx, parent_hash, session); + let params = make_validation_session(parent_hash, local_key.clone()); + protocol.new_validation_session(&mut ctx, params); assert!(ctx.has_message(peer_a, Message::SessionKey(local_key.clone()))); } @@ -160,8 +159,9 @@ fn fetches_from_those_with_knowledge() { let status = Status { collating_for: None }; - let (session, knowledge) = make_validation_session(local_key.clone()); - protocol.new_validation_session(&mut TestContext::default(), parent_hash, session); + let params = make_validation_session(parent_hash, local_key.clone()); + let session = protocol.new_validation_session(&mut TestContext::default(), params); + let knowledge = session.knowledge(); knowledge.lock().note_statement(a_key.clone(), &GenericStatement::Valid(candidate_hash)); let recv = protocol.fetch_block_data(&mut TestContext::default(), &candidate_receipt, parent_hash); @@ -290,11 +290,11 @@ fn many_session_keys() { let local_key_a: ValidatorId = [3; 32].unchecked_into(); let local_key_b: ValidatorId = [4; 32].unchecked_into(); - let (session_a, _knowledge_a) = make_validation_session(local_key_a.clone()); - let (session_b, _knowledge_b) = make_validation_session(local_key_b.clone()); + let params_a = make_validation_session(parent_a, local_key_a.clone()); + let params_b = make_validation_session(parent_b, local_key_b.clone()); - protocol.new_validation_session(&mut TestContext::default(), parent_a, session_a); - protocol.new_validation_session(&mut TestContext::default(), parent_b, session_b); + protocol.new_validation_session(&mut TestContext::default(), params_a); + protocol.new_validation_session(&mut TestContext::default(), params_b); assert_eq!(protocol.live_validation_sessions.recent_keys(), &[local_key_a.clone(), local_key_b.clone()]); @@ -313,7 +313,7 @@ fn many_session_keys() { let peer_b = 2; - protocol.remove_validation_session(&parent_a); + assert!(protocol.remove_validation_session(parent_a)); { let mut ctx = TestContext::default(); diff --git a/polkadot/network/src/tests/validation.rs b/polkadot/network/src/tests/validation.rs index 47d501e1ab..d2219d1191 100644 --- a/polkadot/network/src/tests/validation.rs +++ b/polkadot/network/src/tests/validation.rs @@ -471,9 +471,12 @@ fn ingress_fetch_works() { }; // make sure everyone can get ingress for their own parachain. - let fetch_a = router_a.fetch_incoming(id_a).map_err(|_| format!("Could not fetch ingress_a")); - let fetch_b = router_b.fetch_incoming(id_b).map_err(|_| format!("Could not fetch ingress_b")); - let fetch_c = router_c.fetch_incoming(id_c).map_err(|_| format!("Could not fetch ingress_c")); + let fetch_a = router_a.then(move |r| r.unwrap() + .fetch_incoming(id_a).map_err(|_| format!("Could not fetch ingress_a"))); + let fetch_b = router_b.then(move |r| r.unwrap() + .fetch_incoming(id_b).map_err(|_| format!("Could not fetch ingress_b"))); + let fetch_c = router_c.then(move |r| r.unwrap() + .fetch_incoming(id_c).map_err(|_| format!("Could not fetch ingress_c"))); let work = fetch_a.join3(fetch_b, fetch_c); runtime.spawn(built.gossip.then(|_| Ok(()))); // in background. diff --git a/polkadot/network/src/validation.rs b/polkadot/network/src/validation.rs index a838d51a2b..51b8fd0289 100644 --- a/polkadot/network/src/validation.rs +++ b/polkadot/network/src/validation.rs @@ -19,19 +19,23 @@ //! This fulfills the `polkadot_validation::Network` trait, providing a hook to be called //! each time a validation session begins on a new chain head. -use sr_primitives::traits::ProvideRuntimeApi; +use sr_primitives::traits::{BlakeTwo256, ProvideRuntimeApi, Hash as HashT}; use substrate_network::Context as NetContext; use polkadot_validation::{Network as ParachainNetwork, SharedTable, Collators, Statement, GenericStatement}; -use polkadot_primitives::{Block, Hash}; -use polkadot_primitives::parachain::{Id as ParaId, Collation, Extrinsic, ParachainHost, BlockData, ValidatorId, - CollatorId}; -use codec::Decode; +use polkadot_primitives::{Block, Hash, SessionKey}; +use polkadot_primitives::parachain::{ + Id as ParaId, Collation, Extrinsic, ParachainHost, BlockData, Message, CandidateReceipt, + CollatorId, ValidatorId, +}; +use codec::{Encode, Decode}; use futures::prelude::*; -use futures::future::Executor as FutureExecutor; +use futures::future::{self, Executor as FutureExecutor}; use futures::sync::mpsc; +use futures::sync::oneshot::{self, Receiver}; -use std::collections::HashMap; +use std::collections::hash_map::{HashMap, Entry}; +use std::io; use std::sync::Arc; use arrayvec::ArrayVec; @@ -39,10 +43,12 @@ use tokio::runtime::TaskExecutor; use parking_lot::Mutex; use router::Router; -use gossip::{POLKADOT_ENGINE_ID, GossipMessage, RegisteredMessageValidator, MessageValidationData}; +use gossip::{POLKADOT_ENGINE_ID, RegisteredMessageValidator, MessageValidationData}; use super::PolkadotProtocol; +pub use polkadot_validation::Incoming; + /// An executor suitable for dispatching async consensus tasks. pub trait Executor { fn spawn + Send + 'static>(&self, f: F); @@ -111,54 +117,14 @@ impl NetworkService for super::NetworkService { } } -// task that processes all gossipped consensus messages, -// checking signatures -struct MessageProcessTask { - inner_stream: mpsc::UnboundedReceiver>, - table_router: Router, -} - -impl MessageProcessTask where - P: ProvideRuntimeApi + Send + Sync + 'static, - P::Api: ParachainHost, - E: Future + Clone + Send + 'static, - N: NetworkService, - T: Clone + Executor + Send + 'static, -{ - fn process_message(&self, msg: Vec) -> Option> { - debug!(target: "validation", "Processing consensus statement for live consensus"); - - // statements are already checked by gossip validator. - if let Some(message) = GossipMessage::decode(&mut &msg[..]) { - self.table_router.import_statement(message.statement); - } - - None - } -} - -impl Future for MessageProcessTask where - P: ProvideRuntimeApi + Send + Sync + 'static, - P::Api: ParachainHost, - E: Future + Clone + Send + 'static, - N: NetworkService, - T: Clone + Executor + Send + 'static, -{ - type Item = (); - type Error = (); - - fn poll(&mut self) -> Poll<(), ()> { - loop { - match self.inner_stream.poll() { - Ok(Async::Ready(Some(val))) => if let Some(async) = self.process_message(val) { - return Ok(async); - }, - Ok(Async::Ready(None)) => return Ok(Async::Ready(())), - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(e) => debug!(target: "p_net", "Error getting consensus message: {:?}", e), - } - } - } +/// Params to a current validation session. +pub struct SessionParams { + /// The local session key. + pub local_session_key: Option, + /// The parent hash. + pub parent_hash: Hash, + /// The authorities. + pub authorities: Vec, } /// Wrapper around the network service @@ -195,70 +161,107 @@ impl Clone for ValidationNetwork { } } +impl ValidationNetwork where + P: ProvideRuntimeApi + Send + Sync + 'static, + P::Api: ParachainHost, + E: Clone + Future + Send + Sync + 'static, + N: NetworkService, + T: Clone + Executor + Send + Sync + 'static, +{ + /// Instantiate session data fetcher at a parent hash. + /// + /// If the used session key is new, it will be broadcast to peers. + /// If a validation session was already instantiated at this parent hash, + /// the underlying instance will be shared. + /// + /// If there was already a validation session instantiated and a different + /// session key was set, then the new key will be ignored. + /// + /// This implies that there can be multiple services intantiating validation + /// session instances safely, but they should all be coordinated on which session keys + /// are being used. + pub fn instantiate_session(&self, params: SessionParams) + -> oneshot::Receiver> + { + let parent_hash = params.parent_hash; + let network = self.network.clone(); + let api = self.api.clone(); + let task_executor = self.executor.clone(); + let exit = self.exit.clone(); + let message_validator = self.message_validator.clone(); + + let (tx, rx) = oneshot::channel(); + self.network.with_spec(move |spec, ctx| { + // before requesting messages, note live consensus session. + message_validator.note_session( + parent_hash, + MessageValidationData { authorities: params.authorities.clone() }, + ); + + let session = spec.new_validation_session(ctx, params); + let _ = tx.send(SessionDataFetcher { + network, + api, + task_executor, + parent_hash, + knowledge: session.knowledge().clone(), + exit, + fetch_incoming: session.fetched_incoming().clone(), + message_validator, + }); + }); + + rx + } +} + /// A long-lived network which can create parachain statement routing processes on demand. impl ParachainNetwork for ValidationNetwork where P: ProvideRuntimeApi + Send + Sync + 'static, P::Api: ParachainHost, - E: Clone + Future + Send + 'static, + E: Clone + Future + Send + Sync + 'static, N: NetworkService, - T: Clone + Executor + Send + 'static, + T: Clone + Executor + Send + Sync + 'static, { + type Error = String; type TableRouter = Router; + type BuildTableRouter = Box + Send>; fn communication_for( &self, table: Arc, outgoing: polkadot_validation::Outgoing, authorities: &[ValidatorId], - ) -> Self::TableRouter { + ) -> Self::BuildTableRouter { let parent_hash = table.consensus_parent_hash().clone(); - - let knowledge = Arc::new(Mutex::new(Knowledge::new())); - let local_session_key = table.session_key(); - let table_router = Router::new( - table, - self.network.clone(), - self.api.clone(), - self.executor.clone(), + + let build_fetcher = self.instantiate_session(SessionParams { + local_session_key: Some(local_session_key), parent_hash, - knowledge.clone(), - self.exit.clone(), - self.message_validator.clone(), - ); - - table_router.broadcast_egress(outgoing); - - let attestation_topic = table_router.gossip_topic(); - - let table_router_clone = table_router.clone(); - let executor = self.executor.clone(); - let exit = self.exit.clone(); - - // before requesting messages, note live consensus session. - self.message_validator.note_session( - parent_hash, - MessageValidationData { authorities: authorities.to_vec() }, - ); - - // spin up a task in the background that processes all incoming statements - // TODO: propagate statements on a timer? - let inner_stream = self.network.gossip_messages_for(attestation_topic); - self.network - .with_spec(move |spec, ctx| { - spec.new_validation_session(ctx, parent_hash, ValidationSession { - knowledge, - local_session_key, - }); - let process_task = MessageProcessTask { - inner_stream, - table_router: table_router_clone, - }; - - executor.spawn(process_task.select(exit).then(|_| Ok(()))); + authorities: authorities.to_vec(), }); - table_router + let executor = self.executor.clone(); + let work = build_fetcher + .map_err(|e| format!("{:?}", e)) + .map(move |fetcher| { + let table_router = Router::new( + table, + fetcher, + ); + + table_router.broadcast_egress(outgoing); + + let table_router_clone = table_router.clone(); + let work = table_router.checked_statements() + .for_each(move |msg| { table_router_clone.import_statement(msg); Ok(()) }); + executor.spawn(work); + + table_router + }); + + Box::new(work) } } @@ -368,21 +371,69 @@ impl Knowledge { } } +/// receiver for incoming data. +#[derive(Clone)] +pub struct IncomingReceiver { + inner: future::Shared> +} + +impl Future for IncomingReceiver { + type Item = Incoming; + type Error = io::Error; + + fn poll(&mut self) -> Poll { + match self.inner.poll() { + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(i)) => Ok(Async::Ready(Incoming::clone(&*i))), + Err(_) => Err(io::Error::new( + io::ErrorKind::Other, + "Sending end of channel hung up", + )), + } + } +} + +/// Incoming message gossip topic for a parachain at a given block hash. +pub(crate) fn incoming_message_topic(parent_hash: Hash, parachain: ParaId) -> Hash { + let mut v = parent_hash.as_ref().to_vec(); + parachain.using_encoded(|s| v.extend(s)); + v.extend(b"incoming"); + + BlakeTwo256::hash(&v[..]) +} + /// A current validation session instance. +#[derive(Clone)] pub(crate) struct ValidationSession { + parent_hash: Hash, knowledge: Arc>, - local_session_key: ValidatorId, + local_session_key: Option, + fetch_incoming: Arc>, } impl ValidationSession { - #[cfg(test)] - pub(crate) fn new(knowledge: Arc>, local_session_key: ValidatorId) -> Self { + /// Create a new validation session instance. Needs to be attached to the + /// nework. + pub(crate) fn new(params: SessionParams) -> Self { ValidationSession { - knowledge, - local_session_key + parent_hash: params.parent_hash, + knowledge: Arc::new(Mutex::new(Knowledge::new())), + local_session_key: params.local_session_key, + fetch_incoming: Arc::new(Mutex::new(FetchIncoming::new())), } } + /// Get a handle to the shared knowledge relative to this consensus + /// instance. + pub(crate) fn knowledge(&self) -> &Arc> { + &self.knowledge + } + + /// Get a handle to the shared list of parachains' incoming data fetch. + pub(crate) fn fetched_incoming(&self) -> &Arc> { + &self.fetch_incoming + } + // execute a closure with locally stored block data for a candidate, or a slice of session identities // we believe should have the data. fn with_block_data(&self, hash: &Hash, f: F) -> U @@ -447,8 +498,8 @@ impl RecentValidatorIds { pub(crate) struct LiveValidationSessions { // recent local session keys. recent: RecentValidatorIds, - // live validation session instances, on `parent_hash`. - live_instances: HashMap, + // live validation session instances, on `parent_hash`. refcount retained alongside. + live_instances: HashMap, } impl LiveValidationSessions { @@ -462,33 +513,74 @@ impl LiveValidationSessions { /// Note new validation session. If the used session key is new, /// it returns it to be broadcasted to peers. + /// + /// If there was already a validation session instantiated and a different + /// session key was set, then the new key will be ignored. pub(crate) fn new_validation_session( &mut self, - parent_hash: Hash, - session: ValidationSession, - ) -> Option { - let inserted_key = self.recent.insert(session.local_session_key.clone()); - let maybe_new = if let InsertedRecentKey::New(_) = inserted_key { - Some(session.local_session_key.clone()) + params: SessionParams, + ) -> (ValidationSession, Option) { + let parent_hash = params.parent_hash.clone(); + + let key = params.local_session_key.clone(); + let recent = &mut self.recent; + + let mut check_new_key = || { + let inserted_key = key.clone().map(|key| recent.insert(key)); + if let Some(InsertedRecentKey::New(_)) = inserted_key { + key.clone() + } else { + None + } + }; + + if let Some(&mut (ref mut rc, ref mut prev)) = self.live_instances.get_mut(&parent_hash) { + let maybe_new = if prev.local_session_key.is_none() { + prev.local_session_key = key.clone(); + check_new_key() + } else { + None + }; + + *rc += 1; + return (prev.clone(), maybe_new) + } + + let session = ValidationSession::new(params); + self.live_instances.insert(parent_hash, (1, session.clone())); + + (session, check_new_key()) + } + + /// Remove validation session. true indicates that it was actually removed. + pub(crate) fn remove(&mut self, parent_hash: Hash) -> bool { + let maybe_removed = if let Entry::Occupied(mut entry) = self.live_instances.entry(parent_hash) { + entry.get_mut().0 -= 1; + if entry.get().0 == 0 { + let (_, session) = entry.remove(); + Some(session) + } else { + None + } } else { None }; - self.live_instances.insert(parent_hash, session); + let session = match maybe_removed { + None => return false, + Some(s) => s, + }; - maybe_new - } - - /// Remove validation session. - pub(crate) fn remove(&mut self, parent_hash: &Hash) { - if let Some(validation_session) = self.live_instances.remove(parent_hash) { + if let Some(ref key) = session.local_session_key { let key_still_used = self.live_instances.values() - .any(|c| c.local_session_key == validation_session.local_session_key); + .any(|c| c.1.local_session_key.as_ref() == Some(key)); if !key_still_used { - self.recent.remove(&validation_session.local_session_key) + self.recent.remove(key) } } + + true } /// Recent session keys as a slice. @@ -505,15 +597,290 @@ impl LiveValidationSessions { where F: FnOnce(Result<&BlockData, Option<&[ValidatorId]>>) -> U { match self.live_instances.get(parent_hash) { - Some(c) => c.with_block_data(c_hash, |res| f(res.map_err(Some))), + Some(c) => c.1.with_block_data(c_hash, |res| f(res.map_err(Some))), None => f(Err(None)) } } } +/// Receiver for block data. +pub struct BlockDataReceiver { + outer: Receiver>, + inner: Option> +} + +impl Future for BlockDataReceiver { + type Item = BlockData; + type Error = io::Error; + + fn poll(&mut self) -> Poll { + let map_err = |_| io::Error::new( + io::ErrorKind::Other, + "Sending end of channel hung up", + ); + + if let Some(ref mut inner) = self.inner { + return inner.poll().map_err(map_err); + } + match self.outer.poll().map_err(map_err)? { + Async::Ready(inner) => { + self.inner = Some(inner); + self.poll() + } + Async::NotReady => Ok(Async::NotReady), + } + } +} + +/// Wrapper around bookkeeping for tracking which parachains we're fetching incoming messages +/// for. +pub(crate) struct FetchIncoming { + exit_signal: ::exit_future::Signal, + parachains_fetching: HashMap, +} + +impl FetchIncoming { + fn new() -> Self { + FetchIncoming { + exit_signal: ::exit_future::signal_only(), + parachains_fetching: HashMap::new(), + } + } + + // registers intent to fetch incoming. returns an optional piece of work + // that, if some, is needed to be run to completion in order for the future to + // resolve. + // + // impl Future has a bug here where it wrongly assigns a `'static` bound to `M`. + fn fetch_with_work(&mut self, para_id: ParaId, make_work: M) + -> (IncomingReceiver, Option + Send>>) where + M: FnOnce() -> W, + W: Future> + Send + 'static, + { + let (tx, rx) = match self.parachains_fetching.entry(para_id) { + Entry::Occupied(entry) => return (entry.get().clone(), None), + Entry::Vacant(entry) => { + // has not been requested yet. + let (tx, rx) = oneshot::channel(); + let rx = IncomingReceiver { inner: rx.shared() }; + entry.insert(rx.clone()); + + (tx, rx) + } + }; + + let exit = self.exit_signal.make_exit(); + let work = make_work() + .map(move |incoming| if let Some(i) = incoming { let _ = tx.send(i); }) + .select2(exit) + .then(|_| Ok(())); + + (rx, Some(Box::new(work))) + } +} + +/// Can fetch data for a given validation session +pub struct SessionDataFetcher { + network: Arc, + api: Arc

, + fetch_incoming: Arc>, + exit: E, + task_executor: T, + knowledge: Arc>, + parent_hash: Hash, + message_validator: RegisteredMessageValidator, +} + +impl SessionDataFetcher { + /// Get the parent hash. + pub(crate) fn parent_hash(&self) -> Hash { + self.parent_hash.clone() + } + + /// Get the shared knowledge. + pub(crate) fn knowledge(&self) -> &Arc> { + &self.knowledge + } + + /// Get the exit future. + pub(crate) fn exit(&self) -> &E { + &self.exit + } + + /// Get the network service. + pub(crate) fn network(&self) -> &Arc { + &self.network + } + + /// Get the executor. + pub(crate) fn executor(&self) -> &T { + &self.task_executor + } + + /// Get the runtime API. + pub(crate) fn api(&self) -> &Arc

{ + &self.api + } +} + +impl Clone for SessionDataFetcher { + fn clone(&self) -> Self { + SessionDataFetcher { + network: self.network.clone(), + api: self.api.clone(), + task_executor: self.task_executor.clone(), + parent_hash: self.parent_hash.clone(), + fetch_incoming: self.fetch_incoming.clone(), + knowledge: self.knowledge.clone(), + exit: self.exit.clone(), + message_validator: self.message_validator.clone(), + } + } +} + +impl SessionDataFetcher where + P::Api: ParachainHost, + N: NetworkService, + T: Clone + Executor + Send + 'static, + E: Future + Clone + Send + 'static, +{ + /// Fetch block data for the given candidate receipt. + pub fn fetch_block_data(&self, candidate: &CandidateReceipt) -> BlockDataReceiver { + let parent_hash = self.parent_hash; + let candidate = candidate.clone(); + let (tx, rx) = ::futures::sync::oneshot::channel(); + self.network.with_spec(move |spec, ctx| { + let inner_rx = spec.fetch_block_data(ctx, &candidate, parent_hash); + let _ = tx.send(inner_rx); + }); + BlockDataReceiver { outer: rx, inner: None } + } + + /// Fetch incoming messages for a parachain. + pub fn fetch_incoming(&self, parachain: ParaId) -> IncomingReceiver { + use polkadot_primitives::BlockId; + + let (rx, work) = self.fetch_incoming.lock().fetch_with_work(parachain.clone(), move || { + let parent_hash: Hash = self.parent_hash(); + let topic = incoming_message_topic(parent_hash, parachain); + + let gossip_messages = self.network().gossip_messages_for(topic) + .map_err(|()| panic!("unbounded receivers do not throw errors; qed")) + .filter_map(|msg| IngressPair::decode(&mut msg.as_slice())); + + let canon_roots = self.api.runtime_api().ingress(&BlockId::hash(parent_hash), parachain) + .map_err(|e| format!("Cannot fetch ingress for parachain {:?} at {:?}: {:?}", + parachain, parent_hash, e) + ); + + canon_roots.into_future() + .and_then(move |ingress_roots| match ingress_roots { + None => Err(format!("No parachain {:?} registered at {}", parachain, parent_hash)), + Some(roots) => Ok(roots.into_iter().collect()) + }) + .and_then(move |ingress_roots| ComputeIngress { + inner: gossip_messages, + ingress_roots, + incoming: Vec::new(), + }) + .select2(self.exit.clone()) + .map(|res| match res { + future::Either::A((incoming, _)) => incoming, + future::Either::B(_) => None, + }) + }); + + if let Some(work) = work { + self.task_executor.spawn(work); + } + + rx + } +} + +impl Drop for SessionDataFetcher { + fn drop(&mut self) { + // a bit of a hack... + let network = self.network.clone(); + let fetch_incoming = self.fetch_incoming.clone(); + let message_validator = self.message_validator.clone(); + + let parent_hash = self.parent_hash(); + + self.network.with_spec(move |spec, _| { + if !spec.remove_validation_session(parent_hash) { return } + + let mut incoming_fetched = fetch_incoming.lock(); + for (para_id, _) in incoming_fetched.parachains_fetching.drain() { + network.drop_gossip(incoming_message_topic( + parent_hash, + para_id, + )); + } + + message_validator.remove_session(&parent_hash); + }); + } +} + +type IngressPair = (ParaId, Vec); + +// computes ingress from incoming stream of messages. +// returns `None` if the stream concludes too early. +#[must_use = "futures do nothing unless polled"] +struct ComputeIngress { + ingress_roots: HashMap, + incoming: Vec, + inner: S, +} + +impl Future for ComputeIngress where S: Stream { + type Item = Option; + type Error = S::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + loop { + if self.ingress_roots.is_empty() { + return Ok(Async::Ready( + Some(::std::mem::replace(&mut self.incoming, Vec::new())) + )) + } + + let (para_id, messages) = match try_ready!(self.inner.poll()) { + None => return Ok(Async::Ready(None)), + Some(next) => next, + }; + + match self.ingress_roots.entry(para_id) { + Entry::Vacant(_) => continue, + Entry::Occupied(occupied) => { + let canon_root = occupied.get().clone(); + let messages = messages.iter().map(|m| &m.0[..]); + if ::polkadot_validation::message_queue_root(messages) != canon_root { + continue; + } + + occupied.remove(); + } + } + + let pos = self.incoming.binary_search_by_key( + ¶_id, + |&(id, _)| id, + ) + .err() + .expect("incoming starts empty and only inserted when \ + para_id not inserted before; qed"); + + self.incoming.insert(pos, (para_id, messages)); + } + } +} + #[cfg(test)] mod tests { use super::*; + use futures::stream; use substrate_primitives::crypto::UncheckedInto; #[test] @@ -565,4 +932,107 @@ mod tests { _ => panic!("not new"), } } + + #[test] + fn compute_ingress_works() { + let actual_messages = [ + ( + ParaId::from(1), + vec![Message(vec![1, 3, 5, 6]), Message(vec![4, 4, 4, 4])], + ), + ( + ParaId::from(2), + vec![ + Message(vec![1, 3, 7, 9, 1, 2, 3, 4, 5, 6]), + Message(b"hello world".to_vec()), + ], + ), + ( + ParaId::from(5), + vec![Message(vec![1, 2, 3, 4, 5]), Message(vec![6, 9, 6, 9])], + ), + ]; + + let roots: HashMap<_, _> = actual_messages.iter() + .map(|&(para_id, ref messages)| ( + para_id, + ::polkadot_validation::message_queue_root(messages.iter().map(|m| &m.0)), + )) + .collect(); + + let inputs = [ + ( + ParaId::from(1), // wrong message. + vec![Message(vec![1, 1, 2, 2]), Message(vec![3, 3, 4, 4])], + ), + ( + ParaId::from(1), + vec![Message(vec![1, 3, 5, 6]), Message(vec![4, 4, 4, 4])], + ), + ( + ParaId::from(1), // duplicate + vec![Message(vec![1, 3, 5, 6]), Message(vec![4, 4, 4, 4])], + ), + + ( + ParaId::from(5), // out of order + vec![Message(vec![1, 2, 3, 4, 5]), Message(vec![6, 9, 6, 9])], + ), + ( + ParaId::from(1234), // un-routed parachain. + vec![Message(vec![9, 9, 9, 9])], + ), + ( + ParaId::from(2), + vec![ + Message(vec![1, 3, 7, 9, 1, 2, 3, 4, 5, 6]), + Message(b"hello world".to_vec()), + ], + ), + ]; + let ingress = ComputeIngress { + ingress_roots: roots, + incoming: Vec::new(), + inner: stream::iter_ok::<_, ()>(inputs.iter().cloned()), + }; + + assert_eq!(ingress.wait().unwrap().unwrap(), actual_messages); + } + + #[test] + fn add_new_sessions_works() { + let mut live_sessions = LiveValidationSessions::new(); + let key_a: ValidatorId = [0; 32].unchecked_into(); + let key_b: ValidatorId = [1; 32].unchecked_into(); + let parent_hash = [0xff; 32].into(); + + let (session, new_key) = live_sessions.new_validation_session(SessionParams { + parent_hash, + local_session_key: None, + authorities: Vec::new(), + }); + + let knowledge = session.knowledge().clone(); + + assert!(new_key.is_none()); + + let (session, new_key) = live_sessions.new_validation_session(SessionParams { + parent_hash, + local_session_key: Some(key_a.clone()), + authorities: Vec::new(), + }); + + // check that knowledge points to the same place. + assert_eq!(&**session.knowledge() as *const _, &*knowledge as *const _); + assert_eq!(new_key, Some(key_a.clone())); + + let (session, new_key) = live_sessions.new_validation_session(SessionParams { + parent_hash, + local_session_key: Some(key_b.clone()), + authorities: Vec::new(), + }); + + assert_eq!(&**session.knowledge() as *const _, &*knowledge as *const _); + assert!(new_key.is_none()); + } } diff --git a/polkadot/service/src/lib.rs b/polkadot/service/src/lib.rs index b71b460a06..1c76e5b180 100644 --- a/polkadot/service/src/lib.rs +++ b/polkadot/service/src/lib.rs @@ -212,6 +212,13 @@ construct_service_factory! { None => return Ok(service), }; + if service.config.custom.collating_for.is_some() { + info!("The node cannot start as an authority because it is also configured\ + to run as a collator."); + + return Ok(service); + } + let client = service.client(); let known_oracle = client.clone(); diff --git a/polkadot/src/main.rs b/polkadot/src/main.rs index d86478809d..bd6b6bd160 100644 --- a/polkadot/src/main.rs +++ b/polkadot/src/main.rs @@ -25,7 +25,7 @@ extern crate futures; #[macro_use] extern crate error_chain; -use cli::{PolkadotService, VersionInfo}; +use cli::{PolkadotService, VersionInfo, TaskExecutor}; use futures::sync::oneshot; use futures::{future, Future}; @@ -52,7 +52,7 @@ impl cli::IntoExit for Worker { impl cli::Worker for Worker { type Work = ::Exit; - fn work(self, _service: &S) -> Self::Work { + fn work(self, _service: &S, _: TaskExecutor) -> Self::Work { use cli::IntoExit; self.into_exit() } diff --git a/polkadot/test-parachains/adder/collator/src/main.rs b/polkadot/test-parachains/adder/collator/src/main.rs index b704bb4f3f..dcef6dc7e0 100644 --- a/polkadot/test-parachains/adder/collator/src/main.rs +++ b/polkadot/test-parachains/adder/collator/src/main.rs @@ -33,7 +33,7 @@ use std::sync::Arc; use adder::{HeadData as AdderHead, BlockData as AdderBody}; use substrate_primitives::{Pair as PairT, ed25519::Pair}; use parachain::codec::{Encode, Decode}; -use primitives::parachain::{HeadData, BlockData, Id as ParaId, Message}; +use primitives::parachain::{HeadData, BlockData, Id as ParaId, Message, Extrinsic}; use collator::{InvalidHead, ParachainContext, VersionInfo}; use parking_lot::Mutex; @@ -59,7 +59,7 @@ impl ParachainContext for AdderContext { &self, last_head: HeadData, ingress: I, - ) -> Result<(BlockData, HeadData), InvalidHead> + ) -> Result<(BlockData, HeadData, Extrinsic), InvalidHead> { let adder_head = AdderHead::decode(&mut &last_head.0[..]) .ok_or(InvalidHead)?; @@ -93,7 +93,7 @@ impl ParachainContext for AdderContext { next_head.number, next_body.state.overflowing_add(next_body.add).0); db.insert(next_head.clone(), next_body); - Ok((encoded_body, encoded_head)) + Ok((encoded_body, encoded_head, Extrinsic { outgoing_messages: Vec::new() })) } } diff --git a/polkadot/validation/src/attestation_service.rs b/polkadot/validation/src/attestation_service.rs index 74b67ed778..0df832921e 100644 --- a/polkadot/validation/src/attestation_service.rs +++ b/polkadot/validation/src/attestation_service.rs @@ -118,6 +118,7 @@ pub(crate) fn start( N: Network + Send + Sync + 'static, N::TableRouter: Send + 'static, <::FetchIncoming as IntoFuture>::Future: Send + 'static, + ::Future: Send + 'static, { const TIMER_DELAY: Duration = Duration::from_secs(5); const TIMER_INTERVAL: Duration = Duration::from_secs(30); diff --git a/polkadot/validation/src/collation.rs b/polkadot/validation/src/collation.rs index 7195db6d00..d9a84c0f46 100644 --- a/polkadot/validation/src/collation.rs +++ b/polkadot/validation/src/collation.rs @@ -170,7 +170,7 @@ pub fn message_queue_root>(messages: I) -> Hash } /// Compute the set of egress roots for all given outgoing messages. -pub fn egress_roots(mut outgoing: Vec) -> Vec<(ParaId, Hash)> { +pub fn egress_roots(outgoing: &mut [OutgoingMessage]) -> Vec<(ParaId, Hash)> { // stable sort messages by parachain ID. outgoing.sort_by_key(|msg| ParaId::from(msg.target)); @@ -357,7 +357,7 @@ mod tests { &[(1.into(), root_1), (2.into(), root_2), (3.into(), root_3)], ).is_ok()); - let egress_roots = egress_roots(messages.clone()); + let egress_roots = egress_roots(&mut messages.clone()[..]); assert!(check_extrinsic( messages.clone(), diff --git a/polkadot/validation/src/lib.rs b/polkadot/validation/src/lib.rs index 3b00a71626..a94c5310e3 100644 --- a/polkadot/validation/src/lib.rs +++ b/polkadot/validation/src/lib.rs @@ -167,10 +167,17 @@ pub trait TableRouter: Clone { /// A long-lived network which can create parachain statement and BFT message routing processes on demand. pub trait Network { + /// The error type of asynchronously building the table router. + type Error: std::fmt::Debug; + /// The table router type. This should handle importing of any statements, /// routing statements to peers, and driving completion of any `StatementProducers`. type TableRouter: TableRouter; + /// The future used for asynchronously building the table router. + /// This should not fail. + type BuildTableRouter: IntoFuture; + /// Instantiate a table router using the given shared table. /// Also pass through any outgoing messages to be broadcast to peers. fn communication_for( @@ -178,7 +185,7 @@ pub trait Network { table: Arc, outgoing: Outgoing, authorities: &[SessionKey], - ) -> Self::TableRouter; + ) -> Self::BuildTableRouter; } /// Information about a specific group. @@ -284,6 +291,7 @@ impl ParachainValidation where ::Future: Send + 'static, N::TableRouter: Send + 'static, <::FetchIncoming as IntoFuture>::Future: Send + 'static, + ::Future: Send + 'static, { /// Get an attestation table for given parent hash. /// @@ -382,64 +390,77 @@ impl ParachainValidation where &self, relay_parent: Hash, validation_para: ParaId, - router: N::TableRouter, + build_router: N::BuildTableRouter ) -> exit_future::Signal { use extrinsic_store::Data; let (signal, exit) = exit_future::signal(); - - let fetch_incoming = router.fetch_incoming(validation_para) - .into_future() - .map_err(|e| format!("{:?}", e)); - - // fetch incoming messages to our parachain from network and - // then fetch a local collation. let (collators, client) = (self.collators.clone(), self.client.clone()); - let collation_work = fetch_incoming - .map_err(|e| String::clone(&e)) - .and_then(move |incoming| { - CollationFetch::new( - validation_para, - relay_parent, - collators, - client, - incoming, - ).map_err(|e| format!("{:?}", e)) - }); - let extrinsic_store = self.extrinsic_store.clone(); - let handled_work = collation_work.then(move |result| match result { - Ok((collation, extrinsic)) => { - let res = extrinsic_store.make_available(Data { - relay_parent, - parachain_id: collation.receipt.parachain_index, - candidate_hash: collation.receipt.hash(), - block_data: collation.block_data.clone(), - extrinsic: Some(extrinsic.clone()), + + let with_router = move |router: N::TableRouter| { + let fetch_incoming = router.fetch_incoming(validation_para) + .into_future() + .map_err(|e| format!("{:?}", e)); + + // fetch incoming messages to our parachain from network and + // then fetch a local collation. + let collation_work = fetch_incoming + .map_err(|e| String::clone(&e)) + .and_then(move |incoming| { + CollationFetch::new( + validation_para, + relay_parent, + collators, + client, + incoming, + ).map_err(|e| format!("{:?}", e)) }); - match res { - Ok(()) => { - // TODO: https://github.com/paritytech/polkadot/issues/51 - // Erasure-code and provide merkle branches. - router.local_candidate(collation.receipt, collation.block_data, extrinsic) + collation_work.then(move |result| match result { + Ok((collation, extrinsic)) => { + let res = extrinsic_store.make_available(Data { + relay_parent, + parachain_id: collation.receipt.parachain_index, + candidate_hash: collation.receipt.hash(), + block_data: collation.block_data.clone(), + extrinsic: Some(extrinsic.clone()), + }); + + match res { + Ok(()) => { + // TODO: https://github.com/paritytech/polkadot/issues/51 + // Erasure-code and provide merkle branches. + router.local_candidate( + collation.receipt, + collation.block_data, + extrinsic, + ) + } + Err(e) => warn!( + target: "validation", + "Failed to make collation data available: {:?}", + e, + ), } - Err(e) => warn!( - target: "validation", - "Failed to make collation data available: {:?}", - e, - ), + + Ok(()) } + Err(e) => { + warn!(target: "validation", "Failed to collate candidate: {}", e); + Ok(()) + } + }) + }; - Ok(()) - } - Err(e) => { - warn!(target: "validation", "Failed to collate candidate: {}", e); - Ok(()) - } - }); - - let cancellable_work = handled_work.select(exit).then(|_| Ok(())); + let cancellable_work = build_router + .into_future() + .map_err(|e| { + warn!(target: "validation" , "Failed to build table router: {:?}", e); + }) + .and_then(with_router) + .select(exit) + .then(|_| Ok(())); // spawn onto thread pool. self.handle.spawn(cancellable_work); @@ -472,6 +493,7 @@ impl ProposerFactory where N: Network + Send + Sync + 'static, N::TableRouter: Send + 'static, <::FetchIncoming as IntoFuture>::Future: Send + 'static, + ::Future: Send + 'static, TxApi: PoolChainApi, { /// Create a new proposer factory. @@ -521,6 +543,7 @@ impl consensus::Environment for ProposerFactory::Future: Send + 'static, N::TableRouter: Send + 'static, <::FetchIncoming as IntoFuture>::Future: Send + 'static, + ::Future: Send + 'static, { type Proposer = Proposer; type Error = Error;