strip out all ICMP network code and begin gossip refactor for attestations (#256)

* strip out all ICMP code and begin gossip refactor

* validate incoming statements

* message_allowed logic

* compiles

* do reporting and neighbor packet validation

* tests compile

* propagate gossip messages

* test message_allowed

* some more tests

* address grumbles
This commit is contained in:
Robert Habermeier
2019-05-17 14:30:10 -04:00
committed by GitHub
parent 2bbfcc2f72
commit 164943b961
12 changed files with 738 additions and 491 deletions
+39 -272
View File
@@ -19,13 +19,14 @@
//! This fulfills the `polkadot_validation::Network` trait, providing a hook to be called
//! each time a validation session begins on a new chain head.
use sr_primitives::traits::{BlakeTwo256, ProvideRuntimeApi, Hash as HashT};
use substrate_network::Context as NetContext;
use substrate_network::consensus_gossip::{TopicNotification, MessageRecipient as GossipMessageRecipient};
use sr_primitives::traits::ProvideRuntimeApi;
use substrate_network::{PeerId, Context as NetContext};
use substrate_network::consensus_gossip::{
self, TopicNotification, MessageRecipient as GossipMessageRecipient, ConsensusMessage,
};
use polkadot_validation::{Network as ParachainNetwork, SharedTable, Collators, Statement, GenericStatement};
use polkadot_primitives::{Block, BlockId, Hash, SessionKey};
use polkadot_primitives::parachain::{Id as ParaId, Collation, Extrinsic, ParachainHost, Message, CandidateReceipt, CollatorId, ValidatorId, PoVBlock, ValidatorIndex};
use codec::{Encode, Decode};
use polkadot_primitives::parachain::{Id as ParaId, Collation, Extrinsic, ParachainHost, CandidateReceipt, CollatorId, ValidatorId, PoVBlock, ValidatorIndex};
use futures::prelude::*;
use futures::future::{self, Executor as FutureExecutor};
@@ -71,6 +72,17 @@ impl Executor for TaskExecutor {
}
}
/// A gossip network subservice.
pub trait GossipService {
fn send_message(&mut self, ctx: &mut NetContext<Block>, who: &PeerId, message: ConsensusMessage);
}
impl GossipService for consensus_gossip::ConsensusGossip<Block> {
fn send_message(&mut self, ctx: &mut NetContext<Block>, who: &PeerId, message: ConsensusMessage) {
consensus_gossip::ConsensusGossip::send_message(self, ctx, who, message)
}
}
/// Basic functionality that a network has to fulfill.
pub trait NetworkService: Send + Sync + 'static {
/// Get a stream of gossip messages for a given hash.
@@ -79,8 +91,9 @@ pub trait NetworkService: Send + Sync + 'static {
/// Gossip a message on given topic.
fn gossip_message(&self, topic: Hash, message: Vec<u8>);
/// Drop a gossip topic.
fn drop_gossip(&self, topic: Hash);
/// Execute a closure with the gossip service.
fn with_gossip<F: Send + 'static>(&self, with: F)
where F: FnOnce(&mut GossipService, &mut NetContext<Block>);
/// Execute a closure with the polkadot protocol.
fn with_spec<F: Send + 'static>(&self, with: F)
@@ -91,7 +104,7 @@ impl NetworkService for super::NetworkService {
fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<TopicNotification> {
let (tx, rx) = std::sync::mpsc::channel();
self.with_gossip(move |gossip, _| {
super::NetworkService::with_gossip(self, move |gossip, _| {
let inner_rx = gossip.messages_for(POLKADOT_ENGINE_ID, topic);
let _ = tx.send(inner_rx);
});
@@ -111,7 +124,11 @@ impl NetworkService for super::NetworkService {
);
}
fn drop_gossip(&self, _topic: Hash) { }
fn with_gossip<F: Send + 'static>(&self, with: F)
where F: FnOnce(&mut GossipService, &mut NetContext<Block>)
{
super::NetworkService::with_gossip(self, move |gossip, ctx| with(gossip, ctx))
}
fn with_spec<F: Send + 'static>(&self, with: F)
where F: FnOnce(&mut PolkadotProtocol, &mut NetContext<Block>)
@@ -199,16 +216,20 @@ impl<P, E, N, T> ValidationNetwork<P, E, N, T> where
.collect();
let (tx, rx) = oneshot::channel();
self.network.with_spec(move |spec, ctx| {
// before requesting messages, note live consensus session.
message_validator.note_session(
parent_hash,
MessageValidationData {
authorities: params.authorities.clone(),
index_mapping,
},
);
{
let message_validator = self.message_validator.clone();
let authorities = params.authorities.clone();
self.network.with_gossip(move |gossip, ctx| {
message_validator.note_session(
parent_hash,
MessageValidationData { authorities, index_mapping },
|peer_id, message| gossip.send_message(ctx, peer_id, message),
);
});
}
self.network.with_spec(move |spec, ctx| {
let session = spec.new_validation_session(ctx, params);
let _ = tx.send(SessionDataFetcher {
network,
@@ -217,7 +238,6 @@ impl<P, E, N, T> ValidationNetwork<P, E, N, T> where
parent_hash,
knowledge: session.knowledge().clone(),
exit,
fetch_incoming: session.fetched_incoming().clone(),
message_validator,
});
});
@@ -241,7 +261,6 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
fn communication_for(
&self,
table: Arc<SharedTable>,
outgoing: polkadot_validation::Outgoing,
authorities: &[ValidatorId],
) -> Self::BuildTableRouter {
let parent_hash = table.consensus_parent_hash().clone();
@@ -264,8 +283,6 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
message_validator,
);
table_router.broadcast_egress(outgoing);
let table_router_clone = table_router.clone();
let work = table_router.checked_statements()
.for_each(move |msg| { table_router_clone.import_statement(msg); Ok(()) });
@@ -406,22 +423,12 @@ impl Future for IncomingReceiver {
}
}
/// Incoming message gossip topic for a parachain at a given block hash.
pub(crate) fn incoming_message_topic(parent_hash: Hash, parachain: ParaId) -> Hash {
let mut v = parent_hash.as_ref().to_vec();
parachain.using_encoded(|s| v.extend(s));
v.extend(b"incoming");
BlakeTwo256::hash(&v[..])
}
/// A current validation session instance.
#[derive(Clone)]
pub(crate) struct ValidationSession {
parent_hash: Hash,
knowledge: Arc<Mutex<Knowledge>>,
local_session_key: Option<ValidatorId>,
fetch_incoming: Arc<Mutex<FetchIncoming>>,
}
impl ValidationSession {
@@ -432,7 +439,6 @@ impl ValidationSession {
parent_hash: params.parent_hash,
knowledge: Arc::new(Mutex::new(Knowledge::new())),
local_session_key: params.local_session_key,
fetch_incoming: Arc::new(Mutex::new(FetchIncoming::new())),
}
}
@@ -442,11 +448,6 @@ impl ValidationSession {
&self.knowledge
}
/// Get a handle to the shared list of parachains' incoming data fetch.
pub(crate) fn fetched_incoming(&self) -> &Arc<Mutex<FetchIncoming>> {
&self.fetch_incoming
}
// execute a closure with locally stored proof-of-validation for a candidate, or a slice of session identities
// we believe should have the data.
fn with_pov_block<F, U>(&self, hash: &Hash, f: F) -> U
@@ -646,58 +647,10 @@ impl Future for PoVReceiver {
}
}
/// Wrapper around bookkeeping for tracking which parachains we're fetching incoming messages
/// for.
pub(crate) struct FetchIncoming {
exit_signal: ::exit_future::Signal,
parachains_fetching: HashMap<ParaId, IncomingReceiver>,
}
impl FetchIncoming {
fn new() -> Self {
FetchIncoming {
exit_signal: ::exit_future::signal_only(),
parachains_fetching: HashMap::new(),
}
}
// registers intent to fetch incoming. returns an optional piece of work
// that, if some, is needed to be run to completion in order for the future to
// resolve.
//
// impl Future has a bug here where it wrongly assigns a `'static` bound to `M`.
fn fetch_with_work<M, W>(&mut self, para_id: ParaId, make_work: M)
-> (IncomingReceiver, Option<Box<Future<Item=(),Error=()> + Send>>) where
M: FnOnce() -> W,
W: Future<Item=Option<Incoming>> + Send + 'static,
{
let (tx, rx) = match self.parachains_fetching.entry(para_id) {
Entry::Occupied(entry) => return (entry.get().clone(), None),
Entry::Vacant(entry) => {
// has not been requested yet.
let (tx, rx) = oneshot::channel();
let rx = IncomingReceiver { inner: rx.shared() };
entry.insert(rx.clone());
(tx, rx)
}
};
let exit = self.exit_signal.make_exit();
let work = make_work()
.map(move |incoming| if let Some(i) = incoming { let _ = tx.send(i); })
.select2(exit)
.then(|_| Ok(()));
(rx, Some(Box::new(work)))
}
}
/// Can fetch data for a given validation session
pub struct SessionDataFetcher<P, E, N: NetworkService, T> {
network: Arc<N>,
api: Arc<P>,
fetch_incoming: Arc<Mutex<FetchIncoming>>,
exit: E,
task_executor: T,
knowledge: Arc<Mutex<Knowledge>>,
@@ -744,7 +697,6 @@ impl<P, E: Clone, N: NetworkService, T: Clone> Clone for SessionDataFetcher<P, E
api: self.api.clone(),
task_executor: self.task_executor.clone(),
parent_hash: self.parent_hash.clone(),
fetch_incoming: self.fetch_incoming.clone(),
knowledge: self.knowledge.clone(),
exit: self.exit.clone(),
message_validator: self.message_validator.clone(),
@@ -783,130 +735,11 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> SessionDataFetcher<P, E, N, T> where
});
PoVReceiver { outer: rx, inner: None }
}
/// Fetch incoming messages for a parachain.
pub fn fetch_incoming(&self, parachain: ParaId) -> IncomingReceiver {
let (rx, work) = self.fetch_incoming.lock().fetch_with_work(parachain.clone(), move || {
let parent_hash: Hash = self.parent_hash();
let topic = incoming_message_topic(parent_hash, parachain);
let gossip_messages = self.network().gossip_messages_for(topic)
.map_err(|()| panic!("unbounded receivers do not throw errors; qed"))
.filter_map(|msg| IngressPair::decode(&mut msg.message.as_slice()));
let canon_roots = self.api.runtime_api().ingress(&BlockId::hash(parent_hash), parachain)
.map_err(|e| format!("Cannot fetch ingress for parachain {:?} at {:?}: {:?}",
parachain, parent_hash, e)
);
canon_roots.into_future()
.and_then(move |ingress_roots| match ingress_roots {
None => Err(format!("No parachain {:?} registered at {}", parachain, parent_hash)),
Some(roots) => Ok(roots.0.into_iter().collect())
})
.and_then(move |ingress_roots| ComputeIngress {
inner: gossip_messages,
ingress_roots,
incoming: Vec::new(),
})
.select2(self.exit.clone())
.map(|res| match res {
future::Either::A((incoming, _)) => incoming,
future::Either::B(_) => None,
})
});
if let Some(work) = work {
self.task_executor.spawn(work);
}
rx
}
}
impl<P, E, N: NetworkService, T> Drop for SessionDataFetcher<P, E, N, T> {
fn drop(&mut self) {
// a bit of a hack...
let network = self.network.clone();
let fetch_incoming = self.fetch_incoming.clone();
let message_validator = self.message_validator.clone();
let parent_hash = self.parent_hash();
self.network.with_spec(move |spec, _| {
if !spec.remove_validation_session(parent_hash) { return }
let mut incoming_fetched = fetch_incoming.lock();
for (para_id, _) in incoming_fetched.parachains_fetching.drain() {
network.drop_gossip(incoming_message_topic(
parent_hash,
para_id,
));
}
message_validator.remove_session(&parent_hash);
});
}
}
type IngressPair = (ParaId, Vec<Message>);
// computes ingress from incoming stream of messages.
// returns `None` if the stream concludes too early.
#[must_use = "futures do nothing unless polled"]
struct ComputeIngress<S> {
ingress_roots: HashMap<ParaId, Hash>,
incoming: Vec<IngressPair>,
inner: S,
}
impl<S> Future for ComputeIngress<S> where S: Stream<Item=IngressPair> {
type Item = Option<Incoming>;
type Error = S::Error;
fn poll(&mut self) -> Poll<Option<Incoming>, Self::Error> {
loop {
if self.ingress_roots.is_empty() {
return Ok(Async::Ready(
Some(::std::mem::replace(&mut self.incoming, Vec::new()))
))
}
let (para_id, messages) = match try_ready!(self.inner.poll()) {
None => return Ok(Async::Ready(None)),
Some(next) => next,
};
match self.ingress_roots.entry(para_id) {
Entry::Vacant(_) => continue,
Entry::Occupied(occupied) => {
let canon_root = occupied.get().clone();
let messages = messages.iter().map(|m| &m.0[..]);
if ::polkadot_validation::message_queue_root(messages) != canon_root {
continue;
}
occupied.remove();
}
}
let pos = self.incoming.binary_search_by_key(
&para_id,
|&(id, _)| id,
)
.err()
.expect("incoming starts empty and only inserted when \
para_id not inserted before; qed");
self.incoming.insert(pos, (para_id, messages));
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::stream;
use substrate_primitives::crypto::UncheckedInto;
#[test]
@@ -959,72 +792,6 @@ mod tests {
}
}
#[test]
fn compute_ingress_works() {
let actual_messages = [
(
ParaId::from(1),
vec![Message(vec![1, 3, 5, 6]), Message(vec![4, 4, 4, 4])],
),
(
ParaId::from(2),
vec![
Message(vec![1, 3, 7, 9, 1, 2, 3, 4, 5, 6]),
Message(b"hello world".to_vec()),
],
),
(
ParaId::from(5),
vec![Message(vec![1, 2, 3, 4, 5]), Message(vec![6, 9, 6, 9])],
),
];
let roots: HashMap<_, _> = actual_messages.iter()
.map(|&(para_id, ref messages)| (
para_id,
::polkadot_validation::message_queue_root(messages.iter().map(|m| &m.0)),
))
.collect();
let inputs = [
(
ParaId::from(1), // wrong message.
vec![Message(vec![1, 1, 2, 2]), Message(vec![3, 3, 4, 4])],
),
(
ParaId::from(1),
vec![Message(vec![1, 3, 5, 6]), Message(vec![4, 4, 4, 4])],
),
(
ParaId::from(1), // duplicate
vec![Message(vec![1, 3, 5, 6]), Message(vec![4, 4, 4, 4])],
),
(
ParaId::from(5), // out of order
vec![Message(vec![1, 2, 3, 4, 5]), Message(vec![6, 9, 6, 9])],
),
(
ParaId::from(1234), // un-routed parachain.
vec![Message(vec![9, 9, 9, 9])],
),
(
ParaId::from(2),
vec![
Message(vec![1, 3, 7, 9, 1, 2, 3, 4, 5, 6]),
Message(b"hello world".to_vec()),
],
),
];
let ingress = ComputeIngress {
ingress_roots: roots,
incoming: Vec::new(),
inner: stream::iter_ok::<_, ()>(inputs.iter().cloned()),
};
assert_eq!(ingress.wait().unwrap().unwrap(), actual_messages);
}
#[test]
fn add_new_sessions_works() {
let mut live_sessions = LiveValidationSessions::new();