Supercede 'Propagate Substrate#4284 to Polkadot' (#695)

* Propagate Substrate#4284 to Polkadot

* Fix tests

* Fixes

* Use hash part of fund id as child unique id.

* Add comma

* Switch branch

* run cargo update

* Update polkadot-master only

* Fix collator
This commit is contained in:
Ashley
2019-12-17 21:11:39 +01:00
committed by Bastian Köcher
parent 10ecc32c55
commit 2b98785cee
11 changed files with 311 additions and 298 deletions
+104 -54
View File
@@ -51,10 +51,10 @@
use sp_runtime::{generic::BlockId, traits::{ProvideRuntimeApi, BlakeTwo256, Hash as HashT}};
use sp_blockchain::Error as ClientError;
use sc_network::{config::Roles, PeerId, ReputationChange};
use sc_network::consensus_gossip::{
self as network_gossip, ValidationResult as GossipValidationResult,
ValidatorContext, MessageIntent, ConsensusMessage,
use sc_network::{config::Roles, Context, PeerId, ReputationChange};
use sc_network_gossip::{
ValidationResult as GossipValidationResult,
ValidatorContext, MessageIntent,
};
use polkadot_validation::{SignedStatement};
use polkadot_primitives::{Block, Hash};
@@ -68,11 +68,12 @@ use std::collections::HashMap;
use std::sync::Arc;
use arrayvec::ArrayVec;
use futures::prelude::*;
use parking_lot::RwLock;
use log::warn;
use super::PolkadotNetworkService;
use crate::router::attestation_topic;
use crate::{GossipMessageStream, NetworkService, PolkadotProtocol, router::attestation_topic};
use attestation::{View as AttestationView, PeerData as AttestationPeerData};
use message_routing::{View as MessageRoutingView};
@@ -133,7 +134,7 @@ mod cost {
}
/// A gossip message.
#[derive(Encode, Decode, Clone)]
#[derive(Encode, Decode, Clone, PartialEq)]
pub enum GossipMessage {
/// A packet sent to a neighbor but not relayed.
#[codec(index = "1")]
@@ -151,15 +152,6 @@ pub enum GossipMessage {
ErasureChunk(ErasureChunkMessage),
}
impl GossipMessage {
fn to_consensus_message(&self) -> ConsensusMessage {
ConsensusMessage {
data: self.encode(),
engine_id: POLKADOT_ENGINE_ID,
}
}
}
impl From<NeighborPacket> for GossipMessage {
fn from(packet: NeighborPacket) -> Self {
GossipMessage::Neighbor(VersionedNeighborPacket::V1(packet))
@@ -179,7 +171,7 @@ impl From<GossipParachainMessages> for GossipMessage {
}
/// A gossip message containing a statement.
#[derive(Encode, Decode, Clone)]
#[derive(Encode, Decode, Clone, PartialEq)]
pub struct GossipStatement {
/// The block hash of the relay chain being referred to. In context, this should
/// be a leaf.
@@ -200,7 +192,7 @@ impl GossipStatement {
/// A gossip message containing one erasure chunk of a candidate block.
/// For each chunk of block erasure encoding one of this messages is constructed.
#[derive(Encode, Decode, Clone, Debug)]
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct ErasureChunkMessage {
/// The chunk itself.
pub chunk: PrimitiveChunk,
@@ -221,7 +213,7 @@ impl From<ErasureChunkMessage> for GossipMessage {
/// These are all the messages posted from one parachain to another during the
/// execution of a single parachain block. Since this parachain block may have been
/// included in many forks of the relay chain, there is no relay-chain leaf parameter.
#[derive(Encode, Decode, Clone)]
#[derive(Encode, Decode, Clone, PartialEq)]
pub struct GossipParachainMessages {
/// The root of the message queue.
pub queue_root: Hash,
@@ -241,7 +233,7 @@ impl GossipParachainMessages {
}
/// A versioned neighbor message.
#[derive(Encode, Decode, Clone)]
#[derive(Encode, Decode, Clone, PartialEq)]
pub enum VersionedNeighborPacket {
#[codec(index = "1")]
V1(NeighborPacket),
@@ -249,13 +241,13 @@ pub enum VersionedNeighborPacket {
/// Contains information on which chain heads the peer is
/// accepting messages for.
#[derive(Encode, Decode, Clone)]
#[derive(Encode, Decode, Clone, PartialEq)]
pub struct NeighborPacket {
chain_heads: Vec<Hash>,
}
/// whether a block is known.
#[derive(Clone, Copy)]
#[derive(Clone, Copy, PartialEq)]
pub enum Known {
/// The block is a known leaf.
Leaf,
@@ -318,6 +310,7 @@ impl<F, P> ChainContext for (F, P) where
pub fn register_validator<C: ChainContext + 'static>(
service: Arc<PolkadotNetworkService>,
chain: C,
executor: &impl futures::task::Spawn,
) -> RegisteredMessageValidator
{
let s = service.clone();
@@ -338,19 +331,26 @@ pub fn register_validator<C: ChainContext + 'static>(
});
let gossip_side = validator.clone();
service.with_gossip(|gossip, ctx|
gossip.register_validator(ctx, POLKADOT_ENGINE_ID, gossip_side)
let gossip_engine = sc_network_gossip::GossipEngine::new(
service.clone(),
executor,
POLKADOT_ENGINE_ID,
gossip_side,
);
RegisteredMessageValidator { inner: validator as _ }
RegisteredMessageValidator {
inner: validator as _,
service: Some(service),
gossip_engine: Some(gossip_engine),
}
}
#[derive(PartialEq)]
enum NewLeafAction {
// (who, message)
TargetedMessage(PeerId, ConsensusMessage),
TargetedMessage(PeerId, GossipMessage),
// (topic, message)
Multicast(Hash, ConsensusMessage),
Multicast(Hash, GossipMessage),
}
/// Actions to take after noting a new block-DAG leaf.
@@ -365,15 +365,14 @@ impl NewLeafActions {
/// Perform the queued actions, feeding into gossip.
pub fn perform(
self,
gossip: &mut dyn crate::GossipService,
ctx: &mut dyn sc_network::Context<Block>,
gossip: &dyn crate::NetworkService,
) {
for action in self.actions {
match action {
NewLeafAction::TargetedMessage(who, message)
=> gossip.send_message(ctx, &who, message),
=> gossip.send_message(who, message),
NewLeafAction::Multicast(topic, message)
=> gossip.multicast(ctx, &topic, message),
=> gossip.gossip_message(topic, message),
}
}
}
@@ -385,6 +384,10 @@ impl NewLeafActions {
#[derive(Clone)]
pub struct RegisteredMessageValidator {
inner: Arc<MessageValidator<dyn ChainContext>>,
// Note: this is always `Some` in real code and `None` in tests.
service: Option<Arc<PolkadotNetworkService>>,
// Note: this is always `Some` in real code and `None` in tests.
gossip_engine: Option<sc_network_gossip::GossipEngine<Block>>,
}
impl RegisteredMessageValidator {
@@ -395,7 +398,11 @@ impl RegisteredMessageValidator {
) -> Self {
let validator = Arc::new(MessageValidator::new_test(chain, report_handle));
RegisteredMessageValidator { inner: validator as _ }
RegisteredMessageValidator {
inner: validator as _,
service: None,
gossip_engine: None,
}
}
pub fn register_availability_store(&mut self, availability_store: av_store::Store) {
@@ -449,7 +456,7 @@ impl RegisteredMessageValidator {
let message = GossipMessage::from(GossipParachainMessages {
queue_root: *queue_root,
messages,
}).to_consensus_message();
});
actions.push(NewLeafAction::Multicast(*topic, message));
@@ -463,6 +470,49 @@ impl RegisteredMessageValidator {
}
}
impl NetworkService for RegisteredMessageValidator {
fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream {
let topic_stream = if let Some(gossip_engine) = self.gossip_engine.as_ref() {
gossip_engine.messages_for(topic)
} else {
log::error!("Called gossip_messages_for on a test engine");
futures::channel::mpsc::unbounded().1
};
GossipMessageStream::new(topic_stream.boxed())
}
fn gossip_message(&self, topic: Hash, message: GossipMessage) {
if let Some(gossip_engine) = self.gossip_engine.as_ref() {
gossip_engine.gossip_message(
topic,
message.encode(),
false,
);
} else {
log::error!("Called gossip_message on a test engine");
}
}
fn send_message(&self, who: PeerId, message: GossipMessage) {
if let Some(gossip_engine) = self.gossip_engine.as_ref() {
gossip_engine.send_message(vec![who], message.encode());
} else {
log::error!("Called send_message on a test engine");
}
}
fn with_spec<F: Send + 'static>(&self, with: F)
where F: FnOnce(&mut PolkadotProtocol, &mut dyn Context<Block>)
{
if let Some(service) = self.service.as_ref() {
service.with_spec(with)
} else {
log::error!("Called with_spec on a test engine");
}
}
}
/// The data needed for validating gossip messages.
#[derive(Default)]
pub(crate) struct MessageValidationData {
@@ -585,13 +635,13 @@ impl<C: ?Sized + ChainContext> Inner<C> {
}
}
fn multicast_neighbor_packet<F: FnMut(&PeerId, ConsensusMessage)>(
fn multicast_neighbor_packet<F: FnMut(&PeerId, GossipMessage)>(
&self,
mut send_neighbor_packet: F,
) {
let neighbor_packet = GossipMessage::from(NeighborPacket {
chain_heads: self.attestation_view.neighbor_info().collect(),
}).to_consensus_message();
});
for peer in self.peers.keys() {
send_neighbor_packet(peer, neighbor_packet.clone())
@@ -628,7 +678,7 @@ impl<C: ChainContext + ?Sized> MessageValidator<C> {
}
}
impl<C: ChainContext + ?Sized> network_gossip::Validator<Block> for MessageValidator<C> {
impl<C: ChainContext + ?Sized> sc_network_gossip::Validator<Block> for MessageValidator<C> {
fn new_peer(&self, _context: &mut dyn ValidatorContext<Block>, who: &PeerId, _roles: Roles) {
let mut inner = self.inner.write();
inner.peers.insert(who.clone(), PeerData::default());
@@ -746,7 +796,7 @@ impl<C: ChainContext + ?Sized> network_gossip::Validator<Block> for MessageValid
#[cfg(test)]
mod tests {
use super::*;
use sc_network::consensus_gossip::Validator as ValidatorT;
use sc_network_gossip::Validator as ValidatorT;
use std::sync::mpsc;
use parking_lot::Mutex;
use polkadot_primitives::parachain::{CandidateReceipt, HeadData};
@@ -776,7 +826,7 @@ mod tests {
}
}
impl network_gossip::ValidatorContext<Block> for MockValidatorContext {
impl sc_network_gossip::ValidatorContext<Block> for MockValidatorContext {
fn broadcast_topic(&mut self, topic: Hash, force: bool) {
self.events.push(ContextEvent::BroadcastTopic(topic, force));
}
@@ -792,12 +842,12 @@ mod tests {
}
impl NewLeafActions {
fn has_message(&self, who: PeerId, message: ConsensusMessage) -> bool {
fn has_message(&self, who: PeerId, message: GossipMessage) -> bool {
let x = NewLeafAction::TargetedMessage(who, message);
self.actions.iter().find(|&m| m == &x).is_some()
}
fn has_multicast(&self, topic: Hash, message: ConsensusMessage) -> bool {
fn has_multicast(&self, topic: Hash, message: GossipMessage) -> bool {
let x = NewLeafAction::Multicast(topic, message);
self.actions.iter().find(|&m| m == &x).is_some()
}
@@ -1082,12 +1132,12 @@ mod tests {
assert!(actions.has_message(peer_a.clone(), GossipMessage::from(NeighborPacket {
chain_heads: vec![hash_a],
}).to_consensus_message()));
})));
assert!(actions.has_multicast(root_a_topic, GossipMessage::from(GossipParachainMessages {
queue_root: root_a,
messages: root_a_messages.clone(),
}).to_consensus_message()));
})));
}
// ensure that we are allowed to multicast to a peer with same chain head,
@@ -1154,12 +1204,12 @@ mod tests {
assert!(actions.has_message(peer_a.clone(), GossipMessage::from(NeighborPacket {
chain_heads: vec![hash_a],
}).to_consensus_message()));
})));
assert!(actions.has_multicast(root_a_topic, GossipMessage::from(GossipParachainMessages {
queue_root: root_a,
messages: root_a_messages.clone(),
}).to_consensus_message()));
})));
}
// ensure that we are not allowed to multicast to either peer, as they
@@ -1168,12 +1218,12 @@ mod tests {
let message = GossipMessage::from(GossipParachainMessages {
queue_root: root_a,
messages: root_a_messages.clone(),
}).encode();
});
let mut allowed = validator.inner.message_allowed();
let intent = MessageIntent::Broadcast;
assert!(!allowed(&peer_a, intent, &root_a_topic, &message[..]));
assert!(!allowed(&peer_b, intent, &root_a_topic, &message[..]));
assert!(!allowed(&peer_a, intent, &root_a_topic, &message.encode()));
assert!(!allowed(&peer_b, intent, &root_a_topic, &message.encode()));
}
// peer A gets updated to the chain head. now we'll attempt to broadcast
@@ -1259,17 +1309,17 @@ mod tests {
let queue_messages = GossipMessage::from(GossipParachainMessages {
queue_root: root_a,
messages: root_a_messages.clone(),
}).to_consensus_message();
});
let not_queue_messages = GossipMessage::from(GossipParachainMessages {
queue_root: root_a,
messages: not_root_a_messages.clone(),
}).encode();
});
let queue_messages_wrong_root = GossipMessage::from(GossipParachainMessages {
queue_root: not_root_a,
messages: root_a_messages.clone(),
}).encode();
});
// ensure that we attempt to multicast all relevant queues after noting a leaf.
{
@@ -1281,7 +1331,7 @@ mod tests {
assert!(actions.has_message(peer_a.clone(), GossipMessage::from(NeighborPacket {
chain_heads: vec![hash_a],
}).to_consensus_message()));
})));
// we don't know this queue! no broadcast :(
assert!(!actions.has_multicast(root_a_topic, queue_messages.clone()));
@@ -1292,7 +1342,7 @@ mod tests {
let res = validator.inner.validate(
&mut validator_context,
&peer_a,
&queue_messages_wrong_root[..],
&queue_messages_wrong_root.encode(),
);
match res {
@@ -1308,7 +1358,7 @@ mod tests {
let res = validator.inner.validate(
&mut validator_context,
&peer_a,
&not_queue_messages[..],
&not_queue_messages.encode(),
);
match res {
@@ -1324,7 +1374,7 @@ mod tests {
let res = validator.inner.validate(
&mut validator_context,
&peer_a,
&queue_messages.data[..],
&queue_messages.encode(),
);
match res {
@@ -1333,7 +1383,7 @@ mod tests {
}
assert_eq!(validator_context.events, vec![
ContextEvent::BroadcastMessage(root_a_topic, queue_messages.data.clone(), false),
ContextEvent::BroadcastMessage(root_a_topic, queue_messages.encode(), false),
]);
}
}
+1 -1
View File
@@ -30,7 +30,7 @@
//! a `Candidate` we are aware of. Otherwise, it is possible we could be forced to
//! consider an infinite amount of attestations produced by a misbehaving validator.
use sc_network::consensus_gossip::{ValidationResult as GossipValidationResult};
use sc_network_gossip::{ValidationResult as GossipValidationResult};
use sc_network::ReputationChange;
use polkadot_validation::GenericStatement;
use polkadot_primitives::Hash;
+9 -74
View File
@@ -26,7 +26,7 @@ pub mod validation;
pub mod gossip;
use codec::{Decode, Encode};
use futures::channel::{oneshot, mpsc};
use futures::channel::oneshot;
use futures::prelude::*;
use polkadot_primitives::{Block, Hash, Header};
use polkadot_primitives::parachain::{
@@ -37,9 +37,7 @@ use sc_network::{
PeerId, RequestId, Context, StatusMessage as GenericFullStatus,
specialization::NetworkSpecialization as Specialization,
};
use sc_network::consensus_gossip::{
self, TopicNotification, MessageRecipient as GossipMessageRecipient, ConsensusMessage,
};
use sc_network_gossip::TopicNotification;
use self::validation::{LiveValidationLeaves, RecentValidatorIds, InsertedRecentKey};
use self::collator_pool::{CollatorPool, Role, Action};
use self::local_collations::LocalCollations;
@@ -49,7 +47,7 @@ use std::collections::{HashMap, HashSet};
use std::pin::Pin;
use std::task::{Context as PollContext, Poll};
use crate::gossip::{POLKADOT_ENGINE_ID, GossipMessage, ErasureChunkMessage};
use crate::gossip::{GossipMessage, ErasureChunkMessage, RegisteredMessageValidator};
#[cfg(test)]
mod tests;
@@ -90,13 +88,12 @@ pub trait NetworkService: Send + Sync + 'static {
/// Gossip a message on given topic.
fn gossip_message(&self, topic: Hash, message: GossipMessage);
/// Execute a closure with the gossip service.
fn with_gossip<F: Send + 'static>(&self, with: F)
where F: FnOnce(&mut dyn GossipService, &mut dyn Context<Block>);
/// Send a message to a specific peer we're connected to.
fn send_message(&self, who: PeerId, message: GossipMessage);
/// Execute a closure with the polkadot protocol.
fn with_spec<F: Send + 'static>(&self, with: F)
where F: FnOnce(&mut PolkadotProtocol, &mut dyn Context<Block>);
where Self: Sized, F: FnOnce(&mut PolkadotProtocol, &mut dyn Context<Block>);
}
/// This is a newtype that implements a [`ProvideGossipMessages`] shim trait.
@@ -106,11 +103,10 @@ pub trait NetworkService: Send + Sync + 'static {
///
/// [`NetworkService`]: ./trait.NetworkService.html
/// [`ProvideGossipMessages`]: ../polkadot_availability_store/trait.ProvideGossipMessages.html
pub struct AvailabilityNetworkShim<T>(pub std::sync::Arc<T>);
#[derive(Clone)]
pub struct AvailabilityNetworkShim(pub RegisteredMessageValidator);
impl<T> av_store::ProvideGossipMessages for AvailabilityNetworkShim<T>
where T: NetworkService
{
impl av_store::ProvideGossipMessages for AvailabilityNetworkShim {
fn gossip_messages_for(&self, topic: Hash)
-> Pin<Box<dyn Stream<Item = (Hash, Hash, ErasureChunk)> + Send>>
{
@@ -145,67 +141,6 @@ impl<T> av_store::ProvideGossipMessages for AvailabilityNetworkShim<T>
}
}
impl<T> Clone for AvailabilityNetworkShim<T> {
fn clone(&self) -> Self {
AvailabilityNetworkShim(self.0.clone())
}
}
impl NetworkService for PolkadotNetworkService {
fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream {
let (tx, rx) = std::sync::mpsc::channel();
PolkadotNetworkService::with_gossip(self, move |gossip, _| {
let inner_rx = gossip.messages_for(POLKADOT_ENGINE_ID, topic);
let _ = tx.send(inner_rx);
});
let topic_stream = match rx.recv() {
Ok(rx) => rx,
Err(_) => mpsc::unbounded().1, // return empty channel.
};
GossipMessageStream::new(topic_stream.boxed())
}
fn gossip_message(&self, topic: Hash, message: GossipMessage) {
self.gossip_consensus_message(
topic,
POLKADOT_ENGINE_ID,
message.encode(),
GossipMessageRecipient::BroadcastToAll,
);
}
fn with_gossip<F: Send + 'static>(&self, with: F)
where F: FnOnce(&mut dyn GossipService, &mut dyn Context<Block>)
{
PolkadotNetworkService::with_gossip(self, move |gossip, ctx| with(gossip, ctx))
}
fn with_spec<F: Send + 'static>(&self, with: F)
where F: FnOnce(&mut PolkadotProtocol, &mut dyn Context<Block>)
{
PolkadotNetworkService::with_spec(self, with)
}
}
/// A gossip network subservice.
pub trait GossipService {
fn send_message(&mut self, ctx: &mut dyn Context<Block>, who: &PeerId, message: ConsensusMessage);
fn multicast(&mut self, ctx: &mut dyn Context<Block>, topic: &Hash, message: ConsensusMessage);
}
impl GossipService for consensus_gossip::ConsensusGossip<Block> {
fn send_message(&mut self, ctx: &mut dyn Context<Block>, who: &PeerId, message: ConsensusMessage) {
consensus_gossip::ConsensusGossip::send_message(self, ctx, who, message)
}
fn multicast(&mut self, ctx: &mut dyn Context<Block>, topic: &Hash, message: ConsensusMessage) {
consensus_gossip::ConsensusGossip::multicast(self, ctx, *topic, message, false)
}
}
/// A stream of gossip messages and an optional sender for a topic.
pub struct GossipMessageStream {
topic_stream: Pin<Box<dyn Stream<Item = TopicNotification> + Send>>,
+10 -12
View File
@@ -72,18 +72,18 @@ pub(crate) fn checked_statements<N: NetworkService>(network: &N, topic: Hash) ->
}
/// Table routing implementation.
pub struct Router<P, E, N: NetworkService, T> {
pub struct Router<P, E, T> {
table: Arc<SharedTable>,
attestation_topic: Hash,
fetcher: LeafWorkDataFetcher<P, E, N, T>,
fetcher: LeafWorkDataFetcher<P, E, T>,
deferred_statements: Arc<Mutex<DeferredStatements>>,
message_validator: RegisteredMessageValidator,
}
impl<P, E, N: NetworkService, T> Router<P, E, N, T> {
impl<P, E, T> Router<P, E, T> {
pub(crate) fn new(
table: Arc<SharedTable>,
fetcher: LeafWorkDataFetcher<P, E, N, T>,
fetcher: LeafWorkDataFetcher<P, E, T>,
message_validator: RegisteredMessageValidator,
) -> Self {
let parent_hash = fetcher.parent_hash();
@@ -103,19 +103,19 @@ impl<P, E, N: NetworkService, T> Router<P, E, N, T> {
/// dropped when it is not required anymore. Otherwise, it will stick around in memory
/// infinitely.
pub(crate) fn checked_statements(&self) -> impl Stream<Item=SignedStatement> {
checked_statements(&**self.network(), self.attestation_topic)
checked_statements(&*self.network(), self.attestation_topic)
}
fn parent_hash(&self) -> Hash {
self.fetcher.parent_hash()
}
fn network(&self) -> &Arc<N> {
fn network(&self) -> &RegisteredMessageValidator {
self.fetcher.network()
}
}
impl<P, E: Clone, N: NetworkService, T: Clone> Clone for Router<P, E, N, T> {
impl<P, E: Clone, T: Clone> Clone for Router<P, E, T> {
fn clone(&self) -> Self {
Router {
table: self.table.clone(),
@@ -127,9 +127,8 @@ impl<P, E: Clone, N: NetworkService, T: Clone> Clone for Router<P, E, N, T> {
}
}
impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> where
impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, T> Router<P, E, T> where
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
N: NetworkService,
T: Clone + Executor + Send + 'static,
E: Future<Output=()> + Clone + Send + Unpin + 'static,
{
@@ -226,9 +225,8 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w
}
}
impl<P: ProvideRuntimeApi + Send, E, N, T> TableRouter for Router<P, E, N, T> where
impl<P: ProvideRuntimeApi + Send, E, T> TableRouter for Router<P, E, T> where
P::Api: ParachainHost<Block>,
N: NetworkService,
T: Clone + Executor + Send + 'static,
E: Future<Output=()> + Clone + Send + 'static,
{
@@ -284,7 +282,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> TableRouter for Router<P, E, N, T> wh
}
}
impl<P, E, N: NetworkService, T> Drop for Router<P, E, N, T> {
impl<P, E, T> Drop for Router<P, E, T> {
fn drop(&mut self) {
let parent_hash = self.parent_hash();
self.network().with_spec(move |spec, _| { spec.remove_validation_session(parent_hash); });
+8 -12
View File
@@ -19,11 +19,11 @@
#![allow(unused)]
use crate::gossip::GossipMessage;
use sc_network::Context as NetContext;
use sc_network::consensus_gossip::TopicNotification;
use sc_network::{Context as NetContext, PeerId};
use sc_network_gossip::TopicNotification;
use sp_core::{NativeOrEncoded, ExecutionContext};
use sp_keyring::Sr25519Keyring;
use crate::{GossipService, PolkadotProtocol, NetworkService, GossipMessageStream};
use crate::{PolkadotProtocol, NetworkService, GossipMessageStream};
use polkadot_validation::{SharedTable, Network};
use polkadot_primitives::{Block, BlockNumber, Hash, Header, BlockId};
@@ -124,17 +124,15 @@ impl NetworkService for TestNetwork {
GossipMessageStream::new(rx.boxed())
}
fn send_message(&self, _: PeerId, _: GossipMessage) {
unimplemented!()
}
fn gossip_message(&self, topic: Hash, message: GossipMessage) {
let notification = TopicNotification { message: message.encode(), sender: None };
let _ = self.gossip.send_message.unbounded_send((topic, notification));
}
fn with_gossip<F: Send + 'static>(&self, with: F)
where F: FnOnce(&mut dyn GossipService, &mut dyn NetContext<Block>)
{
unimplemented!()
}
fn with_spec<F: Send + 'static>(&self, with: F)
where F: FnOnce(&mut PolkadotProtocol, &mut dyn NetContext<Block>)
{
@@ -310,7 +308,6 @@ impl ParachainHost<Block> for RuntimeApi {
type TestValidationNetwork = crate::validation::ValidationNetwork<
TestApi,
NeverExit,
TestNetwork,
TaskExecutor,
>;
@@ -337,9 +334,8 @@ fn build_network(n: usize, executor: TaskExecutor) -> Built {
);
TestValidationNetwork::new(
net,
NeverExit,
message_val,
NeverExit,
runtime_api.clone(),
executor.clone(),
)
+23 -34
View File
@@ -62,44 +62,40 @@ pub struct LeafWorkParams {
}
/// Wrapper around the network service
pub struct ValidationNetwork<P, E, N, T> {
network: Arc<N>,
pub struct ValidationNetwork<P, E, T> {
api: Arc<P>,
executor: T,
message_validator: RegisteredMessageValidator,
network: RegisteredMessageValidator,
exit: E,
}
impl<P, E, N, T> ValidationNetwork<P, E, N, T> {
impl<P, E, T> ValidationNetwork<P, E, T> {
/// Create a new consensus networking object.
pub fn new(
network: Arc<N>,
network: RegisteredMessageValidator,
exit: E,
message_validator: RegisteredMessageValidator,
api: Arc<P>,
executor: T,
) -> Self {
ValidationNetwork { network, exit, message_validator, api, executor }
ValidationNetwork { network, exit, api, executor }
}
}
impl<P, E: Clone, N, T: Clone> Clone for ValidationNetwork<P, E, N, T> {
impl<P, E: Clone, T: Clone> Clone for ValidationNetwork<P, E, T> {
fn clone(&self) -> Self {
ValidationNetwork {
network: self.network.clone(),
exit: self.exit.clone(),
api: self.api.clone(),
executor: self.executor.clone(),
message_validator: self.message_validator.clone(),
}
}
}
impl<P, E, N, T> ValidationNetwork<P, E, N, T> where
impl<P, E, T> ValidationNetwork<P, E, T> where
P: ProvideRuntimeApi + Send + Sync + 'static,
P::Api: ParachainHost<Block>,
E: Clone + Future<Output=()> + Send + Sync + 'static,
N: NetworkService,
T: Clone + Executor + Send + Sync + 'static,
{
/// Instantiate block-DAG leaf work
@@ -117,27 +113,26 @@ impl<P, E, N, T> ValidationNetwork<P, E, N, T> where
/// leaf-work instances safely, but they should all be coordinated on which session keys
/// are being used.
pub fn instantiate_leaf_work(&self, params: LeafWorkParams)
-> oneshot::Receiver<LeafWorkDataFetcher<P, E, N, T>>
-> oneshot::Receiver<LeafWorkDataFetcher<P, E, T>>
{
let parent_hash = params.parent_hash;
let network = self.network.clone();
let api = self.api.clone();
let task_executor = self.executor.clone();
let exit = self.exit.clone();
let message_validator = self.message_validator.clone();
let authorities = params.authorities.clone();
let (tx, rx) = oneshot::channel();
self.network.with_spec(move |spec, ctx| {
let actions = message_validator.new_local_leaf(
let actions = network.new_local_leaf(
parent_hash,
MessageValidationData { authorities },
|queue_root| spec.availability_store.as_ref()
.and_then(|store| store.queue_by_root(queue_root))
);
network.with_gossip(move |gossip, ctx| actions.perform(gossip, ctx));
actions.perform(&network);
let work = spec.new_validation_leaf_work(ctx, params);
let _ = tx.send(LeafWorkDataFetcher {
@@ -147,7 +142,6 @@ impl<P, E, N, T> ValidationNetwork<P, E, N, T> where
parent_hash,
knowledge: work.knowledge().clone(),
exit,
message_validator,
});
});
@@ -155,7 +149,7 @@ impl<P, E, N, T> ValidationNetwork<P, E, N, T> where
}
}
impl<P, E, N, T> ValidationNetwork<P, E, N, T> where N: NetworkService {
impl<P, E, T> ValidationNetwork<P, E, T> {
/// Convert the given `CollatorId` to a `PeerId`.
pub fn collator_id_to_peer_id(&self, collator_id: CollatorId) ->
impl Future<Output=Option<PeerId>> + Send
@@ -177,20 +171,19 @@ impl<P, E, N, T> ValidationNetwork<P, E, N, T> where N: NetworkService {
/// dropped when it is not required anymore. Otherwise, it will stick around in memory
/// infinitely.
pub fn checked_statements(&self, relay_parent: Hash) -> impl Stream<Item=SignedStatement> {
crate::router::checked_statements(&*self.network, crate::router::attestation_topic(relay_parent))
crate::router::checked_statements(&self.network, crate::router::attestation_topic(relay_parent))
}
}
/// A long-lived network which can create parachain statement routing processes on demand.
impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
impl<P, E, T> ParachainNetwork for ValidationNetwork<P, E, T> where
P: ProvideRuntimeApi + Send + Sync + 'static,
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
E: Clone + Future<Output=()> + Send + Sync + Unpin + 'static,
N: NetworkService,
T: Clone + Executor + Send + Sync + 'static,
{
type Error = String;
type TableRouter = Router<P, E, N, T>;
type TableRouter = Router<P, E, T>;
type BuildTableRouter = Box<dyn Future<Output=Result<Self::TableRouter, String>> + Send + Unpin>;
fn communication_for(
@@ -207,16 +200,16 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
parent_hash,
authorities: authorities.to_vec(),
});
let message_validator = self.message_validator.clone();
let executor = self.executor.clone();
let network = self.network.clone();
let work = build_fetcher
.map_err(|e| format!("{:?}", e))
.map_ok(move |fetcher| {
let table_router = Router::new(
table,
fetcher,
message_validator,
network,
);
let table_router_clone = table_router.clone();
@@ -241,10 +234,9 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct NetworkDown;
impl<P, E: Clone, N, T: Clone> Collators for ValidationNetwork<P, E, N, T> where
impl<P, E: Clone, N: Clone> Collators for ValidationNetwork<P, E, N> where
P: ProvideRuntimeApi + Send + Sync + 'static,
P::Api: ParachainHost<Block>,
N: NetworkService,
{
type Error = NetworkDown;
type Collation = Pin<Box<dyn Future<Output = Result<Collation, NetworkDown>> + Send>>;
@@ -526,17 +518,16 @@ impl LiveValidationLeaves {
}
/// Can fetch data for a given validation leaf-work instance.
pub struct LeafWorkDataFetcher<P, E, N: NetworkService, T> {
network: Arc<N>,
pub struct LeafWorkDataFetcher<P, E, T> {
network: RegisteredMessageValidator,
api: Arc<P>,
exit: E,
task_executor: T,
knowledge: Arc<Mutex<Knowledge>>,
parent_hash: Hash,
message_validator: RegisteredMessageValidator,
}
impl<P, E, N: NetworkService, T> LeafWorkDataFetcher<P, E, N, T> {
impl<P, E, T> LeafWorkDataFetcher<P, E, T> {
/// Get the parent hash.
pub(crate) fn parent_hash(&self) -> Hash {
self.parent_hash
@@ -553,7 +544,7 @@ impl<P, E, N: NetworkService, T> LeafWorkDataFetcher<P, E, N, T> {
}
/// Get the network service.
pub(crate) fn network(&self) -> &Arc<N> {
pub(crate) fn network(&self) -> &RegisteredMessageValidator {
&self.network
}
@@ -568,7 +559,7 @@ impl<P, E, N: NetworkService, T> LeafWorkDataFetcher<P, E, N, T> {
}
}
impl<P, E: Clone, N: NetworkService, T: Clone> Clone for LeafWorkDataFetcher<P, E, N, T> {
impl<P, E: Clone, T: Clone> Clone for LeafWorkDataFetcher<P, E, T> {
fn clone(&self) -> Self {
LeafWorkDataFetcher {
network: self.network.clone(),
@@ -577,14 +568,12 @@ impl<P, E: Clone, N: NetworkService, T: Clone> Clone for LeafWorkDataFetcher<P,
parent_hash: self.parent_hash,
knowledge: self.knowledge.clone(),
exit: self.exit.clone(),
message_validator: self.message_validator.clone(),
}
}
}
impl<P: ProvideRuntimeApi + Send, E, N, T> LeafWorkDataFetcher<P, E, N, T> where
impl<P: ProvideRuntimeApi + Send, E, T> LeafWorkDataFetcher<P, E, T> where
P::Api: ParachainHost<Block>,
N: NetworkService,
T: Clone + Executor + Send + 'static,
E: Future<Output=()> + Clone + Send + 'static,
{