update substrate reference (#244)

* port polkadot_runtime and polkadot_validation

* update storages build (#245)

* all tests pass

* rebuild wasm
This commit is contained in:
Robert Habermeier
2019-05-06 18:25:55 +02:00
committed by GitHub
parent e42019e1dc
commit a65be1b2df
18 changed files with 1127 additions and 620 deletions
+19 -10
View File
@@ -16,11 +16,13 @@
//! Gossip messages and the message validator
use substrate_network::PeerId;
use substrate_network::consensus_gossip::{
self as network_gossip, ValidationResult as GossipValidationResult,
ValidatorContext,
};
use polkadot_validation::SignedStatement;
use polkadot_primitives::{Hash, SessionKey};
use polkadot_primitives::{Block, Hash, SessionKey};
use codec::Decode;
use std::collections::HashMap;
@@ -80,7 +82,9 @@ pub fn register_validator<O: KnownOracle + 'static>(
});
let gossip_side = validator.clone();
service.with_gossip(|gossip, _| gossip.register_validator(POLKADOT_ENGINE_ID, gossip_side));
service.with_gossip(|gossip, ctx|
gossip.register_validator(ctx, POLKADOT_ENGINE_ID, gossip_side)
);
RegisteredMessageValidator { inner: validator as _ }
}
@@ -140,29 +144,34 @@ pub struct MessageValidator<O: ?Sized> {
oracle: O,
}
impl<O: KnownOracle + ?Sized> network_gossip::Validator<Hash> for MessageValidator<O> {
fn validate(&self, mut data: &[u8]) -> GossipValidationResult<Hash> {
impl<O: KnownOracle + ?Sized> network_gossip::Validator<Block> for MessageValidator<O> {
fn validate(&self, context: &mut ValidatorContext<Block>, _sender: &PeerId, mut data: &[u8])
-> GossipValidationResult<Hash>
{
let orig_data = data;
match GossipMessage::decode(&mut data) {
Some(GossipMessage { relay_parent, statement }) => {
let live = self.live_session.read();
let topic = || ::router::attestation_topic(relay_parent.clone());
if let Some(validation) = live.get(&relay_parent) {
if validation.check_statement(&relay_parent, &statement) {
GossipValidationResult::Valid(topic())
// repropagate
let topic = topic();
context.broadcast_message(topic, orig_data.to_owned(), false);
GossipValidationResult::ProcessAndKeep(topic)
} else {
GossipValidationResult::Invalid
GossipValidationResult::Discard
}
} else {
match self.oracle.is_known(&relay_parent) {
None | Some(Known::Leaf) => GossipValidationResult::Future(topic()),
Some(Known::Old) => GossipValidationResult::Expired,
Some(Known::Bad) => GossipValidationResult::Invalid,
None | Some(Known::Leaf) => GossipValidationResult::ProcessAndKeep(topic()),
Some(Known::Old) | Some(Known::Bad) => GossipValidationResult::Discard,
}
}
}
None => {
debug!(target: "validation", "Error decoding gossip message");
GossipValidationResult::Invalid
GossipValidationResult::Discard
}
}
}
+1 -4
View File
@@ -73,9 +73,6 @@ use std::collections::{HashMap, HashSet};
#[cfg(test)]
mod tests;
/// Polkadot protocol id.
pub const DOT_PROTOCOL_ID: ::substrate_network::ProtocolId = *b"dot";
type FullStatus = GenericFullStatus<Block>;
/// Specialization of the network service for the polkadot protocol.
@@ -188,7 +185,7 @@ pub enum Message {
fn send_polkadot_message(ctx: &mut Context<Block>, to: PeerId, message: Message) {
trace!(target: "p_net", "Sending polkadot message to {}: {:?}", to, message);
let encoded = message.encode();
ctx.send_message(to, generic_message::Message::ChainSpecific(encoded))
ctx.send_chain_specific(to, encoded)
}
/// Polkadot protocol attachment for substrate.
+1 -1
View File
@@ -88,7 +88,7 @@ impl<P, E, N: NetworkService, T> Router<P, E, N, T> {
self.network().gossip_messages_for(self.attestation_topic)
.filter_map(|msg| {
debug!(target: "validation", "Processing statement for live validation session");
crate::gossip::GossipMessage::decode(&mut &msg[..])
crate::gossip::GossipMessage::decode(&mut &msg.message[..])
})
.map(|msg| msg.statement)
}
+16 -11
View File
@@ -29,8 +29,8 @@ use substrate_primitives::crypto::UncheckedInto;
use codec::Encode;
use substrate_network::{
Severity, PeerId, PeerInfo, ClientHandle, Context, config::Roles,
message::Message as SubstrateMessage, specialization::NetworkSpecialization,
generic_message::Message as GenericMessage
message::{BlockRequest, generic::ConsensusMessage},
specialization::NetworkSpecialization, generic_message::Message as GenericMessage
};
use futures::Future;
@@ -41,7 +41,7 @@ mod validation;
struct TestContext {
disabled: Vec<PeerId>,
disconnected: Vec<PeerId>,
messages: Vec<(PeerId, SubstrateMessage<Block>)>,
messages: Vec<(PeerId, Vec<u8>)>,
}
impl Context<Block> for TestContext {
@@ -60,20 +60,25 @@ impl Context<Block> for TestContext {
unimplemented!()
}
fn send_message(&mut self, who: PeerId, data: SubstrateMessage<Block>) {
self.messages.push((who, data))
fn send_block_request(&mut self, _who: PeerId, _request: BlockRequest<Block>) {
unimplemented!()
}
fn send_consensus(&mut self, _who: PeerId, _consensus: ConsensusMessage) {
unimplemented!()
}
fn send_chain_specific(&mut self, who: PeerId, message: Vec<u8>){
self.messages.push((who, message))
}
}
impl TestContext {
fn has_message(&self, to: PeerId, message: Message) -> bool {
use substrate_network::generic_message::Message as GenericMessage;
let encoded = message.encode();
self.messages.iter().any(|&(ref peer, ref msg)| match msg {
GenericMessage::ChainSpecific(ref data) => peer == &to && data == &encoded,
_ => false,
})
self.messages.iter().any(|&(ref peer, ref data)|
peer == &to && data == &encoded
)
}
}
+27 -12
View File
@@ -18,6 +18,7 @@
use validation::NetworkService;
use substrate_network::Context as NetContext;
use substrate_network::consensus_gossip::TopicNotification;
use substrate_primitives::{NativeOrEncoded, ExecutionContext};
use substrate_keyring::AuthorityKeyring;
use {PolkadotProtocol};
@@ -52,25 +53,32 @@ impl Future for NeverExit {
}
}
fn clone_gossip(n: &TopicNotification) -> TopicNotification {
TopicNotification {
message: n.message.clone(),
sender: n.sender.clone(),
}
}
struct GossipRouter {
incoming_messages: mpsc::UnboundedReceiver<(Hash, Vec<u8>)>,
incoming_streams: mpsc::UnboundedReceiver<(Hash, mpsc::UnboundedSender<Vec<u8>>)>,
outgoing: Vec<(Hash, mpsc::UnboundedSender<Vec<u8>>)>,
messages: Vec<(Hash, Vec<u8>)>,
incoming_messages: mpsc::UnboundedReceiver<(Hash, TopicNotification)>,
incoming_streams: mpsc::UnboundedReceiver<(Hash, mpsc::UnboundedSender<TopicNotification>)>,
outgoing: Vec<(Hash, mpsc::UnboundedSender<TopicNotification>)>,
messages: Vec<(Hash, TopicNotification)>,
}
impl GossipRouter {
fn add_message(&mut self, topic: Hash, message: Vec<u8>) {
fn add_message(&mut self, topic: Hash, message: TopicNotification) {
self.outgoing.retain(|&(ref o_topic, ref sender)| {
o_topic != &topic || sender.unbounded_send(message.clone()).is_ok()
o_topic != &topic || sender.unbounded_send(clone_gossip(&message)).is_ok()
});
self.messages.push((topic, message));
}
fn add_outgoing(&mut self, topic: Hash, sender: mpsc::UnboundedSender<Vec<u8>>) {
fn add_outgoing(&mut self, topic: Hash, sender: mpsc::UnboundedSender<TopicNotification>) {
for message in self.messages.iter()
.filter(|&&(ref t, _)| t == &topic)
.map(|&(_, ref msg)| msg.clone())
.map(|&(_, ref msg)| clone_gossip(msg))
{
if let Err(_) = sender.unbounded_send(message) { return }
}
@@ -107,8 +115,8 @@ impl Future for GossipRouter {
#[derive(Clone)]
struct GossipHandle {
send_message: mpsc::UnboundedSender<(Hash, Vec<u8>)>,
send_listener: mpsc::UnboundedSender<(Hash, mpsc::UnboundedSender<Vec<u8>>)>,
send_message: mpsc::UnboundedSender<(Hash, TopicNotification)>,
send_listener: mpsc::UnboundedSender<(Hash, mpsc::UnboundedSender<TopicNotification>)>,
}
fn make_gossip() -> (GossipRouter, GossipHandle) {
@@ -132,14 +140,15 @@ struct TestNetwork {
}
impl NetworkService for TestNetwork {
fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<Vec<u8>> {
fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<TopicNotification> {
let (tx, rx) = mpsc::unbounded();
let _ = self.gossip.send_listener.unbounded_send((topic, tx));
rx
}
fn gossip_message(&self, topic: Hash, message: Vec<u8>) {
let _ = self.gossip.send_message.unbounded_send((topic, message));
let notification = TopicNotification { message, sender: None };
let _ = self.gossip.send_message.unbounded_send((topic, notification));
}
fn drop_gossip(&self, _topic: Hash) {}
@@ -233,6 +242,12 @@ impl ApiExt<Block> for RuntimeApi {
fn runtime_version_at(&self, _: &BlockId) -> ClientResult<RuntimeVersion> {
unimplemented!("Not required for testing!")
}
fn record_proof(&mut self) { }
fn extract_proof(&mut self) -> Option<Vec<Vec<u8>>> {
None
}
}
impl ParachainHost<Block> for RuntimeApi {
+10 -4
View File
@@ -21,6 +21,7 @@
use sr_primitives::traits::{BlakeTwo256, ProvideRuntimeApi, Hash as HashT};
use substrate_network::Context as NetContext;
use substrate_network::consensus_gossip::{TopicNotification, MessageRecipient as GossipMessageRecipient};
use polkadot_validation::{Network as ParachainNetwork, SharedTable, Collators, Statement, GenericStatement};
use polkadot_primitives::{Block, BlockId, Hash, SessionKey};
use polkadot_primitives::parachain::{
@@ -76,7 +77,7 @@ impl Executor for TaskExecutor {
/// Basic functionality that a network has to fulfill.
pub trait NetworkService: Send + Sync + 'static {
/// Get a stream of gossip messages for a given hash.
fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<Vec<u8>>;
fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<TopicNotification>;
/// Gossip a message on given topic.
fn gossip_message(&self, topic: Hash, message: Vec<u8>);
@@ -90,7 +91,7 @@ pub trait NetworkService: Send + Sync + 'static {
}
impl NetworkService for super::NetworkService {
fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<Vec<u8>> {
fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<TopicNotification> {
let (tx, rx) = std::sync::mpsc::channel();
self.with_gossip(move |gossip, _| {
@@ -105,7 +106,12 @@ impl NetworkService for super::NetworkService {
}
fn gossip_message(&self, topic: Hash, message: Vec<u8>) {
self.gossip_consensus_message(topic, POLKADOT_ENGINE_ID, message, false);
self.gossip_consensus_message(
topic,
POLKADOT_ENGINE_ID,
message,
GossipMessageRecipient::BroadcastToAll,
);
}
fn drop_gossip(&self, _topic: Hash) { }
@@ -781,7 +787,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> SessionDataFetcher<P, E, N, T> where
let gossip_messages = self.network().gossip_messages_for(topic)
.map_err(|()| panic!("unbounded receivers do not throw errors; qed"))
.filter_map(|msg| IngressPair::decode(&mut msg.as_slice()));
.filter_map(|msg| IngressPair::decode(&mut msg.message.as_slice()));
let canon_roots = self.api.runtime_api().ingress(&BlockId::hash(parent_hash), parachain)
.map_err(|e| format!("Cannot fetch ingress for parachain {:?} at {:?}: {:?}",