From 5a09802e57e7051aeb10c210eb9b24e6e28555b2 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 18 Jul 2018 15:04:26 +0200 Subject: [PATCH] Collator-side of collator protocol (#351) * skeleton of collators object * awaiting and handling collations. rename `collators` to CollationPool * add some tests * add tests * implement Collators trait for ConsensusNetwork * plug collators into main polkadot-network * ignore collator role message * add a couple more tests * garbage collection for collations * extract session-key tracking from consensus * add local_collations.rs * finish polish of local_collations * integrate local_collations into network layer * introduce API for adding local collations * mostly finish collator implementation pending service fix * Specialized network() * push collations to the network * grumbles * substrate-service has custom configuration * initialize network in collator mode as necessary --- polkadot/cli/src/cli.yml | 4 - polkadot/cli/src/lib.rs | 24 +-- polkadot/collator/Cargo.toml | 1 + polkadot/collator/src/lib.rs | 129 ++++++++++---- polkadot/network/src/consensus.rs | 1 - polkadot/network/src/lib.rs | 216 ++++++++++++++--------- polkadot/network/src/local_collations.rs | 199 +++++++++++++++++++++ polkadot/network/src/tests.rs | 20 +-- polkadot/service/src/lib.rs | 32 +++- 9 files changed, 476 insertions(+), 150 deletions(-) create mode 100644 polkadot/network/src/local_collations.rs diff --git a/polkadot/cli/src/cli.yml b/polkadot/cli/src/cli.yml index 1d4e876d80..be0d0e501b 100644 --- a/polkadot/cli/src/cli.yml +++ b/polkadot/cli/src/cli.yml @@ -29,10 +29,6 @@ args: value_name: KEY help: Specify node secret key (64-character hex string) takes_value: true - - collator: - long: collator - help: Enable collator mode - takes_value: false - validator: long: validator help: Enable validator mode diff --git a/polkadot/cli/src/lib.rs b/polkadot/cli/src/lib.rs index 352f513e9e..9ce9da8ea0 100644 --- a/polkadot/cli/src/lib.rs +++ b/polkadot/cli/src/lib.rs @@ -71,7 +71,7 @@ pub use client::error::Error as ClientError; pub use client::backend::Backend as ClientBackend; pub use state_machine::Backend as StateMachineBackend; pub use polkadot_primitives::Block as PolkadotBlock; -pub use service::{Components as ServiceComponents, Service}; +pub use service::{Components as ServiceComponents, Service, CustomConfiguration}; use std::io::{self, Write, Read, stdin, stdout}; use std::fs::File; @@ -134,11 +134,16 @@ fn base_path(matches: &clap::ArgMatches) -> PathBuf { pub trait Worker { /// A future that resolves when the work is done or the node should exit. /// This will be run on a tokio runtime. - type Work: Future; + type Work: Future + Send + 'static; /// An exit scheduled for the future. type Exit: Future + Send + 'static; + /// Return configuration for the polkadot node. + // TODO: make this the full configuration, so embedded nodes don't need + // string CLI args + fn configuration(&self) -> CustomConfiguration { Default::default() } + /// Don't work, but schedule an exit. fn exit_only(self) -> Self::Exit; @@ -217,13 +222,7 @@ pub fn run(args: I, worker: W) -> error::Result<()> where }; let role = - if matches.is_present("collator") { - info!("Starting collator"); - // TODO [rob]: collation node implementation - // This isn't a thing. Different parachains will have their own collator executables and - // maybe link to libpolkadot to get a light-client. - service::Roles::LIGHT - } else if matches.is_present("light") { + if matches.is_present("light") { info!("Starting (light)"); config.execution_strategy = service::ExecutionStrategy::NativeWhenPossible; service::Roles::LIGHT @@ -262,9 +261,10 @@ pub fn run(args: I, worker: W) -> error::Result<()> where config.network.net_config_path = config.network.config_path.clone(); let port = match matches.value_of("port") { - Some(port) => port.parse().expect("Invalid p2p port value specified."), + Some(port) => port.parse().map_err(|_| "Invalid p2p port value specified.")?, None => 30333, }; + config.network.listen_address = Some(SocketAddr::new("0.0.0.0".parse().unwrap(), port)); config.network.public_address = None; config.network.client_version = format!("parity-polkadot/{}", crate_version!()); @@ -275,6 +275,8 @@ pub fn run(args: I, worker: W) -> error::Result<()> where }; } + config.custom = worker.configuration(); + config.keys = matches.values_of("key").unwrap_or_default().map(str::to_owned).collect(); if matches.is_present("dev") { config.keys.push("Alice".into()); @@ -494,7 +496,7 @@ fn run_until_exit( ) }; - let _ = worker.work(&service).wait(); + let _ = runtime.block_on(worker.work(&service)); exit_send.fire(); Ok(()) } diff --git a/polkadot/collator/Cargo.toml b/polkadot/collator/Cargo.toml index c91b6a5d24..122261e3f6 100644 --- a/polkadot/collator/Cargo.toml +++ b/polkadot/collator/Cargo.toml @@ -15,3 +15,4 @@ polkadot-primitives = { path = "../primitives", version = "0.1" } polkadot-cli = { path = "../cli" } log = "0.4" ed25519 = { path = "../../substrate/ed25519" } +tokio = "0.1.7" diff --git a/polkadot/collator/src/lib.rs b/polkadot/collator/src/lib.rs index 7da24c51b6..7e4b36cc48 100644 --- a/polkadot/collator/src/lib.rs +++ b/polkadot/collator/src/lib.rs @@ -49,6 +49,7 @@ extern crate substrate_client as client; extern crate substrate_codec as codec; extern crate substrate_primitives as primitives; extern crate ed25519; +extern crate tokio; extern crate polkadot_api; extern crate polkadot_cli; @@ -58,16 +59,20 @@ extern crate polkadot_primitives; #[macro_use] extern crate log; -use std::collections::{BTreeSet, BTreeMap}; +use std::collections::{BTreeSet, BTreeMap, HashSet}; use std::sync::Arc; +use std::time::{Duration, Instant}; use futures::{future, stream, Stream, Future, IntoFuture}; use client::BlockchainEvents; use polkadot_api::PolkadotApi; -use polkadot_primitives::BlockId; -use polkadot_primitives::parachain::{self, BlockData, HeadData, ConsolidatedIngress, Collation, Message, Id as ParaId}; -use polkadot_cli::{ServiceComponents, Service}; +use polkadot_primitives::{AccountId, BlockId, SessionKey}; +use polkadot_primitives::parachain::{self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId}; +use polkadot_cli::{ServiceComponents, Service, CustomConfiguration}; use polkadot_cli::Worker; +use tokio::timer::Deadline; + +const COLLATION_TIMEOUT: Duration = Duration::from_secs(30); /// Parachain context needed for collation. /// @@ -99,6 +104,11 @@ pub trait RelayChainContext { fn unrouted_egress(&self, id: ParaId) -> Self::FutureEgress; } +fn key_to_account_id(key: &ed25519::Pair) -> AccountId { + let pubkey_bytes: [u8; 32] = key.public().into(); + pubkey_bytes.into() +} + /// Collate the necessary ingress queue using the given context. pub fn collate_ingress<'a, R>(relay_context: R) -> impl Future + 'a @@ -159,11 +169,10 @@ pub fn collate<'a, R, P>( let block_data_hash = block_data.hash(); let signature = key.sign(&block_data_hash.0[..]).into(); - let pubkey_bytes: [u8; 32] = key.public().into(); let receipt = parachain::CandidateReceipt { parachain_index: local_id, - collator: pubkey_bytes.into(), + collator: key_to_account_id(&*key), signature, head_data, balance_uploads: Vec::new(), @@ -183,7 +192,7 @@ pub fn collate<'a, R, P>( struct ApiContext; impl RelayChainContext for ApiContext { - type Error = (); + type Error = ::polkadot_api::Error; type FutureEgress = Result>, Self::Error>; fn routing_parachains(&self) -> BTreeSet { @@ -203,12 +212,21 @@ struct CollationNode { } impl Worker for CollationNode where - P: ParachainContext + 'static, + P: ParachainContext + Send + 'static, E: Future + Send + 'static { - type Work = Box>; + type Work = Box + Send>; type Exit = E; + fn configuration(&self) -> CustomConfiguration { + let mut config = CustomConfiguration::default(); + config.collating_for = Some(( + key_to_account_id(&*self.key), + self.para_id.clone(), + )); + config + } + fn exit_only(self) -> Self::Exit { self.exit } @@ -217,35 +235,66 @@ impl Worker for CollationNode where let CollationNode { parachain_context, exit, para_id, key } = self; let client = service.client(); let api = service.api(); + let network = service.network(); let work = client.import_notification_stream() - .and_then(move |notification| { - let id = BlockId::hash(notification.hash); - - match api.parachain_head(&id, para_id) { - Ok(Some(last_head)) => { - let collation_work = collate( - para_id, - HeadData(last_head), - ApiContext, - parachain_context.clone(), - key.clone(), - ).map(Some); - - future::Either::A(collation_work) - } - Ok(None) => { - info!("Parachain {:?} appears to be inactive. Cannot collate.", id); - future::Either::B(future::ok(None)) - } - Err(e) => { - warn!("Could not collate for parachain {:?}: {:?}", id, e); - future::Either::B(future::ok(None)) // returning error would shut down the collation node + .for_each(move |notification| { + macro_rules! try_fr { + ($e:expr) => { + match $e { + Ok(x) => x, + Err(e) => return future::Either::A(future::err(e)), + } } } - }) - .for_each(|_collation: Option| { - // TODO: import into network. + + let relay_parent = notification.hash; + let id = BlockId::hash(relay_parent); + + let network = network.clone(); + let api = api.clone(); + let key = key.clone(); + let parachain_context = parachain_context.clone(); + + let work = future::lazy(move || { + let last_head = match try_fr!(api.parachain_head(&id, para_id)) { + Some(last_head) => last_head, + None => return future::Either::A(future::ok(())), + }; + + let targets = compute_targets( + para_id, + try_fr!(api.session_keys(&id)).as_slice(), + try_fr!(api.duty_roster(&id)), + ); + + let collation_work = collate( + para_id, + HeadData(last_head), + ApiContext, + parachain_context, + key, + ).map(move |collation| { + network.with_spec(|spec, ctx| spec.add_local_collation( + ctx, + relay_parent, + targets, + collation, + )); + }); + + future::Either::B(collation_work) + }); + let deadlined = Deadline::new(work, Instant::now() + COLLATION_TIMEOUT); + let silenced = deadlined.then(|res| match res { + Ok(()) => Ok(()), + Err(e) => { + warn!("Collation failure: {}", e); + Ok(()) + } + }); + + tokio::spawn(silenced); Ok(()) }); @@ -254,6 +303,16 @@ impl Worker for CollationNode where } } +fn compute_targets(para_id: ParaId, session_keys: &[SessionKey], roster: DutyRoster) -> HashSet { + use polkadot_primitives::parachain::Chain; + + roster.validator_duty.iter().enumerate() + .filter(|&(_, c)| c == &Chain::Parachain(para_id)) + .filter_map(|(i, _)| session_keys.get(i)) + .cloned() + .collect() +} + /// Run a collator node with the given `RelayChainContext` and `ParachainContext` and /// arguments to the underlying polkadot node. /// @@ -266,7 +325,7 @@ pub fn run_collator( key: Arc, args: Vec<::std::ffi::OsString> ) -> polkadot_cli::error::Result<()> where - P: ParachainContext + 'static, + P: ParachainContext + Send + 'static, E: IntoFuture, E::Future: Send + 'static, { diff --git a/polkadot/network/src/consensus.rs b/polkadot/network/src/consensus.rs index 6b1b141eca..4f29899100 100644 --- a/polkadot/network/src/consensus.rs +++ b/polkadot/network/src/consensus.rs @@ -285,7 +285,6 @@ impl Network for ConsensusNetwork

, + collating_for: Option<(AccountId, ParaId)>, + validator_key: Option, + claimed_validator: bool, } #[derive(Default)] @@ -164,7 +166,6 @@ impl Knowledge { struct CurrentConsensus { knowledge: Arc>, parent_hash: Hash, - session_keys: HashMap, local_session_key: SessionKey, } @@ -174,12 +175,6 @@ impl CurrentConsensus { self.knowledge.lock().candidates.get(hash) .and_then(|entry| entry.block_data.clone()) } - - fn peer_disconnected(&mut self, peer: &PeerInfo) { - if let Some(key) = peer.session_keys.get(&self.parent_hash) { - self.session_keys.remove(key); - } - } } /// Polkadot-specific messages. @@ -187,9 +182,9 @@ impl CurrentConsensus { pub enum Message { /// signed statement and localized parent hash. Statement(Hash, SignedStatement), - /// Tell the peer your session key for the current block. - // TODO: do this with a random challenge protocol - SessionKey(Hash, SessionKey), + /// As a validator, tell the peer your current session key. + // TODO: do this with a cryptographic proof of some kind + SessionKey(SessionKey), /// Requesting parachain block data by candidate hash. RequestBlockData(RequestId, Hash), /// Provide block data by candidate hash or nothing if unknown. @@ -208,9 +203,8 @@ impl Encode for Message { dest.push(h); dest.push(s); } - Message::SessionKey(ref h, ref k) => { + Message::SessionKey(ref k) => { dest.push_byte(1); - dest.push(h); dest.push(k); } Message::RequestBlockData(ref id, ref d) => { @@ -240,7 +234,7 @@ impl Decode for Message { fn decode(input: &mut I) -> Option { match input.read_byte()? { 0 => Some(Message::Statement(Decode::decode(input)?, Decode::decode(input)?)), - 1 => Some(Message::SessionKey(Decode::decode(input)?, Decode::decode(input)?)), + 1 => Some(Message::SessionKey(Decode::decode(input)?)), 2 => Some(Message::RequestBlockData(Decode::decode(input)?, Decode::decode(input)?)), 3 => Some(Message::BlockData(Decode::decode(input)?, Decode::decode(input)?)), 4 => Some(Message::CollatorRole(Decode::decode(input)?)), @@ -259,27 +253,27 @@ fn send_polkadot_message(ctx: &mut Context, to: PeerId, message: Message) /// Polkadot protocol attachment for substrate. pub struct PolkadotProtocol { peers: HashMap, + collating_for: Option<(AccountId, ParaId)>, consensus_gossip: ConsensusGossip, collators: CollatorPool, + validators: HashMap, + local_collations: LocalCollations, live_consensus: Option, in_flight: HashMap<(RequestId, PeerId), BlockDataRequest>, pending: Vec, next_req_id: u64, } -impl Default for PolkadotProtocol { - fn default() -> Self { - Self::new() - } -} - impl PolkadotProtocol { /// Instantiate a polkadot protocol handler. - pub fn new() -> Self { + pub fn new(collating_for: Option<(AccountId, ParaId)>) -> Self { PolkadotProtocol { peers: HashMap::new(), consensus_gossip: ConsensusGossip::new(), collators: CollatorPool::new(), + collating_for, + validators: HashMap::new(), + local_collations: LocalCollations::new(), live_consensus: None, in_flight: HashMap::new(), pending: Vec::new(), @@ -311,31 +305,23 @@ impl PolkadotProtocol { } /// Note new consensus session. - fn new_consensus(&mut self, ctx: &mut Context, mut consensus: CurrentConsensus) { - let parent_hash = consensus.parent_hash; - let old_parent = self.live_consensus.as_ref().map(|c| c.parent_hash); + fn new_consensus(&mut self, ctx: &mut Context, consensus: CurrentConsensus) { + let old_data = self.live_consensus.as_ref().map(|c| (c.parent_hash, c.local_session_key)); - // TODO: optimize for when session key changes and only send to collators who are relevant in next few blocks. - for (id, info) in self.peers.iter_mut() - .filter(|&(_, ref info)| info.validator || info.status.collating_for.is_some()) - { - send_polkadot_message( - ctx, - *id, - Message::SessionKey(parent_hash, consensus.local_session_key) - ); - - if let Some(key) = info.session_keys.get(&parent_hash) { - consensus.session_keys.insert(*key, *id); - } - - if let Some(ref old_parent) = old_parent { - info.session_keys.remove(old_parent); + if Some(&consensus.local_session_key) != old_data.as_ref().map(|&(_, ref key)| key) { + for (id, _) in self.peers.iter() + .filter(|&(_, ref info)| info.claimed_validator || info.collating_for.is_some()) + { + send_polkadot_message( + ctx, + *id, + Message::SessionKey(consensus.local_session_key) + ); } } self.live_consensus = Some(consensus); - self.consensus_gossip.collect_garbage(old_parent.as_ref()); + self.consensus_gossip.collect_garbage(old_data.as_ref().map(|&(ref hash, _)| hash)); } fn dispatch_pending_requests(&mut self, ctx: &mut Context) { @@ -359,8 +345,9 @@ impl PolkadotProtocol { continue; } + let validator_keys = &mut self.validators; let next_peer = entry.knows_block_data.iter() - .filter_map(|x| consensus.session_keys.get(x).map(|id| (*x, *id))) + .filter_map(|x| validator_keys.get(x).map(|id| (*x, *id))) .find(|&(ref key, _)| pending.attempted_peers.insert(*key)) .map(|(_, id)| id); @@ -392,29 +379,7 @@ impl PolkadotProtocol { match msg { Message::Statement(parent_hash, _statement) => self.consensus_gossip.on_chain_specific(ctx, peer_id, raw, parent_hash), - Message::SessionKey(parent_hash, key) => { - { - let info = match self.peers.get_mut(&peer_id) { - Some(peer) => peer, - None => return, - }; - - if !info.validator { - ctx.disable_peer(peer_id, "Unknown Polkadot-protocol reason"); - return; - } - - match self.live_consensus { - Some(ref mut consensus) if consensus.parent_hash == parent_hash => { - consensus.session_keys.insert(key, peer_id); - } - _ => {} - } - - info.session_keys.insert(parent_hash, key); - } - self.dispatch_pending_requests(ctx); - } + Message::SessionKey(key) => self.on_session_key(ctx, peer_id, key), Message::RequestBlockData(req_id, hash) => { let block_data = self.live_consensus.as_ref() .and_then(|c| c.block_data(&hash)); @@ -423,10 +388,43 @@ impl PolkadotProtocol { } Message::BlockData(req_id, data) => self.on_block_data(ctx, peer_id, req_id, data), Message::Collation(relay_parent, collation) => self.on_collation(ctx, peer_id, relay_parent, collation), - Message::CollatorRole(_) => {}, + Message::CollatorRole(role) => self.on_new_role(ctx, peer_id, role), } } + fn on_session_key(&mut self, ctx: &mut Context, peer_id: PeerId, key: SessionKey) { + { + let info = match self.peers.get_mut(&peer_id) { + Some(peer) => peer, + None => { + trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", peer_id); + return + } + }; + + if !info.claimed_validator { + ctx.disable_peer(peer_id, "Session key broadcasted without setting authority role"); + return; + } + + if let Some(old_key) = ::std::mem::replace(&mut info.validator_key, Some(key)) { + self.validators.remove(&old_key); + + for (relay_parent, collation) in self.local_collations.fresh_key(&old_key, &key) { + send_polkadot_message( + ctx, + peer_id, + Message::Collation(relay_parent, collation), + ) + } + + } + self.validators.insert(key, peer_id); + } + + self.dispatch_pending_requests(ctx); + } + fn on_block_data(&mut self, ctx: &mut Context, peer_id: PeerId, req_id: RequestId, data: Option) { match self.in_flight.remove(&(req_id, peer_id)) { Some(req) => { @@ -440,14 +438,39 @@ impl PolkadotProtocol { self.pending.push(req); self.dispatch_pending_requests(ctx); } - None => ctx.disable_peer(peer_id, "Unknown Polkadot-protocol reason"), + None => ctx.disable_peer(peer_id, "Unexpected block data response"), + } + } + + // when a validator sends us (a collator) a new role. + fn on_new_role(&mut self, ctx: &mut Context, peer_id: PeerId, role: Role) { + let info = match self.peers.get(&peer_id) { + Some(peer) => peer, + None => { + trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", peer_id); + return + } + }; + + match info.validator_key { + None => ctx.disable_peer( + peer_id, + "Sent collator role without registering first as validator", + ), + Some(key) => for (relay_parent, collation) in self.local_collations.note_validator_role(key, role) { + send_polkadot_message( + ctx, + peer_id, + Message::Collation(relay_parent, collation), + ) + }, } } } impl Specialization for PolkadotProtocol { fn status(&self) -> Vec { - Status { collating_for: None }.encode() + Status { collating_for: self.collating_for.clone() }.encode() } fn on_connect(&mut self, ctx: &mut Context, peer_id: PeerId, status: FullStatus) { @@ -460,7 +483,7 @@ impl Specialization for PolkadotProtocol { if let Some((ref acc_id, ref para_id)) = local_status.collating_for { if self.collator_peer_id(acc_id.clone()).is_some() { - ctx.disable_peer(peer_id, "Unknown Polkadot-protocol reason"); + ctx.disconnect_peer(peer_id); return } @@ -476,9 +499,9 @@ impl Specialization for PolkadotProtocol { let send_key = validator || local_status.collating_for.is_some(); self.peers.insert(peer_id, PeerInfo { - status: local_status, - session_keys: Default::default(), - validator, + collating_for: local_status.collating_for, + validator_key: None, + claimed_validator: validator, }); self.consensus_gossip.new_peer(ctx, peer_id, status.roles); @@ -486,7 +509,7 @@ impl Specialization for PolkadotProtocol { send_polkadot_message( ctx, peer_id, - Message::SessionKey(consensus.parent_hash, consensus.local_session_key) + Message::SessionKey(consensus.local_session_key) ); } @@ -495,7 +518,7 @@ impl Specialization for PolkadotProtocol { fn on_disconnect(&mut self, ctx: &mut Context, peer_id: PeerId) { if let Some(info) = self.peers.remove(&peer_id) { - if let Some((acc_id, _)) = info.status.collating_for { + if let Some((acc_id, _)) = info.collating_for { let new_primary = self.collators.on_disconnect(acc_id) .and_then(|new_primary| self.collator_peer_id(new_primary)); @@ -508,8 +531,9 @@ impl Specialization for PolkadotProtocol { } } - if let (true, &mut Some(ref mut consensus)) = (info.validator, &mut self.live_consensus) { - consensus.peer_disconnected(&info); + if let Some(validator_key) = info.validator_key { + self.validators.remove(&validator_key); + self.local_collations.on_disconnect(&validator_key); } { @@ -562,6 +586,7 @@ impl Specialization for PolkadotProtocol { fn maintain_peers(&mut self, ctx: &mut Context) { self.consensus_gossip.collect_garbage(None); self.collators.collect_garbage(None); + self.local_collations.collect_garbage(None); self.dispatch_pending_requests(ctx); for collator_action in self.collators.maintain_peers() { @@ -578,8 +603,9 @@ impl Specialization for PolkadotProtocol { } } - fn on_block_imported(&mut self, _ctx: &mut Context, hash: Hash, _header: &Header) { + fn on_block_imported(&mut self, _ctx: &mut Context, hash: Hash, header: &Header) { self.collators.collect_garbage(Some(&hash)); + self.local_collations.collect_garbage(Some(&header.parent_hash)); } } @@ -591,14 +617,14 @@ impl PolkadotProtocol { match self.peers.get(&from) { None => ctx.disconnect_peer(from), - Some(peer_info) => match peer_info.status.collating_for { - None => ctx.disable_peer(from, "Unknown Polkadot-protocol reason"), + Some(peer_info) => match peer_info.collating_for { + None => ctx.disable_peer(from, "Sent collation without registering collator intent"), Some((ref acc_id, ref para_id)) => { let structurally_valid = para_id == &collation_para && acc_id == &collated_acc; if structurally_valid && collation.receipt.check_signature().is_ok() { self.collators.on_collation(acc_id.clone(), relay_parent, collation) } else { - ctx.disable_peer(from, "Unknown Polkadot-protocol reason") + ctx.disable_peer(from, "Sent malformed collation") }; } }, @@ -614,7 +640,6 @@ impl PolkadotProtocol { // get connected peer with given account ID for collation. fn collator_peer_id(&self, account_id: AccountId) -> Option { let check_info = |info: &PeerInfo| info - .status .collating_for .as_ref() .map_or(false, |&(ref acc_id, _)| acc_id == &account_id); @@ -629,7 +654,30 @@ impl PolkadotProtocol { // disconnect a collator by account-id. fn disconnect_bad_collator(&self, ctx: &mut Context, account_id: AccountId) { if let Some(peer_id) = self.collator_peer_id(account_id) { - ctx.disable_peer(peer_id, "Unknown Polkadot-protocol reason") + ctx.disable_peer(peer_id, "Consensus layer determined the given collator misbehaved") + } + } +} + +impl PolkadotProtocol { + /// Add a local collation and broadcast it to the necessary peers. + pub fn add_local_collation( + &mut self, + ctx: &mut Context, + relay_parent: Hash, + targets: HashSet, + collation: Collation, + ) { + for (primary, cloned_collation) in self.local_collations.add_collation(relay_parent, targets, collation.clone()) { + match self.validators.get(&primary) { + Some(peer_id) => send_polkadot_message( + ctx, + *peer_id, + Message::Collation(relay_parent, cloned_collation), + ), + None => + warn!(target: "polkadot_network", "Encountered tracked but disconnected validator {:?}", primary), + } } } } diff --git a/polkadot/network/src/local_collations.rs b/polkadot/network/src/local_collations.rs new file mode 100644 index 0000000000..2902ed5f0e --- /dev/null +++ b/polkadot/network/src/local_collations.rs @@ -0,0 +1,199 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Local collations to be circulated to validators. +//! +//! Collations are attempted to be repropagated when a new validator connects, +//! a validator changes his session key, or when they are generated. + +use polkadot_primitives::{Hash, SessionKey}; + +use collator_pool::Role; + +use std::collections::{HashMap, HashSet}; +use std::time::{Duration, Instant}; + +const LIVE_FOR: Duration = Duration::from_secs(60 * 5); + +struct LocalCollation { + targets: HashSet, + collation: C, + live_since: Instant, +} + +/// Tracker for locally collated values and which validators to send them to. +pub struct LocalCollations { + primary_for: HashSet, + local_collations: HashMap>, +} + +impl LocalCollations { + /// Create a new `LocalCollations` tracker. + pub fn new() -> Self { + LocalCollations { + primary_for: HashSet::new(), + local_collations: HashMap::new(), + } + } + + /// Validator gave us a new role. If the new role is "primary", this function might return + /// a set of collations to send to that validator. + pub fn note_validator_role(&mut self, key: SessionKey, role: Role) -> Vec<(Hash, C)> { + match role { + Role::Backup => { + self.primary_for.remove(&key); + Vec::new() + } + Role::Primary => { + let new_primary = self.primary_for.insert(key); + if new_primary { + self.collations_targeting(&key) + } else { + Vec::new() + } + } + } + } + + /// Fresh session key from a validator. Returns a vector of collations to send + /// to the validator. + pub fn fresh_key(&mut self, old_key: &SessionKey, new_key: &SessionKey) -> Vec<(Hash, C)> { + if self.primary_for.remove(old_key) { + self.primary_for.insert(*new_key); + + self.collations_targeting(new_key) + } else { + Vec::new() + } + } + + /// Validator disconnected. + pub fn on_disconnect(&mut self, key: &SessionKey) { + self.primary_for.remove(key); + } + + /// Mark collations relevant to the given parent hash as obsolete. + pub fn collect_garbage(&mut self, relay_parent: Option<&Hash>) { + if let Some(relay_parent) = relay_parent { + self.local_collations.remove(relay_parent); + } + + let now = Instant::now(); + self.local_collations.retain(|_, v| v.live_since + LIVE_FOR > now); + } + + /// Add a collation. Returns an iterator of session keys to send to and lazy copies of the collation. + pub fn add_collation<'a>( + &'a mut self, + relay_parent: Hash, + targets: HashSet, + collation: C + ) + -> impl Iterator + 'a + { + self.local_collations.insert(relay_parent, LocalCollation { + targets, + collation, + live_since: Instant::now(), + }); + + let local = self.local_collations.get(&relay_parent) + .expect("just inserted to this key; qed"); + + let borrowed_collation = &local.collation; + local.targets + .intersection(&self.primary_for) + .map(move |k| (*k, borrowed_collation.clone())) + } + + fn collations_targeting(&self, key: &SessionKey) -> Vec<(Hash, C)> { + self.local_collations.iter() + .filter(|&(_, ref v)| v.targets.contains(key)) + .map(|(h, v)| (*h, v.collation.clone())) + .collect() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn add_validator_with_ready_collation() { + let key = [1; 32].into(); + let relay_parent = [2; 32].into(); + let targets = { + let mut set = HashSet::new(); + set.insert(key); + set + }; + + let mut tracker = LocalCollations::new(); + assert!(tracker.add_collation(relay_parent, targets, 5).next().is_none()); + assert_eq!(tracker.note_validator_role(key, Role::Primary), vec![(relay_parent, 5)]); + } + + #[test] + fn rename_with_ready() { + let orig_key = [1; 32].into(); + let new_key = [2; 32].into(); + let relay_parent = [255; 32].into(); + let targets = { + let mut set = HashSet::new(); + set.insert(new_key); + set + }; + + let mut tracker: LocalCollations = LocalCollations::new(); + assert!(tracker.add_collation(relay_parent, targets, 5).next().is_none()); + assert!(tracker.note_validator_role(orig_key, Role::Primary).is_empty()); + assert_eq!(tracker.fresh_key(&orig_key, &new_key), vec![(relay_parent, 5u8)]); + } + + #[test] + fn collecting_garbage() { + let relay_parent_a = [255; 32].into(); + let relay_parent_b = [222; 32].into(); + + let mut tracker: LocalCollations = LocalCollations::new(); + assert!(tracker.add_collation(relay_parent_a, HashSet::new(), 5).next().is_none()); + assert!(tracker.add_collation(relay_parent_b, HashSet::new(), 69).next().is_none()); + + let live_since = Instant::now() - LIVE_FOR - Duration::from_secs(10); + tracker.local_collations.get_mut(&relay_parent_b).unwrap().live_since = live_since; + + tracker.collect_garbage(Some(&relay_parent_a)); + + // first one pruned because of relay parent, other because of time. + assert!(tracker.local_collations.is_empty()); + } + + #[test] + fn add_collation_with_connected_target() { + let key = [1; 32].into(); + let relay_parent = [2; 32].into(); + let targets = { + let mut set = HashSet::new(); + set.insert(key); + set + }; + + let mut tracker = LocalCollations::new(); + assert!(tracker.note_validator_role(key, Role::Primary).is_empty()); + assert_eq!(tracker.add_collation(relay_parent, targets, 5).next(), Some((key, 5))); + + } +} diff --git a/polkadot/network/src/tests.rs b/polkadot/network/src/tests.rs index 06d679dcd0..021ea739b1 100644 --- a/polkadot/network/src/tests.rs +++ b/polkadot/network/src/tests.rs @@ -86,7 +86,6 @@ fn make_consensus(parent_hash: Hash, local_key: SessionKey) -> (CurrentConsensus let c = CurrentConsensus { knowledge: knowledge.clone(), parent_hash, - session_keys: Default::default(), local_session_key: local_key, }; @@ -100,7 +99,7 @@ fn on_message(protocol: &mut PolkadotProtocol, ctx: &mut TestContext, from: Peer #[test] fn sends_session_key() { - let mut protocol = PolkadotProtocol::new(); + let mut protocol = PolkadotProtocol::new(None); let peer_a = 1; let peer_b = 2; @@ -120,20 +119,19 @@ fn sends_session_key() { let mut ctx = TestContext::default(); let (consensus, _knowledge) = make_consensus(parent_hash, local_key); protocol.new_consensus(&mut ctx, consensus); - - assert!(ctx.has_message(peer_a, Message::SessionKey(parent_hash, local_key))); + assert!(ctx.has_message(peer_a, Message::SessionKey(local_key))); } { let mut ctx = TestContext::default(); protocol.on_connect(&mut ctx, peer_b, make_status(&collator_status, Roles::NONE)); - assert!(ctx.has_message(peer_b, Message::SessionKey(parent_hash, local_key))); + assert!(ctx.has_message(peer_b, Message::SessionKey(local_key))); } } #[test] fn fetches_from_those_with_knowledge() { - let mut protocol = PolkadotProtocol::new(); + let mut protocol = PolkadotProtocol::new(None); let peer_a = 1; let peer_b = 2; @@ -169,13 +167,14 @@ fn fetches_from_those_with_knowledge() { { let mut ctx = TestContext::default(); protocol.on_connect(&mut ctx, peer_a, make_status(&status, Roles::AUTHORITY)); - assert!(ctx.has_message(peer_a, Message::SessionKey(parent_hash, local_key))); + assert!(ctx.has_message(peer_a, Message::SessionKey(local_key))); } // peer A gives session key and gets asked for data. { let mut ctx = TestContext::default(); - on_message(&mut protocol, &mut ctx, peer_a, Message::SessionKey(parent_hash, a_key)); + on_message(&mut protocol, &mut ctx, peer_a, Message::SessionKey(a_key)); + assert!(protocol.validators.contains_key(&a_key)); assert!(ctx.has_message(peer_a, Message::RequestBlockData(1, candidate_hash))); } @@ -185,7 +184,7 @@ fn fetches_from_those_with_knowledge() { { let mut ctx = TestContext::default(); protocol.on_connect(&mut ctx, peer_b, make_status(&status, Roles::AUTHORITY)); - on_message(&mut protocol, &mut ctx, peer_b, Message::SessionKey(parent_hash, b_key)); + on_message(&mut protocol, &mut ctx, peer_b, Message::SessionKey(b_key)); assert!(!ctx.has_message(peer_b, Message::RequestBlockData(2, candidate_hash))); } @@ -194,6 +193,7 @@ fn fetches_from_those_with_knowledge() { { let mut ctx = TestContext::default(); protocol.on_disconnect(&mut ctx, peer_a); + assert!(!protocol.validators.contains_key(&a_key)); assert!(ctx.has_message(peer_b, Message::RequestBlockData(2, candidate_hash))); } @@ -208,7 +208,7 @@ fn fetches_from_those_with_knowledge() { #[test] fn remove_bad_collator() { - let mut protocol = PolkadotProtocol::new(); + let mut protocol = PolkadotProtocol::new(None); let peer_id = 1; let account_id = [2; 32].into(); diff --git a/polkadot/service/src/lib.rs b/polkadot/service/src/lib.rs index 45be076dc1..6d2c4aee15 100644 --- a/polkadot/service/src/lib.rs +++ b/polkadot/service/src/lib.rs @@ -46,13 +46,14 @@ use std::collections::HashMap; use codec::Encode; use transaction_pool::TransactionPool; use polkadot_api::{PolkadotApi, light::RemotePolkadotApiWrapper}; -use polkadot_primitives::{Block, BlockId, Hash}; +use polkadot_primitives::{parachain, AccountId, Block, BlockId, Hash}; use polkadot_runtime::GenesisConfig; use client::Client; use polkadot_network::{PolkadotProtocol, consensus::ConsensusNetwork}; use tokio::runtime::TaskExecutor; +use service::FactoryFullConfiguration; -pub use service::{Configuration, Roles, PruningMode, ExtrinsicPoolOptions, +pub use service::{Roles, PruningMode, ExtrinsicPoolOptions, ErrorKind, Error, ComponentBlock, LightComponents, FullComponents}; pub use client::ExecutionStrategy; @@ -87,6 +88,17 @@ impl Components for service::FullComponents { type Backend = service::FullBackend; } +/// All configuration for the polkadot node. +pub type Configuration = FactoryFullConfiguration; + +/// Polkadot-specific configuration. +#[derive(Default)] +pub struct CustomConfiguration { + /// Set to `Some` with a collator `AccountId` and desired parachain + /// if the network protocol should be started in collator mode. + pub collating_for: Option<(AccountId, parachain::Id)>, +} + /// Polkadot config for the substrate service. pub struct Factory; @@ -105,6 +117,7 @@ impl service::ServiceFactory for Factory { RemotePolkadotApiWrapper, service::LightExecutor> >; type Genesis = GenesisConfig; + type Configuration = CustomConfiguration; const NETWORK_PROTOCOL_ID: network::ProtocolId = ::polkadot_network::DOT_PROTOCOL_ID; @@ -129,6 +142,15 @@ impl service::ServiceFactory for Factory { imports_external_transactions: false, }) } + + fn build_network_protocol(config: &Configuration) + -> Result + { + if let Some((_, ref para_id)) = config.custom.collating_for { + info!("Starting network in Collator mode for parachain {:?}", para_id); + } + Ok(PolkadotProtocol::new(config.custom.collating_for)) + } } /// Polkadot service. @@ -155,7 +177,7 @@ impl Service { } /// Creates light client and register protocol with the network service -pub fn new_light(config: Configuration, executor: TaskExecutor) +pub fn new_light(config: Configuration, executor: TaskExecutor) -> Result>, Error> { let service = service::Service::>::new(config, executor)?; @@ -170,7 +192,7 @@ pub fn new_light(config: Configuration, executor: TaskExecutor) } /// Creates full client and register protocol with the network service -pub fn new_full(config: Configuration, executor: TaskExecutor) +pub fn new_full(config: Configuration, executor: TaskExecutor) -> Result>, Error> { let is_validator = (config.roles & Roles::AUTHORITY) == Roles::AUTHORITY; @@ -207,7 +229,7 @@ pub fn new_full(config: Configuration, executor: TaskExecutor) } /// Creates bare client without any networking. -pub fn new_client(config: Configuration) +pub fn new_client(config: Configuration) -> Result>>, Error> { service::new_client::(config)