Make validation::NetworkService strongly typed (#295)

By using a strongly typed network service, we make sure that we send and
receive the correct messages. Before there was a bug, a `SignedStatement`
was sent and a `GossipMessage` was decoded, but this could never work.
This commit is contained in:
Bastian Köcher
2019-06-24 11:43:07 +02:00
committed by GitHub
parent a016bac6ad
commit 664dea075a
6 changed files with 95 additions and 34 deletions
+46 -8
View File
@@ -19,6 +19,7 @@
//! This fulfills the `polkadot_validation::Network` trait, providing a hook to be called
//! each time a validation session begins on a new chain head.
use crate::gossip::GossipMessage;
use sr_primitives::traits::ProvideRuntimeApi;
use substrate_network::{PeerId, Context as NetContext};
use substrate_network::consensus_gossip::{
@@ -43,7 +44,7 @@ use std::sync::Arc;
use arrayvec::ArrayVec;
use tokio::runtime::TaskExecutor;
use parking_lot::Mutex;
use log::warn;
use log::{debug, warn};
use crate::router::Router;
use crate::gossip::{POLKADOT_ENGINE_ID, RegisteredMessageValidator, MessageValidationData};
@@ -52,6 +53,8 @@ use super::PolkadotProtocol;
pub use polkadot_validation::Incoming;
use parity_codec::{Encode, Decode};
/// An executor suitable for dispatching async consensus tasks.
pub trait Executor {
fn spawn<F: Future<Item=(),Error=()> + Send + 'static>(&self, f: F);
@@ -87,13 +90,46 @@ impl GossipService for consensus_gossip::ConsensusGossip<Block> {
}
}
/// A stream of gossip messages and an optional sender for a topic.
pub struct GossipMessageStream {
topic_stream: mpsc::UnboundedReceiver<TopicNotification>,
}
impl GossipMessageStream {
/// Create a new instance with the given topic stream.
pub fn new(topic_stream: mpsc::UnboundedReceiver<TopicNotification>) -> Self {
Self {
topic_stream
}
}
}
impl Stream for GossipMessageStream {
type Item = (GossipMessage, Option<PeerId>);
type Error = ();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
let msg = match futures::try_ready!(self.topic_stream.poll()) {
Some(msg) => msg,
None => return Ok(Async::Ready(None)),
};
debug!(target: "validation", "Processing statement for live validation session");
if let Some(gmsg) = GossipMessage::decode(&mut &msg.message[..]) {
return Ok(Async::Ready(Some((gmsg, msg.sender))))
}
}
}
}
/// Basic functionality that a network has to fulfill.
pub trait NetworkService: Send + Sync + 'static {
/// Get a stream of gossip messages for a given hash.
fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<TopicNotification>;
fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream;
/// Gossip a message on given topic.
fn gossip_message(&self, topic: Hash, message: Vec<u8>);
fn gossip_message(&self, topic: Hash, message: GossipMessage);
/// Execute a closure with the gossip service.
fn with_gossip<F: Send + 'static>(&self, with: F)
@@ -105,7 +141,7 @@ pub trait NetworkService: Send + Sync + 'static {
}
impl NetworkService for super::NetworkService {
fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<TopicNotification> {
fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream {
let (tx, rx) = std::sync::mpsc::channel();
super::NetworkService::with_gossip(self, move |gossip, _| {
@@ -113,17 +149,19 @@ impl NetworkService for super::NetworkService {
let _ = tx.send(inner_rx);
});
match rx.recv() {
let topic_stream = match rx.recv() {
Ok(rx) => rx,
Err(_) => mpsc::unbounded().1, // return empty channel.
}
};
GossipMessageStream::new(topic_stream)
}
fn gossip_message(&self, topic: Hash, message: Vec<u8>) {
fn gossip_message(&self, topic: Hash, message: GossipMessage) {
self.gossip_consensus_message(
topic,
POLKADOT_ENGINE_ID,
message,
message.encode(),
GossipMessageRecipient::BroadcastToAll,
);
}