Update to new gossip system. (#172)

* Integrates new gossip system into Polkadot (#166)

* new gossip validation in network

* integrate new gossip into service

* Fix build

* Fix claims module

* fix warning

* update to latest master again

* update runtime
This commit is contained in:
Robert Habermeier
2019-03-06 20:47:09 +01:00
committed by Bastian Köcher
parent a918f5b1a0
commit 7e6183f74c
17 changed files with 833 additions and 619 deletions
+40 -33
View File
@@ -20,7 +20,7 @@
//! each time a validation session begins on a new chain head.
use sr_primitives::traits::ProvideRuntimeApi;
use substrate_network::{consensus_gossip::ConsensusMessage, Context as NetContext};
use substrate_network::Context as NetContext;
use polkadot_validation::{Network as ParachainNetwork, SharedTable, Collators, Statement, GenericStatement};
use polkadot_primitives::{AccountId, Block, Hash, SessionKey};
use polkadot_primitives::parachain::{Id as ParaId, Collation, Extrinsic, ParachainHost, BlockData};
@@ -38,6 +38,8 @@ use tokio::runtime::TaskExecutor;
use parking_lot::Mutex;
use router::Router;
use gossip::{POLKADOT_ENGINE_ID, GossipMessage, RegisteredMessageValidator, MessageValidationData};
use super::PolkadotProtocol;
/// An executor suitable for dispatching async consensus tasks.
@@ -67,7 +69,7 @@ impl Executor for TaskExecutor {
/// Basic functionality that a network has to fulfill.
pub trait NetworkService: Send + Sync + 'static {
/// Get a stream of gossip messages for a given hash.
fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<ConsensusMessage>;
fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<Vec<u8>>;
/// Gossip a message on given topic.
fn gossip_message(&self, topic: Hash, message: Vec<u8>);
@@ -81,11 +83,11 @@ pub trait NetworkService: Send + Sync + 'static {
}
impl NetworkService for super::NetworkService {
fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<ConsensusMessage> {
fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<Vec<u8>> {
let (tx, rx) = std::sync::mpsc::channel();
self.with_gossip(move |gossip, _| {
let inner_rx = gossip.messages_for(topic);
let inner_rx = gossip.messages_for(POLKADOT_ENGINE_ID, topic);
let _ = tx.send(inner_rx);
});
@@ -96,14 +98,10 @@ impl NetworkService for super::NetworkService {
}
fn gossip_message(&self, topic: Hash, message: Vec<u8>) {
self.gossip_consensus_message(topic, message, false);
self.gossip_consensus_message(topic, POLKADOT_ENGINE_ID, message);
}
fn drop_gossip(&self, topic: Hash) {
self.with_gossip(move |gossip, _| {
gossip.collect_garbage_for_topic(topic);
})
}
fn drop_gossip(&self, _topic: Hash) { }
fn with_spec<F: Send + 'static>(&self, with: F)
where F: FnOnce(&mut PolkadotProtocol, &mut NetContext<Block>)
@@ -115,8 +113,7 @@ impl NetworkService for super::NetworkService {
// task that processes all gossipped consensus messages,
// checking signatures
struct MessageProcessTask<P, E, N: NetworkService, T> {
inner_stream: mpsc::UnboundedReceiver<ConsensusMessage>,
parent_hash: Hash,
inner_stream: mpsc::UnboundedReceiver<Vec<u8>>,
table_router: Router<P, E, N, T>,
}
@@ -127,19 +124,12 @@ impl<P, E, N, T> MessageProcessTask<P, E, N, T> where
N: NetworkService,
T: Clone + Executor + Send + 'static,
{
fn process_message(&self, msg: ConsensusMessage) -> Option<Async<()>> {
use polkadot_validation::SignedStatement;
fn process_message(&self, msg: Vec<u8>) -> Option<Async<()>> {
debug!(target: "validation", "Processing consensus statement for live consensus");
debug!(target: "validation", "Processing validation statement for live session");
if let Some(statement) = SignedStatement::decode(&mut msg.as_slice()) {
if ::polkadot_validation::check_statement(
&statement.statement,
&statement.signature,
statement.sender,
&self.parent_hash
) {
self.table_router.import_statement(statement);
}
// statements are already checked by gossip validator.
if let Some(message) = GossipMessage::decode(&mut &msg[..]) {
self.table_router.import_statement(message.statement);
}
None
@@ -175,13 +165,20 @@ pub struct ValidationNetwork<P, E, N, T> {
network: Arc<N>,
api: Arc<P>,
executor: T,
message_validator: RegisteredMessageValidator,
exit: E,
}
impl<P, E, N, T> ValidationNetwork<P, E, N, T> {
/// Create a new validation session networking object.
pub fn new(network: Arc<N>, exit: E, api: Arc<P>, executor: T) -> Self {
ValidationNetwork { network, exit, api, executor }
/// Create a new consensus networking object.
pub fn new(
network: Arc<N>,
exit: E,
message_validator: RegisteredMessageValidator,
api: Arc<P>,
executor: T,
) -> Self {
ValidationNetwork { network, exit, message_validator, api, executor }
}
}
@@ -192,6 +189,7 @@ impl<P, E: Clone, N, T: Clone> Clone for ValidationNetwork<P, E, N, T> {
exit: self.exit.clone(),
api: self.api.clone(),
executor: self.executor.clone(),
message_validator: self.message_validator.clone(),
}
}
}
@@ -210,6 +208,7 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
&self,
table: Arc<SharedTable>,
outgoing: polkadot_validation::Outgoing,
authorities: &[SessionKey],
) -> Self::TableRouter {
let parent_hash = table.consensus_parent_hash().clone();
@@ -224,6 +223,7 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
parent_hash,
knowledge.clone(),
self.exit.clone(),
self.message_validator.clone(),
);
table_router.broadcast_egress(outgoing);
@@ -234,6 +234,12 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
let executor = self.executor.clone();
let exit = self.exit.clone();
// before requesting messages, note live consensus session.
self.message_validator.note_session(
parent_hash,
MessageValidationData { authorities: authorities.to_vec() },
);
// spin up a task in the background that processes all incoming statements
// TODO: propagate statements on a timer?
let inner_stream = self.network.gossip_messages_for(attestation_topic);
@@ -245,7 +251,6 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
});
let process_task = MessageProcessTask {
inner_stream,
parent_hash,
table_router: table_router_clone,
};
@@ -276,12 +281,14 @@ impl Future for AwaitingCollation {
.poll()
.map_err(|_| NetworkDown)
}
if let Ok(futures::Async::Ready(mut inner)) = self.outer.poll() {
let poll_result = inner.poll();
self.inner = Some(inner);
return poll_result.map_err(|_| NetworkDown)
match self.outer.poll() {
Ok(futures::Async::Ready(inner)) => {
self.inner = Some(inner);
self.poll()
},
Ok(futures::Async::NotReady) => Ok(futures::Async::NotReady),
Err(_) => Err(NetworkDown)
}
Ok(futures::Async::NotReady)
}
}