mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 16:21:02 +00:00
Validator side of the collation protocol. (#295)
* skeleton of collators object * awaiting and handling collations. rename `collators` to CollationPool * add some tests * add tests * implement Collators trait for ConsensusNetwork * plug collators into main polkadot-network * ignore collator role message * add a couple more tests * garbage collection for collations * ensure disconnected backup collator is removed from pool * address other grumbles
This commit is contained in:
committed by
Gav Wood
parent
907eeef9dd
commit
88c40c8fb4
@@ -157,7 +157,8 @@ pub fn collate<'a, R, P>(
|
|||||||
ingress.0.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg)))
|
ingress.0.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg)))
|
||||||
);
|
);
|
||||||
|
|
||||||
let signature = key.sign(&block_data.0[..]).into();
|
let block_data_hash = block_data.hash();
|
||||||
|
let signature = key.sign(&block_data_hash.0[..]).into();
|
||||||
let pubkey_bytes: [u8; 32] = key.public().into();
|
let pubkey_bytes: [u8; 32] = key.public().into();
|
||||||
|
|
||||||
let receipt = parachain::CandidateReceipt {
|
let receipt = parachain::CandidateReceipt {
|
||||||
@@ -168,7 +169,7 @@ pub fn collate<'a, R, P>(
|
|||||||
balance_uploads: Vec::new(),
|
balance_uploads: Vec::new(),
|
||||||
egress_queue_roots: Vec::new(),
|
egress_queue_roots: Vec::new(),
|
||||||
fees: 0,
|
fees: 0,
|
||||||
block_data_hash: block_data.hash(),
|
block_data_hash,
|
||||||
};
|
};
|
||||||
|
|
||||||
parachain::Collation {
|
parachain::Collation {
|
||||||
|
|||||||
@@ -37,6 +37,12 @@ pub trait Collators: Clone {
|
|||||||
type Collation: IntoFuture<Item=Collation,Error=Self::Error>;
|
type Collation: IntoFuture<Item=Collation,Error=Self::Error>;
|
||||||
|
|
||||||
/// Collate on a specific parachain, building on a given relay chain parent hash.
|
/// Collate on a specific parachain, building on a given relay chain parent hash.
|
||||||
|
///
|
||||||
|
/// The returned collation should be checked for basic validity in the signature
|
||||||
|
/// and will be checked for state-transition validity by the consumer of this trait.
|
||||||
|
///
|
||||||
|
/// This does not have to guarantee local availability, as a valid collation
|
||||||
|
/// will be passed to the `TableRouter` instance.
|
||||||
fn collate(&self, parachain: ParaId, relay_parent: Hash) -> Self::Collation;
|
fn collate(&self, parachain: ParaId, relay_parent: Hash) -> Self::Collation;
|
||||||
|
|
||||||
/// Note a bad collator. TODO: take proof
|
/// Note a bad collator. TODO: take proof
|
||||||
|
|||||||
@@ -0,0 +1,320 @@
|
|||||||
|
// Copyright 2018 Parity Technologies (UK) Ltd.
|
||||||
|
// This file is part of Polkadot.
|
||||||
|
|
||||||
|
// Polkadot is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// Polkadot is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
//! Bridge between the network and consensus service for getting collations to it.
|
||||||
|
|
||||||
|
use polkadot_primitives::{AccountId, Hash};
|
||||||
|
use polkadot_primitives::parachain::{Id as ParaId, Collation};
|
||||||
|
|
||||||
|
use futures::sync::oneshot;
|
||||||
|
|
||||||
|
use std::collections::hash_map::{HashMap, Entry};
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
const COLLATION_LIFETIME: Duration = Duration::from_secs(60 * 5);
|
||||||
|
|
||||||
|
/// The role of the collator. Whether they're the primary or backup for this parachain.
|
||||||
|
#[derive(PartialEq, Debug, Serialize, Deserialize)]
|
||||||
|
pub enum Role {
|
||||||
|
/// Primary collators should send collations whenever it's time.
|
||||||
|
Primary,
|
||||||
|
/// Backup collators should not.
|
||||||
|
Backup,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A maintenance action for the collator set.
|
||||||
|
#[derive(PartialEq, Debug)]
|
||||||
|
#[allow(dead_code)]
|
||||||
|
pub enum Action {
|
||||||
|
/// Disconnect the given collator.
|
||||||
|
Disconnect(AccountId),
|
||||||
|
/// Give the collator a new role.
|
||||||
|
NewRole(AccountId, Role),
|
||||||
|
}
|
||||||
|
|
||||||
|
struct CollationSlot {
|
||||||
|
live_at: Instant,
|
||||||
|
entries: SlotEntries,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CollationSlot {
|
||||||
|
fn blank_now() -> Self {
|
||||||
|
CollationSlot {
|
||||||
|
live_at: Instant::now(),
|
||||||
|
entries: SlotEntries::Blank,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn stay_alive(&self, now: Instant) -> bool {
|
||||||
|
self.live_at + COLLATION_LIFETIME > now
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
enum SlotEntries {
|
||||||
|
Blank,
|
||||||
|
// not queried yet
|
||||||
|
Pending(Vec<Collation>),
|
||||||
|
// waiting for next to arrive.
|
||||||
|
Awaiting(Vec<oneshot::Sender<Collation>>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SlotEntries {
|
||||||
|
fn received_collation(&mut self, collation: Collation) {
|
||||||
|
*self = match ::std::mem::replace(self, SlotEntries::Blank) {
|
||||||
|
SlotEntries::Blank => SlotEntries::Pending(vec![collation]),
|
||||||
|
SlotEntries::Pending(mut cs) => {
|
||||||
|
cs.push(collation);
|
||||||
|
SlotEntries::Pending(cs)
|
||||||
|
}
|
||||||
|
SlotEntries::Awaiting(senders) => {
|
||||||
|
for sender in senders {
|
||||||
|
let _ = sender.send(collation.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
SlotEntries::Blank
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
fn await_with(&mut self, sender: oneshot::Sender<Collation>) {
|
||||||
|
*self = match ::std::mem::replace(self, SlotEntries::Blank) {
|
||||||
|
SlotEntries::Blank => SlotEntries::Awaiting(vec![sender]),
|
||||||
|
SlotEntries::Awaiting(mut senders) => {
|
||||||
|
senders.push(sender);
|
||||||
|
SlotEntries::Awaiting(senders)
|
||||||
|
}
|
||||||
|
SlotEntries::Pending(mut cs) => {
|
||||||
|
let next_collation = cs.pop().expect("empty variant is always `Blank`; qed");
|
||||||
|
let _ = sender.send(next_collation);
|
||||||
|
|
||||||
|
if cs.is_empty() {
|
||||||
|
SlotEntries::Blank
|
||||||
|
} else {
|
||||||
|
SlotEntries::Pending(cs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ParachainCollators {
|
||||||
|
primary: AccountId,
|
||||||
|
backup: Vec<AccountId>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Manages connected collators and role assignments from the perspective of a validator.
|
||||||
|
pub struct CollatorPool {
|
||||||
|
collators: HashMap<AccountId, ParaId>,
|
||||||
|
parachain_collators: HashMap<ParaId, ParachainCollators>,
|
||||||
|
collations: HashMap<(Hash, ParaId), CollationSlot>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CollatorPool {
|
||||||
|
/// Create a new `CollatorPool` object.
|
||||||
|
pub fn new() -> Self {
|
||||||
|
CollatorPool {
|
||||||
|
collators: HashMap::new(),
|
||||||
|
parachain_collators: HashMap::new(),
|
||||||
|
collations: HashMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Call when a new collator is authenticated. Returns the role.
|
||||||
|
pub fn on_new_collator(&mut self, account_id: AccountId, para_id: ParaId) -> Role {
|
||||||
|
self.collators.insert(account_id.clone(), para_id);
|
||||||
|
match self.parachain_collators.entry(para_id) {
|
||||||
|
Entry::Vacant(vacant) => {
|
||||||
|
vacant.insert(ParachainCollators {
|
||||||
|
primary: account_id,
|
||||||
|
backup: Vec::new(),
|
||||||
|
});
|
||||||
|
|
||||||
|
Role::Primary
|
||||||
|
},
|
||||||
|
Entry::Occupied(mut occupied) => {
|
||||||
|
occupied.get_mut().backup.push(account_id);
|
||||||
|
|
||||||
|
Role::Backup
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Called when a collator disconnects. If it was the primary, returns a new primary for that
|
||||||
|
/// parachain.
|
||||||
|
pub fn on_disconnect(&mut self, account_id: AccountId) -> Option<AccountId> {
|
||||||
|
self.collators.remove(&account_id).and_then(|para_id| match self.parachain_collators.entry(para_id) {
|
||||||
|
Entry::Vacant(_) => None,
|
||||||
|
Entry::Occupied(mut occ) => {
|
||||||
|
if occ.get().primary == account_id {
|
||||||
|
if occ.get().backup.is_empty() {
|
||||||
|
occ.remove();
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
let mut collators = occ.get_mut();
|
||||||
|
collators.primary = collators.backup.pop().expect("backup non-empty; qed");
|
||||||
|
Some(collators.primary)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
let pos = occ.get().backup.iter().position(|a| a == &account_id)
|
||||||
|
.expect("registered collator always present in backup if not primary; qed");
|
||||||
|
|
||||||
|
occ.get_mut().backup.remove(pos);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Called when a collation is received.
|
||||||
|
/// The collator should be registered for the parachain of the collation as a precondition of this function.
|
||||||
|
/// The collation should have been checked for integrity of signature before passing to this function.
|
||||||
|
pub fn on_collation(&mut self, account_id: AccountId, relay_parent: Hash, collation: Collation) {
|
||||||
|
if let Some(para_id) = self.collators.get(&account_id) {
|
||||||
|
debug_assert_eq!(para_id, &collation.receipt.parachain_index);
|
||||||
|
|
||||||
|
// TODO: punish if not primary?
|
||||||
|
|
||||||
|
self.collations.entry((relay_parent, para_id.clone()))
|
||||||
|
.or_insert_with(CollationSlot::blank_now)
|
||||||
|
.entries
|
||||||
|
.received_collation(collation);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wait for a collation from a parachain.
|
||||||
|
pub fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId, sender: oneshot::Sender<Collation>) {
|
||||||
|
self.collations.entry((relay_parent, para_id))
|
||||||
|
.or_insert_with(CollationSlot::blank_now)
|
||||||
|
.entries
|
||||||
|
.await_with(sender);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Call periodically to perform collator set maintenance.
|
||||||
|
/// Returns a set of actions to perform on the network level.
|
||||||
|
pub fn maintain_peers(&mut self) -> Vec<Action> {
|
||||||
|
// TODO: rearrange periodically to new primary, evaluate based on latency etc.
|
||||||
|
Vec::new()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// called when a block with given hash has been imported.
|
||||||
|
pub fn collect_garbage(&mut self, chain_head: Option<&Hash>) {
|
||||||
|
let now = Instant::now();
|
||||||
|
self.collations.retain(|&(ref h, _), slot| chain_head != Some(h) && slot.stay_alive(now));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use polkadot_primitives::parachain::{CandidateReceipt, BlockData, HeadData};
|
||||||
|
use substrate_primitives::H512;
|
||||||
|
use futures::Future;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn disconnect_primary_gives_new_primary() {
|
||||||
|
let mut pool = CollatorPool::new();
|
||||||
|
let para_id: ParaId = 5.into();
|
||||||
|
let bad_primary = [0; 32].into();
|
||||||
|
let good_backup = [1; 32].into();
|
||||||
|
|
||||||
|
assert_eq!(pool.on_new_collator(bad_primary, para_id.clone()), Role::Primary);
|
||||||
|
assert_eq!(pool.on_new_collator(good_backup, para_id.clone()), Role::Backup);
|
||||||
|
assert_eq!(pool.on_disconnect(bad_primary), Some(good_backup));
|
||||||
|
assert_eq!(pool.on_disconnect(good_backup), None);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn disconnect_backup_removes_from_pool() {
|
||||||
|
let mut pool = CollatorPool::new();
|
||||||
|
let para_id: ParaId = 5.into();
|
||||||
|
let primary = [0; 32].into();
|
||||||
|
let backup = [1; 32].into();
|
||||||
|
|
||||||
|
assert_eq!(pool.on_new_collator(primary, para_id.clone()), Role::Primary);
|
||||||
|
assert_eq!(pool.on_new_collator(backup, para_id.clone()), Role::Backup);
|
||||||
|
assert_eq!(pool.on_disconnect(backup), None);
|
||||||
|
assert!(pool.parachain_collators.get(¶_id).unwrap().backup.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn await_before_collation() {
|
||||||
|
let mut pool = CollatorPool::new();
|
||||||
|
let para_id: ParaId = 5.into();
|
||||||
|
let primary = [0; 32].into();
|
||||||
|
let relay_parent = [1; 32].into();
|
||||||
|
|
||||||
|
assert_eq!(pool.on_new_collator(primary, para_id.clone()), Role::Primary);
|
||||||
|
let (tx1, rx1) = oneshot::channel();
|
||||||
|
let (tx2, rx2) = oneshot::channel();
|
||||||
|
pool.await_collation(relay_parent, para_id, tx1);
|
||||||
|
pool.await_collation(relay_parent, para_id, tx2);
|
||||||
|
pool.on_collation(primary, relay_parent, Collation {
|
||||||
|
receipt: CandidateReceipt {
|
||||||
|
parachain_index: para_id,
|
||||||
|
collator: primary.into(),
|
||||||
|
signature: H512::from([2; 64]).into(),
|
||||||
|
head_data: HeadData(vec![1, 2, 3]),
|
||||||
|
balance_uploads: vec![],
|
||||||
|
egress_queue_roots: vec![],
|
||||||
|
fees: 0,
|
||||||
|
block_data_hash: [3; 32].into(),
|
||||||
|
},
|
||||||
|
block_data: BlockData(vec![4, 5, 6]),
|
||||||
|
});
|
||||||
|
|
||||||
|
rx1.wait().unwrap();
|
||||||
|
rx2.wait().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn collate_before_await() {
|
||||||
|
let mut pool = CollatorPool::new();
|
||||||
|
let para_id: ParaId = 5.into();
|
||||||
|
let primary = [0; 32].into();
|
||||||
|
let relay_parent = [1; 32].into();
|
||||||
|
|
||||||
|
assert_eq!(pool.on_new_collator(primary, para_id.clone()), Role::Primary);
|
||||||
|
|
||||||
|
pool.on_collation(primary, relay_parent, Collation {
|
||||||
|
receipt: CandidateReceipt {
|
||||||
|
parachain_index: para_id,
|
||||||
|
collator: primary.into(),
|
||||||
|
signature: H512::from([2; 64]).into(),
|
||||||
|
head_data: HeadData(vec![1, 2, 3]),
|
||||||
|
balance_uploads: vec![],
|
||||||
|
egress_queue_roots: vec![],
|
||||||
|
fees: 0,
|
||||||
|
block_data_hash: [3; 32].into(),
|
||||||
|
},
|
||||||
|
block_data: BlockData(vec![4, 5, 6]),
|
||||||
|
});
|
||||||
|
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
pool.await_collation(relay_parent, para_id, tx);
|
||||||
|
rx.wait().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn slot_stay_alive() {
|
||||||
|
let slot = CollationSlot::blank_now();
|
||||||
|
let now = slot.live_at;
|
||||||
|
|
||||||
|
assert!(slot.stay_alive(now));
|
||||||
|
assert!(slot.stay_alive(now + Duration::from_secs(10)));
|
||||||
|
assert!(!slot.stay_alive(now + COLLATION_LIFETIME));
|
||||||
|
assert!(!slot.stay_alive(now + COLLATION_LIFETIME + Duration::from_secs(10)));
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -27,7 +27,7 @@ use polkadot_consensus::{Network, SharedTable, Collators};
|
|||||||
use polkadot_primitives::{AccountId, Block, Hash, SessionKey};
|
use polkadot_primitives::{AccountId, Block, Hash, SessionKey};
|
||||||
use polkadot_primitives::parachain::{Id as ParaId, Collation};
|
use polkadot_primitives::parachain::{Id as ParaId, Collation};
|
||||||
|
|
||||||
use futures::{future, prelude::*};
|
use futures::prelude::*;
|
||||||
use futures::sync::mpsc;
|
use futures::sync::mpsc;
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@@ -304,13 +304,38 @@ impl<P: LocalPolkadotApi + Send + Sync + 'static> Network for ConsensusNetwork<P
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Error when the network appears to be down.
|
||||||
|
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||||
|
pub struct NetworkDown;
|
||||||
|
|
||||||
|
/// A future that resolves when a collation is received.
|
||||||
|
pub struct AwaitingCollation(Option<::futures::sync::oneshot::Receiver<Collation>>);
|
||||||
|
|
||||||
|
impl Future for AwaitingCollation {
|
||||||
|
type Item = Collation;
|
||||||
|
type Error = NetworkDown;
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Collation, NetworkDown> {
|
||||||
|
match self.0.poll().map_err(|_| NetworkDown)? {
|
||||||
|
Async::Ready(None) => Err(NetworkDown),
|
||||||
|
Async::Ready(Some(x)) => Ok(Async::Ready(x)),
|
||||||
|
Async::NotReady => Ok(Async::NotReady),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
impl<P: LocalPolkadotApi + Send + Sync + 'static> Collators for ConsensusNetwork<P> {
|
impl<P: LocalPolkadotApi + Send + Sync + 'static> Collators for ConsensusNetwork<P> {
|
||||||
type Error = ();
|
type Error = NetworkDown;
|
||||||
type Collation = future::Empty<Collation, ()>;
|
type Collation = AwaitingCollation;
|
||||||
|
|
||||||
fn collate(&self, _parachain: ParaId, _relay_parent: Hash) -> Self::Collation {
|
fn collate(&self, parachain: ParaId, relay_parent: Hash) -> Self::Collation {
|
||||||
future::empty()
|
AwaitingCollation(
|
||||||
|
self.network.with_spec(|spec, _| spec.await_collation(relay_parent, parachain))
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn note_bad_collator(&self, _collator: AccountId) { }
|
fn note_bad_collator(&self, collator: AccountId) {
|
||||||
|
self.network.with_spec(|spec, ctx| spec.disconnect_bad_collator(ctx, collator));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
+116
-21
@@ -43,6 +43,7 @@ extern crate rhododendron;
|
|||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate log;
|
extern crate log;
|
||||||
|
|
||||||
|
mod collator_pool;
|
||||||
mod router;
|
mod router;
|
||||||
pub mod consensus;
|
pub mod consensus;
|
||||||
|
|
||||||
@@ -50,17 +51,19 @@ use codec::Slicable;
|
|||||||
use futures::sync::oneshot;
|
use futures::sync::oneshot;
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
use polkadot_consensus::{Statement, SignedStatement, GenericStatement};
|
use polkadot_consensus::{Statement, SignedStatement, GenericStatement};
|
||||||
use polkadot_primitives::{Block, SessionKey, Hash};
|
use polkadot_primitives::{AccountId, Block, SessionKey, Hash, Header};
|
||||||
use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt};
|
use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt, Collation};
|
||||||
use substrate_network::{PeerId, RequestId, Context};
|
use substrate_network::{PeerId, RequestId, Context};
|
||||||
use substrate_network::consensus_gossip::ConsensusGossip;
|
use substrate_network::consensus_gossip::ConsensusGossip;
|
||||||
use substrate_network::{message, generic_message};
|
use substrate_network::{message, generic_message};
|
||||||
use substrate_network::specialization::Specialization;
|
use substrate_network::specialization::Specialization;
|
||||||
use substrate_network::StatusMessage as GenericFullStatus;
|
use substrate_network::StatusMessage as GenericFullStatus;
|
||||||
|
use self::collator_pool::{CollatorPool, Role, Action};
|
||||||
|
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests;
|
mod tests;
|
||||||
|
|
||||||
@@ -75,16 +78,16 @@ pub type NetworkService = ::substrate_network::Service<Block, PolkadotProtocol>;
|
|||||||
/// Status of a Polkadot node.
|
/// Status of a Polkadot node.
|
||||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||||
pub struct Status {
|
pub struct Status {
|
||||||
collating_for: Option<ParaId>,
|
collating_for: Option<(AccountId, ParaId)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Slicable for Status {
|
impl Slicable for Status {
|
||||||
fn encode(&self) -> Vec<u8> {
|
fn encode(&self) -> Vec<u8> {
|
||||||
let mut v = Vec::new();
|
let mut v = Vec::new();
|
||||||
match self.collating_for {
|
match self.collating_for {
|
||||||
Some(ref id) => {
|
Some(ref details) => {
|
||||||
v.push(1);
|
v.push(1);
|
||||||
id.using_encoded(|s| v.extend(s));
|
details.using_encoded(|s| v.extend(s));
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
v.push(0);
|
v.push(0);
|
||||||
@@ -96,7 +99,7 @@ impl Slicable for Status {
|
|||||||
fn decode<I: ::codec::Input>(input: &mut I) -> Option<Self> {
|
fn decode<I: ::codec::Input>(input: &mut I) -> Option<Self> {
|
||||||
let collating_for = match input.read_byte()? {
|
let collating_for = match input.read_byte()? {
|
||||||
0 => None,
|
0 => None,
|
||||||
1 => Some(ParaId::decode(input)?),
|
1 => Some(Slicable::decode(input)?),
|
||||||
_ => return None,
|
_ => return None,
|
||||||
};
|
};
|
||||||
Some(Status { collating_for })
|
Some(Status { collating_for })
|
||||||
@@ -196,6 +199,10 @@ pub enum Message {
|
|||||||
RequestBlockData(RequestId, Hash),
|
RequestBlockData(RequestId, Hash),
|
||||||
/// Provide block data by candidate hash or nothing if unknown.
|
/// Provide block data by candidate hash or nothing if unknown.
|
||||||
BlockData(RequestId, Option<BlockData>),
|
BlockData(RequestId, Option<BlockData>),
|
||||||
|
/// Tell a collator their role.
|
||||||
|
CollatorRole(Role),
|
||||||
|
/// A collation provided by a peer. Relay parent and collation.
|
||||||
|
Collation(Hash, Collation),
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send_polkadot_message(ctx: &mut Context<Block>, to: PeerId, message: Message) {
|
fn send_polkadot_message(ctx: &mut Context<Block>, to: PeerId, message: Message) {
|
||||||
@@ -207,8 +214,7 @@ fn send_polkadot_message(ctx: &mut Context<Block>, to: PeerId, message: Message)
|
|||||||
pub struct PolkadotProtocol {
|
pub struct PolkadotProtocol {
|
||||||
peers: HashMap<PeerId, PeerInfo>,
|
peers: HashMap<PeerId, PeerInfo>,
|
||||||
consensus_gossip: ConsensusGossip<Block>,
|
consensus_gossip: ConsensusGossip<Block>,
|
||||||
collators: HashMap<ParaId, Vec<PeerId>>,
|
collators: CollatorPool,
|
||||||
collating_for: Option<ParaId>,
|
|
||||||
live_consensus: Option<CurrentConsensus>,
|
live_consensus: Option<CurrentConsensus>,
|
||||||
in_flight: HashMap<(RequestId, PeerId), BlockDataRequest>,
|
in_flight: HashMap<(RequestId, PeerId), BlockDataRequest>,
|
||||||
pending: Vec<BlockDataRequest>,
|
pending: Vec<BlockDataRequest>,
|
||||||
@@ -221,8 +227,7 @@ impl PolkadotProtocol {
|
|||||||
PolkadotProtocol {
|
PolkadotProtocol {
|
||||||
peers: HashMap::new(),
|
peers: HashMap::new(),
|
||||||
consensus_gossip: ConsensusGossip::new(),
|
consensus_gossip: ConsensusGossip::new(),
|
||||||
collators: HashMap::new(),
|
collators: CollatorPool::new(),
|
||||||
collating_for: None,
|
|
||||||
live_consensus: None,
|
live_consensus: None,
|
||||||
in_flight: HashMap::new(),
|
in_flight: HashMap::new(),
|
||||||
pending: Vec::new(),
|
pending: Vec::new(),
|
||||||
@@ -260,7 +265,10 @@ impl PolkadotProtocol {
|
|||||||
let parent_hash = consensus.parent_hash;
|
let parent_hash = consensus.parent_hash;
|
||||||
let old_parent = self.live_consensus.as_ref().map(|c| c.parent_hash);
|
let old_parent = self.live_consensus.as_ref().map(|c| c.parent_hash);
|
||||||
|
|
||||||
for (id, info) in self.peers.iter_mut().filter(|&(_, ref info)| info.validator) {
|
// TODO: optimize for when session key changes and only send to collators who are relevant in next few blocks.
|
||||||
|
for (id, info) in self.peers.iter_mut()
|
||||||
|
.filter(|&(_, ref info)| info.validator || info.status.collating_for.is_some())
|
||||||
|
{
|
||||||
send_polkadot_message(
|
send_polkadot_message(
|
||||||
ctx,
|
ctx,
|
||||||
*id,
|
*id,
|
||||||
@@ -363,6 +371,8 @@ impl PolkadotProtocol {
|
|||||||
send_polkadot_message(ctx, peer_id, Message::BlockData(req_id, block_data));
|
send_polkadot_message(ctx, peer_id, Message::BlockData(req_id, block_data));
|
||||||
}
|
}
|
||||||
Message::BlockData(req_id, data) => self.on_block_data(ctx, peer_id, req_id, data),
|
Message::BlockData(req_id, data) => self.on_block_data(ctx, peer_id, req_id, data),
|
||||||
|
Message::Collation(relay_parent, collation) => self.on_collation(ctx, peer_id, relay_parent, collation),
|
||||||
|
Message::CollatorRole(_) => {},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -386,7 +396,7 @@ impl PolkadotProtocol {
|
|||||||
|
|
||||||
impl Specialization<Block> for PolkadotProtocol {
|
impl Specialization<Block> for PolkadotProtocol {
|
||||||
fn status(&self) -> Vec<u8> {
|
fn status(&self) -> Vec<u8> {
|
||||||
Status { collating_for: self.collating_for.clone() }.encode()
|
Status { collating_for: None }.encode()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_connect(&mut self, ctx: &mut Context<Block>, peer_id: PeerId, status: FullStatus) {
|
fn on_connect(&mut self, ctx: &mut Context<Block>, peer_id: PeerId, status: FullStatus) {
|
||||||
@@ -397,13 +407,23 @@ impl Specialization<Block> for PolkadotProtocol {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some(ref para_id) = local_status.collating_for {
|
if let Some((ref acc_id, ref para_id)) = local_status.collating_for {
|
||||||
self.collators.entry(para_id.clone())
|
if self.collator_peer_id(acc_id.clone()).is_some() {
|
||||||
.or_insert_with(Vec::new)
|
ctx.disable_peer(peer_id);
|
||||||
.push(peer_id);
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
let collator_role = self.collators.on_new_collator(acc_id.clone(), para_id.clone());
|
||||||
|
send_polkadot_message(
|
||||||
|
ctx,
|
||||||
|
peer_id,
|
||||||
|
Message::CollatorRole(collator_role),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
let validator = status.roles.iter().any(|r| *r == message::Role::Authority);
|
let validator = status.roles.iter().any(|r| *r == message::Role::Authority);
|
||||||
|
let send_key = validator || local_status.collating_for.is_some();
|
||||||
|
|
||||||
self.peers.insert(peer_id, PeerInfo {
|
self.peers.insert(peer_id, PeerInfo {
|
||||||
status: local_status,
|
status: local_status,
|
||||||
session_keys: Default::default(),
|
session_keys: Default::default(),
|
||||||
@@ -411,8 +431,7 @@ impl Specialization<Block> for PolkadotProtocol {
|
|||||||
});
|
});
|
||||||
|
|
||||||
self.consensus_gossip.new_peer(ctx, peer_id, &status.roles);
|
self.consensus_gossip.new_peer(ctx, peer_id, &status.roles);
|
||||||
|
if let (true, &Some(ref consensus)) = (send_key, &self.live_consensus) {
|
||||||
if let (true, &Some(ref consensus)) = (validator, &self.live_consensus) {
|
|
||||||
send_polkadot_message(
|
send_polkadot_message(
|
||||||
ctx,
|
ctx,
|
||||||
peer_id,
|
peer_id,
|
||||||
@@ -425,9 +444,16 @@ impl Specialization<Block> for PolkadotProtocol {
|
|||||||
|
|
||||||
fn on_disconnect(&mut self, ctx: &mut Context<Block>, peer_id: PeerId) {
|
fn on_disconnect(&mut self, ctx: &mut Context<Block>, peer_id: PeerId) {
|
||||||
if let Some(info) = self.peers.remove(&peer_id) {
|
if let Some(info) = self.peers.remove(&peer_id) {
|
||||||
if let Some(collators) = info.status.collating_for.and_then(|id| self.collators.get_mut(&id)) {
|
if let Some((acc_id, _)) = info.status.collating_for {
|
||||||
if let Some(pos) = collators.iter().position(|x| x == &peer_id) {
|
let new_primary = self.collators.on_disconnect(acc_id)
|
||||||
collators.swap_remove(pos);
|
.and_then(|new_primary| self.collator_peer_id(new_primary));
|
||||||
|
|
||||||
|
if let Some(new_primary) = new_primary {
|
||||||
|
send_polkadot_message(
|
||||||
|
ctx,
|
||||||
|
new_primary,
|
||||||
|
Message::CollatorRole(Role::Primary),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -483,6 +509,75 @@ impl Specialization<Block> for PolkadotProtocol {
|
|||||||
|
|
||||||
fn maintain_peers(&mut self, ctx: &mut Context<Block>) {
|
fn maintain_peers(&mut self, ctx: &mut Context<Block>) {
|
||||||
self.consensus_gossip.collect_garbage(None);
|
self.consensus_gossip.collect_garbage(None);
|
||||||
|
self.collators.collect_garbage(None);
|
||||||
self.dispatch_pending_requests(ctx);
|
self.dispatch_pending_requests(ctx);
|
||||||
|
|
||||||
|
for collator_action in self.collators.maintain_peers() {
|
||||||
|
match collator_action {
|
||||||
|
Action::Disconnect(collator) => self.disconnect_bad_collator(ctx, collator),
|
||||||
|
Action::NewRole(account_id, role) => if let Some(collator) = self.collator_peer_id(account_id) {
|
||||||
|
send_polkadot_message(
|
||||||
|
ctx,
|
||||||
|
collator,
|
||||||
|
Message::CollatorRole(role),
|
||||||
|
)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn on_block_imported(&mut self, _ctx: &mut Context<Block>, hash: Hash, _header: &Header) {
|
||||||
|
self.collators.collect_garbage(Some(&hash));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PolkadotProtocol {
|
||||||
|
// we received a collation from a peer
|
||||||
|
fn on_collation(&mut self, ctx: &mut Context<Block>, from: PeerId, relay_parent: Hash, collation: Collation) {
|
||||||
|
let collation_para = collation.receipt.parachain_index;
|
||||||
|
let collated_acc = collation.receipt.collator;
|
||||||
|
|
||||||
|
match self.peers.get(&from) {
|
||||||
|
None => ctx.disconnect_peer(from),
|
||||||
|
Some(peer_info) => match peer_info.status.collating_for {
|
||||||
|
None => ctx.disable_peer(from),
|
||||||
|
Some((ref acc_id, ref para_id)) => {
|
||||||
|
let structurally_valid = para_id == &collation_para && acc_id == &collated_acc;
|
||||||
|
if structurally_valid && collation.receipt.check_signature().is_ok() {
|
||||||
|
self.collators.on_collation(acc_id.clone(), relay_parent, collation)
|
||||||
|
} else {
|
||||||
|
ctx.disable_peer(from)
|
||||||
|
};
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId) -> oneshot::Receiver<Collation> {
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
self.collators.await_collation(relay_parent, para_id, tx);
|
||||||
|
rx
|
||||||
|
}
|
||||||
|
|
||||||
|
// get connected peer with given account ID for collation.
|
||||||
|
fn collator_peer_id(&self, account_id: AccountId) -> Option<PeerId> {
|
||||||
|
let check_info = |info: &PeerInfo| info
|
||||||
|
.status
|
||||||
|
.collating_for
|
||||||
|
.as_ref()
|
||||||
|
.map_or(false, |&(ref acc_id, _)| acc_id == &account_id);
|
||||||
|
|
||||||
|
self.peers
|
||||||
|
.iter()
|
||||||
|
.filter(|&(_, info)| check_info(info))
|
||||||
|
.map(|(peer_id, _)| *peer_id)
|
||||||
|
.next()
|
||||||
|
}
|
||||||
|
|
||||||
|
// disconnect a collator by account-id.
|
||||||
|
fn disconnect_bad_collator(&self, ctx: &mut Context<Block>, account_id: AccountId) {
|
||||||
|
if let Some(peer_id) = self.collator_peer_id(account_id) {
|
||||||
|
ctx.disable_peer(peer_id)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -110,11 +110,12 @@ fn sends_session_key() {
|
|||||||
let parent_hash = [0; 32].into();
|
let parent_hash = [0; 32].into();
|
||||||
let local_key = [1; 32].into();
|
let local_key = [1; 32].into();
|
||||||
|
|
||||||
let status = Status { collating_for: None };
|
let validator_status = Status { collating_for: None };
|
||||||
|
let collator_status = Status { collating_for: Some(([2; 32].into(), 5.into())) };
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut ctx = TestContext::default();
|
let mut ctx = TestContext::default();
|
||||||
protocol.on_connect(&mut ctx, peer_a, make_status(&status, vec![Role::Authority]));
|
protocol.on_connect(&mut ctx, peer_a, make_status(&validator_status, vec![Role::Authority]));
|
||||||
assert!(ctx.messages.is_empty());
|
assert!(ctx.messages.is_empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -128,7 +129,7 @@ fn sends_session_key() {
|
|||||||
|
|
||||||
{
|
{
|
||||||
let mut ctx = TestContext::default();
|
let mut ctx = TestContext::default();
|
||||||
protocol.on_connect(&mut ctx, peer_b, make_status(&status, vec![Role::Authority]));
|
protocol.on_connect(&mut ctx, peer_b, make_status(&collator_status, vec![]));
|
||||||
assert!(ctx.has_message(peer_b, Message::SessionKey(parent_hash, local_key)));
|
assert!(ctx.has_message(peer_b, Message::SessionKey(parent_hash, local_key)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -207,3 +208,24 @@ fn fetches_from_those_with_knowledge() {
|
|||||||
assert_eq!(recv.wait().unwrap(), block_data);
|
assert_eq!(recv.wait().unwrap(), block_data);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn remove_bad_collator() {
|
||||||
|
let mut protocol = PolkadotProtocol::new();
|
||||||
|
|
||||||
|
let peer_id = 1;
|
||||||
|
let account_id = [2; 32].into();
|
||||||
|
|
||||||
|
let status = Status { collating_for: Some((account_id, 5.into())) };
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut ctx = TestContext::default();
|
||||||
|
protocol.on_connect(&mut ctx, peer_id, make_status(&status, vec![]));
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut ctx = TestContext::default();
|
||||||
|
protocol.disconnect_bad_collator(&mut ctx, account_id);
|
||||||
|
assert!(ctx.disabled.contains(&peer_id));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -144,7 +144,7 @@ pub struct CandidateReceipt {
|
|||||||
pub parachain_index: Id,
|
pub parachain_index: Id,
|
||||||
/// The collator's relay-chain account ID
|
/// The collator's relay-chain account ID
|
||||||
pub collator: super::AccountId,
|
pub collator: super::AccountId,
|
||||||
/// Signature on block data by collator.
|
/// Signature on blake2-256 of the block data by collator.
|
||||||
pub signature: CandidateSignature,
|
pub signature: CandidateSignature,
|
||||||
/// The head-data
|
/// The head-data
|
||||||
pub head_data: HeadData,
|
pub head_data: HeadData,
|
||||||
@@ -195,6 +195,17 @@ impl CandidateReceipt {
|
|||||||
use runtime_primitives::traits::{BlakeTwo256, Hash};
|
use runtime_primitives::traits::{BlakeTwo256, Hash};
|
||||||
BlakeTwo256::hash_of(self)
|
BlakeTwo256::hash_of(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Check integrity vs. provided block data.
|
||||||
|
pub fn check_signature(&self) -> Result<(), ()> {
|
||||||
|
use runtime_primitives::traits::Verify;
|
||||||
|
|
||||||
|
if self.signature.verify(&self.block_data_hash.0[..], &self.collator) {
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(())
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PartialOrd for CandidateReceipt {
|
impl PartialOrd for CandidateReceipt {
|
||||||
|
|||||||
Reference in New Issue
Block a user