Strip out old XCMP primitives (#823)

* WIP

* WIp

* Mostly get tests to compile

* Fix adder collator

* Remove more stuff

* Revert some changes to av store

* Fix av store tests

* Nitpicks

* Restore some things

* Small changes

* Remvoe unused error variants
This commit is contained in:
Ashley
2020-02-13 17:12:05 +01:00
committed by GitHub
parent 5385b9af82
commit 5f9e602af7
30 changed files with 117 additions and 2105 deletions
+1 -6
View File
@@ -236,15 +236,12 @@ impl CollatorPool {
mod tests {
use super::*;
use sp_core::crypto::UncheckedInto;
use polkadot_primitives::parachain::{
CandidateReceipt, BlockData, PoVBlock, HeadData, ConsolidatedIngress,
};
use polkadot_primitives::parachain::{CandidateReceipt, BlockData, PoVBlock, HeadData};
use futures::executor::block_on;
fn make_pov(block_data: Vec<u8>) -> PoVBlock {
PoVBlock {
block_data: BlockData(block_data),
ingress: ConsolidatedIngress(Vec::new()),
}
}
@@ -294,7 +291,6 @@ mod tests {
signature: Default::default(),
head_data: HeadData(vec![1, 2, 3]),
parent_head: HeadData(vec![]),
egress_queue_roots: vec![],
fees: 0,
block_data_hash: [3; 32].into(),
upward_messages: Vec::new(),
@@ -324,7 +320,6 @@ mod tests {
signature: Default::default(),
head_data: HeadData(vec![1, 2, 3]),
parent_head: HeadData(vec![]),
egress_queue_roots: vec![],
fees: 0,
block_data_hash: [3; 32].into(),
upward_messages: Vec::new(),
@@ -93,11 +93,6 @@ impl PeerData {
pub(super) fn knowledge_at_mut(&mut self, parent_hash: &Hash) -> Option<&mut Knowledge> {
self.live.get_mut(parent_hash)
}
/// Get an iterator over all live leaves of this peer.
pub(super) fn leaves(&self) -> impl Iterator<Item = &Hash> {
self.live.keys()
}
}
/// An impartial view of what topics and data are valid based on attestation session data.
@@ -1,339 +0,0 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Data structures and synchronous logic for ICMP message gossip.
//!
//! The parent-module documentation describes some rationale of the general
//! gossip protocol design.
//!
//! The ICMP message-routing gossip works according to those rationale.
//!
//! In this protocol, we perform work under 4 conditions:
//! ### 1. Upon observation of a new leaf in the block-DAG.
//!
//! We first communicate the best leaves to our neighbors in the gossip graph
//! by the means of a neighbor packet. Then, we query to discover the trie roots
//! of all un-routed message queues from the perspective of each of those leaves.
//!
//! For any trie root in the unrouted set for the new leaf, if we have the corresponding
//! queue, we send it to any peers with the new leaf in their latest advertised set.
//!
//! Which parachain those messages go to and from is unimportant, because this is
//! an everybody-sees-everything style protocol. The only important property is "liveness":
//! that the queue root is un-routed at one of the leaves we perceive to be at the head
//! of the block-DAG.
//!
//! In Substrate gossip, every message is associated with a topic. Typically,
//! many messages are grouped under a single topic. In this gossip system, each queue
//! gets its own topic, which is based on the root hash of the queue. This is because
//! many different chain leaves may have the same queue as un-routed, so it's better than
//! attempting to group message packets by the leaf they appear unrouted at.
//!
//! ### 2. Upon a neighbor packet from a peer.
//!
//! The neighbor packet from a peer should contain perceived chain heads of that peer.
//! If there is any overlap between our perceived chain heads and theirs, we send
//! them any known, un-routed message queue from either set.
//!
//! ### 3. Upon receiving a message queue from a peer.
//!
//! If the message queue is in the un-routed set of one of the latest leaves we've updated to,
//! we accept it and relay to any peers who need that queue as well.
//!
//! If not, we report the peer to the peer-set manager for sending us bad data.
//!
//! ### 4. Periodic Pruning
//!
//! We prune messages that are not un-routed from the view of any leaf and cease
//! to attempt to send them to any peer.
use sp_runtime::traits::{BlakeTwo256, Hash as HashT};
use polkadot_primitives::Hash;
use std::collections::{HashMap, HashSet};
use sp_blockchain::Error as ClientError;
use super::{MAX_CHAIN_HEADS, GossipValidationResult, LeavesVec, ChainContext};
/// Construct a topic for a message queue root deterministically.
pub fn queue_topic(queue_root: Hash) -> Hash {
let mut v = queue_root.as_ref().to_vec();
v.extend(b"message_queue");
BlakeTwo256::hash(&v[..])
}
/// A view of which queue roots are current for a given set of leaves.
#[derive(Default)]
pub struct View {
leaves: LeavesVec,
leaf_topics: HashMap<Hash, HashSet<Hash>>, // leaf_hash -> { topics }
expected_queues: HashMap<Hash, (Hash, bool)>, // topic -> (queue-root, known)
}
impl View {
/// Update the set of current leaves. This is called when we perceive a new bset leaf-set.
pub fn update_leaves<T: ChainContext + ?Sized, I>(&mut self, context: &T, new_leaves: I)
-> Result<(), ClientError>
where I: Iterator<Item=Hash>
{
let new_leaves = new_leaves.take(MAX_CHAIN_HEADS);
let old_leaves = std::mem::replace(&mut self.leaves, new_leaves.collect());
let expected_queues = &mut self.expected_queues;
let leaves = &self.leaves;
self.leaf_topics.retain(|l, topics| {
if leaves.contains(l) { return true }
// prune out all data about old leaves we don't follow anymore.
for topic in topics.iter() {
expected_queues.remove(topic);
}
false
});
let mut res = Ok(());
// add in new data about fresh leaves.
for new_leaf in &self.leaves {
if old_leaves.contains(new_leaf) { continue }
let mut this_leaf_topics = HashSet::new();
let r = context.leaf_unrouted_roots(new_leaf, &mut |&queue_root| {
let topic = queue_topic(queue_root);
this_leaf_topics.insert(topic);
expected_queues.entry(topic).or_insert((queue_root, false));
});
if r.is_err() {
if let Err(e) = res {
log::debug!(target: "message_routing", "Ignored duplicate error {}", e)
};
res = r;
}
self.leaf_topics.insert(*new_leaf, this_leaf_topics);
}
res
}
/// Validate an incoming message queue against this view. If it is accepted
/// by our view of un-routed message queues, we will keep and re-propagate.
pub fn validate_queue_and_note_known(&mut self, messages: &super::GossipParachainMessages)
-> (GossipValidationResult<Hash>, sc_network::ReputationChange)
{
let ostensible_topic = queue_topic(messages.queue_root);
match self.expected_queues.get_mut(&ostensible_topic) {
None => (GossipValidationResult::Discard, super::cost::UNNEEDED_ICMP_MESSAGES),
Some(&mut (_, ref mut known)) => {
if !messages.queue_root_is_correct() {
(
GossipValidationResult::Discard,
super::cost::icmp_messages_root_mismatch(messages.messages.len()),
)
} else {
*known = true;
(
GossipValidationResult::ProcessAndKeep(ostensible_topic),
super::benefit::NEW_ICMP_MESSAGES,
)
}
}
}
}
/// Whether a message with given topic is live.
pub fn is_topic_live(&self, topic: &Hash) -> bool {
self.expected_queues.get(topic).is_some()
}
/// Whether a message is allowed under the intersection of the given leaf-set
/// and our own.
pub fn allowed_intersecting(&self, other_leaves: &LeavesVec, topic: &Hash) -> bool {
for i in other_leaves {
for j in &self.leaves {
if i == j {
let leaf_topics = self.leaf_topics.get(i)
.expect("leaf_topics are mutated only in update_leaves; \
we have an entry for each item in self.leaves; \
i is in self.leaves; qed");
if leaf_topics.contains(topic) {
return true;
}
}
}
}
false
}
/// Get topics of all message queues a peer is interested in - this is useful
/// when a peer has informed us of their new best leaves.
pub fn intersection_topics(&self, other_leaves: &LeavesVec) -> impl Iterator<Item=Hash> {
let deduplicated = other_leaves.iter()
.filter_map(|l| self.leaf_topics.get(l))
.flat_map(|topics| topics.iter().cloned())
.collect::<HashSet<_>>();
deduplicated.into_iter()
}
/// Iterate over all live message queues for which the data is marked as not locally known,
/// calling a closure with `(topic, root)`. The closure will return whether the queue data is
/// unknown.
///
/// This is called when we should send un-routed message queues that we are
/// newly aware of to peers - as in when we update our leaves.
pub fn sweep_unknown_queues(&mut self, mut check_known: impl FnMut(&Hash, &Hash) -> bool) {
for (topic, &mut (ref queue_root, ref mut known)) in self.expected_queues.iter_mut() {
if !*known {
*known = check_known(topic, queue_root)
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::legacy::tests::TestChainContext;
use crate::legacy::gossip::{Known, GossipParachainMessages};
use polkadot_primitives::parachain::Message as ParachainMessage;
fn hash(x: u8) -> Hash {
[x; 32].into()
}
fn message_queue(from: u8, to: u8) -> Option<[[u8; 2]; 1]> {
if from == to {
None
} else {
Some([[from, to]])
}
}
fn message_queue_root(from: u8, to: u8) -> Option<Hash> {
message_queue(from, to).map(
|q| polkadot_validation::message_queue_root(q.iter())
)
}
// check that our view has all of the roots of the message queues
// emitted in the heads identified in `our_heads`, and none of the others.
fn check_roots(view: &mut View, our_heads: &[u8], n_heads: u8) -> bool {
for i in 0..n_heads {
for j in 0..n_heads {
if let Some(messages) = message_queue(i, j) {
let queue_root = message_queue_root(i, j).unwrap();
let messages = GossipParachainMessages {
queue_root,
messages: messages.iter().map(|m| ParachainMessage(m.to_vec())).collect(),
};
let had_queue = match view.validate_queue_and_note_known(&messages).0 {
GossipValidationResult::ProcessAndKeep(topic) => topic == queue_topic(queue_root),
_ => false,
};
if our_heads.contains(&i) != had_queue {
return false
}
}
}
}
true
}
#[test]
fn update_leaves_none_in_common() {
let mut ctx = TestChainContext::default();
let n_heads = 5;
for i in 0..n_heads {
ctx.known_map.insert(hash(i as u8), Known::Leaf);
let messages_out: Vec<_> = (0..n_heads).filter_map(|j| message_queue_root(i, j)).collect();
if !messages_out.is_empty() {
ctx.ingress_roots.insert(hash(i as u8), messages_out);
}
}
// initialize the view with 2 leaves.
let mut view = View::default();
view.update_leaves(
&ctx,
[hash(0), hash(1)].iter().cloned(),
).unwrap();
// we should have all queue roots that were
// un-routed from the perspective of those 2
// leaves and no others.
assert!(check_roots(&mut view, &[0, 1], n_heads));
// after updating to a disjoint set,
// the property that we are aware of all un-routed
// from the perspective of our known leaves should
// remain the same.
view.update_leaves(
&ctx,
[hash(2), hash(3), hash(4)].iter().cloned(),
).unwrap();
assert!(check_roots(&mut view, &[2, 3, 4], n_heads));
}
#[test]
fn update_leaves_overlapping() {
let mut ctx = TestChainContext::default();
let n_heads = 5;
for i in 0..n_heads {
ctx.known_map.insert(hash(i as u8), Known::Leaf);
let messages_out: Vec<_> = (0..n_heads).filter_map(|j| message_queue_root(i, j)).collect();
if !messages_out.is_empty() {
ctx.ingress_roots.insert(hash(i as u8), messages_out);
}
}
let mut view = View::default();
view.update_leaves(
&ctx,
[hash(0), hash(1), hash(2)].iter().cloned(),
).unwrap();
assert!(check_roots(&mut view, &[0, 1, 2], n_heads));
view.update_leaves(
&ctx,
[hash(2), hash(3), hash(4)].iter().cloned(),
).unwrap();
// after updating to a leaf-set overlapping with the prior,
// the property that we are aware of all un-routed
// from the perspective of our known leaves should
// remain the same.
assert!(check_roots(&mut view, &[2, 3, 4], n_heads));
}
}
+8 -414
View File
@@ -49,7 +49,7 @@
//! Peers who send information which was not allowed under a recent neighbor packet
//! will be noted as non-beneficial to Substrate's peer-set management utility.
use sp_runtime::{generic::BlockId, traits::{BlakeTwo256, Hash as HashT}};
use sp_runtime::traits::{BlakeTwo256, Hash as HashT};
use sp_blockchain::Error as ClientError;
use sc_network::{config::Roles, Context, PeerId, ReputationChange};
use sc_network::{NetworkService as SubstrateNetworkService, specialization::NetworkSpecialization};
@@ -60,7 +60,7 @@ use sc_network_gossip::{
use polkadot_validation::{SignedStatement};
use polkadot_primitives::{Block, Hash};
use polkadot_primitives::parachain::{
ParachainHost, ValidatorId, Message as ParachainMessage, ErasureChunk as PrimitiveChunk
ParachainHost, ValidatorId, ErasureChunk as PrimitiveChunk
};
use polkadot_erasure_coding::{self as erasure};
use codec::{Decode, Encode};
@@ -72,15 +72,12 @@ use std::sync::Arc;
use arrayvec::ArrayVec;
use futures::prelude::*;
use parking_lot::RwLock;
use log::warn;
use crate::legacy::{GossipMessageStream, NetworkService, GossipService, PolkadotProtocol, router::attestation_topic};
use attestation::{View as AttestationView, PeerData as AttestationPeerData};
use message_routing::{View as MessageRoutingView};
mod attestation;
mod message_routing;
/// The engine ID of the polkadot attestation system.
pub const POLKADOT_ENGINE_ID: sp_runtime::ConsensusEngineId = *b"dot1";
@@ -99,8 +96,6 @@ mod benefit {
pub const NEW_ATTESTATION: Rep = Rep::new(50, "Polkadot: New attestation");
/// When a peer sends us a previously-unknown erasure chunk.
pub const NEW_ERASURE_CHUNK: Rep = Rep::new(10, "Polkadot: New erasure chunk");
/// When a peer sends us a previously-unknown message packet.
pub const NEW_ICMP_MESSAGES: Rep = Rep::new(50, "Polkadot: New ICMP messages");
}
mod cost {
@@ -119,19 +114,10 @@ mod cost {
pub const BAD_SIGNATURE: Rep = Rep::new(-500, "Polkadot: Bad signature");
/// A peer sent us a bad neighbor packet.
pub const BAD_NEIGHBOR_PACKET: Rep = Rep::new(-300, "Polkadot: Bad neighbor");
/// A peer sent us an ICMP queue we haven't advertised a need for.
pub const UNNEEDED_ICMP_MESSAGES: Rep = Rep::new(-100, "Polkadot: Unexpected ICMP message");
/// A peer sent us an erasure chunk referring to a candidate that we are not aware of.
pub const ORPHANED_ERASURE_CHUNK: Rep = Rep::new(-10, "An erasure chunk from unknown candidate");
/// A peer sent us an erasure chunk that does not match candidate's erasure root.
pub const ERASURE_CHUNK_WRONG_ROOT: Rep = Rep::new(-100, "Chunk doesn't match encoding root");
/// A peer sent us an ICMP queue with a bad root.
pub fn icmp_messages_root_mismatch(n_messages: usize) -> Rep {
const PER_MESSAGE: i32 = -150;
Rep::new((0..n_messages).map(|_| PER_MESSAGE).sum(), "Polkadot: ICMP root mismatch")
}
}
/// A gossip message.
@@ -144,12 +130,9 @@ pub enum GossipMessage {
/// Non-candidate statements should only be sent to peers who are aware of the candidate.
#[codec(index = "2")]
Statement(GossipStatement),
/// A packet of messages from one parachain to another.
#[codec(index = "3")]
ParachainMessages(GossipParachainMessages),
// TODO: https://github.com/paritytech/polkadot/issues/253
/// A packet containing one of the erasure-coding chunks of one candidate.
#[codec(index = "4")]
#[codec(index = "3")]
ErasureChunk(ErasureChunkMessage),
}
@@ -165,12 +148,6 @@ impl From<GossipStatement> for GossipMessage {
}
}
impl From<GossipParachainMessages> for GossipMessage {
fn from(messages: GossipParachainMessages) -> Self {
GossipMessage::ParachainMessages(messages)
}
}
/// A gossip message containing a statement.
#[derive(Encode, Decode, Clone, PartialEq)]
pub struct GossipStatement {
@@ -218,19 +195,6 @@ impl From<ErasureChunkMessage> for GossipMessage {
pub struct GossipParachainMessages {
/// The root of the message queue.
pub queue_root: Hash,
/// The messages themselves.
pub messages: Vec<ParachainMessage>,
}
impl GossipParachainMessages {
// confirms that the queue-root in the struct correctly matches
// the messages.
fn queue_root_is_correct(&self) -> bool {
let root = polkadot_validation::message_queue_root(
self.messages.iter().map(|m| &m.0)
);
root == self.queue_root
}
}
/// A versioned neighbor message.
@@ -283,23 +247,9 @@ impl<F, P> ChainContext for (F, P) where
fn leaf_unrouted_roots(
&self,
&leaf: &Hash,
with_queue_root: &mut dyn FnMut(&Hash),
_leaf: &Hash,
_with_queue_root: &mut dyn FnMut(&Hash),
) -> Result<(), ClientError> {
let api = self.1.runtime_api();
let leaf_id = BlockId::Hash(leaf);
let active_parachains = api.active_parachains(&leaf_id)?;
// TODO: https://github.com/paritytech/polkadot/issues/467
for (para_id, _) in active_parachains {
if let Some(ingress) = api.ingress(&leaf_id, para_id, None)? {
for (_height, _from, queue_root) in ingress.iter() {
with_queue_root(queue_root);
}
}
}
Ok(())
}
}
@@ -325,7 +275,6 @@ pub fn register_validator<C: ChainContext + 'static, S: NetworkSpecialization<Bl
inner: RwLock::new(Inner {
peers: HashMap::new(),
attestation_view: Default::default(),
message_routing_view: Default::default(),
availability_store: None,
chain,
})
@@ -350,8 +299,6 @@ pub fn register_validator<C: ChainContext + 'static, S: NetworkSpecialization<Bl
enum NewLeafAction {
// (who, message)
TargetedMessage(PeerId, GossipMessage),
// (topic, message)
Multicast(Hash, GossipMessage),
}
/// Actions to take after noting a new block-DAG leaf.
@@ -372,8 +319,6 @@ impl NewLeafActions {
match action {
NewLeafAction::TargetedMessage(who, message)
=> gossip.send_message(who, message),
NewLeafAction::Multicast(topic, message)
=> gossip.gossip_message(topic, message),
}
}
}
@@ -428,7 +373,6 @@ impl<S: NetworkSpecialization<Block>> RegisteredMessageValidator<S> {
&self,
relay_chain_leaf: Hash,
validation: MessageValidationData,
lookup_queue_by_root: impl Fn(&Hash) -> Option<Vec<ParachainMessage>>,
) -> NewLeafActions {
// add an entry in attestation_view
// prune any entries from attestation_view which are no longer leaves
@@ -441,7 +385,6 @@ impl<S: NetworkSpecialization<Block>> RegisteredMessageValidator<S> {
let &mut Inner {
ref chain,
ref mut attestation_view,
ref mut message_routing_view,
..
} = &mut *inner;
@@ -449,10 +392,6 @@ impl<S: NetworkSpecialization<Block>> RegisteredMessageValidator<S> {
Some(Known::Leaf) => true,
_ => false,
});
if let Err(e) = message_routing_view.update_leaves(chain, attestation_view.neighbor_info()) {
warn!("Unable to fully update leaf-state: {:?}", e);
}
}
@@ -461,23 +400,6 @@ impl<S: NetworkSpecialization<Block>> RegisteredMessageValidator<S> {
|who, message| actions.push(NewLeafAction::TargetedMessage(who.clone(), message))
);
// feed any new unrouted queues into the propagation pool.
inner.message_routing_view.sweep_unknown_queues(|topic, queue_root|
match lookup_queue_by_root(queue_root) {
Some(messages) => {
let message = GossipMessage::from(GossipParachainMessages {
queue_root: *queue_root,
messages,
});
actions.push(NewLeafAction::Multicast(*topic, message));
true
}
None => false,
}
);
NewLeafActions { actions }
}
@@ -575,16 +497,9 @@ struct PeerData {
attestation: AttestationPeerData,
}
impl PeerData {
fn leaves(&self) -> impl Iterator<Item = &Hash> {
self.attestation.leaves()
}
}
struct Inner<C: ?Sized> {
peers: HashMap<PeerId, PeerData>,
attestation_view: AttestationView,
message_routing_view: MessageRoutingView,
availability_store: Option<av_store::Store>,
chain: C,
}
@@ -602,11 +517,7 @@ impl<C: ?Sized + ChainContext> Inner<C> {
let new_leaves = peer.attestation.update_leaves(&chain_heads);
let new_attestation_topics = new_leaves.iter().cloned().map(attestation_topic);
// find all topics which are from the intersection of our leaves with the peer's
// new leaves.
let new_message_routing_topics = self.message_routing_view.intersection_topics(&new_leaves);
new_attestation_topics.chain(new_message_routing_topics).collect()
new_attestation_topics.collect()
} else {
Vec::new()
};
@@ -692,7 +603,6 @@ impl<C: ChainContext + ?Sized> MessageValidator<C> {
inner: RwLock::new(Inner {
peers: HashMap::new(),
attestation_view: Default::default(),
message_routing_view: Default::default(),
availability_store: None,
chain,
}),
@@ -740,18 +650,6 @@ impl<C: ChainContext + ?Sized> sc_network_gossip::Validator<Block> for MessageVa
}
(res, cb)
}
Ok(GossipMessage::ParachainMessages(messages)) => {
let (res, cb) = {
let mut inner = self.inner.write();
let inner = &mut *inner;
inner.message_routing_view.validate_queue_and_note_known(&messages)
};
if let GossipValidationResult::ProcessAndKeep(ref topic) = res {
context.broadcast_message(topic.clone(), data.to_vec(), false);
}
(res, cb)
}
Ok(GossipMessage::ErasureChunk(chunk)) => {
self.inner.write().validate_erasure_chunk_packet(chunk)
}
@@ -767,8 +665,7 @@ impl<C: ChainContext + ?Sized> sc_network_gossip::Validator<Block> for MessageVa
Box::new(move |topic, _data| {
// check that messages from this topic are considered live by one of our protocols.
// everything else is expired
let live = inner.attestation_view.is_topic_live(&topic)
|| !inner.message_routing_view.is_topic_live(&topic);
let live = inner.attestation_view.is_topic_live(&topic);
!live // = expired
})
@@ -780,7 +677,6 @@ impl<C: ChainContext + ?Sized> sc_network_gossip::Validator<Block> for MessageVa
let &mut Inner {
ref mut peers,
ref mut attestation_view,
ref mut message_routing_view,
..
} = &mut *inner;
@@ -806,13 +702,6 @@ impl<C: ChainContext + ?Sized> sc_network_gossip::Validator<Block> for MessageVa
)
})
}
Ok(GossipMessage::ParachainMessages(_)) => match peer {
None => false,
Some(peer) => {
let their_leaves: LeavesVec = peer.leaves().cloned().collect();
message_routing_view.allowed_intersecting(&their_leaves, topic)
}
}
_ => false,
}
})
@@ -827,9 +716,8 @@ mod tests {
use parking_lot::Mutex;
use polkadot_primitives::parachain::{CandidateReceipt, HeadData};
use sp_core::crypto::UncheckedInto;
use sp_core::sr25519::{Public as Sr25519Public, Signature as Sr25519Signature};
use sp_core::sr25519::Signature as Sr25519Signature;
use polkadot_validation::GenericStatement;
use super::message_routing::queue_topic;
use crate::legacy::tests::TestChainContext;
@@ -867,22 +755,6 @@ mod tests {
}
}
impl NewLeafActions {
fn has_message(&self, who: PeerId, message: GossipMessage) -> bool {
let x = NewLeafAction::TargetedMessage(who, message);
self.actions.iter().find(|&m| m == &x).is_some()
}
fn has_multicast(&self, topic: Hash, message: GossipMessage) -> bool {
let x = NewLeafAction::Multicast(topic, message);
self.actions.iter().find(|&m| m == &x).is_some()
}
}
fn validator_id(raw: [u8; 32]) -> ValidatorId {
Sr25519Public::from_raw(raw).into()
}
#[test]
fn message_allowed() {
let (tx, _rx) = mpsc::channel();
@@ -933,7 +805,6 @@ mod tests {
head_data: HeadData(vec![9, 9, 9]),
parent_head: HeadData(vec![]),
signature: Default::default(),
egress_queue_roots: Vec::new(),
fees: 1_000_000,
block_data_hash: [20u8; 32].into(),
upward_messages: Vec::new(),
@@ -1095,12 +966,6 @@ mod tests {
let hash_a = [1u8; 32].into();
let root_a = [11u8; 32].into();
let root_a_topic = queue_topic(root_a);
let root_a_messages = vec![
ParachainMessage(vec![1, 2, 3]),
ParachainMessage(vec![4, 5, 6]),
];
let chain = {
let mut chain = TestChainContext::default();
@@ -1111,8 +976,6 @@ mod tests {
let validator = RegisteredMessageValidator::new_test(chain, report_handle);
let authorities: Vec<ValidatorId> = vec![validator_id([0; 32]), validator_id([10; 32])];
let peer_a = PeerId::random();
let peer_b = PeerId::random();
@@ -1144,274 +1007,5 @@ mod tests {
],
);
}
// ensure that we attempt to multicast all relevant queues after noting a leaf.
{
let actions = validator.new_local_leaf(
hash_a,
MessageValidationData { authorities },
|root| if root == &root_a {
Some(root_a_messages.clone())
} else {
None
},
);
assert!(actions.has_message(peer_a.clone(), GossipMessage::from(NeighborPacket {
chain_heads: vec![hash_a],
})));
assert!(actions.has_multicast(root_a_topic, GossipMessage::from(GossipParachainMessages {
queue_root: root_a,
messages: root_a_messages.clone(),
})));
}
// ensure that we are allowed to multicast to a peer with same chain head,
// but not to one without.
{
let message = GossipMessage::from(GossipParachainMessages {
queue_root: root_a,
messages: root_a_messages.clone(),
}).encode();
let mut allowed = validator.inner.message_allowed();
let intent = MessageIntent::Broadcast;
assert!(allowed(&peer_a, intent, &root_a_topic, &message[..]));
assert!(!allowed(&peer_b, intent, &root_a_topic, &message[..]));
}
}
#[test]
fn multicasts_icmp_queues_on_neighbor_update() {
let (tx, _rx) = mpsc::channel();
let tx = Mutex::new(tx);
let report_handle = Box::new(move |peer: &PeerId, cb: ReputationChange| tx.lock().send((peer.clone(), cb)).unwrap());
let hash_a = [1u8; 32].into();
let root_a = [11u8; 32].into();
let root_a_topic = queue_topic(root_a);
let root_a_messages = vec![
ParachainMessage(vec![1, 2, 3]),
ParachainMessage(vec![4, 5, 6]),
];
let chain = {
let mut chain = TestChainContext::default();
chain.known_map.insert(hash_a, Known::Leaf);
chain.ingress_roots.insert(hash_a, vec![root_a]);
chain
};
let validator = RegisteredMessageValidator::new_test(chain, report_handle);
let authorities: Vec<ValidatorId> = vec![validator_id([0; 32]), validator_id([10; 32])];
let peer_a = PeerId::random();
let peer_b = PeerId::random();
let mut validator_context = MockValidatorContext::default();
validator.inner.new_peer(&mut validator_context, &peer_a, Roles::FULL);
validator.inner.new_peer(&mut validator_context, &peer_b, Roles::FULL);
assert!(validator_context.events.is_empty());
validator_context.clear();
// ensure that we attempt to multicast all relevant queues after noting a leaf.
{
let actions = validator.new_local_leaf(
hash_a,
MessageValidationData { authorities },
|root| if root == &root_a {
Some(root_a_messages.clone())
} else {
None
},
);
assert!(actions.has_message(peer_a.clone(), GossipMessage::from(NeighborPacket {
chain_heads: vec![hash_a],
})));
assert!(actions.has_multicast(root_a_topic, GossipMessage::from(GossipParachainMessages {
queue_root: root_a,
messages: root_a_messages.clone(),
})));
}
// ensure that we are not allowed to multicast to either peer, as they
// don't have the chain head.
{
let message = GossipMessage::from(GossipParachainMessages {
queue_root: root_a,
messages: root_a_messages.clone(),
});
let mut allowed = validator.inner.message_allowed();
let intent = MessageIntent::Broadcast;
assert!(!allowed(&peer_a, intent, &root_a_topic, &message.encode()));
assert!(!allowed(&peer_b, intent, &root_a_topic, &message.encode()));
}
// peer A gets updated to the chain head. now we'll attempt to broadcast
// all queues to it.
{
let message = GossipMessage::from(NeighborPacket {
chain_heads: vec![hash_a],
}).encode();
let res = validator.inner.validate(
&mut validator_context,
&peer_a,
&message[..],
);
match res {
GossipValidationResult::Discard => {},
_ => panic!("wrong result"),
}
assert_eq!(
validator_context.events,
vec![
ContextEvent::SendTopic(peer_a.clone(), attestation_topic(hash_a), false),
ContextEvent::SendTopic(peer_a.clone(), root_a_topic, false),
],
);
}
// ensure that we are allowed to multicast to a peer with same chain head,
// but not to one without.
{
let message = GossipMessage::from(GossipParachainMessages {
queue_root: root_a,
messages: root_a_messages.clone(),
}).encode();
let mut allowed = validator.inner.message_allowed();
let intent = MessageIntent::Broadcast;
assert!(allowed(&peer_a, intent, &root_a_topic, &message[..]));
assert!(!allowed(&peer_b, intent, &root_a_topic, &message[..]));
}
}
#[test]
fn accepts_needed_unknown_icmp_message_queue() {
let (tx, _rx) = mpsc::channel();
let tx = Mutex::new(tx);
let report_handle = Box::new(move |peer: &PeerId, cb: ReputationChange| tx.lock().send((peer.clone(), cb)).unwrap());
let hash_a = [1u8; 32].into();
let root_a_messages = vec![
ParachainMessage(vec![1, 2, 3]),
ParachainMessage(vec![4, 5, 6]),
];
let not_root_a_messages = vec![
ParachainMessage(vec![1, 1, 1, 1, 1, 1, 1, 1, 1, 1]),
ParachainMessage(vec![4, 5, 6]),
];
let root_a = polkadot_validation::message_queue_root(
root_a_messages.iter().map(|m| &m.0)
);
let not_root_a = [69u8; 32].into();
let root_a_topic = queue_topic(root_a);
let chain = {
let mut chain = TestChainContext::default();
chain.known_map.insert(hash_a, Known::Leaf);
chain.ingress_roots.insert(hash_a, vec![root_a]);
chain
};
let validator = RegisteredMessageValidator::new_test(chain, report_handle);
let authorities: Vec<ValidatorId> = vec![validator_id([0; 32]), validator_id([10; 32])];
let peer_a = PeerId::random();
let mut validator_context = MockValidatorContext::default();
validator.inner.new_peer(&mut validator_context, &peer_a, Roles::FULL);
assert!(validator_context.events.is_empty());
validator_context.clear();
let queue_messages = GossipMessage::from(GossipParachainMessages {
queue_root: root_a,
messages: root_a_messages.clone(),
});
let not_queue_messages = GossipMessage::from(GossipParachainMessages {
queue_root: root_a,
messages: not_root_a_messages.clone(),
});
let queue_messages_wrong_root = GossipMessage::from(GossipParachainMessages {
queue_root: not_root_a,
messages: root_a_messages.clone(),
});
// ensure that we attempt to multicast all relevant queues after noting a leaf.
{
let actions = validator.new_local_leaf(
hash_a,
MessageValidationData { authorities },
|_root| None,
);
assert!(actions.has_message(peer_a.clone(), GossipMessage::from(NeighborPacket {
chain_heads: vec![hash_a],
})));
// we don't know this queue! no broadcast :(
assert!(!actions.has_multicast(root_a_topic, queue_messages.clone()));
}
// rejects right queue with unknown root.
{
let res = validator.inner.validate(
&mut validator_context,
&peer_a,
&queue_messages_wrong_root.encode(),
);
match res {
GossipValidationResult::Discard => {},
_ => panic!("wrong result"),
}
assert_eq!(validator_context.events, Vec::new());
}
// rejects bad queue.
{
let res = validator.inner.validate(
&mut validator_context,
&peer_a,
&not_queue_messages.encode(),
);
match res {
GossipValidationResult::Discard => {},
_ => panic!("wrong result"),
}
assert_eq!(validator_context.events, Vec::new());
}
// accepts the right queue.
{
let res = validator.inner.validate(
&mut validator_context,
&peer_a,
&queue_messages.encode(),
);
match res {
GossipValidationResult::ProcessAndKeep(topic) if topic == root_a_topic => {},
_ => panic!("wrong result"),
}
assert_eq!(validator_context.events, vec![
ContextEvent::BroadcastMessage(root_a_topic, queue_messages.encode(), false),
]);
}
}
}
+2 -14
View File
@@ -31,7 +31,7 @@ use futures::prelude::*;
use polkadot_primitives::{Block, Hash, Header};
use polkadot_primitives::parachain::{
Id as ParaId, CollatorId, CandidateReceipt, Collation, PoVBlock,
StructuredUnroutedIngress, ValidatorId, OutgoingMessages, ErasureChunk,
ValidatorId, ErasureChunk,
};
use sc_network::{
PeerId, RequestId, Context, StatusMessage as GenericFullStatus,
@@ -169,7 +169,6 @@ struct PoVBlockRequest {
candidate_hash: Hash,
block_data_hash: Hash,
sender: oneshot::Sender<PoVBlock>,
canon_roots: StructuredUnroutedIngress,
}
impl PoVBlockRequest {
@@ -182,13 +181,7 @@ impl PoVBlockRequest {
return Err(self);
}
match polkadot_validation::validate_incoming(&self.canon_roots, &pov_block.ingress) {
Ok(()) => {
let _ = self.sender.send(pov_block);
Ok(())
}
Err(_) => Err(self)
}
Ok(())
}
}
@@ -300,7 +293,6 @@ impl PolkadotProtocol {
ctx: &mut dyn Context<Block>,
candidate: &CandidateReceipt,
relay_parent: Hash,
canon_roots: StructuredUnroutedIngress,
) -> oneshot::Receiver<PoVBlock> {
let (tx, rx) = oneshot::channel();
@@ -310,7 +302,6 @@ impl PolkadotProtocol {
candidate_hash: candidate.hash(),
block_data_hash: candidate.block_data_hash,
sender: tx,
canon_roots,
});
self.dispatch_pending_requests(ctx);
@@ -617,7 +608,6 @@ impl Specialization<Block> for PolkadotProtocol {
validation_leaf: Default::default(),
candidate_hash: Default::default(),
block_data_hash: Default::default(),
canon_roots: StructuredUnroutedIngress(Vec::new()),
sender,
}));
}
@@ -747,7 +737,6 @@ impl PolkadotProtocol {
relay_parent: Hash,
targets: HashSet<ValidatorId>,
collation: Collation,
outgoing_targeted: OutgoingMessages,
) -> impl Future<Output = ()> {
debug!(target: "p_net", "Importing local collation on relay parent {:?} and parachain {:?}",
relay_parent, collation.info.parachain_index);
@@ -776,7 +765,6 @@ impl PolkadotProtocol {
relay_parent,
parachain_id: collation_cloned.info.parachain_index,
block_data: collation_cloned.pov.block_data.clone(),
outgoing_queues: Some(outgoing_targeted.clone().into()),
}).await;
}
}
+4 -7
View File
@@ -29,7 +29,7 @@ use polkadot_validation::{
};
use polkadot_primitives::{Block, Hash};
use polkadot_primitives::parachain::{
OutgoingMessages, CandidateReceipt, ParachainHost, ValidatorIndex, Collation, PoVBlock, ErasureChunk,
CandidateReceipt, ParachainHost, ValidatorIndex, Collation, PoVBlock, ErasureChunk,
};
use sp_api::ProvideRuntimeApi;
@@ -203,9 +203,8 @@ impl<P: ProvideRuntimeApi<Block> + Send + Sync + 'static, T> Router<P, T> where
Ok(validated) => {
// store the data before broadcasting statements, so other peers can fetch.
knowledge.lock().note_candidate(
candidate_hash,
Some(validated.0.pov_block().clone()),
validated.0.outgoing_messages().cloned(),
candidate_hash,
Some(validated.0.pov_block().clone()),
);
// propagate the statement.
@@ -241,7 +240,6 @@ impl<P: ProvideRuntimeApi<Block> + Send, T> TableRouter for Router<P, T> where
&self,
collation: Collation,
receipt: CandidateReceipt,
outgoing: OutgoingMessages,
chunks: (ValidatorIndex, &[ErasureChunk])
) -> Self::SendLocalCollation {
// produce a signed statement
@@ -250,7 +248,6 @@ impl<P: ProvideRuntimeApi<Block> + Send, T> TableRouter for Router<P, T> where
let validated = Validated::collated_local(
receipt,
collation.pov.clone(),
outgoing.clone(),
);
let statement = GossipStatement::new(
@@ -262,7 +259,7 @@ impl<P: ProvideRuntimeApi<Block> + Send, T> TableRouter for Router<P, T> where
);
// give to network to make available.
self.fetcher.knowledge().lock().note_candidate(hash, Some(collation.pov), Some(outgoing));
self.fetcher.knowledge().lock().note_candidate(hash, Some(collation.pov));
self.network().gossip_message(self.attestation_topic, statement.into());
for chunk in chunks.1 {
+1 -106
View File
@@ -20,20 +20,14 @@ use std::collections::HashMap;
use super::{PolkadotProtocol, Status, Message, FullStatus};
use crate::legacy::validation::LeafWorkParams;
use polkadot_validation::GenericStatement;
use polkadot_primitives::{Block, Hash};
use polkadot_primitives::parachain::{
CandidateReceipt, HeadData, PoVBlock, BlockData, CollatorId, ValidatorId,
StructuredUnroutedIngress,
};
use polkadot_primitives::parachain::{CollatorId, ValidatorId};
use sp_core::crypto::UncheckedInto;
use codec::Encode;
use sc_network::{
PeerId, Context, ReputationChange, config::Roles, specialization::NetworkSpecialization,
};
use futures::executor::block_on;
mod validation;
#[derive(Default)]
@@ -94,13 +88,6 @@ impl crate::legacy::gossip::ChainContext for TestChainContext {
}
}
fn make_pov(block_data: Vec<u8>) -> PoVBlock {
PoVBlock {
block_data: BlockData(block_data),
ingress: polkadot_primitives::parachain::ConsolidatedIngress(Vec::new()),
}
}
fn make_status(status: &Status, roles: Roles) -> FullStatus {
FullStatus {
version: 1,
@@ -121,11 +108,6 @@ fn make_validation_leaf_work(parent_hash: Hash, local_key: ValidatorId) -> LeafW
}
}
fn on_message(protocol: &mut PolkadotProtocol, ctx: &mut TestContext, from: PeerId, message: Message) {
let encoded = message.encode();
protocol.on_message(ctx, from, encoded);
}
#[test]
fn sends_session_key() {
let mut protocol = PolkadotProtocol::new(None);
@@ -158,93 +140,6 @@ fn sends_session_key() {
}
}
#[test]
fn fetches_from_those_with_knowledge() {
let mut protocol = PolkadotProtocol::new(None);
let peer_a = PeerId::random();
let peer_b = PeerId::random();
let parent_hash = [0; 32].into();
let local_key: ValidatorId = [1; 32].unchecked_into();
let block_data = BlockData(vec![1, 2, 3, 4]);
let block_data_hash = block_data.hash();
let candidate_receipt = CandidateReceipt {
parachain_index: 5.into(),
collator: [255; 32].unchecked_into(),
head_data: HeadData(vec![9, 9, 9]),
parent_head: HeadData(vec![]),
signature: Default::default(),
egress_queue_roots: Vec::new(),
fees: 1_000_000,
block_data_hash,
upward_messages: Vec::new(),
erasure_root: [1u8; 32].into(),
};
let candidate_hash = candidate_receipt.hash();
let a_key: ValidatorId = [3; 32].unchecked_into();
let b_key: ValidatorId = [4; 32].unchecked_into();
let status = Status { collating_for: None };
let params = make_validation_leaf_work(parent_hash, local_key.clone());
let session = protocol.new_validation_leaf_work(&mut TestContext::default(), params);
let knowledge = session.knowledge();
knowledge.lock().note_statement(a_key.clone(), &GenericStatement::Valid(candidate_hash));
let canon_roots = StructuredUnroutedIngress(Vec::new());
let recv = protocol.fetch_pov_block(
&mut TestContext::default(),
&candidate_receipt,
parent_hash,
canon_roots,
);
// connect peer A
{
let mut ctx = TestContext::default();
protocol.on_connect(&mut ctx, peer_a.clone(), make_status(&status, Roles::AUTHORITY));
assert!(ctx.has_message(peer_a.clone(), Message::ValidatorId(local_key)));
}
// peer A gives session key and gets asked for data.
{
let mut ctx = TestContext::default();
on_message(&mut protocol, &mut ctx, peer_a.clone(), Message::ValidatorId(a_key.clone()));
assert!(protocol.validators.contains_key(&a_key));
assert!(ctx.has_message(peer_a.clone(), Message::RequestPovBlock(1, parent_hash, candidate_hash)));
}
knowledge.lock().note_statement(b_key.clone(), &GenericStatement::Valid(candidate_hash));
// peer B connects and sends session key. request already assigned to A
{
let mut ctx = TestContext::default();
protocol.on_connect(&mut ctx, peer_b.clone(), make_status(&status, Roles::AUTHORITY));
on_message(&mut protocol, &mut ctx, peer_b.clone(), Message::ValidatorId(b_key.clone()));
assert!(!ctx.has_message(peer_b.clone(), Message::RequestPovBlock(2, parent_hash, candidate_hash)));
}
// peer A disconnects, triggering reassignment
{
let mut ctx = TestContext::default();
protocol.on_disconnect(&mut ctx, peer_a.clone());
assert!(!protocol.validators.contains_key(&a_key));
assert!(ctx.has_message(peer_b.clone(), Message::RequestPovBlock(2, parent_hash, candidate_hash)));
}
// peer B comes back with block data.
{
let mut ctx = TestContext::default();
let pov_block = make_pov(block_data.0);
on_message(&mut protocol, &mut ctx, peer_b, Message::PovBlock(2, Some(pov_block.clone())));
drop(protocol);
assert_eq!(block_on(recv).unwrap(), pov_block);
}
}
#[test]
fn remove_bad_collator() {
let mut protocol = PolkadotProtocol::new(None);
@@ -28,8 +28,7 @@ use crate::legacy::{PolkadotProtocol, NetworkService, GossipService, GossipMessa
use polkadot_validation::{SharedTable, Network};
use polkadot_primitives::{Block, BlockNumber, Hash, Header, BlockId};
use polkadot_primitives::parachain::{
Id as ParaId, Chain, DutyRoster, ParachainHost, TargetedMessage,
ValidatorId, StructuredUnroutedIngress, BlockIngressRoots, Status,
Id as ParaId, Chain, DutyRoster, ParachainHost, TargetedMessage, ValidatorId, Status,
FeeSchedule, HeadData, Retriable, CollatorId, ErasureChunk, CandidateReceipt,
};
use parking_lot::Mutex;
@@ -150,7 +149,6 @@ struct ApiData {
validators: Vec<ValidatorId>,
duties: Vec<Chain>,
active_parachains: Vec<(ParaId, Option<(CollatorId, Retriable)>)>,
ingress: HashMap<ParaId, StructuredUnroutedIngress>,
}
#[derive(Default, Clone)]
@@ -299,17 +297,6 @@ impl ParachainHost<Block> for RuntimeApi {
Ok(NativeOrEncoded::Native(Some(Vec::new())))
}
fn ParachainHost_ingress_runtime_api_impl(
&self,
_at: &BlockId,
_: ExecutionContext,
id: Option<(ParaId, Option<BlockNumber>)>,
_: Vec<u8>,
) -> ClientResult<NativeOrEncoded<Option<StructuredUnroutedIngress>>> {
let (id, _) = id.unwrap();
Ok(NativeOrEncoded::Native(self.data.lock().ingress.get(&id).cloned()))
}
fn ParachainHost_get_heads_runtime_api_impl(
&self,
_at: &BlockId,
@@ -362,34 +349,6 @@ fn build_network<SP: Spawn + Clone>(n: usize, spawner: SP)-> Built<SP> {
}
}
#[derive(Default)]
struct IngressBuilder {
egress: HashMap<(ParaId, ParaId), Vec<Vec<u8>>>,
}
impl IngressBuilder {
fn add_messages(&mut self, source: ParaId, messages: &[TargetedMessage]) {
for message in messages {
let target = message.target;
self.egress.entry((source, target)).or_insert_with(Vec::new).push(message.data.clone());
}
}
fn build(self) -> HashMap<ParaId, BlockIngressRoots> {
let mut map = HashMap::new();
for ((source, target), messages) in self.egress {
map.entry(target).or_insert_with(Vec::new)
.push((source, polkadot_validation::message_queue_root(&messages)));
}
for roots in map.values_mut() {
roots.sort_by_key(|&(para_id, _)| para_id);
}
map.into_iter().map(|(k, v)| (k, BlockIngressRoots(v))).collect()
}
}
#[derive(Clone)]
struct DummyGossipMessages;
+4 -28
View File
@@ -23,9 +23,9 @@ use sc_network::PeerId;
use polkadot_validation::{
Network as ParachainNetwork, SharedTable, Collators, Statement, GenericStatement, SignedStatement,
};
use polkadot_primitives::{Block, BlockId, Hash};
use polkadot_primitives::{Block, Hash};
use polkadot_primitives::parachain::{
Id as ParaId, Collation, OutgoingMessages, ParachainHost, CandidateReceipt, CollatorId,
Id as ParaId, Collation, ParachainHost, CandidateReceipt, CollatorId,
ValidatorId, PoVBlock,
};
use sp_api::ProvideRuntimeApi;
@@ -49,8 +49,6 @@ use crate::legacy::gossip::{RegisteredMessageValidator, MessageValidationData};
use super::{NetworkService, PolkadotProtocol};
pub use polkadot_validation::Incoming;
/// Params to instantiate validation work on a block-DAG leaf.
pub struct LeafWorkParams {
/// The local session key.
@@ -123,8 +121,6 @@ impl<P, T> ValidationNetwork<P, T> where
let actions = network.new_local_leaf(
parent_hash,
MessageValidationData { authorities },
|queue_root| spec.availability_store.as_ref()
.and_then(|store| store.queue_by_root(queue_root))
);
actions.perform(&network);
@@ -264,7 +260,6 @@ struct KnowledgeEntry {
knows_block_data: Vec<ValidatorId>,
knows_outgoing: Vec<ValidatorId>,
pov: Option<PoVBlock>,
outgoing_messages: Option<OutgoingMessages>,
}
/// Tracks knowledge of peers.
@@ -308,11 +303,9 @@ impl Knowledge {
&mut self,
hash: Hash,
pov: Option<PoVBlock>,
outgoing_messages: Option<OutgoingMessages>,
) {
let entry = self.candidates.entry(hash).or_insert_with(Default::default);
entry.pov = entry.pov.take().or(pov);
entry.outgoing_messages = entry.outgoing_messages.take().or(outgoing_messages);
}
}
@@ -567,32 +560,15 @@ impl<P: ProvideRuntimeApi<Block> + Send, T> LeafWorkDataFetcher<P, T> where
pub fn fetch_pov_block(&self, candidate: &CandidateReceipt)
-> Pin<Box<dyn Future<Output = Result<PoVBlock, io::Error>> + Send>> {
let parachain = candidate.parachain_index;
let parent_hash = self.parent_hash;
let network = self.network.clone();
let candidate = candidate.clone();
let (tx, rx) = oneshot::channel();
let canon_roots = self.api.runtime_api().ingress(
&BlockId::hash(parent_hash),
parachain,
None,
)
.map_err(|e|
format!(
"Cannot fetch ingress for parachain {:?} at {:?}: {:?}",
parachain,
parent_hash,
e,
)
);
async move {
network.with_spec(move |spec, ctx| {
if let Ok(Some(canon_roots)) = canon_roots {
let inner_rx = spec.fetch_pov_block(ctx, &candidate, parent_hash, canon_roots);
let _ = tx.send(inner_rx);
}
let inner_rx = spec.fetch_pov_block(ctx, &candidate, parent_hash);
let _ = tx.send(inner_rx);
});
let map_err = |_| io::Error::new(
+1 -8
View File
@@ -28,11 +28,10 @@ use futures::prelude::*;
use futures::task::{Spawn, SpawnExt};
use log::{debug, trace};
use av_store::Store as AvailabilityStore;
use polkadot_primitives::{
Hash, Block,
parachain::{
PoVBlock, ValidatorId, ValidatorIndex, Collation, CandidateReceipt, OutgoingMessages,
PoVBlock, ValidatorId, ValidatorIndex, Collation, CandidateReceipt,
ErasureChunk, ParachainHost, Id as ParaId, CollatorId,
},
};
@@ -106,7 +105,6 @@ pub struct Service {
pub fn start<C, Api, SP>(
service: Arc<PolkadotNetworkService>,
config: Config,
availability_store: AvailabilityStore,
chain_context: C,
api: Arc<Api>,
executor: SP,
@@ -129,7 +127,6 @@ pub fn start<C, Api, SP>(
executor.spawn(worker_loop(
config,
service.clone(),
availability_store,
gossip_validator,
worker_sender.clone(),
api,
@@ -563,7 +560,6 @@ impl ProtocolHandler {
async fn worker_loop<Api, Sp>(
config: Config,
service: Arc<PolkadotNetworkService>,
availability_store: AvailabilityStore,
gossip_handle: RegisteredMessageValidator,
sender: mpsc::Sender<ServiceToWorkerMsg>,
api: Arc<Api>,
@@ -624,7 +620,6 @@ async fn worker_loop<Api, Sp>(
let new_leaf_actions = gossip_handle.new_local_leaf(
relay_parent,
crate::legacy::gossip::MessageValidationData { authorities },
|queue_root| availability_store.queue_by_root(queue_root),
);
new_leaf_actions.perform(&gossip_handle);
@@ -789,7 +784,6 @@ fn distribute_local_collation(
let validated = Validated::collated_local(
receipt,
collation.pov.clone(),
OutgoingMessages { outgoing_messages: Vec::new() },
);
let statement = crate::legacy::gossip::GossipStatement::new(
@@ -914,7 +908,6 @@ impl TableRouter for Router {
&self,
collation: Collation,
receipt: CandidateReceipt,
_outgoing: OutgoingMessages,
chunks: (ValidatorIndex, &[ErasureChunk]),
) -> Self::SendLocalCollation {
let message = ServiceToWorkerMsg::LocalCollation(