diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 031f60f6eb..6449e80917 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -1981,6 +1981,7 @@ dependencies = [ name = "polkadot-network" version = "0.1.0" dependencies = [ + "arrayvec 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 2.1.5 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/polkadot/network/Cargo.toml b/polkadot/network/Cargo.toml index df3b413a5e..4dfc8a1238 100644 --- a/polkadot/network/Cargo.toml +++ b/polkadot/network/Cargo.toml @@ -5,6 +5,7 @@ authors = ["Parity Technologies "] description = "Polkadot-specific networking protocol" [dependencies] +arrayvec = "0.4" parking_lot = "0.4" polkadot-availability-store = { path = "../availability-store" } polkadot-consensus = { path = "../consensus" } diff --git a/polkadot/network/src/consensus.rs b/polkadot/network/src/consensus.rs index 0ff324c72d..cd83f96250 100644 --- a/polkadot/network/src/consensus.rs +++ b/polkadot/network/src/consensus.rs @@ -21,20 +21,22 @@ use sr_primitives::traits::ProvideRuntimeApi; use substrate_network::consensus_gossip::ConsensusMessage; -use polkadot_consensus::{Network, SharedTable, Collators}; +use polkadot_consensus::{Network, SharedTable, Collators, Statement, GenericStatement}; use polkadot_primitives::{AccountId, Block, Hash, SessionKey}; -use polkadot_primitives::parachain::{Id as ParaId, Collation, ParachainHost}; +use polkadot_primitives::parachain::{Id as ParaId, Collation, Extrinsic, ParachainHost, BlockData}; use codec::Decode; use futures::prelude::*; use futures::sync::mpsc; +use std::collections::HashMap; use std::sync::Arc; +use arrayvec::ArrayVec; use tokio::runtime::TaskExecutor; use parking_lot::Mutex; -use super::{NetworkService, Knowledge, CurrentConsensus}; +use super::NetworkService; use router::Router; // task that processes all gossipped consensus messages, @@ -142,9 +144,8 @@ impl Network for ConsensusNetwork< // TODO: propagate statements on a timer? let inner_stream = self.network.consensus_gossip().write().messages_for(attestation_topic); task_executor.spawn(self.network.with_spec(|spec, ctx| { - spec.new_consensus(ctx, CurrentConsensus { + spec.new_consensus(ctx, parent_hash, CurrentConsensus { knowledge, - parent_hash, local_session_key, }); @@ -175,7 +176,6 @@ impl Future for AwaitingCollation { } } - impl Collators for ConsensusNetwork

where P::Api: ParachainHost, { @@ -192,3 +192,250 @@ impl Collators for ConsensusNetwor self.network.with_spec(|spec, ctx| spec.disconnect_bad_collator(ctx, collator)); } } + +#[derive(Default)] +struct KnowledgeEntry { + knows_block_data: Vec, + knows_extrinsic: Vec, + block_data: Option, + extrinsic: Option, +} + +/// Tracks knowledge of peers. +pub(crate) struct Knowledge { + candidates: HashMap, +} + +impl Knowledge { + /// Create a new knowledge instance. + pub(crate) fn new() -> Self { + Knowledge { + candidates: HashMap::new(), + } + } + + /// Note a statement seen from another validator. + pub(crate) fn note_statement(&mut self, from: SessionKey, statement: &Statement) { + match *statement { + GenericStatement::Candidate(ref c) => { + let mut entry = self.candidates.entry(c.hash()).or_insert_with(Default::default); + entry.knows_block_data.push(from); + entry.knows_extrinsic.push(from); + } + GenericStatement::Available(ref hash) => { + let mut entry = self.candidates.entry(*hash).or_insert_with(Default::default); + entry.knows_block_data.push(from); + entry.knows_extrinsic.push(from); + } + GenericStatement::Valid(ref hash) | GenericStatement::Invalid(ref hash) => self.candidates.entry(*hash) + .or_insert_with(Default::default) + .knows_block_data + .push(from), + } + } + + /// Note a candidate collated or seen locally. + pub(crate) fn note_candidate(&mut self, hash: Hash, block_data: Option, extrinsic: Option) { + let entry = self.candidates.entry(hash).or_insert_with(Default::default); + entry.block_data = entry.block_data.take().or(block_data); + entry.extrinsic = entry.extrinsic.take().or(extrinsic); + } +} + +/// A current consensus instance. +pub(crate) struct CurrentConsensus { + knowledge: Arc>, + local_session_key: SessionKey, +} + +impl CurrentConsensus { + #[cfg(test)] + pub(crate) fn new(knowledge: Arc>, local_session_key: SessionKey) -> Self { + CurrentConsensus { + knowledge, + local_session_key + } + } + + // execute a closure with locally stored block data for a candidate, or a slice of session identities + // we believe should have the data. + fn with_block_data(&self, hash: &Hash, f: F) -> U + where F: FnOnce(Result<&BlockData, &[SessionKey]>) -> U + { + let knowledge = self.knowledge.lock(); + let res = knowledge.candidates.get(hash) + .ok_or(&[] as &_) + .and_then(|entry| entry.block_data.as_ref().ok_or(&entry.knows_block_data[..])); + + f(res) + } +} + +// 3 is chosen because sessions change infrequently and usually +// only the last 2 (current session and "last" session) are relevant. +// the extra is an error boundary. +const RECENT_SESSIONS: usize = 3; + +/// Result when inserting recent session key. +#[derive(PartialEq, Eq)] +pub(crate) enum InsertedRecentKey { + /// Key was already known. + AlreadyKnown, + /// Key was new and pushed out optional old item. + New(Option), +} + +/// Wrapper for managing recent session keys. +#[derive(Default)] +pub(crate) struct RecentSessionKeys { + inner: ArrayVec<[SessionKey; RECENT_SESSIONS]>, +} + +impl RecentSessionKeys { + /// Insert a new session key. This returns one to be pushed out if the + /// set is full. + pub(crate) fn insert(&mut self, key: SessionKey) -> InsertedRecentKey { + if self.inner.contains(&key) { return InsertedRecentKey::AlreadyKnown } + + let old = if self.inner.len() == RECENT_SESSIONS { + Some(self.inner.remove(0)) + } else { + None + }; + + self.inner.push(key); + InsertedRecentKey::New(old) + } + + /// As a slice. + pub(crate) fn as_slice(&self) -> &[SessionKey] { + &*self.inner + } + + fn remove(&mut self, key: &SessionKey) { + self.inner.retain(|k| k != key) + } +} + +/// Manages requests and session keys for live consensus instances. +pub(crate) struct LiveConsensusInstances { + // recent local session keys. + recent: RecentSessionKeys, + // live consensus instances, on `parent_hash`. + live_instances: HashMap, +} + +impl LiveConsensusInstances { + /// Create a new `LiveConsensusInstances` + pub(crate) fn new() -> Self { + LiveConsensusInstances { + recent: Default::default(), + live_instances: HashMap::new(), + } + } + + /// Note new consensus session. If the used session key is new, + /// it returns it to be broadcasted to peers. + pub(crate) fn new_consensus( + &mut self, + parent_hash: Hash, + consensus: CurrentConsensus, + ) -> Option { + let inserted_key = self.recent.insert(consensus.local_session_key); + let maybe_new = if let InsertedRecentKey::New(_) = inserted_key { + Some(consensus.local_session_key) + } else { + None + }; + + self.live_instances.insert(parent_hash, consensus); + + maybe_new + } + + /// Remove consensus session. + pub(crate) fn remove(&mut self, parent_hash: &Hash) { + if let Some(consensus) = self.live_instances.remove(parent_hash) { + let key_still_used = self.live_instances.values() + .any(|c| c.local_session_key == consensus.local_session_key); + + if !key_still_used { + self.recent.remove(&consensus.local_session_key) + } + } + } + + /// Recent session keys as a slice. + pub(crate) fn recent_keys(&self) -> &[SessionKey] { + self.recent.as_slice() + } + + /// Call a closure with block data from consensus session at parent hash. + /// + /// This calls the closure with `Some(data)` where the session and data are live, + /// `Err(Some(keys))` when the session is live but the data unknown, with a list of keys + /// who have the data, and `Err(None)` where the session is unknown. + pub(crate) fn with_block_data(&self, parent_hash: &Hash, c_hash: &Hash, f: F) -> U + where F: FnOnce(Result<&BlockData, Option<&[SessionKey]>>) -> U + { + match self.live_instances.get(parent_hash) { + Some(c) => c.with_block_data(c_hash, |res| f(res.map_err(Some))), + None => f(Err(None)) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn last_keys_works() { + let a = [1; 32].into(); + let b = [2; 32].into(); + let c = [3; 32].into(); + let d = [4; 32].into(); + + let mut recent = RecentSessionKeys::default(); + + match recent.insert(a) { + InsertedRecentKey::New(None) => {}, + _ => panic!("is new, not at capacity"), + } + + match recent.insert(a) { + InsertedRecentKey::AlreadyKnown => {}, + _ => panic!("not new"), + } + + match recent.insert(b) { + InsertedRecentKey::New(None) => {}, + _ => panic!("is new, not at capacity"), + } + + match recent.insert(b) { + InsertedRecentKey::AlreadyKnown => {}, + _ => panic!("not new"), + } + + match recent.insert(c) { + InsertedRecentKey::New(None) => {}, + _ => panic!("is new, not at capacity"), + } + + match recent.insert(c) { + InsertedRecentKey::AlreadyKnown => {}, + _ => panic!("not new"), + } + + match recent.insert(d) { + InsertedRecentKey::New(Some(old)) => assert_eq!(old, a), + _ => panic!("is new, and at capacity"), + } + + match recent.insert(d) { + InsertedRecentKey::AlreadyKnown => {}, + _ => panic!("not new"), + } + } +} diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index 70ecfdf24f..d513b27064 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -29,6 +29,7 @@ extern crate polkadot_consensus; extern crate polkadot_availability_store as av_store; extern crate polkadot_primitives; +extern crate arrayvec; extern crate futures; extern crate parking_lot; extern crate tokio; @@ -46,19 +47,17 @@ pub mod consensus; use codec::{Decode, Encode}; use futures::sync::oneshot; -use parking_lot::Mutex; -use polkadot_consensus::{Statement, GenericStatement}; use polkadot_primitives::{AccountId, Block, SessionKey, Hash, Header}; -use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt, Collation}; +use polkadot_primitives::parachain::{Id as ParaId, BlockData, CandidateReceipt, Collation}; use substrate_network::{NodeIndex, RequestId, Context, Severity}; use substrate_network::{message, generic_message}; use substrate_network::specialization::NetworkSpecialization as Specialization; use substrate_network::StatusMessage as GenericFullStatus; +use self::consensus::{LiveConsensusInstances, RecentSessionKeys, InsertedRecentKey}; use self::collator_pool::{CollatorPool, Role, Action}; use self::local_collations::LocalCollations; use std::collections::{HashMap, HashSet}; -use std::sync::Arc; #[cfg(test)] @@ -91,92 +90,46 @@ struct BlockDataRequest { enum CollatorState { Fresh, RolePending(Role), - Primed, + Primed(Option), } impl CollatorState { fn send_key(&mut self, key: SessionKey, mut f: F) { f(Message::SessionKey(key)); - if let CollatorState::RolePending(role) = ::std::mem::replace(self, CollatorState::Primed) { + if let CollatorState::RolePending(role) = *self { f(Message::CollatorRole(role)); + *self = CollatorState::Primed(Some(role)); } } fn set_role(&mut self, role: Role, mut f: F) { - if let CollatorState::Primed = *self { + if let CollatorState::Primed(ref mut r) = *self { f(Message::CollatorRole(role)); + *r = Some(role); } else { *self = CollatorState::RolePending(role); } } + + fn role(&self) -> Option { + match *self { + CollatorState::Fresh => None, + CollatorState::RolePending(role) => Some(role), + CollatorState::Primed(role) => role, + } + } } struct PeerInfo { collating_for: Option<(AccountId, ParaId)>, - validator_key: Option, + validator_keys: RecentSessionKeys, claimed_validator: bool, collator_state: CollatorState, } -#[derive(Default)] -struct KnowledgeEntry { - knows_block_data: Vec, - knows_extrinsic: Vec, - block_data: Option, - extrinsic: Option, -} - -/// Tracks knowledge of peers. -struct Knowledge { - candidates: HashMap, -} - -impl Knowledge { - pub fn new() -> Self { - Knowledge { - candidates: HashMap::new(), - } - } - - fn note_statement(&mut self, from: SessionKey, statement: &Statement) { - match *statement { - GenericStatement::Candidate(ref c) => { - let mut entry = self.candidates.entry(c.hash()).or_insert_with(Default::default); - entry.knows_block_data.push(from); - entry.knows_extrinsic.push(from); - } - GenericStatement::Available(ref hash) => { - let mut entry = self.candidates.entry(*hash).or_insert_with(Default::default); - entry.knows_block_data.push(from); - entry.knows_extrinsic.push(from); - } - GenericStatement::Valid(ref hash) | GenericStatement::Invalid(ref hash) => self.candidates.entry(*hash) - .or_insert_with(Default::default) - .knows_block_data - .push(from), - } - } - - fn note_candidate(&mut self, hash: Hash, block_data: Option, extrinsic: Option) { - let entry = self.candidates.entry(hash).or_insert_with(Default::default); - entry.block_data = entry.block_data.take().or(block_data); - entry.extrinsic = entry.extrinsic.take().or(extrinsic); - } -} - -struct CurrentConsensus { - knowledge: Arc>, - parent_hash: Hash, - local_session_key: SessionKey, -} - -impl CurrentConsensus { - // get locally stored block data for a candidate. - fn block_data(&self, relay_parent: &Hash, hash: &Hash) -> Option { - if relay_parent != &self.parent_hash { return None } - - self.knowledge.lock().candidates.get(hash) - .and_then(|entry| entry.block_data.clone()) +impl PeerInfo { + fn should_send_key(&self) -> bool { + self.claimed_validator || self.collating_for.is_some() } } @@ -209,7 +162,7 @@ pub struct PolkadotProtocol { collators: CollatorPool, validators: HashMap, local_collations: LocalCollations, - live_consensus: Option, + live_consensus: LiveConsensusInstances, in_flight: HashMap<(RequestId, NodeIndex), BlockDataRequest>, pending: Vec, extrinsic_store: Option<::av_store::Store>, @@ -225,7 +178,7 @@ impl PolkadotProtocol { collating_for, validators: HashMap::new(), local_collations: LocalCollations::new(), - live_consensus: None, + live_consensus: LiveConsensusInstances::new(), in_flight: HashMap::new(), pending: Vec::new(), extrinsic_store: None, @@ -250,69 +203,75 @@ impl PolkadotProtocol { } /// Note new consensus session. - fn new_consensus(&mut self, ctx: &mut Context, consensus: CurrentConsensus) { - let old_data = self.live_consensus.as_ref().map(|c| (c.parent_hash, c.local_session_key)); - - if Some(&consensus.local_session_key) != old_data.as_ref().map(|&(_, ref key)| key) { + fn new_consensus( + &mut self, + ctx: &mut Context, + parent_hash: Hash, + consensus: consensus::CurrentConsensus, + ) { + if let Some(new_local) = self.live_consensus.new_consensus(parent_hash, consensus) { for (id, peer_data) in self.peers.iter_mut() - .filter(|&(_, ref info)| info.claimed_validator || info.collating_for.is_some()) + .filter(|&(_, ref info)| info.should_send_key()) { - peer_data.collator_state.send_key(consensus.local_session_key, |msg| send_polkadot_message( + peer_data.collator_state.send_key(new_local, |msg| send_polkadot_message( ctx, *id, msg )); } } + } - self.live_consensus = Some(consensus); + fn remove_consensus(&mut self, parent_hash: &Hash) { + self.live_consensus.remove(parent_hash); } fn dispatch_pending_requests(&mut self, ctx: &mut Context) { - let consensus = match self.live_consensus { - Some(ref mut c) => c, - None => { - self.pending.clear(); - return; - } - }; - - let knowledge = consensus.knowledge.lock(); let mut new_pending = Vec::new(); + let validator_keys = &mut self.validators; + let next_req_id = &mut self.next_req_id; + let in_flight = &mut self.in_flight; + for mut pending in ::std::mem::replace(&mut self.pending, Vec::new()) { - if pending.consensus_parent != consensus.parent_hash { continue } + let parent = pending.consensus_parent; + let c_hash = pending.candidate_hash; - if let Some(entry) = knowledge.candidates.get(&pending.candidate_hash) { - // answer locally - if let Some(ref data) = entry.block_data { + let still_pending = self.live_consensus.with_block_data(&parent, &c_hash, |x| match x { + Ok(data @ &_) => { + // answer locally. let _ = pending.sender.send(data.clone()); - continue; + None } + Err(Some(known_keys)) => { + let next_peer = known_keys.iter() + .filter_map(|x| validator_keys.get(x).map(|id| (*x, *id))) + .find(|&(ref key, _)| pending.attempted_peers.insert(*key)) + .map(|(_, id)| id); - let validator_keys = &mut self.validators; - let next_peer = entry.knows_block_data.iter() - .filter_map(|x| validator_keys.get(x).map(|id| (*x, *id))) - .find(|&(ref key, _)| pending.attempted_peers.insert(*key)) - .map(|(_, id)| id); + // dispatch to peer + if let Some(who) = next_peer { + let req_id = *next_req_id; + *next_req_id += 1; - // dispatch to peer - if let Some(who) = next_peer { - let req_id = self.next_req_id; - self.next_req_id += 1; + send_polkadot_message( + ctx, + who, + Message::RequestBlockData(req_id, parent, c_hash) + ); - send_polkadot_message( - ctx, - who, - Message::RequestBlockData(req_id, pending.consensus_parent, pending.candidate_hash) - ); + in_flight.insert((req_id, who), pending); - self.in_flight.insert((req_id, who), pending); - - continue; + None + } else { + Some(pending) + } } + Err(None) => None, // no such known consensus session. prune out. + }); + + if let Some(pending) = still_pending { + new_pending.push(pending); } - - new_pending.push(pending); } self.pending = new_pending; @@ -323,8 +282,12 @@ impl PolkadotProtocol { match msg { Message::SessionKey(key) => self.on_session_key(ctx, who, key), Message::RequestBlockData(req_id, relay_parent, candidate_hash) => { - let block_data = self.live_consensus.as_ref() - .and_then(|c| c.block_data(&relay_parent, &candidate_hash)) + let block_data = self.live_consensus + .with_block_data( + &relay_parent, + &candidate_hash, + |res| res.ok().map(|b| b.clone()), + ) .or_else(|| self.extrinsic_store.as_ref() .and_then(|s| s.block_data(relay_parent, candidate_hash)) ); @@ -352,18 +315,26 @@ impl PolkadotProtocol { return; } - if let Some(old_key) = ::std::mem::replace(&mut info.validator_key, Some(key)) { - self.validators.remove(&old_key); - - for (relay_parent, collation) in self.local_collations.fresh_key(&old_key, &key) { - send_polkadot_message( - ctx, - who, - Message::Collation(relay_parent, collation), - ) + let local_collations = &mut self.local_collations; + let new_collations = match info.validator_keys.insert(key) { + InsertedRecentKey::AlreadyKnown => Vec::new(), + InsertedRecentKey::New(Some(old_key)) => { + self.validators.remove(&old_key); + local_collations.fresh_key(&old_key, &key) } + InsertedRecentKey::New(None) => info.collator_state.role() + .map(|r| local_collations.note_validator_role(key, r)) + .unwrap_or_else(Vec::new), + }; + for (relay_parent, collation) in new_collations { + send_polkadot_message( + ctx, + who, + Message::Collation(relay_parent, collation), + ) } + self.validators.insert(key, who); } @@ -389,7 +360,7 @@ impl PolkadotProtocol { // when a validator sends us (a collator) a new role. fn on_new_role(&mut self, ctx: &mut Context, who: NodeIndex, role: Role) { - let info = match self.peers.get(&who) { + let info = match self.peers.get_mut(&who) { Some(peer) => peer, None => { trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", who); @@ -399,19 +370,27 @@ impl PolkadotProtocol { debug!(target: "p_net", "New collator role {:?} from {}", role, who); - match info.validator_key { - None => ctx.report_peer( + if info.validator_keys.as_slice().is_empty() { + ctx.report_peer( who, Severity::Bad("Sent collator role without registering first as validator"), - ), - Some(key) => for (relay_parent, collation) in self.local_collations.note_validator_role(key, role) { + ); + } else { + // update role for all saved session keys for this validator. + let local_collations = &mut self.local_collations; + for (relay_parent, collation) in info.validator_keys + .as_slice() + .iter() + .cloned() + .flat_map(|k| local_collations.note_validator_role(k, role)) + { debug!(target: "p_net", "Broadcasting collation on relay parent {:?}", relay_parent); send_polkadot_message( ctx, who, Message::Collation(relay_parent, collation), ) - }, + } } } } @@ -430,11 +409,10 @@ impl Specialization for PolkadotProtocol { }; let validator = status.roles.contains(substrate_network::config::Roles::AUTHORITY); - let send_key = validator || local_status.collating_for.is_some(); let mut peer_info = PeerInfo { collating_for: local_status.collating_for, - validator_key: None, + validator_keys: Default::default(), claimed_validator: validator, collator_state: CollatorState::Fresh, }; @@ -454,12 +432,15 @@ impl Specialization for PolkadotProtocol { )); } - if let (true, &Some(ref consensus)) = (send_key, &self.live_consensus) { - peer_info.collator_state.send_key(consensus.local_session_key, |msg| send_polkadot_message( - ctx, - who, - msg, - )); + // send session keys. + if peer_info.should_send_key() { + for local_session_key in self.live_consensus.recent_keys() { + peer_info.collator_state.send_key(*local_session_key, |msg| send_polkadot_message( + ctx, + who, + msg, + )); + } } self.peers.insert(who, peer_info); @@ -481,9 +462,9 @@ impl Specialization for PolkadotProtocol { } } - if let Some(validator_key) = info.validator_key { - self.validators.remove(&validator_key); - self.local_collations.on_disconnect(&validator_key); + for key in info.validator_keys.as_slice().iter() { + self.validators.remove(key); + self.local_collations.on_disconnect(key); } { diff --git a/polkadot/network/src/router.rs b/polkadot/network/src/router.rs index 4307c302fb..211bc21491 100644 --- a/polkadot/network/src/router.rs +++ b/polkadot/network/src/router.rs @@ -36,7 +36,8 @@ use std::collections::{HashMap, HashSet}; use std::io; use std::sync::Arc; -use super::{NetworkService, Knowledge}; +use consensus::Knowledge; +use super::NetworkService; fn attestation_topic(parent_hash: Hash) -> Hash { let mut v = parent_hash.as_ref().to_vec(); @@ -242,6 +243,13 @@ impl TableRouter for Router

} } +impl

Drop for Router

{ + fn drop(&mut self) { + let parent_hash = &self.parent_hash; + self.network.with_spec(|spec, _| spec.remove_consensus(parent_hash)); + } +} + /// Receiver for block data. pub struct BlockDataReceiver { inner: ::futures::sync::oneshot::Receiver, diff --git a/polkadot/network/src/tests.rs b/polkadot/network/src/tests.rs index 263a931b9e..20ed401f55 100644 --- a/polkadot/network/src/tests.rs +++ b/polkadot/network/src/tests.rs @@ -16,11 +16,12 @@ //! Tests for polkadot and consensus network. -use super::{PolkadotProtocol, Status, CurrentConsensus, Knowledge, Message, FullStatus}; +use super::{PolkadotProtocol, Status, Message, FullStatus}; +use consensus::{CurrentConsensus, Knowledge}; use parking_lot::Mutex; use polkadot_consensus::GenericStatement; -use polkadot_primitives::{Block, Hash, SessionKey}; +use polkadot_primitives::{Block, SessionKey}; use polkadot_primitives::parachain::{CandidateReceipt, HeadData, BlockData}; use substrate_primitives::H512; use codec::Encode; @@ -84,13 +85,9 @@ fn make_status(status: &Status, roles: Roles) -> FullStatus { } } -fn make_consensus(parent_hash: Hash, local_key: SessionKey) -> (CurrentConsensus, Arc>) { +fn make_consensus(local_key: SessionKey) -> (CurrentConsensus, Arc>) { let knowledge = Arc::new(Mutex::new(Knowledge::new())); - let c = CurrentConsensus { - knowledge: knowledge.clone(), - parent_hash, - local_session_key: local_key, - }; + let c = CurrentConsensus::new(knowledge.clone(), local_key); (c, knowledge) } @@ -120,8 +117,8 @@ fn sends_session_key() { { let mut ctx = TestContext::default(); - let (consensus, _knowledge) = make_consensus(parent_hash, local_key); - protocol.new_consensus(&mut ctx, consensus); + let (consensus, _knowledge) = make_consensus(local_key); + protocol.new_consensus(&mut ctx, parent_hash, consensus); assert!(ctx.has_message(peer_a, Message::SessionKey(local_key))); } @@ -160,8 +157,8 @@ fn fetches_from_those_with_knowledge() { let status = Status { collating_for: None }; - let (consensus, knowledge) = make_consensus(parent_hash, local_key); - protocol.new_consensus(&mut TestContext::default(), consensus); + let (consensus, knowledge) = make_consensus(local_key); + protocol.new_consensus(&mut TestContext::default(), parent_hash, consensus); knowledge.lock().note_statement(a_key, &GenericStatement::Valid(candidate_hash)); let recv = protocol.fetch_block_data(&mut TestContext::default(), &candidate_receipt, parent_hash); @@ -279,3 +276,49 @@ fn remove_bad_collator() { assert!(ctx.disabled.contains(&who)); } } + +#[test] +fn many_session_keys() { + let mut protocol = PolkadotProtocol::new(None); + + let parent_a = [1; 32].into(); + let parent_b = [2; 32].into(); + + let local_key_a = [3; 32].into(); + let local_key_b = [4; 32].into(); + + let (consensus_a, _knowledge_a) = make_consensus(local_key_a); + let (consensus_b, _knowledge_b) = make_consensus(local_key_b); + + protocol.new_consensus(&mut TestContext::default(), parent_a, consensus_a); + protocol.new_consensus(&mut TestContext::default(), parent_b, consensus_b); + + assert_eq!(protocol.live_consensus.recent_keys(), &[local_key_a, local_key_b]); + + let peer_a = 1; + + // when connecting a peer, we should get both those keys. + { + let mut ctx = TestContext::default(); + + let status = Status { collating_for: None }; + protocol.on_connect(&mut ctx, peer_a, make_status(&status, Roles::AUTHORITY)); + + assert!(ctx.has_message(peer_a, Message::SessionKey(local_key_a))); + assert!(ctx.has_message(peer_a, Message::SessionKey(local_key_b))); + } + + let peer_b = 2; + + protocol.remove_consensus(&parent_a); + + { + let mut ctx = TestContext::default(); + + let status = Status { collating_for: None }; + protocol.on_connect(&mut ctx, peer_b, make_status(&status, Roles::AUTHORITY)); + + assert!(!ctx.has_message(peer_b, Message::SessionKey(local_key_a))); + assert!(ctx.has_message(peer_b, Message::SessionKey(local_key_b))); + } +}