mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 15:11:02 +00:00
Store PeerIds in collator pool (#299)
This commit is contained in:
@@ -19,6 +19,7 @@
|
|||||||
use parity_codec::{Encode, Decode};
|
use parity_codec::{Encode, Decode};
|
||||||
use polkadot_primitives::Hash;
|
use polkadot_primitives::Hash;
|
||||||
use polkadot_primitives::parachain::{CollatorId, Id as ParaId, Collation};
|
use polkadot_primitives::parachain::{CollatorId, Id as ParaId, Collation};
|
||||||
|
use substrate_network::PeerId;
|
||||||
use futures::sync::oneshot;
|
use futures::sync::oneshot;
|
||||||
|
|
||||||
use std::collections::hash_map::{HashMap, Entry};
|
use std::collections::hash_map::{HashMap, Entry};
|
||||||
@@ -117,7 +118,7 @@ struct ParachainCollators {
|
|||||||
|
|
||||||
/// Manages connected collators and role assignments from the perspective of a validator.
|
/// Manages connected collators and role assignments from the perspective of a validator.
|
||||||
pub struct CollatorPool {
|
pub struct CollatorPool {
|
||||||
collators: HashMap<CollatorId, ParaId>,
|
collators: HashMap<CollatorId, (ParaId, PeerId)>,
|
||||||
parachain_collators: HashMap<ParaId, ParachainCollators>,
|
parachain_collators: HashMap<ParaId, ParachainCollators>,
|
||||||
collations: HashMap<(Hash, ParaId), CollationSlot>,
|
collations: HashMap<(Hash, ParaId), CollationSlot>,
|
||||||
}
|
}
|
||||||
@@ -133,8 +134,8 @@ impl CollatorPool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Call when a new collator is authenticated. Returns the role.
|
/// Call when a new collator is authenticated. Returns the role.
|
||||||
pub fn on_new_collator(&mut self, collator_id: CollatorId, para_id: ParaId) -> Role {
|
pub fn on_new_collator(&mut self, collator_id: CollatorId, para_id: ParaId, peer_id: PeerId) -> Role {
|
||||||
self.collators.insert(collator_id.clone(), para_id);
|
self.collators.insert(collator_id.clone(), (para_id, peer_id));
|
||||||
match self.parachain_collators.entry(para_id) {
|
match self.parachain_collators.entry(para_id) {
|
||||||
Entry::Vacant(vacant) => {
|
Entry::Vacant(vacant) => {
|
||||||
vacant.insert(ParachainCollators {
|
vacant.insert(ParachainCollators {
|
||||||
@@ -155,7 +156,7 @@ impl CollatorPool {
|
|||||||
/// Called when a collator disconnects. If it was the primary, returns a new primary for that
|
/// Called when a collator disconnects. If it was the primary, returns a new primary for that
|
||||||
/// parachain.
|
/// parachain.
|
||||||
pub fn on_disconnect(&mut self, collator_id: CollatorId) -> Option<CollatorId> {
|
pub fn on_disconnect(&mut self, collator_id: CollatorId) -> Option<CollatorId> {
|
||||||
self.collators.remove(&collator_id).and_then(|para_id| match self.parachain_collators.entry(para_id) {
|
self.collators.remove(&collator_id).and_then(|(para_id, _)| match self.parachain_collators.entry(para_id) {
|
||||||
Entry::Vacant(_) => None,
|
Entry::Vacant(_) => None,
|
||||||
Entry::Occupied(mut occ) => {
|
Entry::Occupied(mut occ) => {
|
||||||
if occ.get().primary == collator_id {
|
if occ.get().primary == collator_id {
|
||||||
@@ -182,7 +183,7 @@ impl CollatorPool {
|
|||||||
/// The collator should be registered for the parachain of the collation as a precondition of this function.
|
/// The collator should be registered for the parachain of the collation as a precondition of this function.
|
||||||
/// The collation should have been checked for integrity of signature before passing to this function.
|
/// The collation should have been checked for integrity of signature before passing to this function.
|
||||||
pub fn on_collation(&mut self, collator_id: CollatorId, relay_parent: Hash, collation: Collation) {
|
pub fn on_collation(&mut self, collator_id: CollatorId, relay_parent: Hash, collation: Collation) {
|
||||||
if let Some(para_id) = self.collators.get(&collator_id) {
|
if let Some((para_id, _)) = self.collators.get(&collator_id) {
|
||||||
debug_assert_eq!(para_id, &collation.receipt.parachain_index);
|
debug_assert_eq!(para_id, &collation.receipt.parachain_index);
|
||||||
|
|
||||||
// TODO: punish if not primary? (https://github.com/paritytech/polkadot/issues/213)
|
// TODO: punish if not primary? (https://github.com/paritytech/polkadot/issues/213)
|
||||||
@@ -240,8 +241,8 @@ mod tests {
|
|||||||
let bad_primary: CollatorId = [0; 32].unchecked_into();
|
let bad_primary: CollatorId = [0; 32].unchecked_into();
|
||||||
let good_backup: CollatorId = [1; 32].unchecked_into();
|
let good_backup: CollatorId = [1; 32].unchecked_into();
|
||||||
|
|
||||||
assert_eq!(pool.on_new_collator(bad_primary.clone(), para_id.clone()), Role::Primary);
|
assert_eq!(pool.on_new_collator(bad_primary.clone(), para_id.clone(), PeerId::random()), Role::Primary);
|
||||||
assert_eq!(pool.on_new_collator(good_backup.clone(), para_id.clone()), Role::Backup);
|
assert_eq!(pool.on_new_collator(good_backup.clone(), para_id.clone(), PeerId::random()), Role::Backup);
|
||||||
assert_eq!(pool.on_disconnect(bad_primary), Some(good_backup.clone()));
|
assert_eq!(pool.on_disconnect(bad_primary), Some(good_backup.clone()));
|
||||||
assert_eq!(pool.on_disconnect(good_backup), None);
|
assert_eq!(pool.on_disconnect(good_backup), None);
|
||||||
}
|
}
|
||||||
@@ -253,8 +254,8 @@ mod tests {
|
|||||||
let primary = [0; 32].unchecked_into();
|
let primary = [0; 32].unchecked_into();
|
||||||
let backup: CollatorId = [1; 32].unchecked_into();
|
let backup: CollatorId = [1; 32].unchecked_into();
|
||||||
|
|
||||||
assert_eq!(pool.on_new_collator(primary, para_id.clone()), Role::Primary);
|
assert_eq!(pool.on_new_collator(primary, para_id.clone(), PeerId::random()), Role::Primary);
|
||||||
assert_eq!(pool.on_new_collator(backup.clone(), para_id.clone()), Role::Backup);
|
assert_eq!(pool.on_new_collator(backup.clone(), para_id.clone(), PeerId::random()), Role::Backup);
|
||||||
assert_eq!(pool.on_disconnect(backup), None);
|
assert_eq!(pool.on_disconnect(backup), None);
|
||||||
assert!(pool.parachain_collators.get(¶_id).unwrap().backup.is_empty());
|
assert!(pool.parachain_collators.get(¶_id).unwrap().backup.is_empty());
|
||||||
}
|
}
|
||||||
@@ -263,10 +264,11 @@ mod tests {
|
|||||||
fn await_before_collation() {
|
fn await_before_collation() {
|
||||||
let mut pool = CollatorPool::new();
|
let mut pool = CollatorPool::new();
|
||||||
let para_id: ParaId = 5.into();
|
let para_id: ParaId = 5.into();
|
||||||
|
let peer_id = PeerId::random();
|
||||||
let primary: CollatorId = [0; 32].unchecked_into();
|
let primary: CollatorId = [0; 32].unchecked_into();
|
||||||
let relay_parent = [1; 32].into();
|
let relay_parent = [1; 32].into();
|
||||||
|
|
||||||
assert_eq!(pool.on_new_collator(primary.clone(), para_id.clone()), Role::Primary);
|
assert_eq!(pool.on_new_collator(primary.clone(), para_id.clone(), peer_id.clone()), Role::Primary);
|
||||||
let (tx1, rx1) = oneshot::channel();
|
let (tx1, rx1) = oneshot::channel();
|
||||||
let (tx2, rx2) = oneshot::channel();
|
let (tx2, rx2) = oneshot::channel();
|
||||||
pool.await_collation(relay_parent, para_id, tx1);
|
pool.await_collation(relay_parent, para_id, tx1);
|
||||||
@@ -274,7 +276,7 @@ mod tests {
|
|||||||
pool.on_collation(primary.clone(), relay_parent, Collation {
|
pool.on_collation(primary.clone(), relay_parent, Collation {
|
||||||
receipt: CandidateReceipt {
|
receipt: CandidateReceipt {
|
||||||
parachain_index: para_id,
|
parachain_index: para_id,
|
||||||
collator: primary.into(),
|
collator: primary.clone().into(),
|
||||||
signature: Default::default(),
|
signature: Default::default(),
|
||||||
head_data: HeadData(vec![1, 2, 3]),
|
head_data: HeadData(vec![1, 2, 3]),
|
||||||
egress_queue_roots: vec![],
|
egress_queue_roots: vec![],
|
||||||
@@ -287,6 +289,7 @@ mod tests {
|
|||||||
|
|
||||||
rx1.wait().unwrap();
|
rx1.wait().unwrap();
|
||||||
rx2.wait().unwrap();
|
rx2.wait().unwrap();
|
||||||
|
assert_eq!(pool.collators.get(&primary).map(|ids| &ids.1).unwrap(), &peer_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@@ -296,7 +299,7 @@ mod tests {
|
|||||||
let primary: CollatorId = [0; 32].unchecked_into();
|
let primary: CollatorId = [0; 32].unchecked_into();
|
||||||
let relay_parent = [1; 32].into();
|
let relay_parent = [1; 32].into();
|
||||||
|
|
||||||
assert_eq!(pool.on_new_collator(primary.clone(), para_id.clone()), Role::Primary);
|
assert_eq!(pool.on_new_collator(primary.clone(), para_id.clone(), PeerId::random()), Role::Primary);
|
||||||
|
|
||||||
pool.on_collation(primary.clone(), relay_parent, Collation {
|
pool.on_collation(primary.clone(), relay_parent, Collation {
|
||||||
receipt: CandidateReceipt {
|
receipt: CandidateReceipt {
|
||||||
|
|||||||
@@ -491,7 +491,11 @@ impl Specialization<Block> for PolkadotProtocol {
|
|||||||
}
|
}
|
||||||
ctx.report_peer(who.clone(), benefit::NEW_COLLATOR);
|
ctx.report_peer(who.clone(), benefit::NEW_COLLATOR);
|
||||||
|
|
||||||
let collator_role = self.collators.on_new_collator(acc_id.clone(), para_id.clone());
|
let collator_role = self.collators.on_new_collator(
|
||||||
|
acc_id.clone(),
|
||||||
|
para_id.clone(),
|
||||||
|
who.clone(),
|
||||||
|
);
|
||||||
|
|
||||||
peer_info.collator_state.set_role(collator_role, |msg| send_polkadot_message(
|
peer_info.collator_state.set_role(collator_role, |msg| send_polkadot_message(
|
||||||
ctx,
|
ctx,
|
||||||
|
|||||||
Reference in New Issue
Block a user