diff --git a/polkadot/network/Cargo.toml b/polkadot/network/Cargo.toml index 742169d128..474d2ddcbd 100644 --- a/polkadot/network/Cargo.toml +++ b/polkadot/network/Cargo.toml @@ -11,7 +11,7 @@ parking_lot = "0.7.1" av_store = { package = "polkadot-availability-store", path = "../availability-store" } polkadot-validation = { path = "../validation" } polkadot-primitives = { path = "../primitives" } -parity-codec = { version = "3.0", features = ["derive"] } +parity-codec = { version = "3.5.1", features = ["derive"] } substrate-network = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } substrate-primitives = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } sr-primitives = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } diff --git a/polkadot/network/src/gossip.rs b/polkadot/network/src/gossip.rs index bd5c52418f..af987f12dd 100644 --- a/polkadot/network/src/gossip.rs +++ b/polkadot/network/src/gossip.rs @@ -64,7 +64,7 @@ mod cost { /// A gossip message. #[derive(Encode, Decode, Clone)] -pub(crate) enum GossipMessage { +pub enum GossipMessage { /// A packet sent to a neighbor but not relayed. #[codec(index = "1")] Neighbor(VersionedNeighborPacket), @@ -76,13 +76,29 @@ pub(crate) enum GossipMessage { // erasure-coded chunks. } +impl From for GossipMessage { + fn from(stmt: GossipStatement) -> Self { + GossipMessage::Statement(stmt) + } +} + /// A gossip message containing a statement. #[derive(Encode, Decode, Clone)] -pub(crate) struct GossipStatement { +pub struct GossipStatement { /// The relay chain parent hash. - pub(crate) relay_parent: Hash, + pub relay_parent: Hash, /// The signed statement being gossipped. - pub(crate) signed_statement: SignedStatement, + pub signed_statement: SignedStatement, +} + +impl GossipStatement { + /// Create a new instance. + pub fn new(relay_parent: Hash, signed_statement: SignedStatement) -> Self { + Self { + relay_parent, + signed_statement, + } + } } /// A versioned neighbor message. diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index c62808371f..885920d050 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -69,7 +69,7 @@ mod benefit { type FullStatus = GenericFullStatus; /// Specialization of the network service for the polkadot protocol. -pub type NetworkService = ::substrate_network::NetworkService; +pub type NetworkService = substrate_network::NetworkService; /// Status of a Polkadot node. #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] diff --git a/polkadot/network/src/router.rs b/polkadot/network/src/router.rs index 221ab8b743..925df7d2d4 100644 --- a/polkadot/network/src/router.rs +++ b/polkadot/network/src/router.rs @@ -31,9 +31,8 @@ use polkadot_primitives::{Block, Hash}; use polkadot_primitives::parachain::{Extrinsic, CandidateReceipt, ParachainHost, ValidatorIndex, Collation, PoVBlock, }; -use crate::gossip::RegisteredMessageValidator; +use crate::gossip::{RegisteredMessageValidator, GossipMessage, GossipStatement}; -use parity_codec::{Encode, Decode}; use futures::prelude::*; use parking_lot::Mutex; use log::{debug, trace}; @@ -79,19 +78,18 @@ impl Router { /// Return a future of checked messages. These should be imported into the router /// with `import_statement`. - pub(crate) fn checked_statements(&self) -> impl Stream { + /// + /// The returned stream will not terminate, so it is required to make sure that the stream is + /// dropped when it is not required anymore. Otherwise, it will stick around in memory + /// infinitely. + pub(crate) fn checked_statements(&self) -> impl Stream { // spin up a task in the background that processes all incoming statements // validation has been done already by the gossip validator. // this will block internally until the gossip messages stream is obtained. self.network().gossip_messages_for(self.attestation_topic) - .filter_map(|msg| { - use crate::gossip::GossipMessage; - - debug!(target: "validation", "Processing statement for live validation session"); - match GossipMessage::decode(&mut &msg.message[..]) { - Some(GossipMessage::Statement(s)) => Some(s.signed_statement), - _ => None, - } + .filter_map(|msg| match msg.0 { + GossipMessage::Statement(s) => Some(s.signed_statement), + _ => None }) } @@ -180,6 +178,7 @@ impl Router w let network = self.network().clone(); let knowledge = self.fetcher.knowledge().clone(); let attestation_topic = self.attestation_topic.clone(); + let parent_hash = self.parent_hash(); producer.prime(self.fetcher.api().clone()) .map(move |validated| { @@ -193,8 +192,11 @@ impl Router w // propagate the statement. // consider something more targeted than gossip in the future. - let signed = table.import_validated(validated); - network.gossip_message(attestation_topic, signed.encode()); + let statement = GossipStatement::new( + parent_hash, + table.import_validated(validated), + ); + network.gossip_message(attestation_topic, statement.into()); }) .map_err(|e| debug!(target: "p_net", "Failed to produce statements: {:?}", e)) } @@ -213,11 +215,14 @@ impl TableRouter for Router wh // produce a signed statement let hash = collation.receipt.hash(); let validated = Validated::collated_local(collation.receipt, collation.pov.clone(), extrinsic.clone()); - let statement = self.table.import_validated(validated); + let statement = GossipStatement::new( + self.parent_hash(), + self.table.import_validated(validated), + ); // give to network to make available. self.fetcher.knowledge().lock().note_candidate(hash, Some(collation.pov), Some(extrinsic)); - self.network().gossip_message(self.attestation_topic, statement.encode()); + self.network().gossip_message(self.attestation_topic, statement.into()); } fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> Self::FetchValidationProof { diff --git a/polkadot/network/src/tests/validation.rs b/polkadot/network/src/tests/validation.rs index 92d8bb30ca..3ec328f805 100644 --- a/polkadot/network/src/tests/validation.rs +++ b/polkadot/network/src/tests/validation.rs @@ -18,7 +18,8 @@ #![allow(unused)] -use crate::validation::{NetworkService, GossipService}; +use crate::validation::{NetworkService, GossipService, GossipMessageStream}; +use crate::gossip::GossipMessage; use substrate_network::Context as NetContext; use substrate_network::consensus_gossip::TopicNotification; use substrate_primitives::{NativeOrEncoded, ExecutionContext}; @@ -40,6 +41,7 @@ use std::collections::HashMap; use std::sync::Arc; use futures::{prelude::*, sync::mpsc}; use tokio::runtime::{Runtime, TaskExecutor}; +use parity_codec::Encode; use super::TestContext; @@ -142,14 +144,14 @@ struct TestNetwork { } impl NetworkService for TestNetwork { - fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver { + fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream { let (tx, rx) = mpsc::unbounded(); let _ = self.gossip.send_listener.unbounded_send((topic, tx)); - rx + GossipMessageStream::new(rx) } - fn gossip_message(&self, topic: Hash, message: Vec) { - let notification = TopicNotification { message, sender: None }; + fn gossip_message(&self, topic: Hash, message: GossipMessage) { + let notification = TopicNotification { message: message.encode(), sender: None }; let _ = self.gossip.send_message.unbounded_send((topic, notification)); } diff --git a/polkadot/network/src/validation.rs b/polkadot/network/src/validation.rs index 72bedbae35..fabf58a3bd 100644 --- a/polkadot/network/src/validation.rs +++ b/polkadot/network/src/validation.rs @@ -19,6 +19,7 @@ //! This fulfills the `polkadot_validation::Network` trait, providing a hook to be called //! each time a validation session begins on a new chain head. +use crate::gossip::GossipMessage; use sr_primitives::traits::ProvideRuntimeApi; use substrate_network::{PeerId, Context as NetContext}; use substrate_network::consensus_gossip::{ @@ -43,7 +44,7 @@ use std::sync::Arc; use arrayvec::ArrayVec; use tokio::runtime::TaskExecutor; use parking_lot::Mutex; -use log::warn; +use log::{debug, warn}; use crate::router::Router; use crate::gossip::{POLKADOT_ENGINE_ID, RegisteredMessageValidator, MessageValidationData}; @@ -52,6 +53,8 @@ use super::PolkadotProtocol; pub use polkadot_validation::Incoming; +use parity_codec::{Encode, Decode}; + /// An executor suitable for dispatching async consensus tasks. pub trait Executor { fn spawn + Send + 'static>(&self, f: F); @@ -87,13 +90,46 @@ impl GossipService for consensus_gossip::ConsensusGossip { } } +/// A stream of gossip messages and an optional sender for a topic. +pub struct GossipMessageStream { + topic_stream: mpsc::UnboundedReceiver, +} + +impl GossipMessageStream { + /// Create a new instance with the given topic stream. + pub fn new(topic_stream: mpsc::UnboundedReceiver) -> Self { + Self { + topic_stream + } + } +} + +impl Stream for GossipMessageStream { + type Item = (GossipMessage, Option); + type Error = (); + + fn poll(&mut self) -> Poll, Self::Error> { + loop { + let msg = match futures::try_ready!(self.topic_stream.poll()) { + Some(msg) => msg, + None => return Ok(Async::Ready(None)), + }; + + debug!(target: "validation", "Processing statement for live validation session"); + if let Some(gmsg) = GossipMessage::decode(&mut &msg.message[..]) { + return Ok(Async::Ready(Some((gmsg, msg.sender)))) + } + } + } +} + /// Basic functionality that a network has to fulfill. pub trait NetworkService: Send + Sync + 'static { /// Get a stream of gossip messages for a given hash. - fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver; + fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream; /// Gossip a message on given topic. - fn gossip_message(&self, topic: Hash, message: Vec); + fn gossip_message(&self, topic: Hash, message: GossipMessage); /// Execute a closure with the gossip service. fn with_gossip(&self, with: F) @@ -105,7 +141,7 @@ pub trait NetworkService: Send + Sync + 'static { } impl NetworkService for super::NetworkService { - fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver { + fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream { let (tx, rx) = std::sync::mpsc::channel(); super::NetworkService::with_gossip(self, move |gossip, _| { @@ -113,17 +149,19 @@ impl NetworkService for super::NetworkService { let _ = tx.send(inner_rx); }); - match rx.recv() { + let topic_stream = match rx.recv() { Ok(rx) => rx, Err(_) => mpsc::unbounded().1, // return empty channel. - } + }; + + GossipMessageStream::new(topic_stream) } - fn gossip_message(&self, topic: Hash, message: Vec) { + fn gossip_message(&self, topic: Hash, message: GossipMessage) { self.gossip_consensus_message( topic, POLKADOT_ENGINE_ID, - message, + message.encode(), GossipMessageRecipient::BroadcastToAll, ); }