From 67ee212171e3b3e404bb8b1b1ccd476008e87a97 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Mon, 24 Jun 2019 20:46:03 +0200 Subject: [PATCH] Store `PeerId`s in collator pool (#299) --- polkadot/network/src/collator_pool.rs | 27 +++++++++++++++------------ polkadot/network/src/lib.rs | 6 +++++- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/polkadot/network/src/collator_pool.rs b/polkadot/network/src/collator_pool.rs index 1b396742ae..63c96aec09 100644 --- a/polkadot/network/src/collator_pool.rs +++ b/polkadot/network/src/collator_pool.rs @@ -19,6 +19,7 @@ use parity_codec::{Encode, Decode}; use polkadot_primitives::Hash; use polkadot_primitives::parachain::{CollatorId, Id as ParaId, Collation}; +use substrate_network::PeerId; use futures::sync::oneshot; use std::collections::hash_map::{HashMap, Entry}; @@ -117,7 +118,7 @@ struct ParachainCollators { /// Manages connected collators and role assignments from the perspective of a validator. pub struct CollatorPool { - collators: HashMap, + collators: HashMap, parachain_collators: HashMap, collations: HashMap<(Hash, ParaId), CollationSlot>, } @@ -133,8 +134,8 @@ impl CollatorPool { } /// Call when a new collator is authenticated. Returns the role. - pub fn on_new_collator(&mut self, collator_id: CollatorId, para_id: ParaId) -> Role { - self.collators.insert(collator_id.clone(), para_id); + pub fn on_new_collator(&mut self, collator_id: CollatorId, para_id: ParaId, peer_id: PeerId) -> Role { + self.collators.insert(collator_id.clone(), (para_id, peer_id)); match self.parachain_collators.entry(para_id) { Entry::Vacant(vacant) => { vacant.insert(ParachainCollators { @@ -155,7 +156,7 @@ impl CollatorPool { /// Called when a collator disconnects. If it was the primary, returns a new primary for that /// parachain. pub fn on_disconnect(&mut self, collator_id: CollatorId) -> Option { - self.collators.remove(&collator_id).and_then(|para_id| match self.parachain_collators.entry(para_id) { + self.collators.remove(&collator_id).and_then(|(para_id, _)| match self.parachain_collators.entry(para_id) { Entry::Vacant(_) => None, Entry::Occupied(mut occ) => { if occ.get().primary == collator_id { @@ -182,7 +183,7 @@ impl CollatorPool { /// The collator should be registered for the parachain of the collation as a precondition of this function. /// The collation should have been checked for integrity of signature before passing to this function. pub fn on_collation(&mut self, collator_id: CollatorId, relay_parent: Hash, collation: Collation) { - if let Some(para_id) = self.collators.get(&collator_id) { + if let Some((para_id, _)) = self.collators.get(&collator_id) { debug_assert_eq!(para_id, &collation.receipt.parachain_index); // TODO: punish if not primary? (https://github.com/paritytech/polkadot/issues/213) @@ -240,8 +241,8 @@ mod tests { let bad_primary: CollatorId = [0; 32].unchecked_into(); let good_backup: CollatorId = [1; 32].unchecked_into(); - assert_eq!(pool.on_new_collator(bad_primary.clone(), para_id.clone()), Role::Primary); - assert_eq!(pool.on_new_collator(good_backup.clone(), para_id.clone()), Role::Backup); + assert_eq!(pool.on_new_collator(bad_primary.clone(), para_id.clone(), PeerId::random()), Role::Primary); + assert_eq!(pool.on_new_collator(good_backup.clone(), para_id.clone(), PeerId::random()), Role::Backup); assert_eq!(pool.on_disconnect(bad_primary), Some(good_backup.clone())); assert_eq!(pool.on_disconnect(good_backup), None); } @@ -253,8 +254,8 @@ mod tests { let primary = [0; 32].unchecked_into(); let backup: CollatorId = [1; 32].unchecked_into(); - assert_eq!(pool.on_new_collator(primary, para_id.clone()), Role::Primary); - assert_eq!(pool.on_new_collator(backup.clone(), para_id.clone()), Role::Backup); + assert_eq!(pool.on_new_collator(primary, para_id.clone(), PeerId::random()), Role::Primary); + assert_eq!(pool.on_new_collator(backup.clone(), para_id.clone(), PeerId::random()), Role::Backup); assert_eq!(pool.on_disconnect(backup), None); assert!(pool.parachain_collators.get(¶_id).unwrap().backup.is_empty()); } @@ -263,10 +264,11 @@ mod tests { fn await_before_collation() { let mut pool = CollatorPool::new(); let para_id: ParaId = 5.into(); + let peer_id = PeerId::random(); let primary: CollatorId = [0; 32].unchecked_into(); let relay_parent = [1; 32].into(); - assert_eq!(pool.on_new_collator(primary.clone(), para_id.clone()), Role::Primary); + assert_eq!(pool.on_new_collator(primary.clone(), para_id.clone(), peer_id.clone()), Role::Primary); let (tx1, rx1) = oneshot::channel(); let (tx2, rx2) = oneshot::channel(); pool.await_collation(relay_parent, para_id, tx1); @@ -274,7 +276,7 @@ mod tests { pool.on_collation(primary.clone(), relay_parent, Collation { receipt: CandidateReceipt { parachain_index: para_id, - collator: primary.into(), + collator: primary.clone().into(), signature: Default::default(), head_data: HeadData(vec![1, 2, 3]), egress_queue_roots: vec![], @@ -287,6 +289,7 @@ mod tests { rx1.wait().unwrap(); rx2.wait().unwrap(); + assert_eq!(pool.collators.get(&primary).map(|ids| &ids.1).unwrap(), &peer_id); } #[test] @@ -296,7 +299,7 @@ mod tests { let primary: CollatorId = [0; 32].unchecked_into(); let relay_parent = [1; 32].into(); - assert_eq!(pool.on_new_collator(primary.clone(), para_id.clone()), Role::Primary); + assert_eq!(pool.on_new_collator(primary.clone(), para_id.clone(), PeerId::random()), Role::Primary); pool.on_collation(primary.clone(), relay_parent, Collation { receipt: CandidateReceipt { diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index 885920d050..1606778e99 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -491,7 +491,11 @@ impl Specialization for PolkadotProtocol { } ctx.report_peer(who.clone(), benefit::NEW_COLLATOR); - let collator_role = self.collators.on_new_collator(acc_id.clone(), para_id.clone()); + let collator_role = self.collators.on_new_collator( + acc_id.clone(), + para_id.clone(), + who.clone(), + ); peer_info.collator_state.set_role(collator_role, |msg| send_polkadot_message( ctx,