ICMP message-routing gossip (#304)

* core logic for ICMP gossip

* refactor gossip to make more extension friendly

* move files aroun

* extract attestation-gossip logic to its own module

* message validation and broadcast logic

* fix upstream crates' compilation

* add a test

* another test for overlapping

* Some grammar and phrasing tweaks

Co-Authored-By: Luke Schoen <ltfschoen@users.noreply.github.com>

* add since parameter to ingress runtime API

* broadcast out known unrouted message queues

* fix compilation of service and collator

* remove useless index_mapping

* some tests for icmp propagation

* fix decoding bug and test icmp queue validation

* simplify engine-id definition

Co-Authored-By: Sergei Pepyakin <sergei@parity.io>

* address some grumbles

* some cleanup of old circulation code

* give network a handle to extrinsic store on startup

* an honest collator ensures data available as well

* address some grumbles

* add docs; rename the attestation session to "leaf work"

* module docs

* move gossip back to gossip.rs

* clean up and document attestation-gossip a bit

* some more docs on the availability store

* store all outgoing message queues in the availability store

* filter `Extrinsic` out of validation crate

* expunge Extrinsic from network

* expunge Extrinsic from erasure-coding

* expunge Extrinsic from collator

* expunge from adder-collator

* rename ExtrinsicStore to AvailabilityStore everywhere

* annotate and clean up message-routing tests
This commit is contained in:
Robert Habermeier
2019-08-29 11:49:59 +02:00
committed by GitHub
parent bd8ebbfee5
commit 55c4c830fe
22 changed files with 1981 additions and 818 deletions
+74 -28
View File
@@ -14,13 +14,17 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. // along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Persistent database for parachain data. //! Persistent database for parachain data: PoV block data and outgoing messages.
//!
//! This will be written into during the block validation pipeline, and queried
//! by networking code in order to circulate required data and maintain availability
//! of it.
use codec::{Encode, Decode}; use codec::{Encode, Decode};
use kvdb::{KeyValueDB, DBTransaction}; use kvdb::{KeyValueDB, DBTransaction};
use kvdb_rocksdb::{Database, DatabaseConfig}; use kvdb_rocksdb::{Database, DatabaseConfig};
use polkadot_primitives::Hash; use polkadot_primitives::Hash;
use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic}; use polkadot_primitives::parachain::{Id as ParaId, BlockData, Message};
use log::warn; use log::warn;
use std::collections::HashSet; use std::collections::HashSet;
@@ -42,7 +46,7 @@ pub struct Config {
pub path: PathBuf, pub path: PathBuf,
} }
/// Some data to keep available. /// Some data to keep available about a parachain block candidate.
pub struct Data { pub struct Data {
/// The relay chain parent hash this should be localized to. /// The relay chain parent hash this should be localized to.
pub relay_parent: Hash, pub relay_parent: Hash,
@@ -52,18 +56,16 @@ pub struct Data {
pub candidate_hash: Hash, pub candidate_hash: Hash,
/// Block data. /// Block data.
pub block_data: BlockData, pub block_data: BlockData,
/// Extrinsic data. /// Outgoing message queues from execution of the block, if any.
pub extrinsic: Option<Extrinsic>, ///
/// The tuple pairs the message queue root and the queue data.
pub outgoing_queues: Option<Vec<(Hash, Vec<Message>)>>,
} }
fn block_data_key(relay_parent: &Hash, candidate_hash: &Hash) -> Vec<u8> { fn block_data_key(relay_parent: &Hash, candidate_hash: &Hash) -> Vec<u8> {
(relay_parent, candidate_hash, 0i8).encode() (relay_parent, candidate_hash, 0i8).encode()
} }
fn extrinsic_key(relay_parent: &Hash, candidate_hash: &Hash) -> Vec<u8> {
(relay_parent, candidate_hash, 1i8).encode()
}
/// Handle to the availability store. /// Handle to the availability store.
#[derive(Clone)] #[derive(Clone)]
pub struct Store { pub struct Store {
@@ -96,6 +98,16 @@ impl Store {
} }
/// Make some data available provisionally. /// Make some data available provisionally.
///
/// Validators with the responsibility of maintaining availability
/// for a block or collators collating a block will call this function
/// in order to persist that data to disk and so it can be queried and provided
/// to other nodes in the network.
///
/// The message data of `Data` is optional but is expected
/// to be present with the exception of the case where there is no message data
/// due to the block's invalidity. Determination of invalidity is beyond the
/// scope of this function.
pub fn make_available(&self, data: Data) -> io::Result<()> { pub fn make_available(&self, data: Data) -> io::Result<()> {
let mut tx = DBTransaction::new(); let mut tx = DBTransaction::new();
@@ -118,12 +130,16 @@ impl Store {
data.block_data.encode() data.block_data.encode()
); );
if let Some(extrinsic) = data.extrinsic { if let Some(outgoing_queues) = data.outgoing_queues {
tx.put_vec( // This is kept forever and not pruned.
columns::DATA, for (root, messages) in outgoing_queues {
extrinsic_key(&data.relay_parent, &data.candidate_hash).as_slice(), tx.put_vec(
extrinsic.encode(), columns::DATA,
); root.as_ref(),
messages.encode(),
);
}
} }
self.inner.write(tx) self.inner.write(tx)
@@ -146,7 +162,6 @@ impl Store {
for candidate_hash in v { for candidate_hash in v {
if !finalized_candidates.contains(&candidate_hash) { if !finalized_candidates.contains(&candidate_hash) {
tx.delete(columns::DATA, block_data_key(&parent, &candidate_hash).as_slice()); tx.delete(columns::DATA, block_data_key(&parent, &candidate_hash).as_slice());
tx.delete(columns::DATA, extrinsic_key(&parent, &candidate_hash).as_slice());
} }
} }
@@ -168,12 +183,11 @@ impl Store {
} }
} }
/// Query extrinsic data. /// Query message queue data by message queue root hash.
pub fn extrinsic(&self, relay_parent: Hash, candidate_hash: Hash) -> Option<Extrinsic> { pub fn queue_by_root(&self, queue_root: &Hash) -> Option<Vec<Message>> {
let encoded_key = extrinsic_key(&relay_parent, &candidate_hash); match self.inner.get(columns::DATA, queue_root.as_ref()) {
match self.inner.get(columns::DATA, &encoded_key[..]) {
Ok(Some(raw)) => Some( Ok(Some(raw)) => Some(
Extrinsic::decode(&mut &raw[..]).expect("all stored data serialized correctly; qed") <_>::decode(&mut &raw[..]).expect("all stored data serialized correctly; qed")
), ),
Ok(None) => None, Ok(None) => None,
Err(e) => { Err(e) => {
@@ -207,7 +221,7 @@ mod tests {
parachain_id: para_id_1, parachain_id: para_id_1,
candidate_hash: candidate_1, candidate_hash: candidate_1,
block_data: block_data_1.clone(), block_data: block_data_1.clone(),
extrinsic: Some(Extrinsic { outgoing_messages: Vec::new() }), outgoing_queues: None,
}).unwrap(); }).unwrap();
store.make_available(Data { store.make_available(Data {
@@ -215,21 +229,53 @@ mod tests {
parachain_id: para_id_2, parachain_id: para_id_2,
candidate_hash: candidate_2, candidate_hash: candidate_2,
block_data: block_data_2.clone(), block_data: block_data_2.clone(),
extrinsic: Some(Extrinsic { outgoing_messages: Vec::new() }), outgoing_queues: None,
}).unwrap(); }).unwrap();
assert_eq!(store.block_data(relay_parent, candidate_1).unwrap(), block_data_1); assert_eq!(store.block_data(relay_parent, candidate_1).unwrap(), block_data_1);
assert_eq!(store.block_data(relay_parent, candidate_2).unwrap(), block_data_2); assert_eq!(store.block_data(relay_parent, candidate_2).unwrap(), block_data_2);
assert!(store.extrinsic(relay_parent, candidate_1).is_some());
assert!(store.extrinsic(relay_parent, candidate_2).is_some());
store.candidates_finalized(relay_parent, [candidate_1].iter().cloned().collect()).unwrap(); store.candidates_finalized(relay_parent, [candidate_1].iter().cloned().collect()).unwrap();
assert_eq!(store.block_data(relay_parent, candidate_1).unwrap(), block_data_1); assert_eq!(store.block_data(relay_parent, candidate_1).unwrap(), block_data_1);
assert!(store.block_data(relay_parent, candidate_2).is_none()); assert!(store.block_data(relay_parent, candidate_2).is_none());
}
assert!(store.extrinsic(relay_parent, candidate_1).is_some()); #[test]
assert!(store.extrinsic(relay_parent, candidate_2).is_none()); fn queues_available_by_queue_root() {
let relay_parent = [1; 32].into();
let para_id = 5.into();
let candidate = [2; 32].into();
let block_data = BlockData(vec![1, 2, 3]);
let message_queue_root_1 = [0x42; 32].into();
let message_queue_root_2 = [0x43; 32].into();
let message_a = Message(vec![1, 2, 3, 4]);
let message_b = Message(vec![4, 5, 6, 7]);
let outgoing_queues = vec![
(message_queue_root_1, vec![message_a.clone()]),
(message_queue_root_2, vec![message_b.clone()]),
];
let store = Store::new_in_memory();
store.make_available(Data {
relay_parent,
parachain_id: para_id,
candidate_hash: candidate,
block_data: block_data.clone(),
outgoing_queues: Some(outgoing_queues),
}).unwrap();
assert_eq!(
store.queue_by_root(&message_queue_root_1),
Some(vec![message_a]),
);
assert_eq!(
store.queue_by_root(&message_queue_root_2),
Some(vec![message_b]),
);
} }
} }
+54 -43
View File
@@ -57,7 +57,7 @@ use primitives::Pair;
use polkadot_primitives::{ use polkadot_primitives::{
BlockId, Hash, Block, BlockId, Hash, Block,
parachain::{ parachain::{
self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId, Extrinsic, self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId, OutgoingMessages,
PoVBlock, Status as ParachainStatus, ValidatorId, CollatorPair, PoVBlock, Status as ParachainStatus, ValidatorId, CollatorPair,
} }
}; };
@@ -65,8 +65,8 @@ use polkadot_cli::{
Worker, IntoExit, ProvideRuntimeApi, TaskExecutor, AbstractService, Worker, IntoExit, ProvideRuntimeApi, TaskExecutor, AbstractService,
CustomConfiguration, ParachainHost, CustomConfiguration, ParachainHost,
}; };
use polkadot_network::validation::{SessionParams, ValidationNetwork}; use polkadot_network::validation::{LeafWorkParams, ValidationNetwork};
use polkadot_network::{NetworkService, PolkadotProtocol}; use polkadot_network::{PolkadotNetworkService, PolkadotProtocol};
use tokio::timer::Timeout; use tokio::timer::Timeout;
pub use polkadot_cli::VersionInfo; pub use polkadot_cli::VersionInfo;
@@ -91,7 +91,7 @@ pub trait Network: Send + Sync {
fn checked_statements(&self, relay_parent: Hash) -> Box<dyn Stream<Item=SignedStatement, Error=()>>; fn checked_statements(&self, relay_parent: Hash) -> Box<dyn Stream<Item=SignedStatement, Error=()>>;
} }
impl<P, E> Network for ValidationNetwork<P, E, NetworkService, TaskExecutor> where impl<P, E> Network for ValidationNetwork<P, E, PolkadotNetworkService, TaskExecutor> where
P: 'static + Send + Sync, P: 'static + Send + Sync,
E: 'static + Send + Sync, E: 'static + Send + Sync,
{ {
@@ -142,7 +142,7 @@ pub trait BuildParachainContext {
/// This can be implemented through an externally attached service or a stub. /// This can be implemented through an externally attached service or a stub.
/// This is expected to be a lightweight, shared type like an Arc. /// This is expected to be a lightweight, shared type like an Arc.
pub trait ParachainContext: Clone { pub trait ParachainContext: Clone {
type ProduceCandidate: IntoFuture<Item=(BlockData, HeadData, Extrinsic), Error=InvalidHead>; type ProduceCandidate: IntoFuture<Item=(BlockData, HeadData, OutgoingMessages), Error=InvalidHead>;
/// Produce a candidate, given the relay parent hash, the latest ingress queue information /// Produce a candidate, given the relay parent hash, the latest ingress queue information
/// and the last parachain head. /// and the last parachain head.
@@ -177,7 +177,7 @@ pub fn collate<'a, R, P>(
para_context: P, para_context: P,
key: Arc<CollatorPair>, key: Arc<CollatorPair>,
) )
-> impl Future<Item=parachain::Collation, Error=Error<R::Error>> + 'a -> impl Future<Item=(parachain::Collation, OutgoingMessages), Error=Error<R::Error>> + 'a
where where
R: RelayChainContext, R: RelayChainContext,
R::Error: 'a, R::Error: 'a,
@@ -197,11 +197,11 @@ pub fn collate<'a, R, P>(
.map(move |x| (ingress, x)) .map(move |x| (ingress, x))
.map_err(Error::Collator) .map_err(Error::Collator)
}) })
.and_then(move |(ingress, (block_data, head_data, mut extrinsic))| { .and_then(move |(ingress, (block_data, head_data, mut outgoing))| {
let block_data_hash = block_data.hash(); let block_data_hash = block_data.hash();
let signature = key.sign(block_data_hash.as_ref()).into(); let signature = key.sign(block_data_hash.as_ref()).into();
let egress_queue_roots = let egress_queue_roots =
polkadot_validation::egress_roots(&mut extrinsic.outgoing_messages); polkadot_validation::egress_roots(&mut outgoing.outgoing_messages);
let receipt = parachain::CandidateReceipt { let receipt = parachain::CandidateReceipt {
parachain_index: local_id, parachain_index: local_id,
@@ -214,19 +214,21 @@ pub fn collate<'a, R, P>(
upward_messages: Vec::new(), upward_messages: Vec::new(),
}; };
Ok(parachain::Collation { let collation = parachain::Collation {
receipt, receipt,
pov: PoVBlock { pov: PoVBlock {
block_data, block_data,
ingress, ingress,
}, },
}) };
Ok((collation, outgoing))
}) })
} }
/// Polkadot-api context. /// Polkadot-api context.
struct ApiContext<P, E> { struct ApiContext<P, E> {
network: Arc<ValidationNetwork<P, E, NetworkService, TaskExecutor>>, network: Arc<ValidationNetwork<P, E, PolkadotNetworkService, TaskExecutor>>,
parent_hash: Hash, parent_hash: Hash,
validators: Vec<ValidatorId>, validators: Vec<ValidatorId>,
} }
@@ -243,7 +245,7 @@ impl<P: 'static, E: 'static> RelayChainContext for ApiContext<P, E> where
// TODO: https://github.com/paritytech/polkadot/issues/253 // TODO: https://github.com/paritytech/polkadot/issues/253
// //
// Fetch ingress and accumulate all unrounted egress // Fetch ingress and accumulate all unrounted egress
let _session = self.network.instantiate_session(SessionParams { let _session = self.network.instantiate_leaf_work(LeafWorkParams {
local_session_key: None, local_session_key: None,
parent_hash: self.parent_hash, parent_hash: self.parent_hash,
authorities: self.validators.clone(), authorities: self.validators.clone(),
@@ -303,26 +305,28 @@ impl<P, E> Worker for CollationNode<P, E> where
return Box::new(future::err(())); return Box::new(future::err(()));
}; };
let is_known = move |block_hash: &Hash| {
use client::BlockStatus;
use polkadot_network::gossip::Known;
match known_oracle.block_status(&BlockId::hash(*block_hash)) {
Err(_) | Ok(BlockStatus::Unknown) | Ok(BlockStatus::Queued) => None,
Ok(BlockStatus::KnownBad) => Some(Known::Bad),
Ok(BlockStatus::InChainWithState) | Ok(BlockStatus::InChainPruned) =>
match select_chain.leaves() {
Err(_) => None,
Ok(leaves) => if leaves.contains(block_hash) {
Some(Known::Leaf)
} else {
Some(Known::Old)
},
}
}
};
let message_validator = polkadot_network::gossip::register_validator( let message_validator = polkadot_network::gossip::register_validator(
network.clone(), network.clone(),
move |block_hash: &Hash| { (is_known, client.clone()),
use client::BlockStatus;
use polkadot_network::gossip::Known;
match known_oracle.block_status(&BlockId::hash(*block_hash)) {
Err(_) | Ok(BlockStatus::Unknown) | Ok(BlockStatus::Queued) => None,
Ok(BlockStatus::KnownBad) => Some(Known::Bad),
Ok(BlockStatus::InChainWithState) | Ok(BlockStatus::InChainPruned) =>
match select_chain.leaves() {
Err(_) => None,
Ok(leaves) => if leaves.contains(block_hash) {
Some(Known::Leaf)
} else {
Some(Known::Old)
},
}
}
},
); );
let validation_network = Arc::new(ValidationNetwork::new( let validation_network = Arc::new(ValidationNetwork::new(
@@ -386,13 +390,20 @@ impl<P, E> Worker for CollationNode<P, E> where
context, context,
parachain_context, parachain_context,
key, key,
).map(move |collation| { ).map(move |(collation, outgoing)| {
network.with_spec(move |spec, ctx| spec.add_local_collation( network.with_spec(move |spec, ctx| {
ctx, let res = spec.add_local_collation(
relay_parent, ctx,
targets, relay_parent,
collation, targets,
)); collation,
outgoing,
);
if let Err(e) = res {
warn!("Unable to broadcast local collation: {:?}", e);
}
})
}); });
future::Either::B(collation_work) future::Either::B(collation_work)
@@ -450,7 +461,7 @@ pub fn run_collator<P, E>(
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::collections::HashMap; use std::collections::HashMap;
use polkadot_primitives::parachain::{OutgoingMessage, FeeSchedule}; use polkadot_primitives::parachain::{TargetedMessage, FeeSchedule};
use keyring::Sr25519Keyring; use keyring::Sr25519Keyring;
use super::*; use super::*;
@@ -475,20 +486,20 @@ mod tests {
struct DummyParachainContext; struct DummyParachainContext;
impl ParachainContext for DummyParachainContext { impl ParachainContext for DummyParachainContext {
type ProduceCandidate = Result<(BlockData, HeadData, Extrinsic), InvalidHead>; type ProduceCandidate = Result<(BlockData, HeadData, OutgoingMessages), InvalidHead>;
fn produce_candidate<I: IntoIterator<Item=(ParaId, Message)>>( fn produce_candidate<I: IntoIterator<Item=(ParaId, Message)>>(
&self, &self,
_relay_parent: Hash, _relay_parent: Hash,
_status: ParachainStatus, _status: ParachainStatus,
ingress: I, ingress: I,
) -> Result<(BlockData, HeadData, Extrinsic), InvalidHead> { ) -> Result<(BlockData, HeadData, OutgoingMessages), InvalidHead> {
// send messages right back. // send messages right back.
Ok(( Ok((
BlockData(vec![1, 2, 3, 4, 5,]), BlockData(vec![1, 2, 3, 4, 5,]),
HeadData(vec![9, 9, 9]), HeadData(vec![9, 9, 9]),
Extrinsic { OutgoingMessages {
outgoing_messages: ingress.into_iter().map(|(id, msg)| OutgoingMessage { outgoing_messages: ingress.into_iter().map(|(id, msg)| TargetedMessage {
target: id, target: id,
data: msg.0, data: msg.0,
}).collect(), }).collect(),
@@ -542,7 +553,7 @@ mod tests {
context.clone(), context.clone(),
DummyParachainContext, DummyParachainContext,
Arc::new(Sr25519Keyring::Alice.pair().into()), Arc::new(Sr25519Keyring::Alice.pair().into()),
).wait().unwrap(); ).wait().unwrap().0;
// ascending order by root. // ascending order by root.
assert_eq!(collation.receipt.egress_queue_roots, vec![(a, root_a), (b, root_b)]); assert_eq!(collation.receipt.egress_queue_roots, vec![(a, root_a), (b, root_b)]);
+6 -6
View File
@@ -27,7 +27,7 @@
use codec::{Encode, Decode}; use codec::{Encode, Decode};
use reed_solomon::galois_16::{self, ReedSolomon}; use reed_solomon::galois_16::{self, ReedSolomon};
use primitives::{Hash as H256, BlakeTwo256, HashT}; use primitives::{Hash as H256, BlakeTwo256, HashT};
use primitives::parachain::{BlockData, Extrinsic}; use primitives::parachain::{BlockData, OutgoingMessages};
use substrate_primitives::Blake2Hasher; use substrate_primitives::Blake2Hasher;
use trie::{EMPTY_PREFIX, MemoryDB, Trie, TrieMut, trie_types::{TrieDBMut, TrieDB}}; use trie::{EMPTY_PREFIX, MemoryDB, Trie, TrieMut, trie_types::{TrieDBMut, TrieDB}};
@@ -124,11 +124,11 @@ fn code_params(n_validators: usize) -> Result<CodeParams, Error> {
/// Obtain erasure-coded chunks, one for each validator. /// Obtain erasure-coded chunks, one for each validator.
/// ///
/// Works only up to 65536 validators, and `n_validators` must be non-zero. /// Works only up to 65536 validators, and `n_validators` must be non-zero.
pub fn obtain_chunks(n_validators: usize, block_data: &BlockData, extrinsic: &Extrinsic) pub fn obtain_chunks(n_validators: usize, block_data: &BlockData, outgoing: &OutgoingMessages)
-> Result<Vec<Vec<u8>>, Error> -> Result<Vec<Vec<u8>>, Error>
{ {
let params = code_params(n_validators)?; let params = code_params(n_validators)?;
let encoded = (block_data, extrinsic).encode(); let encoded = (block_data, outgoing).encode();
if encoded.is_empty() { if encoded.is_empty() {
return Err(Error::BadPayload); return Err(Error::BadPayload);
@@ -150,7 +150,7 @@ pub fn obtain_chunks(n_validators: usize, block_data: &BlockData, extrinsic: &Ex
/// ///
/// Works only up to 65536 validators, and `n_validators` must be non-zero. /// Works only up to 65536 validators, and `n_validators` must be non-zero.
pub fn reconstruct<'a, I: 'a>(n_validators: usize, chunks: I) pub fn reconstruct<'a, I: 'a>(n_validators: usize, chunks: I)
-> Result<(BlockData, Extrinsic), Error> -> Result<(BlockData, OutgoingMessages), Error>
where I: IntoIterator<Item=(&'a [u8], usize)> where I: IntoIterator<Item=(&'a [u8], usize)>
{ {
let params = code_params(n_validators)?; let params = code_params(n_validators)?;
@@ -399,7 +399,7 @@ mod tests {
#[test] #[test]
fn round_trip_block_data() { fn round_trip_block_data() {
let block_data = BlockData((0..255).collect()); let block_data = BlockData((0..255).collect());
let ex = Extrinsic { outgoing_messages: Vec::new() }; let ex = OutgoingMessages { outgoing_messages: Vec::new() };
let chunks = obtain_chunks( let chunks = obtain_chunks(
10, 10,
&block_data, &block_data,
@@ -428,7 +428,7 @@ mod tests {
let chunks = obtain_chunks( let chunks = obtain_chunks(
10, 10,
&block_data, &block_data,
&Extrinsic { outgoing_messages: Vec::new() }, &OutgoingMessages { outgoing_messages: Vec::new() },
).unwrap(); ).unwrap();
let chunks: Vec<_> = chunks.iter().map(|c| &c[..]).collect(); let chunks: Vec<_> = chunks.iter().map(|c| &c[..]).collect();
+1 -1
View File
@@ -18,7 +18,7 @@ sr-primitives = { git = "https://github.com/paritytech/substrate", branch = "pol
futures = "0.1" futures = "0.1"
log = "0.4" log = "0.4"
exit-future = "0.1.4" exit-future = "0.1.4"
substrate-client = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
[dev-dependencies] [dev-dependencies]
substrate-client = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
substrate-keyring = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } substrate-keyring = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
File diff suppressed because it is too large Load Diff
+264
View File
@@ -0,0 +1,264 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Gossip messages and structures for dealing with attestations (statements of
//! validity of invalidity on parachain candidates).
//!
//! This follows the same principles as other gossip modules (see parent
//! documentation for more details) by being aware of our current chain
//! heads and accepting only information relative to them. Attestations are localized to
//! relay chain head, so this is easily doable.
//!
//! This module also provides a filter, so we can only broadcast messages to
//! peers that are relevant to chain heads they have advertised.
//!
//! Furthermore, since attestations are bottlenecked by the `Candidate` statement,
//! we only accept attestations which are themselves `Candidate` messages, or reference
//! a `Candidate` we are aware of. Otherwise, it is possible we could be forced to
//! consider an infinite amount of attestations produced by a misbehaving validator.
use substrate_network::consensus_gossip::{ValidationResult as GossipValidationResult};
use polkadot_validation::GenericStatement;
use polkadot_primitives::Hash;
use std::collections::{HashMap, HashSet};
use log::warn;
use crate::router::attestation_topic;
use super::{cost, benefit, MAX_CHAIN_HEADS, LeavesVec, ChainContext, Known, MessageValidationData, GossipStatement};
// knowledge about attestations on a single parent-hash.
#[derive(Default)]
pub(super) struct Knowledge {
candidates: HashSet<Hash>,
}
impl Knowledge {
// whether the peer is aware of a candidate with given hash.
fn is_aware_of(&self, candidate_hash: &Hash) -> bool {
self.candidates.contains(candidate_hash)
}
// note that the peer is aware of a candidate with given hash. this should
// be done after observing an incoming candidate message via gossip.
fn note_aware(&mut self, candidate_hash: Hash) {
self.candidates.insert(candidate_hash);
}
}
#[derive(Default)]
pub(super) struct PeerData {
live: HashMap<Hash, Knowledge>,
}
impl PeerData {
/// Update leaves, returning a list of which leaves are new.
pub(super) fn update_leaves(&mut self, leaves: &LeavesVec) -> LeavesVec {
let mut new = LeavesVec::new();
self.live.retain(|k, _| leaves.contains(k));
for &leaf in leaves {
self.live.entry(leaf).or_insert_with(|| {
new.push(leaf);
Default::default()
});
}
new
}
#[cfg(test)]
pub(super) fn note_aware_under_leaf(&mut self, relay_chain_leaf: &Hash, candidate_hash: Hash) {
if let Some(knowledge) = self.live.get_mut(relay_chain_leaf) {
knowledge.note_aware(candidate_hash);
}
}
pub(super) fn knowledge_at_mut(&mut self, parent_hash: &Hash) -> Option<&mut Knowledge> {
self.live.get_mut(parent_hash)
}
/// Get an iterator over all live leaves of this peer.
pub(super) fn leaves(&self) -> impl Iterator<Item = &Hash> {
self.live.keys()
}
}
/// An impartial view of what topics and data are valid based on attestation session data.
pub(super) struct View {
leaf_work: Vec<(Hash, LeafView)>, // hashes of the best DAG-leaves paired with validation data.
topics: HashMap<Hash, Hash>, // maps topic hashes to block hashes.
}
impl Default for View {
fn default() -> Self {
View {
leaf_work: Vec::with_capacity(MAX_CHAIN_HEADS),
topics: Default::default(),
}
}
}
impl View {
fn leaf_view(&self, relay_chain_leaf: &Hash) -> Option<&LeafView> {
self.leaf_work.iter()
.find_map(|&(ref h, ref leaf)| if h == relay_chain_leaf { Some(leaf) } else { None } )
}
fn leaf_view_mut(&mut self, relay_chain_leaf: &Hash) -> Option<&mut LeafView> {
self.leaf_work.iter_mut()
.find_map(|&mut (ref h, ref mut leaf)| if h == relay_chain_leaf { Some(leaf) } else { None } )
}
/// Get our leaves-set. Guaranteed to have length <= MAX_CHAIN_HEADS.
pub(super) fn neighbor_info<'a>(&'a self) -> impl Iterator<Item=Hash> + 'a + Clone {
self.leaf_work.iter().take(MAX_CHAIN_HEADS).map(|(p, _)| p.clone())
}
/// Note new leaf in our local view and validation data necessary to check signatures
/// of statements issued under this leaf.
///
/// This will be pruned later on a call to `prune_old_leaves`, when this leaf
/// is not a leaf anymore.
pub(super) fn new_local_leaf(&mut self, relay_chain_leaf: Hash, validation_data: MessageValidationData) {
self.leaf_work.push((
relay_chain_leaf,
LeafView {
validation_data,
knowledge: Default::default(),
},
));
self.topics.insert(attestation_topic(relay_chain_leaf), relay_chain_leaf);
}
/// Prune old leaf-work that fails the leaf predicate.
pub(super) fn prune_old_leaves<F: Fn(&Hash) -> bool>(&mut self, is_leaf: F) {
let leaf_work = &mut self.leaf_work;
leaf_work.retain(|&(ref relay_chain_leaf, _)| is_leaf(relay_chain_leaf));
self.topics.retain(|_, v| leaf_work.iter().find(|(p, _)| p == v).is_some());
}
/// Whether a message topic is considered live relative to our view. non-live
/// topics do not pertain to our perceived leaves, and are uninteresting to us.
pub(super) fn is_topic_live(&self, topic: &Hash) -> bool {
self.topics.contains_key(topic)
}
/// The relay-chain block hash corresponding to a topic.
pub(super) fn topic_block(&self, topic: &Hash) -> Option<&Hash> {
self.topics.get(topic)
}
/// Validate the signature on an attestation statement of some kind. Should be done before
/// any repropagation of that statement.
pub(super) fn validate_statement_signature<C: ChainContext + ?Sized>(
&mut self,
message: GossipStatement,
chain: &C,
)
-> (GossipValidationResult<Hash>, i32)
{
// message must reference one of our chain heads and
// if message is not a `Candidate` we should have the candidate available
// in `attestation_view`.
match self.leaf_view(&message.relay_chain_leaf) {
None => {
let cost = match chain.is_known(&message.relay_chain_leaf) {
Some(Known::Leaf) => {
warn!(
target: "network",
"Leaf block {} not considered live for attestation",
message.relay_chain_leaf,
);
0
}
Some(Known::Old) => cost::PAST_MESSAGE,
_ => cost::FUTURE_MESSAGE,
};
(GossipValidationResult::Discard, cost)
}
Some(view) => {
// first check that we are capable of receiving this message
// in a DoS-proof manner.
let benefit = match message.signed_statement.statement {
GenericStatement::Candidate(_) => benefit::NEW_CANDIDATE,
GenericStatement::Valid(ref h) | GenericStatement::Invalid(ref h) => {
if !view.knowledge.is_aware_of(h) {
let cost = cost::ATTESTATION_NO_CANDIDATE;
return (GossipValidationResult::Discard, cost);
}
benefit::NEW_ATTESTATION
}
};
// validate signature.
let res = view.validation_data.check_statement(
&message.relay_chain_leaf,
&message.signed_statement,
);
match res {
Ok(()) => {
let topic = attestation_topic(message.relay_chain_leaf);
(GossipValidationResult::ProcessAndKeep(topic), benefit)
}
Err(()) => (GossipValidationResult::Discard, cost::BAD_SIGNATURE),
}
}
}
}
/// whether it's allowed to send a statement to a peer with given knowledge
/// about the relay parent the statement refers to.
pub(super) fn statement_allowed(
&mut self,
statement: &GossipStatement,
relay_chain_leaf: &Hash,
peer_knowledge: &mut Knowledge,
) -> bool {
let signed = &statement.signed_statement;
match signed.statement {
GenericStatement::Valid(ref h) | GenericStatement::Invalid(ref h) => {
// `valid` and `invalid` statements can only be propagated after
// a candidate message is known by that peer.
peer_knowledge.is_aware_of(h)
}
GenericStatement::Candidate(ref c) => {
// if we are sending a `Candidate` message we should make sure that
// attestation_view and their_view reflects that we know about the candidate.
let hash = c.hash();
peer_knowledge.note_aware(hash);
if let Some(attestation_view) = self.leaf_view_mut(&relay_chain_leaf) {
attestation_view.knowledge.note_aware(hash);
}
// at this point, the peer hasn't seen the message or the candidate
// and has knowledge of the relevant relay-chain parent.
true
}
}
}
}
struct LeafView {
validation_data: MessageValidationData,
knowledge: Knowledge,
}
@@ -0,0 +1,339 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Data structures and synchronous logic for ICMP message gossip.
//!
//! The parent-module documentation describes some rationale of the general
//! gossip protocol design.
//!
//! The ICMP message-routing gossip works according to those rationale.
//!
//! In this protocol, we perform work under 4 conditions:
//! ### 1. Upon observation of a new leaf in the block-DAG.
//!
//! We first communicate the best leaves to our neighbors in the gossip graph
//! by the means of a neighbor packet. Then, we query to discover the trie roots
//! of all un-routed message queues from the perspective of each of those leaves.
//!
//! For any trie root in the unrouted set for the new leaf, if we have the corresponding
//! queue, we send it to any peers with the new leaf in their latest advertised set.
//!
//! Which parachain those messages go to and from is unimportant, because this is
//! an everybody-sees-everything style protocol. The only important property is "liveness":
//! that the queue root is un-routed at one of the leaves we perceive to be at the head
//! of the block-DAG.
//!
//! In Substrate gossip, every message is associated with a topic. Typically,
//! many messages are grouped under a single topic. In this gossip system, each queue
//! gets its own topic, which is based on the root hash of the queue. This is because
//! many different chain leaves may have the same queue as un-routed, so it's better than
//! attempting to group message packets by the leaf they appear unrouted at.
//!
//! ### 2. Upon a neighbor packet from a peer.
//!
//! The neighbor packet from a peer should contain perceived chain heads of that peer.
//! If there is any overlap between our perceived chain heads and theirs, we send
//! them any known, un-routed message queue from either set.
//!
//! ### 3. Upon receiving a message queue from a peer.
//!
//! If the message queue is in the un-routed set of one of the latest leaves we've updated to,
//! we accept it and relay to any peers who need that queue as well.
//!
//! If not, we report the peer to the peer-set manager for sending us bad data.
//!
//! ### 4. Periodic Pruning
//!
//! We prune messages that are not un-routed from the view of any leaf and cease
//! to attempt to send them to any peer.
use sr_primitives::traits::{BlakeTwo256, Hash as HashT};
use polkadot_primitives::Hash;
use std::collections::{HashMap, HashSet};
use substrate_client::error::Error as ClientError;
use super::{MAX_CHAIN_HEADS, GossipValidationResult, LeavesVec, ChainContext};
/// Construct a topic for a message queue root deterministically.
pub fn queue_topic(queue_root: Hash) -> Hash {
let mut v = queue_root.as_ref().to_vec();
v.extend(b"message_queue");
BlakeTwo256::hash(&v[..])
}
/// A view of which queue roots are current for a given set of leaves.
#[derive(Default)]
pub struct View {
leaves: LeavesVec,
leaf_topics: HashMap<Hash, HashSet<Hash>>, // leaf_hash -> { topics }
expected_queues: HashMap<Hash, (Hash, bool)>, // topic -> (queue-root, known)
}
impl View {
/// Update the set of current leaves. This is called when we perceive a new bset leaf-set.
pub fn update_leaves<T: ChainContext + ?Sized, I>(&mut self, context: &T, new_leaves: I)
-> Result<(), ClientError>
where I: Iterator<Item=Hash>
{
let new_leaves = new_leaves.take(MAX_CHAIN_HEADS);
let old_leaves = std::mem::replace(&mut self.leaves, new_leaves.collect());
let expected_queues = &mut self.expected_queues;
let leaves = &self.leaves;
self.leaf_topics.retain(|l, topics| {
if leaves.contains(l) { return true }
// prune out all data about old leaves we don't follow anymore.
for topic in topics.iter() {
expected_queues.remove(topic);
}
false
});
let mut res = Ok(());
// add in new data about fresh leaves.
for new_leaf in &self.leaves {
if old_leaves.contains(new_leaf) { continue }
let mut this_leaf_topics = HashSet::new();
let r = context.leaf_unrouted_roots(new_leaf, &mut |&queue_root| {
let topic = queue_topic(queue_root);
this_leaf_topics.insert(topic);
expected_queues.entry(topic).or_insert((queue_root, false));
});
if r.is_err() {
if let Err(e) = res {
log::debug!(target: "message_routing", "Ignored duplicate error {}", e)
};
res = r;
}
self.leaf_topics.insert(*new_leaf, this_leaf_topics);
}
res
}
/// Validate an incoming message queue against this view. If it is accepted
/// by our view of un-routed message queues, we will keep and re-propagate.
pub fn validate_queue_and_note_known(&mut self, messages: &super::GossipParachainMessages)
-> (GossipValidationResult<Hash>, i32)
{
let ostensible_topic = queue_topic(messages.queue_root);
match self.expected_queues.get_mut(&ostensible_topic) {
None => (GossipValidationResult::Discard, super::cost::UNNEEDED_ICMP_MESSAGES),
Some(&mut (_, ref mut known)) => {
if !messages.queue_root_is_correct() {
(
GossipValidationResult::Discard,
super::cost::icmp_messages_root_mismatch(messages.messages.len()),
)
} else {
*known = true;
(
GossipValidationResult::ProcessAndKeep(ostensible_topic),
super::benefit::NEW_ICMP_MESSAGES,
)
}
}
}
}
/// Whether a message with given topic is live.
pub fn is_topic_live(&self, topic: &Hash) -> bool {
self.expected_queues.get(topic).is_some()
}
/// Whether a message is allowed under the intersection of the given leaf-set
/// and our own.
pub fn allowed_intersecting(&self, other_leaves: &LeavesVec, topic: &Hash) -> bool {
for i in other_leaves {
for j in &self.leaves {
if i == j {
let leaf_topics = self.leaf_topics.get(i)
.expect("leaf_topics are mutated only in update_leaves; \
we have an entry for each item in self.leaves; \
i is in self.leaves; qed");
if leaf_topics.contains(topic) {
return true;
}
}
}
}
false
}
/// Get topics of all message queues a peer is interested in - this is useful
/// when a peer has informed us of their new best leaves.
pub fn intersection_topics(&self, other_leaves: &LeavesVec) -> impl Iterator<Item=Hash> {
let deduplicated = other_leaves.iter()
.filter_map(|l| self.leaf_topics.get(l))
.flat_map(|topics| topics.iter().cloned())
.collect::<HashSet<_>>();
deduplicated.into_iter()
}
/// Iterate over all live message queues for which the data is marked as not locally known,
/// calling a closure with `(topic, root)`. The closure will return whether the queue data is
/// unknown.
///
/// This is called when we should send un-routed message queues that we are
/// newly aware of to peers - as in when we update our leaves.
pub fn sweep_unknown_queues(&mut self, mut check_known: impl FnMut(&Hash, &Hash) -> bool) {
for (topic, &mut (ref queue_root, ref mut known)) in self.expected_queues.iter_mut() {
if !*known {
*known = check_known(topic, queue_root)
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tests::TestChainContext;
use crate::gossip::{Known, GossipParachainMessages};
use polkadot_primitives::parachain::Message as ParachainMessage;
fn hash(x: u8) -> Hash {
[x; 32].into()
}
fn message_queue(from: u8, to: u8) -> Option<[[u8; 2]; 1]> {
if from == to {
None
} else {
Some([[from, to]])
}
}
fn message_queue_root(from: u8, to: u8) -> Option<Hash> {
message_queue(from, to).map(
|q| polkadot_validation::message_queue_root(q.iter())
)
}
// check that our view has all of the roots of the message queues
// emitted in the heads identified in `our_heads`, and none of the others.
fn check_roots(view: &mut View, our_heads: &[u8], n_heads: u8) -> bool {
for i in 0..n_heads {
for j in 0..n_heads {
if let Some(messages) = message_queue(i, j) {
let queue_root = message_queue_root(i, j).unwrap();
let messages = GossipParachainMessages {
queue_root,
messages: messages.iter().map(|m| ParachainMessage(m.to_vec())).collect(),
};
let had_queue = match view.validate_queue_and_note_known(&messages).0 {
GossipValidationResult::ProcessAndKeep(topic) => topic == queue_topic(queue_root),
_ => false,
};
if our_heads.contains(&i) != had_queue {
return false
}
}
}
}
true
}
#[test]
fn update_leaves_none_in_common() {
let mut ctx = TestChainContext::default();
let n_heads = 5;
for i in 0..n_heads {
ctx.known_map.insert(hash(i as u8), Known::Leaf);
let messages_out: Vec<_> = (0..n_heads).filter_map(|j| message_queue_root(i, j)).collect();
if !messages_out.is_empty() {
ctx.ingress_roots.insert(hash(i as u8), messages_out);
}
}
// initialize the view with 2 leaves.
let mut view = View::default();
view.update_leaves(
&ctx,
[hash(0), hash(1)].iter().cloned(),
).unwrap();
// we should have all queue roots that were
// un-routed from the perspective of those 2
// leaves and no others.
assert!(check_roots(&mut view, &[0, 1], n_heads));
// after updating to a disjoint set,
// the property that we are aware of all un-routed
// from the perspective of our known leaves should
// remain the same.
view.update_leaves(
&ctx,
[hash(2), hash(3), hash(4)].iter().cloned(),
).unwrap();
assert!(check_roots(&mut view, &[2, 3, 4], n_heads));
}
#[test]
fn update_leaves_overlapping() {
let mut ctx = TestChainContext::default();
let n_heads = 5;
for i in 0..n_heads {
ctx.known_map.insert(hash(i as u8), Known::Leaf);
let messages_out: Vec<_> = (0..n_heads).filter_map(|j| message_queue_root(i, j)).collect();
if !messages_out.is_empty() {
ctx.ingress_roots.insert(hash(i as u8), messages_out);
}
}
let mut view = View::default();
view.update_leaves(
&ctx,
[hash(0), hash(1), hash(2)].iter().cloned(),
).unwrap();
assert!(check_roots(&mut view, &[0, 1, 2], n_heads));
view.update_leaves(
&ctx,
[hash(2), hash(3), hash(4)].iter().cloned(),
).unwrap();
// after updating to a leaf-set overlapping with the prior,
// the property that we are aware of all un-routed
// from the perspective of our known leaves should
// remain the same.
assert!(check_roots(&mut view, &[2, 3, 4], n_heads));
}
}
+165 -32
View File
@@ -16,8 +16,8 @@
//! Polkadot-specific network implementation. //! Polkadot-specific network implementation.
//! //!
//! This manages routing for parachain statements, parachain block and extrinsic data fetching, //! This manages routing for parachain statements, parachain block and outgoing message
//! communication between collators and validators, and more. //! data fetching, communication between collators and validators, and more.
mod collator_pool; mod collator_pool;
mod local_collations; mod local_collations;
@@ -26,23 +26,29 @@ pub mod validation;
pub mod gossip; pub mod gossip;
use codec::{Decode, Encode}; use codec::{Decode, Encode};
use futures::sync::oneshot; use futures::sync::{oneshot, mpsc};
use futures::prelude::*;
use polkadot_primitives::{Block, Hash, Header}; use polkadot_primitives::{Block, Hash, Header};
use polkadot_primitives::parachain::{ use polkadot_primitives::parachain::{
Id as ParaId, BlockData, CollatorId, CandidateReceipt, Collation, PoVBlock, Id as ParaId, BlockData, CollatorId, CandidateReceipt, Collation, PoVBlock,
StructuredUnroutedIngress, ValidatorId StructuredUnroutedIngress, ValidatorId, OutgoingMessages,
}; };
use substrate_network::{ use substrate_network::{
PeerId, RequestId, Context, StatusMessage as GenericFullStatus, PeerId, RequestId, Context, StatusMessage as GenericFullStatus,
specialization::{Event, NetworkSpecialization as Specialization}, specialization::{Event, NetworkSpecialization as Specialization},
}; };
use self::validation::{LiveValidationSessions, RecentValidatorIds, InsertedRecentKey}; use substrate_network::consensus_gossip::{
self, TopicNotification, MessageRecipient as GossipMessageRecipient, ConsensusMessage,
};
use self::validation::{LiveValidationLeaves, RecentValidatorIds, InsertedRecentKey};
use self::collator_pool::{CollatorPool, Role, Action}; use self::collator_pool::{CollatorPool, Role, Action};
use self::local_collations::LocalCollations; use self::local_collations::LocalCollations;
use log::{trace, debug, warn}; use log::{trace, debug, warn};
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use crate::gossip::{POLKADOT_ENGINE_ID, GossipMessage};
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
@@ -69,7 +75,112 @@ mod benefit {
type FullStatus = GenericFullStatus<Block>; type FullStatus = GenericFullStatus<Block>;
/// Specialization of the network service for the polkadot protocol. /// Specialization of the network service for the polkadot protocol.
pub type NetworkService = substrate_network::NetworkService<Block, PolkadotProtocol, Hash>; pub type PolkadotNetworkService = substrate_network::NetworkService<Block, PolkadotProtocol, Hash>;
/// Basic functionality that a network has to fulfill.
pub trait NetworkService: Send + Sync + 'static {
/// Get a stream of gossip messages for a given hash.
fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream;
/// Gossip a message on given topic.
fn gossip_message(&self, topic: Hash, message: GossipMessage);
/// Execute a closure with the gossip service.
fn with_gossip<F: Send + 'static>(&self, with: F)
where F: FnOnce(&mut dyn GossipService, &mut dyn Context<Block>);
/// Execute a closure with the polkadot protocol.
fn with_spec<F: Send + 'static>(&self, with: F)
where F: FnOnce(&mut PolkadotProtocol, &mut dyn Context<Block>);
}
impl NetworkService for PolkadotNetworkService {
fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream {
let (tx, rx) = std::sync::mpsc::channel();
PolkadotNetworkService::with_gossip(self, move |gossip, _| {
let inner_rx = gossip.messages_for(POLKADOT_ENGINE_ID, topic);
let _ = tx.send(inner_rx);
});
let topic_stream = match rx.recv() {
Ok(rx) => rx,
Err(_) => mpsc::unbounded().1, // return empty channel.
};
GossipMessageStream::new(topic_stream)
}
fn gossip_message(&self, topic: Hash, message: GossipMessage) {
self.gossip_consensus_message(
topic,
POLKADOT_ENGINE_ID,
message.encode(),
GossipMessageRecipient::BroadcastToAll,
);
}
fn with_gossip<F: Send + 'static>(&self, with: F)
where F: FnOnce(&mut dyn GossipService, &mut dyn Context<Block>)
{
PolkadotNetworkService::with_gossip(self, move |gossip, ctx| with(gossip, ctx))
}
fn with_spec<F: Send + 'static>(&self, with: F)
where F: FnOnce(&mut PolkadotProtocol, &mut dyn Context<Block>)
{
PolkadotNetworkService::with_spec(self, with)
}
}
/// A gossip network subservice.
pub trait GossipService {
fn send_message(&mut self, ctx: &mut dyn Context<Block>, who: &PeerId, message: ConsensusMessage);
fn multicast(&mut self, ctx: &mut dyn Context<Block>, topic: &Hash, message: ConsensusMessage);
}
impl GossipService for consensus_gossip::ConsensusGossip<Block> {
fn send_message(&mut self, ctx: &mut dyn Context<Block>, who: &PeerId, message: ConsensusMessage) {
consensus_gossip::ConsensusGossip::send_message(self, ctx, who, message)
}
fn multicast(&mut self, ctx: &mut dyn Context<Block>, topic: &Hash, message: ConsensusMessage) {
consensus_gossip::ConsensusGossip::multicast(self, ctx, *topic, message, false)
}
}
/// A stream of gossip messages and an optional sender for a topic.
pub struct GossipMessageStream {
topic_stream: mpsc::UnboundedReceiver<TopicNotification>,
}
impl GossipMessageStream {
/// Create a new instance with the given topic stream.
pub fn new(topic_stream: mpsc::UnboundedReceiver<TopicNotification>) -> Self {
Self {
topic_stream
}
}
}
impl Stream for GossipMessageStream {
type Item = (GossipMessage, Option<PeerId>);
type Error = ();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
let msg = match futures::try_ready!(self.topic_stream.poll()) {
Some(msg) => msg,
None => return Ok(Async::Ready(None)),
};
debug!(target: "validation", "Processing statement for live validation leaf-work");
if let Ok(gmsg) = GossipMessage::decode(&mut &msg.message[..]) {
return Ok(Async::Ready(Some((gmsg, msg.sender))))
}
}
}
}
/// Status of a Polkadot node. /// Status of a Polkadot node.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
@@ -79,7 +190,7 @@ pub struct Status {
struct PoVBlockRequest { struct PoVBlockRequest {
attempted_peers: HashSet<ValidatorId>, attempted_peers: HashSet<ValidatorId>,
validation_session_parent: Hash, validation_leaf: Hash,
candidate_hash: Hash, candidate_hash: Hash,
block_data_hash: Hash, block_data_hash: Hash,
sender: oneshot::Sender<PoVBlock>, sender: oneshot::Sender<PoVBlock>,
@@ -188,10 +299,10 @@ pub struct PolkadotProtocol {
collators: CollatorPool, collators: CollatorPool,
validators: HashMap<ValidatorId, PeerId>, validators: HashMap<ValidatorId, PeerId>,
local_collations: LocalCollations<Collation>, local_collations: LocalCollations<Collation>,
live_validation_sessions: LiveValidationSessions, live_validation_leaves: LiveValidationLeaves,
in_flight: HashMap<(RequestId, PeerId), PoVBlockRequest>, in_flight: HashMap<(RequestId, PeerId), PoVBlockRequest>,
pending: Vec<PoVBlockRequest>, pending: Vec<PoVBlockRequest>,
extrinsic_store: Option<::av_store::Store>, availability_store: Option<av_store::Store>,
next_req_id: u64, next_req_id: u64,
} }
@@ -204,10 +315,10 @@ impl PolkadotProtocol {
collating_for, collating_for,
validators: HashMap::new(), validators: HashMap::new(),
local_collations: LocalCollations::new(), local_collations: LocalCollations::new(),
live_validation_sessions: LiveValidationSessions::new(), live_validation_leaves: LiveValidationLeaves::new(),
in_flight: HashMap::new(), in_flight: HashMap::new(),
pending: Vec::new(), pending: Vec::new(),
extrinsic_store: None, availability_store: None,
next_req_id: 1, next_req_id: 1,
} }
} }
@@ -224,7 +335,7 @@ impl PolkadotProtocol {
self.pending.push(PoVBlockRequest { self.pending.push(PoVBlockRequest {
attempted_peers: Default::default(), attempted_peers: Default::default(),
validation_session_parent: relay_parent, validation_leaf: relay_parent,
candidate_hash: candidate.hash(), candidate_hash: candidate.hash(),
block_data_hash: candidate.block_data_hash, block_data_hash: candidate.block_data_hash,
sender: tx, sender: tx,
@@ -235,15 +346,15 @@ impl PolkadotProtocol {
rx rx
} }
/// Note new validation session. /// Note new leaf to do validation work at
fn new_validation_session( fn new_validation_leaf_work(
&mut self, &mut self,
ctx: &mut dyn Context<Block>, ctx: &mut dyn Context<Block>,
params: validation::SessionParams, params: validation::LeafWorkParams,
) -> validation::ValidationSession { ) -> validation::LiveValidationLeaf {
let (session, new_local) = self.live_validation_sessions let (work, new_local) = self.live_validation_leaves
.new_validation_session(params); .new_validation_leaf(params);
if let Some(new_local) = new_local { if let Some(new_local) = new_local {
for (id, peer_data) in self.peers.iter_mut() for (id, peer_data) in self.peers.iter_mut()
@@ -257,12 +368,12 @@ impl PolkadotProtocol {
} }
} }
session work
} }
// true indicates that it was removed actually. // true indicates that it was removed actually.
fn remove_validation_session(&mut self, parent_hash: Hash) -> bool { fn remove_validation_session(&mut self, parent_hash: Hash) -> bool {
self.live_validation_sessions.remove(parent_hash) self.live_validation_leaves.remove(parent_hash)
} }
fn dispatch_pending_requests(&mut self, ctx: &mut dyn Context<Block>) { fn dispatch_pending_requests(&mut self, ctx: &mut dyn Context<Block>) {
@@ -272,10 +383,10 @@ impl PolkadotProtocol {
let in_flight = &mut self.in_flight; let in_flight = &mut self.in_flight;
for mut pending in ::std::mem::replace(&mut self.pending, Vec::new()) { for mut pending in ::std::mem::replace(&mut self.pending, Vec::new()) {
let parent = pending.validation_session_parent; let parent = pending.validation_leaf;
let c_hash = pending.candidate_hash; let c_hash = pending.candidate_hash;
let still_pending = self.live_validation_sessions.with_pov_block(&parent, &c_hash, |x| match x { let still_pending = self.live_validation_leaves.with_pov_block(&parent, &c_hash, |x| match x {
Ok(data @ &_) => { Ok(data @ &_) => {
// answer locally. // answer locally.
let _ = pending.sender.send(data.clone()); let _ = pending.sender.send(data.clone());
@@ -305,7 +416,7 @@ impl PolkadotProtocol {
Some(pending) Some(pending)
} }
} }
Err(None) => None, // no such known validation session. prune out. Err(None) => None, // no such known validation leaf-work. prune out.
}); });
if let Some(pending) = still_pending { if let Some(pending) = still_pending {
@@ -321,7 +432,7 @@ impl PolkadotProtocol {
match msg { match msg {
Message::ValidatorId(key) => self.on_session_key(ctx, who, key), Message::ValidatorId(key) => self.on_session_key(ctx, who, key),
Message::RequestPovBlock(req_id, relay_parent, candidate_hash) => { Message::RequestPovBlock(req_id, relay_parent, candidate_hash) => {
let pov_block = self.live_validation_sessions.with_pov_block( let pov_block = self.live_validation_leaves.with_pov_block(
&relay_parent, &relay_parent,
&candidate_hash, &candidate_hash,
|res| res.ok().map(|b| b.clone()), |res| res.ok().map(|b| b.clone()),
@@ -330,13 +441,13 @@ impl PolkadotProtocol {
send_polkadot_message(ctx, who, Message::PovBlock(req_id, pov_block)); send_polkadot_message(ctx, who, Message::PovBlock(req_id, pov_block));
} }
Message::RequestBlockData(req_id, relay_parent, candidate_hash) => { Message::RequestBlockData(req_id, relay_parent, candidate_hash) => {
let block_data = self.live_validation_sessions let block_data = self.live_validation_leaves
.with_pov_block( .with_pov_block(
&relay_parent, &relay_parent,
&candidate_hash, &candidate_hash,
|res| res.ok().map(|b| b.block_data.clone()), |res| res.ok().map(|b| b.block_data.clone()),
) )
.or_else(|| self.extrinsic_store.as_ref() .or_else(|| self.availability_store.as_ref()
.and_then(|s| s.block_data(relay_parent, candidate_hash)) .and_then(|s| s.block_data(relay_parent, candidate_hash))
); );
@@ -507,7 +618,7 @@ impl Specialization<Block> for PolkadotProtocol {
// send session keys. // send session keys.
if peer_info.should_send_key() { if peer_info.should_send_key() {
for local_session_key in self.live_validation_sessions.recent_keys() { for local_session_key in self.live_validation_leaves.recent_keys() {
peer_info.collator_state.send_key(local_session_key.clone(), |msg| send_polkadot_message( peer_info.collator_state.send_key(local_session_key.clone(), |msg| send_polkadot_message(
ctx, ctx,
who.clone(), who.clone(),
@@ -549,7 +660,7 @@ impl Specialization<Block> for PolkadotProtocol {
let (sender, _) = oneshot::channel(); let (sender, _) = oneshot::channel();
pending.push(::std::mem::replace(val, PoVBlockRequest { pending.push(::std::mem::replace(val, PoVBlockRequest {
attempted_peers: Default::default(), attempted_peers: Default::default(),
validation_session_parent: Default::default(), validation_leaf: Default::default(),
candidate_hash: Default::default(), candidate_hash: Default::default(),
block_data_hash: Default::default(), block_data_hash: Default::default(),
canon_roots: StructuredUnroutedIngress(Vec::new()), canon_roots: StructuredUnroutedIngress(Vec::new()),
@@ -676,16 +787,35 @@ impl PolkadotProtocol {
impl PolkadotProtocol { impl PolkadotProtocol {
/// Add a local collation and broadcast it to the necessary peers. /// Add a local collation and broadcast it to the necessary peers.
///
/// This should be called by a collator intending to get the locally-collated
/// block into the hands of validators.
/// It also places the outgoing message and block data in the local availability store.
pub fn add_local_collation( pub fn add_local_collation(
&mut self, &mut self,
ctx: &mut dyn Context<Block>, ctx: &mut dyn Context<Block>,
relay_parent: Hash, relay_parent: Hash,
targets: HashSet<ValidatorId>, targets: HashSet<ValidatorId>,
collation: Collation, collation: Collation,
) { outgoing_targeted: OutgoingMessages,
) -> std::io::Result<()> {
debug!(target: "p_net", "Importing local collation on relay parent {:?} and parachain {:?}", debug!(target: "p_net", "Importing local collation on relay parent {:?} and parachain {:?}",
relay_parent, collation.receipt.parachain_index); relay_parent, collation.receipt.parachain_index);
let outgoing_queues = polkadot_validation::outgoing_queues(&outgoing_targeted)
.map(|(_target, root, data)| (root, data))
.collect();
if let Some(ref availability_store) = self.availability_store {
availability_store.make_available(av_store::Data {
relay_parent,
parachain_id: collation.receipt.parachain_index,
candidate_hash: collation.receipt.hash(),
block_data: collation.pov.block_data.clone(),
outgoing_queues: Some(outgoing_queues),
})?;
}
for (primary, cloned_collation) in self.local_collations.add_collation(relay_parent, targets, collation.clone()) { for (primary, cloned_collation) in self.local_collations.add_collation(relay_parent, targets, collation.clone()) {
match self.validators.get(&primary) { match self.validators.get(&primary) {
Some(who) => { Some(who) => {
@@ -700,10 +830,13 @@ impl PolkadotProtocol {
warn!(target: "polkadot_network", "Encountered tracked but disconnected validator {:?}", primary), warn!(target: "polkadot_network", "Encountered tracked but disconnected validator {:?}", primary),
} }
} }
Ok(())
} }
/// register availability store. /// Give the network protocol a handle to an availability store, used for
pub fn register_availability_store(&mut self, extrinsic_store: ::av_store::Store) { /// circulation of parachain data required for validation.
self.extrinsic_store = Some(extrinsic_store); pub fn register_availability_store(&mut self, availability_store: ::av_store::Store) {
self.availability_store = Some(availability_store);
} }
} }
+9 -8
View File
@@ -29,7 +29,7 @@ use polkadot_validation::{
}; };
use polkadot_primitives::{Block, Hash}; use polkadot_primitives::{Block, Hash};
use polkadot_primitives::parachain::{ use polkadot_primitives::parachain::{
Extrinsic, CandidateReceipt, ParachainHost, ValidatorIndex, Collation, PoVBlock, OutgoingMessages, CandidateReceipt, ParachainHost, ValidatorIndex, Collation, PoVBlock,
}; };
use crate::gossip::{RegisteredMessageValidator, GossipMessage, GossipStatement}; use crate::gossip::{RegisteredMessageValidator, GossipMessage, GossipStatement};
@@ -41,7 +41,8 @@ use std::collections::{HashMap, HashSet};
use std::io; use std::io;
use std::sync::Arc; use std::sync::Arc;
use crate::validation::{self, SessionDataFetcher, NetworkService, Executor}; use crate::validation::{self, LeafWorkDataFetcher, Executor};
use crate::NetworkService;
/// Compute the gossip topic for attestations on the given parent hash. /// Compute the gossip topic for attestations on the given parent hash.
pub(crate) fn attestation_topic(parent_hash: Hash) -> Hash { pub(crate) fn attestation_topic(parent_hash: Hash) -> Hash {
@@ -72,7 +73,7 @@ pub(crate) fn checked_statements<N: NetworkService>(network: &N, topic: Hash) ->
pub struct Router<P, E, N: NetworkService, T> { pub struct Router<P, E, N: NetworkService, T> {
table: Arc<SharedTable>, table: Arc<SharedTable>,
attestation_topic: Hash, attestation_topic: Hash,
fetcher: SessionDataFetcher<P, E, N, T>, fetcher: LeafWorkDataFetcher<P, E, N, T>,
deferred_statements: Arc<Mutex<DeferredStatements>>, deferred_statements: Arc<Mutex<DeferredStatements>>,
message_validator: RegisteredMessageValidator, message_validator: RegisteredMessageValidator,
} }
@@ -80,7 +81,7 @@ pub struct Router<P, E, N: NetworkService, T> {
impl<P, E, N: NetworkService, T> Router<P, E, N, T> { impl<P, E, N: NetworkService, T> Router<P, E, N, T> {
pub(crate) fn new( pub(crate) fn new(
table: Arc<SharedTable>, table: Arc<SharedTable>,
fetcher: SessionDataFetcher<P, E, N, T>, fetcher: LeafWorkDataFetcher<P, E, N, T>,
message_validator: RegisteredMessageValidator, message_validator: RegisteredMessageValidator,
) -> Self { ) -> Self {
let parent_hash = fetcher.parent_hash(); let parent_hash = fetcher.parent_hash();
@@ -196,7 +197,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w
knowledge.lock().note_candidate( knowledge.lock().note_candidate(
candidate_hash, candidate_hash,
Some(validated.pov_block().clone()), Some(validated.pov_block().clone()),
validated.extrinsic().cloned(), validated.outgoing_messages().cloned(),
); );
// propagate the statement. // propagate the statement.
@@ -224,13 +225,13 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> TableRouter for Router<P, E, N, T> wh
type Error = io::Error; type Error = io::Error;
type FetchValidationProof = validation::PoVReceiver; type FetchValidationProof = validation::PoVReceiver;
fn local_collation(&self, collation: Collation, extrinsic: Extrinsic) { fn local_collation(&self, collation: Collation, outgoing: OutgoingMessages) {
// produce a signed statement // produce a signed statement
let hash = collation.receipt.hash(); let hash = collation.receipt.hash();
let validated = Validated::collated_local( let validated = Validated::collated_local(
collation.receipt, collation.receipt,
collation.pov.clone(), collation.pov.clone(),
extrinsic.clone(), outgoing.clone(),
); );
let statement = GossipStatement::new( let statement = GossipStatement::new(
@@ -242,7 +243,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> TableRouter for Router<P, E, N, T> wh
); );
// give to network to make available. // give to network to make available.
self.fetcher.knowledge().lock().note_candidate(hash, Some(collation.pov), Some(extrinsic)); self.fetcher.knowledge().lock().note_candidate(hash, Some(collation.pov), Some(outgoing));
self.network().gossip_message(self.attestation_topic, statement.into()); self.network().gossip_message(self.attestation_topic, statement.into());
} }
+35 -13
View File
@@ -18,7 +18,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use super::{PolkadotProtocol, Status, Message, FullStatus}; use super::{PolkadotProtocol, Status, Message, FullStatus};
use crate::validation::SessionParams; use crate::validation::LeafWorkParams;
use polkadot_validation::GenericStatement; use polkadot_validation::GenericStatement;
use polkadot_primitives::{Block, Hash}; use polkadot_primitives::{Block, Hash};
@@ -77,6 +77,28 @@ impl TestContext {
} }
} }
#[derive(Default)]
pub struct TestChainContext {
pub known_map: HashMap<Hash, crate::gossip::Known>,
pub ingress_roots: HashMap<Hash, Vec<Hash>>,
}
impl crate::gossip::ChainContext for TestChainContext {
fn is_known(&self, block_hash: &Hash) -> Option<crate::gossip::Known> {
self.known_map.get(block_hash).map(|x| x.clone())
}
fn leaf_unrouted_roots(&self, leaf: &Hash, with_queue_root: &mut dyn FnMut(&Hash))
-> Result<(), substrate_client::error::Error>
{
for root in self.ingress_roots.get(leaf).into_iter().flat_map(|roots| roots) {
with_queue_root(root)
}
Ok(())
}
}
fn make_pov(block_data: Vec<u8>) -> PoVBlock { fn make_pov(block_data: Vec<u8>) -> PoVBlock {
PoVBlock { PoVBlock {
block_data: BlockData(block_data), block_data: BlockData(block_data),
@@ -96,8 +118,8 @@ fn make_status(status: &Status, roles: Roles) -> FullStatus {
} }
} }
fn make_validation_session(parent_hash: Hash, local_key: ValidatorId) -> SessionParams { fn make_validation_leaf_work(parent_hash: Hash, local_key: ValidatorId) -> LeafWorkParams {
SessionParams { LeafWorkParams {
local_session_key: Some(local_key), local_session_key: Some(local_key),
parent_hash, parent_hash,
authorities: Vec::new(), authorities: Vec::new(),
@@ -129,8 +151,8 @@ fn sends_session_key() {
{ {
let mut ctx = TestContext::default(); let mut ctx = TestContext::default();
let params = make_validation_session(parent_hash, local_key.clone()); let params = make_validation_leaf_work(parent_hash, local_key.clone());
protocol.new_validation_session(&mut ctx, params); protocol.new_validation_leaf_work(&mut ctx, params);
assert!(ctx.has_message(peer_a, Message::ValidatorId(local_key.clone()))); assert!(ctx.has_message(peer_a, Message::ValidatorId(local_key.clone())));
} }
@@ -169,8 +191,8 @@ fn fetches_from_those_with_knowledge() {
let status = Status { collating_for: None }; let status = Status { collating_for: None };
let params = make_validation_session(parent_hash, local_key.clone()); let params = make_validation_leaf_work(parent_hash, local_key.clone());
let session = protocol.new_validation_session(&mut TestContext::default(), params); let session = protocol.new_validation_leaf_work(&mut TestContext::default(), params);
let knowledge = session.knowledge(); let knowledge = session.knowledge();
knowledge.lock().note_statement(a_key.clone(), &GenericStatement::Valid(candidate_hash)); knowledge.lock().note_statement(a_key.clone(), &GenericStatement::Valid(candidate_hash));
@@ -259,7 +281,7 @@ fn fetches_available_block_data() {
parachain_id: para_id, parachain_id: para_id,
candidate_hash, candidate_hash,
block_data: block_data.clone(), block_data: block_data.clone(),
extrinsic: None, outgoing_queues: None,
}).unwrap(); }).unwrap();
// connect peer A // connect peer A
@@ -323,13 +345,13 @@ fn many_session_keys() {
let local_key_a: ValidatorId = [3; 32].unchecked_into(); let local_key_a: ValidatorId = [3; 32].unchecked_into();
let local_key_b: ValidatorId = [4; 32].unchecked_into(); let local_key_b: ValidatorId = [4; 32].unchecked_into();
let params_a = make_validation_session(parent_a, local_key_a.clone()); let params_a = make_validation_leaf_work(parent_a, local_key_a.clone());
let params_b = make_validation_session(parent_b, local_key_b.clone()); let params_b = make_validation_leaf_work(parent_b, local_key_b.clone());
protocol.new_validation_session(&mut TestContext::default(), params_a); protocol.new_validation_leaf_work(&mut TestContext::default(), params_a);
protocol.new_validation_session(&mut TestContext::default(), params_b); protocol.new_validation_leaf_work(&mut TestContext::default(), params_b);
assert_eq!(protocol.live_validation_sessions.recent_keys(), &[local_key_a.clone(), local_key_b.clone()]); assert_eq!(protocol.live_validation_leaves.recent_keys(), &[local_key_a.clone(), local_key_b.clone()]);
let peer_a = PeerId::random(); let peer_a = PeerId::random();
+9 -10
View File
@@ -18,18 +18,17 @@
#![allow(unused)] #![allow(unused)]
use crate::validation::{NetworkService, GossipService, GossipMessageStream};
use crate::gossip::GossipMessage; use crate::gossip::GossipMessage;
use substrate_network::Context as NetContext; use substrate_network::Context as NetContext;
use substrate_network::consensus_gossip::TopicNotification; use substrate_network::consensus_gossip::TopicNotification;
use substrate_primitives::{NativeOrEncoded, ExecutionContext}; use substrate_primitives::{NativeOrEncoded, ExecutionContext};
use substrate_keyring::Sr25519Keyring; use substrate_keyring::Sr25519Keyring;
use crate::PolkadotProtocol; use crate::{GossipService, PolkadotProtocol, NetworkService, GossipMessageStream};
use polkadot_validation::{SharedTable, MessagesFrom, Network}; use polkadot_validation::{SharedTable, Network};
use polkadot_primitives::{Block, Hash, Header, BlockId}; use polkadot_primitives::{Block, BlockNumber, Hash, Header, BlockId};
use polkadot_primitives::parachain::{ use polkadot_primitives::parachain::{
Id as ParaId, Chain, DutyRoster, ParachainHost, OutgoingMessage, Id as ParaId, Chain, DutyRoster, ParachainHost, TargetedMessage,
ValidatorId, StructuredUnroutedIngress, BlockIngressRoots, Status, ValidatorId, StructuredUnroutedIngress, BlockIngressRoots, Status,
FeeSchedule, HeadData, FeeSchedule, HeadData,
}; };
@@ -43,7 +42,7 @@ use std::sync::Arc;
use futures::{prelude::*, sync::mpsc}; use futures::{prelude::*, sync::mpsc};
use codec::Encode; use codec::Encode;
use super::TestContext; use super::{TestContext, TestChainContext};
type TaskExecutor = Arc<dyn futures::future::Executor<Box<dyn Future<Item = (), Error = ()> + Send>> + Send + Sync>; type TaskExecutor = Arc<dyn futures::future::Executor<Box<dyn Future<Item = (), Error = ()> + Send>> + Send + Sync>;
@@ -315,10 +314,10 @@ impl ParachainHost<Block> for RuntimeApi {
&self, &self,
_at: &BlockId, _at: &BlockId,
_: ExecutionContext, _: ExecutionContext,
id: Option<ParaId>, id: Option<(ParaId, Option<BlockNumber>)>,
_: Vec<u8>, _: Vec<u8>,
) -> ClientResult<NativeOrEncoded<Option<StructuredUnroutedIngress>>> { ) -> ClientResult<NativeOrEncoded<Option<StructuredUnroutedIngress>>> {
let id = id.unwrap(); let (id, _) = id.unwrap();
Ok(NativeOrEncoded::Native(self.data.lock().ingress.get(&id).cloned())) Ok(NativeOrEncoded::Native(self.data.lock().ingress.get(&id).cloned()))
} }
} }
@@ -348,7 +347,7 @@ fn build_network(n: usize, executor: TaskExecutor) -> Built {
}); });
let message_val = crate::gossip::RegisteredMessageValidator::new_test( let message_val = crate::gossip::RegisteredMessageValidator::new_test(
|_hash: &_| Some(crate::gossip::Known::Leaf), TestChainContext::default(),
Box::new(|_, _| {}), Box::new(|_, _| {}),
); );
@@ -376,7 +375,7 @@ struct IngressBuilder {
} }
impl IngressBuilder { impl IngressBuilder {
fn add_messages(&mut self, source: ParaId, messages: &[OutgoingMessage]) { fn add_messages(&mut self, source: ParaId, messages: &[TargetedMessage]) {
for message in messages { for message in messages {
let target = message.target; let target = message.target;
self.egress.entry((source, target)).or_insert_with(Vec::new).push(message.data.clone()); self.egress.entry((source, target)).or_insert_with(Vec::new).push(message.data.clone());
+93 -196
View File
@@ -14,29 +14,24 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. // along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! The "validation session" networking code built on top of the base network service. //! The "validation leaf work" networking code built on top of the base network service.
//! //!
//! This fulfills the `polkadot_validation::Network` trait, providing a hook to be called //! This fulfills the `polkadot_validation::Network` trait, providing a hook to be called
//! each time a validation session begins on a new chain head. //! each time validation leaf work begins on a new chain head.
use crate::gossip::GossipMessage;
use sr_primitives::traits::ProvideRuntimeApi; use sr_primitives::traits::ProvideRuntimeApi;
use substrate_network::{PeerId, Context as NetContext}; use substrate_network::PeerId;
use substrate_network::consensus_gossip::{
self, TopicNotification, MessageRecipient as GossipMessageRecipient, ConsensusMessage,
};
use polkadot_validation::{ use polkadot_validation::{
Network as ParachainNetwork, SharedTable, Collators, Statement, GenericStatement, SignedStatement, Network as ParachainNetwork, SharedTable, Collators, Statement, GenericStatement, SignedStatement,
}; };
use polkadot_primitives::{Block, BlockId, Hash}; use polkadot_primitives::{Block, BlockId, Hash};
use polkadot_primitives::parachain::{ use polkadot_primitives::parachain::{
Id as ParaId, Collation, Extrinsic, ParachainHost, CandidateReceipt, CollatorId, Id as ParaId, Collation, OutgoingMessages, ParachainHost, CandidateReceipt, CollatorId,
ValidatorId, PoVBlock, ValidatorIndex ValidatorId, PoVBlock
}; };
use futures::prelude::*; use futures::prelude::*;
use futures::future::{self, Executor as FutureExecutor}; use futures::future::{self, Executor as FutureExecutor};
use futures::sync::mpsc;
use futures::sync::oneshot::{self, Receiver}; use futures::sync::oneshot::{self, Receiver};
use std::collections::hash_map::{HashMap, Entry}; use std::collections::hash_map::{HashMap, Entry};
@@ -45,17 +40,15 @@ use std::sync::Arc;
use arrayvec::ArrayVec; use arrayvec::ArrayVec;
use parking_lot::Mutex; use parking_lot::Mutex;
use log::{debug, warn}; use log::warn;
use crate::router::Router; use crate::router::Router;
use crate::gossip::{POLKADOT_ENGINE_ID, RegisteredMessageValidator, MessageValidationData}; use crate::gossip::{RegisteredMessageValidator, MessageValidationData};
use super::PolkadotProtocol; use super::NetworkService;
pub use polkadot_validation::Incoming; pub use polkadot_validation::Incoming;
use codec::{Encode, Decode};
/// An executor suitable for dispatching async consensus tasks. /// An executor suitable for dispatching async consensus tasks.
pub trait Executor { pub trait Executor {
fn spawn<F: Future<Item=(),Error=()> + Send + 'static>(&self, f: F); fn spawn<F: Future<Item=(),Error=()> + Send + 'static>(&self, f: F);
@@ -83,108 +76,8 @@ impl Executor for Arc<
} }
} }
/// A gossip network subservice. /// Params to instantiate validation work on a block-DAG leaf.
pub trait GossipService { pub struct LeafWorkParams {
fn send_message(&mut self, ctx: &mut dyn NetContext<Block>, who: &PeerId, message: ConsensusMessage);
}
impl GossipService for consensus_gossip::ConsensusGossip<Block> {
fn send_message(&mut self, ctx: &mut dyn NetContext<Block>, who: &PeerId, message: ConsensusMessage) {
consensus_gossip::ConsensusGossip::send_message(self, ctx, who, message)
}
}
/// A stream of gossip messages and an optional sender for a topic.
pub struct GossipMessageStream {
topic_stream: mpsc::UnboundedReceiver<TopicNotification>,
}
impl GossipMessageStream {
/// Create a new instance with the given topic stream.
pub fn new(topic_stream: mpsc::UnboundedReceiver<TopicNotification>) -> Self {
Self {
topic_stream
}
}
}
impl Stream for GossipMessageStream {
type Item = (GossipMessage, Option<PeerId>);
type Error = ();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
let msg = match futures::try_ready!(self.topic_stream.poll()) {
Some(msg) => msg,
None => return Ok(Async::Ready(None)),
};
debug!(target: "validation", "Processing statement for live validation session");
if let Ok(gmsg) = GossipMessage::decode(&mut &msg.message[..]) {
return Ok(Async::Ready(Some((gmsg, msg.sender))))
}
}
}
}
/// Basic functionality that a network has to fulfill.
pub trait NetworkService: Send + Sync + 'static {
/// Get a stream of gossip messages for a given hash.
fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream;
/// Gossip a message on given topic.
fn gossip_message(&self, topic: Hash, message: GossipMessage);
/// Execute a closure with the gossip service.
fn with_gossip<F: Send + 'static>(&self, with: F)
where F: FnOnce(&mut dyn GossipService, &mut dyn NetContext<Block>);
/// Execute a closure with the polkadot protocol.
fn with_spec<F: Send + 'static>(&self, with: F)
where F: FnOnce(&mut PolkadotProtocol, &mut dyn NetContext<Block>);
}
impl NetworkService for super::NetworkService {
fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream {
let (tx, rx) = std::sync::mpsc::channel();
super::NetworkService::with_gossip(self, move |gossip, _| {
let inner_rx = gossip.messages_for(POLKADOT_ENGINE_ID, topic);
let _ = tx.send(inner_rx);
});
let topic_stream = match rx.recv() {
Ok(rx) => rx,
Err(_) => mpsc::unbounded().1, // return empty channel.
};
GossipMessageStream::new(topic_stream)
}
fn gossip_message(&self, topic: Hash, message: GossipMessage) {
self.gossip_consensus_message(
topic,
POLKADOT_ENGINE_ID,
message.encode(),
GossipMessageRecipient::BroadcastToAll,
);
}
fn with_gossip<F: Send + 'static>(&self, with: F)
where F: FnOnce(&mut dyn GossipService, &mut dyn NetContext<Block>)
{
super::NetworkService::with_gossip(self, move |gossip, ctx| with(gossip, ctx))
}
fn with_spec<F: Send + 'static>(&self, with: F)
where F: FnOnce(&mut PolkadotProtocol, &mut dyn NetContext<Block>)
{
super::NetworkService::with_spec(self, with)
}
}
/// Params to a current validation session.
pub struct SessionParams {
/// The local session key. /// The local session key.
pub local_session_key: Option<ValidatorId>, pub local_session_key: Option<ValidatorId>,
/// The parent hash. /// The parent hash.
@@ -234,20 +127,22 @@ impl<P, E, N, T> ValidationNetwork<P, E, N, T> where
N: NetworkService, N: NetworkService,
T: Clone + Executor + Send + Sync + 'static, T: Clone + Executor + Send + Sync + 'static,
{ {
/// Instantiate session data fetcher at a parent hash. /// Instantiate block-DAG leaf work
/// (i.e. the work we want to be done by validators at some chain-head)
/// at a parent hash.
/// ///
/// If the used session key is new, it will be broadcast to peers. /// If the used session key is new, it will be broadcast to peers.
/// If a validation session was already instantiated at this parent hash, /// If any validation leaf-work was already instantiated at this parent hash,
/// the underlying instance will be shared. /// the underlying instance will be shared.
/// ///
/// If there was already a validation session instantiated and a different /// If there was already validation leaf-work instantiated and a different
/// session key was set, then the new key will be ignored. /// session key was set, then the new key will be ignored.
/// ///
/// This implies that there can be multiple services intantiating validation /// This implies that there can be multiple services intantiating validation
/// session instances safely, but they should all be coordinated on which session keys /// leaf-work instances safely, but they should all be coordinated on which session keys
/// are being used. /// are being used.
pub fn instantiate_session(&self, params: SessionParams) pub fn instantiate_leaf_work(&self, params: LeafWorkParams)
-> oneshot::Receiver<SessionDataFetcher<P, E, N, T>> -> oneshot::Receiver<LeafWorkDataFetcher<P, E, N, T>>
{ {
let parent_hash = params.parent_hash; let parent_hash = params.parent_hash;
let network = self.network.clone(); let network = self.network.clone();
@@ -255,34 +150,27 @@ impl<P, E, N, T> ValidationNetwork<P, E, N, T> where
let task_executor = self.executor.clone(); let task_executor = self.executor.clone();
let exit = self.exit.clone(); let exit = self.exit.clone();
let message_validator = self.message_validator.clone(); let message_validator = self.message_validator.clone();
let index_mapping = params.authorities let authorities = params.authorities.clone();
.iter()
.enumerate()
.map(|(i, k)| (i as ValidatorIndex, k.clone()))
.collect();
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
{
let message_validator = self.message_validator.clone();
let authorities = params.authorities.clone();
self.network.with_gossip(move |gossip, ctx| {
message_validator.note_session(
parent_hash,
MessageValidationData { authorities, index_mapping },
|peer_id, message| gossip.send_message(ctx, peer_id, message),
);
});
}
self.network.with_spec(move |spec, ctx| { self.network.with_spec(move |spec, ctx| {
let session = spec.new_validation_session(ctx, params); let actions = message_validator.new_local_leaf(
let _ = tx.send(SessionDataFetcher { parent_hash,
MessageValidationData { authorities },
|queue_root| spec.availability_store.as_ref()
.and_then(|store| store.queue_by_root(queue_root))
);
network.with_gossip(move |gossip, ctx| actions.perform(gossip, ctx));
let work = spec.new_validation_leaf_work(ctx, params);
let _ = tx.send(LeafWorkDataFetcher {
network, network,
api, api,
task_executor, task_executor,
parent_hash, parent_hash,
knowledge: session.knowledge().clone(), knowledge: work.knowledge().clone(),
exit, exit,
message_validator, message_validator,
}); });
@@ -335,7 +223,7 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
let parent_hash = *table.consensus_parent_hash(); let parent_hash = *table.consensus_parent_hash();
let local_session_key = table.session_key(); let local_session_key = table.session_key();
let build_fetcher = self.instantiate_session(SessionParams { let build_fetcher = self.instantiate_leaf_work(LeafWorkParams {
local_session_key, local_session_key,
parent_hash, parent_hash,
authorities: authorities.to_vec(), authorities: authorities.to_vec(),
@@ -421,9 +309,9 @@ impl<P, E: Clone, N, T: Clone> Collators for ValidationNetwork<P, E, N, T> where
#[derive(Default)] #[derive(Default)]
struct KnowledgeEntry { struct KnowledgeEntry {
knows_block_data: Vec<ValidatorId>, knows_block_data: Vec<ValidatorId>,
knows_extrinsic: Vec<ValidatorId>, knows_outgoing: Vec<ValidatorId>,
pov: Option<PoVBlock>, pov: Option<PoVBlock>,
extrinsic: Option<Extrinsic>, outgoing_messages: Option<OutgoingMessages>,
} }
/// Tracks knowledge of peers. /// Tracks knowledge of peers.
@@ -442,18 +330,18 @@ impl Knowledge {
/// Note a statement seen from another validator. /// Note a statement seen from another validator.
pub(crate) fn note_statement(&mut self, from: ValidatorId, statement: &Statement) { pub(crate) fn note_statement(&mut self, from: ValidatorId, statement: &Statement) {
// those proposing the candidate or declaring it valid know everything. // those proposing the candidate or declaring it valid know everything.
// those claiming it invalid do not have the extrinsic data as it is // those claiming it invalid do not have the outgoing messages data as it is
// generated by valid execution. // generated by valid execution.
match *statement { match *statement {
GenericStatement::Candidate(ref c) => { GenericStatement::Candidate(ref c) => {
let entry = self.candidates.entry(c.hash()).or_insert_with(Default::default); let entry = self.candidates.entry(c.hash()).or_insert_with(Default::default);
entry.knows_block_data.push(from.clone()); entry.knows_block_data.push(from.clone());
entry.knows_extrinsic.push(from); entry.knows_outgoing.push(from);
} }
GenericStatement::Valid(ref hash) => { GenericStatement::Valid(ref hash) => {
let entry = self.candidates.entry(*hash).or_insert_with(Default::default); let entry = self.candidates.entry(*hash).or_insert_with(Default::default);
entry.knows_block_data.push(from.clone()); entry.knows_block_data.push(from.clone());
entry.knows_extrinsic.push(from); entry.knows_outgoing.push(from);
} }
GenericStatement::Invalid(ref hash) => self.candidates.entry(*hash) GenericStatement::Invalid(ref hash) => self.candidates.entry(*hash)
.or_insert_with(Default::default) .or_insert_with(Default::default)
@@ -463,10 +351,15 @@ impl Knowledge {
} }
/// Note a candidate collated or seen locally. /// Note a candidate collated or seen locally.
pub(crate) fn note_candidate(&mut self, hash: Hash, pov: Option<PoVBlock>, extrinsic: Option<Extrinsic>) { pub(crate) fn note_candidate(
&mut self,
hash: Hash,
pov: Option<PoVBlock>,
outgoing_messages: Option<OutgoingMessages>,
) {
let entry = self.candidates.entry(hash).or_insert_with(Default::default); let entry = self.candidates.entry(hash).or_insert_with(Default::default);
entry.pov = entry.pov.take().or(pov); entry.pov = entry.pov.take().or(pov);
entry.extrinsic = entry.extrinsic.take().or(extrinsic); entry.outgoing_messages = entry.outgoing_messages.take().or(outgoing_messages);
} }
} }
@@ -492,19 +385,19 @@ impl Future for IncomingReceiver {
} }
} }
/// A current validation session instance. /// A current validation leaf-work instance
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct ValidationSession { pub(crate) struct LiveValidationLeaf {
parent_hash: Hash, parent_hash: Hash,
knowledge: Arc<Mutex<Knowledge>>, knowledge: Arc<Mutex<Knowledge>>,
local_session_key: Option<ValidatorId>, local_session_key: Option<ValidatorId>,
} }
impl ValidationSession { impl LiveValidationLeaf {
/// Create a new validation session instance. Needs to be attached to the /// Create a new validation leaf-work instance. Needs to be attached to the
/// network. /// network.
pub(crate) fn new(params: SessionParams) -> Self { pub(crate) fn new(params: LeafWorkParams) -> Self {
ValidationSession { LiveValidationLeaf {
parent_hash: params.parent_hash, parent_hash: params.parent_hash,
knowledge: Arc::new(Mutex::new(Knowledge::new())), knowledge: Arc::new(Mutex::new(Knowledge::new())),
local_session_key: params.local_session_key, local_session_key: params.local_session_key,
@@ -577,32 +470,32 @@ impl RecentValidatorIds {
} }
} }
/// Manages requests and keys for live validation session instances. /// Manages requests and keys for live validation leaf-work instances.
pub(crate) struct LiveValidationSessions { pub(crate) struct LiveValidationLeaves {
// recent local session keys. // recent local session keys.
recent: RecentValidatorIds, recent: RecentValidatorIds,
// live validation session instances, on `parent_hash`. refcount retained alongside. // live validation leaf-work instances, on `parent_hash`. refcount retained alongside.
live_instances: HashMap<Hash, (usize, ValidationSession)>, live_instances: HashMap<Hash, (usize, LiveValidationLeaf)>,
} }
impl LiveValidationSessions { impl LiveValidationLeaves {
/// Create a new `LiveValidationSessions` /// Create a new `LiveValidationLeaves`
pub(crate) fn new() -> Self { pub(crate) fn new() -> Self {
LiveValidationSessions { LiveValidationLeaves {
recent: Default::default(), recent: Default::default(),
live_instances: HashMap::new(), live_instances: HashMap::new(),
} }
} }
/// Note new validation session. If the used session key is new, /// Note new leaf for validation work. If the used session key is new,
/// it returns it to be broadcasted to peers. /// it returns it to be broadcasted to peers.
/// ///
/// If there was already a validation session instantiated and a different /// If there was already work instantiated at this leaf and a different
/// session key was set, then the new key will be ignored. /// session key was set, then the new key will be ignored.
pub(crate) fn new_validation_session( pub(crate) fn new_validation_leaf(
&mut self, &mut self,
params: SessionParams, params: LeafWorkParams,
) -> (ValidationSession, Option<ValidatorId>) { ) -> (LiveValidationLeaf, Option<ValidatorId>) {
let parent_hash = params.parent_hash; let parent_hash = params.parent_hash;
let key = params.local_session_key.clone(); let key = params.local_session_key.clone();
@@ -629,19 +522,19 @@ impl LiveValidationSessions {
return (prev.clone(), maybe_new) return (prev.clone(), maybe_new)
} }
let session = ValidationSession::new(params); let leaf_work = LiveValidationLeaf::new(params);
self.live_instances.insert(parent_hash, (1, session.clone())); self.live_instances.insert(parent_hash, (1, leaf_work.clone()));
(session, check_new_key()) (leaf_work, check_new_key())
} }
/// Remove validation session. true indicates that it was actually removed. /// Remove validation leaf-work. true indicates that it was actually removed.
pub(crate) fn remove(&mut self, parent_hash: Hash) -> bool { pub(crate) fn remove(&mut self, parent_hash: Hash) -> bool {
let maybe_removed = if let Entry::Occupied(mut entry) = self.live_instances.entry(parent_hash) { let maybe_removed = if let Entry::Occupied(mut entry) = self.live_instances.entry(parent_hash) {
entry.get_mut().0 -= 1; entry.get_mut().0 -= 1;
if entry.get().0 == 0 { if entry.get().0 == 0 {
let (_, session) = entry.remove(); let (_, leaf_work) = entry.remove();
Some(session) Some(leaf_work)
} else { } else {
None None
} }
@@ -649,12 +542,12 @@ impl LiveValidationSessions {
None None
}; };
let session = match maybe_removed { let leaf_work = match maybe_removed {
None => return false, None => return false,
Some(s) => s, Some(s) => s,
}; };
if let Some(ref key) = session.local_session_key { if let Some(ref key) = leaf_work.local_session_key {
let key_still_used = self.live_instances.values() let key_still_used = self.live_instances.values()
.any(|c| c.1.local_session_key.as_ref() == Some(key)); .any(|c| c.1.local_session_key.as_ref() == Some(key));
@@ -671,12 +564,12 @@ impl LiveValidationSessions {
self.recent.as_slice() self.recent.as_slice()
} }
/// Call a closure with pov-data from validation session at parent hash for a given /// Call a closure with pov-data from validation leaf-work at parent hash for a given
/// candidate-receipt hash. /// candidate-receipt hash.
/// ///
/// This calls the closure with `Some(data)` where the session and data are live, /// This calls the closure with `Some(data)` where the leaf-work and data are live,
/// `Err(Some(keys))` when the session is live but the data unknown, with a list of keys /// `Err(Some(keys))` when the leaf-work is live but the data unknown, with a list of keys
/// who have the data, and `Err(None)` where the session is unknown. /// who have the data, and `Err(None)` where the leaf-work is unknown.
pub(crate) fn with_pov_block<F, U>(&self, parent_hash: &Hash, c_hash: &Hash, f: F) -> U pub(crate) fn with_pov_block<F, U>(&self, parent_hash: &Hash, c_hash: &Hash, f: F) -> U
where F: FnOnce(Result<&PoVBlock, Option<&[ValidatorId]>>) -> U where F: FnOnce(Result<&PoVBlock, Option<&[ValidatorId]>>) -> U
{ {
@@ -716,8 +609,8 @@ impl Future for PoVReceiver {
} }
} }
/// Can fetch data for a given validation session /// Can fetch data for a given validation leaf-work instance.
pub struct SessionDataFetcher<P, E, N: NetworkService, T> { pub struct LeafWorkDataFetcher<P, E, N: NetworkService, T> {
network: Arc<N>, network: Arc<N>,
api: Arc<P>, api: Arc<P>,
exit: E, exit: E,
@@ -727,7 +620,7 @@ pub struct SessionDataFetcher<P, E, N: NetworkService, T> {
message_validator: RegisteredMessageValidator, message_validator: RegisteredMessageValidator,
} }
impl<P, E, N: NetworkService, T> SessionDataFetcher<P, E, N, T> { impl<P, E, N: NetworkService, T> LeafWorkDataFetcher<P, E, N, T> {
/// Get the parent hash. /// Get the parent hash.
pub(crate) fn parent_hash(&self) -> Hash { pub(crate) fn parent_hash(&self) -> Hash {
self.parent_hash self.parent_hash
@@ -759,9 +652,9 @@ impl<P, E, N: NetworkService, T> SessionDataFetcher<P, E, N, T> {
} }
} }
impl<P, E: Clone, N: NetworkService, T: Clone> Clone for SessionDataFetcher<P, E, N, T> { impl<P, E: Clone, N: NetworkService, T: Clone> Clone for LeafWorkDataFetcher<P, E, N, T> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
SessionDataFetcher { LeafWorkDataFetcher {
network: self.network.clone(), network: self.network.clone(),
api: self.api.clone(), api: self.api.clone(),
task_executor: self.task_executor.clone(), task_executor: self.task_executor.clone(),
@@ -773,7 +666,7 @@ impl<P, E: Clone, N: NetworkService, T: Clone> Clone for SessionDataFetcher<P, E
} }
} }
impl<P: ProvideRuntimeApi + Send, E, N, T> SessionDataFetcher<P, E, N, T> where impl<P: ProvideRuntimeApi + Send, E, N, T> LeafWorkDataFetcher<P, E, N, T> where
P::Api: ParachainHost<Block>, P::Api: ParachainHost<Block>,
N: NetworkService, N: NetworkService,
T: Clone + Executor + Send + 'static, T: Clone + Executor + Send + 'static,
@@ -784,7 +677,11 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> SessionDataFetcher<P, E, N, T> where
let parachain = candidate.parachain_index; let parachain = candidate.parachain_index;
let parent_hash = self.parent_hash; let parent_hash = self.parent_hash;
let canon_roots = self.api.runtime_api().ingress(&BlockId::hash(parent_hash), parachain) let canon_roots = self.api.runtime_api().ingress(
&BlockId::hash(parent_hash),
parachain,
None,
)
.map_err(|e| .map_err(|e|
format!( format!(
"Cannot fetch ingress for parachain {:?} at {:?}: {:?}", "Cannot fetch ingress for parachain {:?} at {:?}: {:?}",
@@ -862,39 +759,39 @@ mod tests {
} }
#[test] #[test]
fn add_new_sessions_works() { fn add_new_leaf_work_works() {
let mut live_sessions = LiveValidationSessions::new(); let mut live_leaves = LiveValidationLeaves::new();
let key_a: ValidatorId = [0; 32].unchecked_into(); let key_a: ValidatorId = [0; 32].unchecked_into();
let key_b: ValidatorId = [1; 32].unchecked_into(); let key_b: ValidatorId = [1; 32].unchecked_into();
let parent_hash = [0xff; 32].into(); let parent_hash = [0xff; 32].into();
let (session, new_key) = live_sessions.new_validation_session(SessionParams { let (leaf_work, new_key) = live_leaves.new_validation_leaf(LeafWorkParams {
parent_hash, parent_hash,
local_session_key: None, local_session_key: None,
authorities: Vec::new(), authorities: Vec::new(),
}); });
let knowledge = session.knowledge().clone(); let knowledge = leaf_work.knowledge().clone();
assert!(new_key.is_none()); assert!(new_key.is_none());
let (session, new_key) = live_sessions.new_validation_session(SessionParams { let (leaf_work, new_key) = live_leaves.new_validation_leaf(LeafWorkParams {
parent_hash, parent_hash,
local_session_key: Some(key_a.clone()), local_session_key: Some(key_a.clone()),
authorities: Vec::new(), authorities: Vec::new(),
}); });
// check that knowledge points to the same place. // check that knowledge points to the same place.
assert_eq!(&**session.knowledge() as *const _, &*knowledge as *const _); assert_eq!(&**leaf_work.knowledge() as *const _, &*knowledge as *const _);
assert_eq!(new_key, Some(key_a.clone())); assert_eq!(new_key, Some(key_a.clone()));
let (session, new_key) = live_sessions.new_validation_session(SessionParams { let (leaf_work, new_key) = live_leaves.new_validation_leaf(LeafWorkParams {
parent_hash, parent_hash,
local_session_key: Some(key_b.clone()), local_session_key: Some(key_b.clone()),
authorities: Vec::new(), authorities: Vec::new(),
}); });
assert_eq!(&**session.knowledge() as *const _, &*knowledge as *const _); assert_eq!(&**leaf_work.knowledge() as *const _, &*knowledge as *const _);
assert!(new_key.is_none()); assert!(new_key.is_none());
} }
} }
+55 -8
View File
@@ -96,31 +96,37 @@ pub struct DutyRoster {
pub validator_duty: Vec<Chain>, pub validator_duty: Vec<Chain>,
} }
/// An outgoing message /// A message targeted to a specific parachain.
#[derive(Clone, PartialEq, Eq, Encode, Decode)] #[derive(Clone, PartialEq, Eq, Encode, Decode)]
#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))] #[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))]
#[cfg_attr(feature = "std", serde(rename_all = "camelCase"))] #[cfg_attr(feature = "std", serde(rename_all = "camelCase"))]
#[cfg_attr(feature = "std", serde(deny_unknown_fields))] #[cfg_attr(feature = "std", serde(deny_unknown_fields))]
pub struct OutgoingMessage { pub struct TargetedMessage {
/// The target parachain. /// The target parachain.
pub target: Id, pub target: Id,
/// The message data. /// The message data.
pub data: Vec<u8>, pub data: Vec<u8>,
} }
impl PartialOrd for OutgoingMessage { impl AsRef<[u8]> for TargetedMessage {
fn as_ref(&self) -> &[u8] {
&self.data[..]
}
}
impl PartialOrd for TargetedMessage {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> { fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.target.cmp(&other.target)) Some(self.target.cmp(&other.target))
} }
} }
impl Ord for OutgoingMessage { impl Ord for TargetedMessage {
fn cmp(&self, other: &Self) -> Ordering { fn cmp(&self, other: &Self) -> Ordering {
self.target.cmp(&other.target) self.target.cmp(&other.target)
} }
} }
/// Extrinsic data for a parachain candidate. /// Outgoing message data for a parachain candidate.
/// ///
/// This is data produced by evaluating the candidate. It contains /// This is data produced by evaluating the candidate. It contains
/// full records of all outgoing messages to other parachains. /// full records of all outgoing messages to other parachains.
@@ -128,11 +134,37 @@ impl Ord for OutgoingMessage {
#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))] #[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))]
#[cfg_attr(feature = "std", serde(rename_all = "camelCase"))] #[cfg_attr(feature = "std", serde(rename_all = "camelCase"))]
#[cfg_attr(feature = "std", serde(deny_unknown_fields))] #[cfg_attr(feature = "std", serde(deny_unknown_fields))]
pub struct Extrinsic { pub struct OutgoingMessages {
/// The outgoing messages from the execution of the parachain. /// The outgoing messages from the execution of the parachain.
/// ///
/// This must be sorted in ascending order by parachain ID. /// This must be sorted in ascending order by parachain ID.
pub outgoing_messages: Vec<OutgoingMessage> pub outgoing_messages: Vec<TargetedMessage>
}
impl OutgoingMessages {
/// Returns an iterator of slices of all outgoing message queues.
///
/// All messages in a given slice are guaranteed to have the same target.
pub fn message_queues(&'_ self) -> impl Iterator<Item=&'_ [TargetedMessage]> + '_ {
let mut outgoing = &self.outgoing_messages[..];
rstd::iter::from_fn(move || {
if outgoing.is_empty() { return None }
let target = outgoing[0].target;
let mut end = 1; // the index of the last matching item + 1.
loop {
match outgoing.get(end) {
None => break,
Some(x) => if x.target != target { break },
}
end += 1;
}
let item = &outgoing[..end];
outgoing = &outgoing[end..];
Some(item)
})
}
} }
/// Candidate receipt type. /// Candidate receipt type.
@@ -217,6 +249,18 @@ pub struct PoVBlock {
#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Encode, Debug))] #[cfg_attr(feature = "std", derive(Serialize, Deserialize, Encode, Debug))]
pub struct Message(#[cfg_attr(feature = "std", serde(with="bytes"))] pub Vec<u8>); pub struct Message(#[cfg_attr(feature = "std", serde(with="bytes"))] pub Vec<u8>);
impl AsRef<[u8]> for Message {
fn as_ref(&self) -> &[u8] {
&self.0[..]
}
}
impl From<TargetedMessage> for Message {
fn from(targeted: TargetedMessage) -> Self {
Message(targeted.data)
}
}
/// All ingress roots at one block. /// All ingress roots at one block.
/// ///
/// This is an ordered vector of other parachain's egress queue roots from a specific block. /// This is an ordered vector of other parachain's egress queue roots from a specific block.
@@ -395,7 +439,10 @@ substrate_client::decl_runtime_apis! {
fn parachain_code(id: Id) -> Option<Vec<u8>>; fn parachain_code(id: Id) -> Option<Vec<u8>>;
/// Get all the unrouted ingress roots at the given block that /// Get all the unrouted ingress roots at the given block that
/// are targeting the given parachain. /// are targeting the given parachain.
fn ingress(to: Id) -> Option<StructuredUnroutedIngress>; ///
/// If `since` is provided, only messages since (including those in) that block
/// will be included.
fn ingress(to: Id, since: Option<BlockNumber>) -> Option<StructuredUnroutedIngress>;
} }
} }
+4 -2
View File
@@ -630,8 +630,10 @@ impl_runtime_apis! {
fn parachain_code(id: parachain::Id) -> Option<Vec<u8>> { fn parachain_code(id: parachain::Id) -> Option<Vec<u8>> {
Parachains::parachain_code(&id) Parachains::parachain_code(&id)
} }
fn ingress(to: parachain::Id) -> Option<parachain::StructuredUnroutedIngress> { fn ingress(to: parachain::Id, since: Option<BlockNumber>)
Parachains::ingress(to).map(parachain::StructuredUnroutedIngress) -> Option<parachain::StructuredUnroutedIngress>
{
Parachains::ingress(to, since).map(parachain::StructuredUnroutedIngress)
} }
} }
+21 -11
View File
@@ -21,7 +21,9 @@ use rstd::collections::btree_map::BTreeMap;
use codec::{Encode, Decode, HasCompact}; use codec::{Encode, Decode, HasCompact};
use srml_support::{decl_storage, decl_module, fail, ensure}; use srml_support::{decl_storage, decl_module, fail, ensure};
use sr_primitives::traits::{Hash as HashT, BlakeTwo256, Member, CheckedConversion, Saturating, One}; use sr_primitives::traits::{
Hash as HashT, BlakeTwo256, Member, CheckedConversion, Saturating, One, Zero,
};
use sr_primitives::weights::SimpleDispatchInfo; use sr_primitives::weights::SimpleDispatchInfo;
use primitives::{Hash, Balance, parachain::{ use primitives::{Hash, Balance, parachain::{
self, Id as ParaId, Chain, DutyRoster, AttestedCandidate, Statement, AccountIdConversion, self, Id as ParaId, Chain, DutyRoster, AttestedCandidate, Statement, AccountIdConversion,
@@ -588,15 +590,23 @@ impl<T: Trait> Module<T> {
} }
/// Calculate the ingress to a specific parachain. /// Calculate the ingress to a specific parachain.
/// Complexity: O(n) in the number of blocks since the parachain's watermark. /// If `since` is provided, only messages since (including those in) that block
/// will be included.
/// Complexity: O(n) in the number of blocks since the supplied block.
/// invoked off-chain. /// invoked off-chain.
/// ///
/// Yields a structure containing all unrouted ingress to the parachain. /// Yields a structure containing all unrouted ingress to the parachain.
pub fn ingress(to: ParaId) -> Option<Vec<(T::BlockNumber, BlockIngressRoots)>> { pub fn ingress(to: ParaId, since: Option<T::BlockNumber>) -> Option<Vec<(T::BlockNumber, BlockIngressRoots)>> {
let watermark = <Watermarks<T>>::get(to)?; let watermark = <Watermarks<T>>::get(to)?;
let now = <system::Module<T>>::block_number(); let now = <system::Module<T>>::block_number();
Some(number_range(watermark.saturating_add(One::one()),now) let watermark_since = watermark.saturating_add(One::one());
let since = rstd::cmp::max(since.unwrap_or(Zero::zero()), watermark_since);
if since > now {
return Some(Vec::new());
}
Some(number_range(since, now)
.filter_map(|unrouted_height| { .filter_map(|unrouted_height| {
<UnroutedIngress<T>>::get(&(unrouted_height, to)).map(|roots| { <UnroutedIngress<T>>::get(&(unrouted_height, to)).map(|roots| {
(unrouted_height, BlockIngressRoots(roots)) (unrouted_height, BlockIngressRoots(roots))
@@ -1697,8 +1707,8 @@ mod tests {
]; ];
with_externalities(&mut new_test_ext(parachains), || { with_externalities(&mut new_test_ext(parachains), || {
assert_eq!(Parachains::ingress(ParaId::from(1)), Some(Vec::new())); assert_eq!(Parachains::ingress(ParaId::from(1), None), Some(Vec::new()));
assert_eq!(Parachains::ingress(ParaId::from(99)), Some(Vec::new())); assert_eq!(Parachains::ingress(ParaId::from(99), None), Some(Vec::new()));
for i in 1..10 { for i in 1..10 {
System::set_block_number(i); System::set_block_number(i);
@@ -1755,7 +1765,7 @@ mod tests {
// parachain 1 has had a bunch of parachain candidates included, // parachain 1 has had a bunch of parachain candidates included,
// which raises the watermark. // which raises the watermark.
assert_eq!( assert_eq!(
Parachains::ingress(ParaId::from(1)), Parachains::ingress(ParaId::from(1), None),
Some(vec![ Some(vec![
(9, BlockIngressRoots(vec![ (9, BlockIngressRoots(vec![
(0.into(), [9; 32].into()) (0.into(), [9; 32].into())
@@ -1766,7 +1776,7 @@ mod tests {
// parachain 99 hasn't had any candidates included, so the // parachain 99 hasn't had any candidates included, so the
// ingress is piling up. // ingress is piling up.
assert_eq!( assert_eq!(
Parachains::ingress(ParaId::from(99)), Parachains::ingress(ParaId::from(99), None),
Some((1..10).map(|i| (i, BlockIngressRoots( Some((1..10).map(|i| (i, BlockIngressRoots(
vec![(1.into(), [i as u8; 32].into())] vec![(1.into(), [i as u8; 32].into())]
))).collect::<Vec<_>>()), ))).collect::<Vec<_>>()),
@@ -1776,8 +1786,8 @@ mod tests {
// after deregistering, there is no ingress to 1, but unrouted messages // after deregistering, there is no ingress to 1, but unrouted messages
// from 1 stick around. // from 1 stick around.
assert_eq!(Parachains::ingress(ParaId::from(1)), None); assert_eq!(Parachains::ingress(ParaId::from(1), None), None);
assert_eq!(Parachains::ingress(ParaId::from(99)), Some((1..10).map(|i| (i, BlockIngressRoots( assert_eq!(Parachains::ingress(ParaId::from(99), None), Some((1..10).map(|i| (i, BlockIngressRoots(
vec![(1.into(), [i as u8; 32].into())] vec![(1.into(), [i as u8; 32].into())]
))).collect::<Vec<_>>())); ))).collect::<Vec<_>>()));
@@ -1809,7 +1819,7 @@ mod tests {
System::set_block_number(12); System::set_block_number(12);
// at the next block, ingress to 99 should be empty. // at the next block, ingress to 99 should be empty.
assert_eq!(Parachains::ingress(ParaId::from(99)), Some(Vec::new())); assert_eq!(Parachains::ingress(ParaId::from(99), None), Some(Vec::new()));
}); });
} }
+30 -21
View File
@@ -36,7 +36,7 @@ pub use service::{ServiceBuilderExport, ServiceBuilderImport, ServiceBuilderReve
pub use service::config::full_version_from_strs; pub use service::config::full_version_from_strs;
pub use client::{backend::Backend, runtime_api::{Core as CoreApi, ConstructRuntimeApi}, ExecutionStrategy, CallExecutor}; pub use client::{backend::Backend, runtime_api::{Core as CoreApi, ConstructRuntimeApi}, ExecutionStrategy, CallExecutor};
pub use consensus_common::SelectChain; pub use consensus_common::SelectChain;
pub use polkadot_network::{PolkadotProtocol, NetworkService}; pub use polkadot_network::{PolkadotProtocol};
pub use polkadot_primitives::parachain::{CollatorId, ParachainHost}; pub use polkadot_primitives::parachain::{CollatorId, ParachainHost};
pub use polkadot_primitives::Block; pub use polkadot_primitives::Block;
pub use polkadot_runtime::RuntimeApi; pub use polkadot_runtime::RuntimeApi;
@@ -175,32 +175,34 @@ pub fn new_full(config: Configuration<CustomConfiguration, GenesisConfig>)
info!("The node cannot start as an authority because it can't select chain."); info!("The node cannot start as an authority because it can't select chain.");
return Ok(service); return Ok(service);
}; };
let gossip_validator_select_chain = select_chain.clone(); let gossip_validator_select_chain = select_chain.clone();
let gossip_validator = network_gossip::register_validator(
service.network(),
move |block_hash: &Hash| {
use client::BlockStatus;
match known_oracle.block_status(&BlockId::hash(*block_hash)) { let is_known = move |block_hash: &Hash| {
Err(_) | Ok(BlockStatus::Unknown) | Ok(BlockStatus::Queued) => None, use client::BlockStatus;
Ok(BlockStatus::KnownBad) => Some(Known::Bad),
Ok(BlockStatus::InChainWithState) | Ok(BlockStatus::InChainPruned) => { match known_oracle.block_status(&BlockId::hash(*block_hash)) {
match gossip_validator_select_chain.leaves() { Err(_) | Ok(BlockStatus::Unknown) | Ok(BlockStatus::Queued) => None,
Err(_) => None, Ok(BlockStatus::KnownBad) => Some(Known::Bad),
Ok(leaves) => if leaves.contains(block_hash) { Ok(BlockStatus::InChainWithState) | Ok(BlockStatus::InChainPruned) => {
Some(Known::Leaf) match gossip_validator_select_chain.leaves() {
} else { Err(_) => None,
Some(Known::Old) Ok(leaves) => if leaves.contains(block_hash) {
}, Some(Known::Leaf)
} } else {
Some(Known::Old)
},
} }
} }
}, }
};
let gossip_validator = network_gossip::register_validator(
service.network(),
(is_known, client.clone()),
); );
if service.config().roles.is_authority() { if service.config().roles.is_authority() {
let extrinsic_store = { let availability_store = {
use std::path::PathBuf; use std::path::PathBuf;
let mut path = PathBuf::from(service.config().database_path.clone()); let mut path = PathBuf::from(service.config().database_path.clone());
@@ -212,6 +214,13 @@ pub fn new_full(config: Configuration<CustomConfiguration, GenesisConfig>)
})? })?
}; };
{
let availability_store = availability_store.clone();
service.network().with_spec(
|spec, _ctx| spec.register_availability_store(availability_store)
);
}
// collator connections and validation network both fulfilled by this // collator connections and validation network both fulfilled by this
let validation_network = ValidationNetwork::new( let validation_network = ValidationNetwork::new(
service.network(), service.network(),
@@ -228,7 +237,7 @@ pub fn new_full(config: Configuration<CustomConfiguration, GenesisConfig>)
service.transaction_pool(), service.transaction_pool(),
Arc::new(service.spawn_task_handle()), Arc::new(service.spawn_task_handle()),
service.keystore(), service.keystore(),
extrinsic_store, availability_store,
polkadot_runtime::constants::time::SLOT_DURATION, polkadot_runtime::constants::time::SLOT_DURATION,
service.config().custom.max_block_data_size, service.config().custom.max_block_data_size,
); );
@@ -25,7 +25,7 @@ use substrate_primitives::Pair;
use parachain::codec::{Encode, Decode}; use parachain::codec::{Encode, Decode};
use primitives::{ use primitives::{
Hash, Hash,
parachain::{HeadData, BlockData, Id as ParaId, Message, Extrinsic, Status as ParachainStatus}, parachain::{HeadData, BlockData, Id as ParaId, Message, OutgoingMessages, Status as ParachainStatus},
}; };
use collator::{InvalidHead, ParachainContext, VersionInfo, Network, BuildParachainContext}; use collator::{InvalidHead, ParachainContext, VersionInfo, Network, BuildParachainContext};
use parking_lot::Mutex; use parking_lot::Mutex;
@@ -53,14 +53,14 @@ struct AdderContext {
/// The parachain context. /// The parachain context.
impl ParachainContext for AdderContext { impl ParachainContext for AdderContext {
type ProduceCandidate = Result<(BlockData, HeadData, Extrinsic), InvalidHead>; type ProduceCandidate = Result<(BlockData, HeadData, OutgoingMessages), InvalidHead>;
fn produce_candidate<I: IntoIterator<Item=(ParaId, Message)>>( fn produce_candidate<I: IntoIterator<Item=(ParaId, Message)>>(
&self, &self,
_relay_parent: Hash, _relay_parent: Hash,
status: ParachainStatus, status: ParachainStatus,
ingress: I, ingress: I,
) -> Result<(BlockData, HeadData, Extrinsic), InvalidHead> ) -> Result<(BlockData, HeadData, OutgoingMessages), InvalidHead>
{ {
let adder_head = AdderHead::decode(&mut &status.head_data.0[..]) let adder_head = AdderHead::decode(&mut &status.head_data.0[..])
.map_err(|_| InvalidHead)?; .map_err(|_| InvalidHead)?;
@@ -94,7 +94,7 @@ impl ParachainContext for AdderContext {
next_head.number, next_body.state.overflowing_add(next_body.add).0); next_head.number, next_body.state.overflowing_add(next_body.add).0);
db.insert(next_head.clone(), next_body); db.insert(next_head.clone(), next_body);
Ok((encoded_body, encoded_head, Extrinsic { outgoing_messages: Vec::new() })) Ok((encoded_body, encoded_head, OutgoingMessages { outgoing_messages: Vec::new() }))
} }
} }
+1 -1
View File
@@ -14,7 +14,7 @@ derive_more = "0.14.0"
log = "0.4.6" log = "0.4.6"
exit-future = "0.1" exit-future = "0.1"
codec = { package = "parity-scale-codec", version = "~1.0.0", default-features = false, features = ["derive"] } codec = { package = "parity-scale-codec", version = "~1.0.0", default-features = false, features = ["derive"] }
extrinsic_store = { package = "polkadot-availability-store", path = "../availability-store" } availability_store = { package = "polkadot-availability-store", path = "../availability-store" }
parachain = { package = "polkadot-parachain", path = "../parachain" } parachain = { package = "polkadot-parachain", path = "../parachain" }
polkadot-primitives = { path = "../primitives" } polkadot-primitives = { path = "../primitives" }
polkadot-runtime = { path = "../runtime" } polkadot-runtime = { path = "../runtime" }
@@ -29,13 +29,13 @@ use client::{error::Result as ClientResult, BlockchainEvents, BlockBody};
use client::block_builder::api::BlockBuilder; use client::block_builder::api::BlockBuilder;
use client::blockchain::HeaderBackend; use client::blockchain::HeaderBackend;
use consensus::SelectChain; use consensus::SelectChain;
use extrinsic_store::Store as ExtrinsicStore; use availability_store::Store as AvailabilityStore;
use futures::prelude::*; use futures::prelude::*;
use futures03::{TryStreamExt as _, StreamExt as _}; use futures03::{TryStreamExt as _, StreamExt as _};
use log::error; use log::error;
use polkadot_primitives::{Block, BlockId}; use polkadot_primitives::{Block, BlockId};
use polkadot_primitives::parachain::{CandidateReceipt, ParachainHost}; use polkadot_primitives::parachain::{CandidateReceipt, ParachainHost};
use runtime_primitives::traits::{ProvideRuntimeApi, Header as HeaderT}; use runtime_primitives::traits::{ProvideRuntimeApi};
use babe_primitives::BabeApi; use babe_primitives::BabeApi;
use keystore::KeyStorePtr; use keystore::KeyStorePtr;
@@ -73,7 +73,7 @@ pub(crate) fn fetch_candidates<P: BlockBody<Block>>(client: &P, block: &BlockId)
// //
// NOTE: this will need to be changed to finality notification rather than // NOTE: this will need to be changed to finality notification rather than
// block import notifications when the consensus switches to non-instant finality. // block import notifications when the consensus switches to non-instant finality.
fn prune_unneeded_availability<P>(client: Arc<P>, extrinsic_store: ExtrinsicStore) fn prune_unneeded_availability<P>(client: Arc<P>, availability_store: AvailabilityStore)
-> impl Future<Item=(),Error=()> + Send -> impl Future<Item=(),Error=()> + Send
where P: Send + Sync + BlockchainEvents<Block> + BlockBody<Block> + 'static where P: Send + Sync + BlockchainEvents<Block> + BlockBody<Block> + 'static
{ {
@@ -94,7 +94,7 @@ fn prune_unneeded_availability<P>(client: Arc<P>, extrinsic_store: ExtrinsicStor
} }
}; };
if let Err(e) = extrinsic_store.candidates_finalized(parent_hash, candidate_hashes) { if let Err(e) = availability_store.candidates_finalized(parent_hash, candidate_hashes) {
warn!(target: "validation", "Failed to prune unneeded available data: {:?}", e); warn!(target: "validation", "Failed to prune unneeded available data: {:?}", e);
} }
@@ -115,7 +115,7 @@ pub(crate) fn start<C, N, P, SC>(
parachain_validation: Arc<crate::ParachainValidation<C, N, P>>, parachain_validation: Arc<crate::ParachainValidation<C, N, P>>,
thread_pool: TaskExecutor, thread_pool: TaskExecutor,
keystore: KeyStorePtr, keystore: KeyStorePtr,
extrinsic_store: ExtrinsicStore, availability_store: AvailabilityStore,
max_block_data_size: Option<u64>, max_block_data_size: Option<u64>,
) -> ServiceHandle ) -> ServiceHandle
where where
@@ -148,7 +148,6 @@ pub(crate) fn start<C, N, P, SC>(
if notification.is_new_best { if notification.is_new_best {
let res = validation.get_or_instantiate( let res = validation.get_or_instantiate(
parent_hash, parent_hash,
notification.header.parent_hash().clone(),
&keystore, &keystore,
max_block_data_size, max_block_data_size,
); );
@@ -194,7 +193,7 @@ pub(crate) fn start<C, N, P, SC>(
error!("Failed to spawn old sessions pruning task"); error!("Failed to spawn old sessions pruning task");
} }
let prune_available = prune_unneeded_availability(client, extrinsic_store) let prune_available = prune_unneeded_availability(client, availability_store)
.select(exit.clone()) .select(exit.clone())
.then(|_| Ok(())); .then(|_| Ok(()));
+26 -26
View File
@@ -23,7 +23,7 @@ use std::sync::Arc;
use polkadot_primitives::{Block, Hash, BlockId, Balance, parachain::{ use polkadot_primitives::{Block, Hash, BlockId, Balance, parachain::{
CollatorId, ConsolidatedIngress, StructuredUnroutedIngress, CandidateReceipt, ParachainHost, CollatorId, ConsolidatedIngress, StructuredUnroutedIngress, CandidateReceipt, ParachainHost,
Id as ParaId, Collation, Extrinsic, OutgoingMessage, UpwardMessage, FeeSchedule, Id as ParaId, Collation, TargetedMessage, OutgoingMessages, UpwardMessage, FeeSchedule,
}}; }};
use runtime_primitives::traits::ProvideRuntimeApi; use runtime_primitives::traits::ProvideRuntimeApi;
use parachain::{wasm_executor::{self, ExternalitiesError, ExecutionMode}, MessageRef, UpwardMessageRef}; use parachain::{wasm_executor::{self, ExternalitiesError, ExecutionMode}, MessageRef, UpwardMessageRef};
@@ -100,10 +100,10 @@ impl<C: Collators, P> CollationFetch<C, P> {
impl<C: Collators, P: ProvideRuntimeApi> Future for CollationFetch<C, P> impl<C: Collators, P: ProvideRuntimeApi> Future for CollationFetch<C, P>
where P::Api: ParachainHost<Block>, where P::Api: ParachainHost<Block>,
{ {
type Item = (Collation, Extrinsic); type Item = (Collation, OutgoingMessages);
type Error = C::Error; type Error = C::Error;
fn poll(&mut self) -> Poll<(Collation, Extrinsic), C::Error> { fn poll(&mut self) -> Poll<(Collation, OutgoingMessages), C::Error> {
loop { loop {
let collation = { let collation = {
let parachain = self.parachain.clone(); let parachain = self.parachain.clone();
@@ -182,15 +182,15 @@ impl std::error::Error for Error {
} }
} }
/// Compute a trie root for a set of messages. /// Compute a trie root for a set of messages, given the raw message data.
pub fn message_queue_root<A, I: IntoIterator<Item=A>>(messages: I) -> Hash pub fn message_queue_root<A, I: IntoIterator<Item=A>>(messages: I) -> Hash
where A: AsRef<[u8]> where A: AsRef<[u8]>
{ {
::trie::trie_types::Layout::<primitives::Blake2Hasher>::ordered_trie_root(messages) trie::trie_types::Layout::<primitives::Blake2Hasher>::ordered_trie_root(messages)
} }
/// Compute the set of egress roots for all given outgoing messages. /// Compute the set of egress roots for all given outgoing messages.
pub fn egress_roots(outgoing: &mut [OutgoingMessage]) -> Vec<(ParaId, Hash)> { pub fn egress_roots(outgoing: &mut [TargetedMessage]) -> Vec<(ParaId, Hash)> {
// stable sort messages by parachain ID. // stable sort messages by parachain ID.
outgoing.sort_by_key(|msg| ParaId::from(msg.target)); outgoing.sort_by_key(|msg| ParaId::from(msg.target));
@@ -214,10 +214,10 @@ pub fn egress_roots(outgoing: &mut [OutgoingMessage]) -> Vec<(ParaId, Hash)> {
egress_roots egress_roots
} }
fn check_extrinsic( fn check_egress(
mut outgoing: Vec<OutgoingMessage>, mut outgoing: Vec<TargetedMessage>,
expected_egress_roots: &[(ParaId, Hash)], expected_egress_roots: &[(ParaId, Hash)],
) -> Result<Extrinsic, Error> { ) -> Result<OutgoingMessages, Error> {
// stable sort messages by parachain ID. // stable sort messages by parachain ID.
outgoing.sort_by_key(|msg| ParaId::from(msg.target)); outgoing.sort_by_key(|msg| ParaId::from(msg.target));
@@ -264,12 +264,12 @@ fn check_extrinsic(
} }
} }
Ok(Extrinsic { outgoing_messages: outgoing }) Ok(OutgoingMessages { outgoing_messages: outgoing })
} }
struct Externalities { struct Externalities {
parachain_index: ParaId, parachain_index: ParaId,
outgoing: Vec<OutgoingMessage>, outgoing: Vec<TargetedMessage>,
upward: Vec<UpwardMessage>, upward: Vec<UpwardMessage>,
fees_charged: Balance, fees_charged: Balance,
free_balance: Balance, free_balance: Balance,
@@ -284,7 +284,7 @@ impl wasm_executor::Externalities for Externalities {
} }
self.apply_message_fee(message.data.len())?; self.apply_message_fee(message.data.len())?;
self.outgoing.push(OutgoingMessage { self.outgoing.push(TargetedMessage {
target, target,
data: message.data.to_vec(), data: message.data.to_vec(),
}); });
@@ -317,11 +317,11 @@ impl Externalities {
} }
} }
// Performs final checks of validity, producing the extrinsic data. // Performs final checks of validity, producing the outgoing message data.
fn final_checks( fn final_checks(
self, self,
candidate: &CandidateReceipt, candidate: &CandidateReceipt,
) -> Result<Extrinsic, Error> { ) -> Result<OutgoingMessages, Error> {
if &self.upward != &candidate.upward_messages { if &self.upward != &candidate.upward_messages {
return Err(Error::UpwardMessagesInvalid { return Err(Error::UpwardMessagesInvalid {
expected: candidate.upward_messages.clone(), expected: candidate.upward_messages.clone(),
@@ -336,7 +336,7 @@ impl Externalities {
}); });
} }
check_extrinsic( check_egress(
self.outgoing, self.outgoing,
&candidate.egress_queue_roots[..], &candidate.egress_queue_roots[..],
) )
@@ -386,7 +386,7 @@ pub fn validate_collation<P>(
relay_parent: &BlockId, relay_parent: &BlockId,
collation: &Collation, collation: &Collation,
max_block_data_size: Option<u64>, max_block_data_size: Option<u64>,
) -> Result<Extrinsic, Error> where ) -> Result<OutgoingMessages, Error> where
P: ProvideRuntimeApi, P: ProvideRuntimeApi,
P::Api: ParachainHost<Block>, P::Api: ParachainHost<Block>,
{ {
@@ -407,7 +407,7 @@ pub fn validate_collation<P>(
let chain_status = api.parachain_status(relay_parent, para_id)? let chain_status = api.parachain_status(relay_parent, para_id)?
.ok_or_else(|| Error::InactiveParachain(para_id))?; .ok_or_else(|| Error::InactiveParachain(para_id))?;
let roots = api.ingress(relay_parent, para_id)? let roots = api.ingress(relay_parent, para_id, None)?
.ok_or_else(|| Error::InactiveParachain(para_id))?; .ok_or_else(|| Error::InactiveParachain(para_id))?;
validate_incoming(&roots, &collation.pov.ingress)?; validate_incoming(&roots, &collation.pov.ingress)?;
@@ -459,42 +459,42 @@ mod tests {
#[test] #[test]
fn compute_and_check_egress() { fn compute_and_check_egress() {
let messages = vec![ let messages = vec![
OutgoingMessage { target: 3.into(), data: vec![1, 1, 1] }, TargetedMessage { target: 3.into(), data: vec![1, 1, 1] },
OutgoingMessage { target: 1.into(), data: vec![1, 2, 3] }, TargetedMessage { target: 1.into(), data: vec![1, 2, 3] },
OutgoingMessage { target: 2.into(), data: vec![4, 5, 6] }, TargetedMessage { target: 2.into(), data: vec![4, 5, 6] },
OutgoingMessage { target: 1.into(), data: vec![7, 8, 9] }, TargetedMessage { target: 1.into(), data: vec![7, 8, 9] },
]; ];
let root_1 = message_queue_root(&[vec![1, 2, 3], vec![7, 8, 9]]); let root_1 = message_queue_root(&[vec![1, 2, 3], vec![7, 8, 9]]);
let root_2 = message_queue_root(&[vec![4, 5, 6]]); let root_2 = message_queue_root(&[vec![4, 5, 6]]);
let root_3 = message_queue_root(&[vec![1, 1, 1]]); let root_3 = message_queue_root(&[vec![1, 1, 1]]);
assert!(check_extrinsic( assert!(check_egress(
messages.clone(), messages.clone(),
&[(1.into(), root_1), (2.into(), root_2), (3.into(), root_3)], &[(1.into(), root_1), (2.into(), root_2), (3.into(), root_3)],
).is_ok()); ).is_ok());
let egress_roots = egress_roots(&mut messages.clone()[..]); let egress_roots = egress_roots(&mut messages.clone()[..]);
assert!(check_extrinsic( assert!(check_egress(
messages.clone(), messages.clone(),
&egress_roots[..], &egress_roots[..],
).is_ok()); ).is_ok());
// missing root. // missing root.
assert!(check_extrinsic( assert!(check_egress(
messages.clone(), messages.clone(),
&[(1.into(), root_1), (3.into(), root_3)], &[(1.into(), root_1), (3.into(), root_3)],
).is_err()); ).is_err());
// extra root. // extra root.
assert!(check_extrinsic( assert!(check_egress(
messages.clone(), messages.clone(),
&[(1.into(), root_1), (2.into(), root_2), (3.into(), root_3), (4.into(), Default::default())], &[(1.into(), root_1), (2.into(), root_2), (3.into(), root_3), (4.into(), Default::default())],
).is_err()); ).is_err());
// root mismatch. // root mismatch.
assert!(check_extrinsic( assert!(check_egress(
messages.clone(), messages.clone(),
&[(1.into(), root_2), (2.into(), root_1), (3.into(), root_3)], &[(1.into(), root_2), (2.into(), root_1), (3.into(), root_3)],
).is_err()); ).is_err());
+32 -65
View File
@@ -37,17 +37,17 @@ use client::blockchain::HeaderBackend;
use client::block_builder::api::BlockBuilder as BlockBuilderApi; use client::block_builder::api::BlockBuilder as BlockBuilderApi;
use codec::Encode; use codec::Encode;
use consensus::SelectChain; use consensus::SelectChain;
use extrinsic_store::Store as ExtrinsicStore; use availability_store::Store as AvailabilityStore;
use parking_lot::Mutex; use parking_lot::Mutex;
use polkadot_primitives::{Hash, Block, BlockId, BlockNumber, Header}; use polkadot_primitives::{Hash, Block, BlockId, BlockNumber, Header};
use polkadot_primitives::parachain::{ use polkadot_primitives::parachain::{
Id as ParaId, Chain, DutyRoster, Extrinsic as ParachainExtrinsic, CandidateReceipt, Id as ParaId, Chain, DutyRoster, OutgoingMessages, CandidateReceipt,
ParachainHost, AttestedCandidate, Statement as PrimitiveStatement, Message, OutgoingMessage, ParachainHost, AttestedCandidate, Statement as PrimitiveStatement, Message,
Collation, PoVBlock, ValidatorSignature, ValidatorPair, ValidatorId Collation, PoVBlock, ValidatorSignature, ValidatorPair, ValidatorId
}; };
use primitives::Pair; use primitives::Pair;
use runtime_primitives::{ use runtime_primitives::{
traits::{ProvideRuntimeApi, Header as HeaderT, DigestFor}, ApplyError traits::{ProvideRuntimeApi, DigestFor}, ApplyError
}; };
use futures_timer::{Delay, Interval}; use futures_timer::{Delay, Interval};
use transaction_pool::txpool::{Pool, ChainApi as PoolChainApi}; use transaction_pool::txpool::{Pool, ChainApi as PoolChainApi};
@@ -88,27 +88,6 @@ const MAX_TRANSACTIONS_SIZE: usize = 4 * 1024 * 1024;
/// Incoming messages; a series of sorted (ParaId, Message) pairs. /// Incoming messages; a series of sorted (ParaId, Message) pairs.
pub type Incoming = Vec<(ParaId, Vec<Message>)>; pub type Incoming = Vec<(ParaId, Vec<Message>)>;
/// Outgoing messages from various candidates.
pub type Outgoing = Vec<MessagesFrom>;
/// Some messages from a parachain.
pub struct MessagesFrom {
/// The parachain originating the messages.
pub from: ParaId,
/// The messages themselves.
pub messages: ParachainExtrinsic,
}
impl MessagesFrom {
/// Construct from the raw messages.
pub fn from_messages(from: ParaId, messages: Vec<OutgoingMessage>) -> Self {
MessagesFrom {
from,
messages: ParachainExtrinsic { outgoing_messages: messages },
}
}
}
/// A handle to a statement table router. /// A handle to a statement table router.
/// ///
/// This is expected to be a lightweight, shared type like an `Arc`. /// This is expected to be a lightweight, shared type like an `Arc`.
@@ -120,7 +99,7 @@ pub trait TableRouter: Clone {
/// Call with local candidate data. This will make the data available on the network, /// Call with local candidate data. This will make the data available on the network,
/// and sign, import, and broadcast a statement about the candidate. /// and sign, import, and broadcast a statement about the candidate.
fn local_collation(&self, collation: Collation, extrinsic: ParachainExtrinsic); fn local_collation(&self, collation: Collation, outgoing: OutgoingMessages);
/// Fetch validation proof for a specific candidate. /// Fetch validation proof for a specific candidate.
fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> Self::FetchValidationProof; fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> Self::FetchValidationProof;
@@ -227,6 +206,18 @@ pub fn make_group_info(
} }
/// Compute the (target, root, messages) of all outgoing queues.
pub fn outgoing_queues(outgoing_targeted: &'_ OutgoingMessages)
-> impl Iterator<Item=(ParaId, Hash, Vec<Message>)> + '_
{
outgoing_targeted.message_queues().filter_map(|queue| {
let target = queue.get(0)?.target;
let queue_root = message_queue_root(queue);
let queue_data = queue.iter().map(|msg| msg.clone().into()).collect();
Some((target, queue_root, queue_data))
})
}
// finds the first key we are capable of signing with out of the given set of validators, // finds the first key we are capable of signing with out of the given set of validators,
// if any. // if any.
fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option<Arc<ValidatorPair>> { fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option<Arc<ValidatorPair>> {
@@ -249,7 +240,7 @@ struct ParachainValidation<C, N, P> {
/// handle to remote task executor /// handle to remote task executor
handle: TaskExecutor, handle: TaskExecutor,
/// Store for extrinsic data. /// Store for extrinsic data.
extrinsic_store: ExtrinsicStore, availability_store: AvailabilityStore,
/// Live agreements. Maps relay chain parent hashes to attestation /// Live agreements. Maps relay chain parent hashes to attestation
/// instances. /// instances.
live_instances: Mutex<HashMap<Hash, Arc<AttestationTracker>>>, live_instances: Mutex<HashMap<Hash, Arc<AttestationTracker>>>,
@@ -274,7 +265,6 @@ impl<C, N, P> ParachainValidation<C, N, P> where
fn get_or_instantiate( fn get_or_instantiate(
&self, &self,
parent_hash: Hash, parent_hash: Hash,
grandparent_hash: Hash,
keystore: &KeyStorePtr, keystore: &KeyStorePtr,
max_block_data_size: Option<u64>, max_block_data_size: Option<u64>,
) )
@@ -290,32 +280,6 @@ impl<C, N, P> ParachainValidation<C, N, P> where
let validators = self.client.runtime_api().validators(&id)?; let validators = self.client.runtime_api().validators(&id)?;
let sign_with = signing_key(&validators[..], keystore); let sign_with = signing_key(&validators[..], keystore);
// compute the parent candidates, if we know of them.
// this will allow us to circulate outgoing messages to other peers as necessary.
let parent_candidates: Vec<_> = crate::attestation_service::fetch_candidates(&*self.client, &id)
.ok()
.and_then(|x| x)
.map(|x| x.collect())
.unwrap_or_default();
// TODO: https://github.com/paritytech/polkadot/issues/253
//
// We probably don't only want active validators to do this, or messages
// will disappear when validators exit the set.
let _outgoing: Vec<_> = {
// extract all extrinsic data that we have and propagate to peers.
live_instances.get(&grandparent_hash).map(|parent_validation| {
parent_candidates.iter().filter_map(|c| {
let para_id = c.parachain_index;
let hash = c.hash();
parent_validation.table.extrinsic_data(&hash).map(|ex| MessagesFrom {
from: para_id,
messages: ex,
})
}).collect()
}).unwrap_or_default()
};
let duty_roster = self.client.runtime_api().duty_roster(&id)?; let duty_roster = self.client.runtime_api().duty_roster(&id)?;
let (group_info, local_duty) = make_group_info( let (group_info, local_duty) = make_group_info(
@@ -339,7 +303,7 @@ impl<C, N, P> ParachainValidation<C, N, P> where
group_info, group_info,
sign_with, sign_with,
parent_hash, parent_hash,
self.extrinsic_store.clone(), self.availability_store.clone(),
max_block_data_size, max_block_data_size,
)); ));
@@ -380,10 +344,10 @@ impl<C, N, P> ParachainValidation<C, N, P> where
max_block_data_size: Option<u64>, max_block_data_size: Option<u64>,
exit: exit_future::Exit, exit: exit_future::Exit,
) { ) {
use extrinsic_store::Data; use availability_store::Data;
let (collators, client) = (self.collators.clone(), self.client.clone()); let (collators, client) = (self.collators.clone(), self.client.clone());
let extrinsic_store = self.extrinsic_store.clone(); let availability_store = self.availability_store.clone();
let with_router = move |router: N::TableRouter| { let with_router = move |router: N::TableRouter| {
// fetch a local collation from connected collators. // fetch a local collation from connected collators.
@@ -396,20 +360,24 @@ impl<C, N, P> ParachainValidation<C, N, P> where
); );
collation_work.then(move |result| match result { collation_work.then(move |result| match result {
Ok((collation, extrinsic)) => { Ok((collation, outgoing_targeted)) => {
let res = extrinsic_store.make_available(Data { let outgoing_queues = crate::outgoing_queues(&outgoing_targeted)
.map(|(_target, root, data)| (root, data))
.collect();
let res = availability_store.make_available(Data {
relay_parent, relay_parent,
parachain_id: collation.receipt.parachain_index, parachain_id: collation.receipt.parachain_index,
candidate_hash: collation.receipt.hash(), candidate_hash: collation.receipt.hash(),
block_data: collation.pov.block_data.clone(), block_data: collation.pov.block_data.clone(),
extrinsic: Some(extrinsic.clone()), outgoing_queues: Some(outgoing_queues),
}); });
match res { match res {
Ok(()) => { Ok(()) => {
// TODO: https://github.com/paritytech/polkadot/issues/51 // TODO: https://github.com/paritytech/polkadot/issues/51
// Erasure-code and provide merkle branches. // Erasure-code and provide merkle branches.
router.local_collation(collation, extrinsic); router.local_collation(collation, outgoing_targeted);
} }
Err(e) => warn!( Err(e) => warn!(
target: "validation", target: "validation",
@@ -482,7 +450,7 @@ impl<C, N, P, SC, TxApi> ProposerFactory<C, N, P, SC, TxApi> where
transaction_pool: Arc<Pool<TxApi>>, transaction_pool: Arc<Pool<TxApi>>,
thread_pool: TaskExecutor, thread_pool: TaskExecutor,
keystore: KeyStorePtr, keystore: KeyStorePtr,
extrinsic_store: ExtrinsicStore, availability_store: AvailabilityStore,
babe_slot_duration: u64, babe_slot_duration: u64,
max_block_data_size: Option<u64>, max_block_data_size: Option<u64>,
) -> Self { ) -> Self {
@@ -491,7 +459,7 @@ impl<C, N, P, SC, TxApi> ProposerFactory<C, N, P, SC, TxApi> where
network, network,
collators, collators,
handle: thread_pool.clone(), handle: thread_pool.clone(),
extrinsic_store: extrinsic_store.clone(), availability_store: availability_store.clone(),
live_instances: Mutex::new(HashMap::new()), live_instances: Mutex::new(HashMap::new()),
}); });
@@ -501,7 +469,7 @@ impl<C, N, P, SC, TxApi> ProposerFactory<C, N, P, SC, TxApi> where
parachain_validation.clone(), parachain_validation.clone(),
thread_pool, thread_pool,
keystore.clone(), keystore.clone(),
extrinsic_store, availability_store,
max_block_data_size, max_block_data_size,
); );
@@ -540,7 +508,6 @@ impl<C, N, P, SC, TxApi> consensus::Environment<Block> for ProposerFactory<C, N,
let tracker = self.parachain_validation.get_or_instantiate( let tracker = self.parachain_validation.get_or_instantiate(
parent_hash, parent_hash,
parent_header.parent_hash().clone(),
&self.keystore, &self.keystore,
self.max_block_data_size, self.max_block_data_size,
)?; )?;
+42 -51
View File
@@ -20,11 +20,11 @@
use std::collections::hash_map::{HashMap, Entry}; use std::collections::hash_map::{HashMap, Entry};
use std::sync::Arc; use std::sync::Arc;
use extrinsic_store::{Data, Store as ExtrinsicStore}; use availability_store::{Data, Store as AvailabilityStore};
use table::{self, Table, Context as TableContextTrait}; use table::{self, Table, Context as TableContextTrait};
use polkadot_primitives::{Block, BlockId, Hash}; use polkadot_primitives::{Block, BlockId, Hash};
use polkadot_primitives::parachain::{ use polkadot_primitives::parachain::{
Id as ParaId, Collation, Extrinsic, CandidateReceipt, ValidatorPair, ValidatorId, Id as ParaId, Collation, OutgoingMessages, CandidateReceipt, ValidatorPair, ValidatorId,
AttestedCandidate, ParachainHost, PoVBlock, ValidatorIndex AttestedCandidate, ParachainHost, PoVBlock, ValidatorIndex
}; };
@@ -91,7 +91,7 @@ impl TableContext {
} }
pub(crate) enum Validation { pub(crate) enum Validation {
Valid(PoVBlock, Extrinsic), Valid(PoVBlock, OutgoingMessages),
Invalid(PoVBlock), // should take proof. Invalid(PoVBlock), // should take proof.
} }
@@ -122,7 +122,7 @@ impl ValidationWork {
struct SharedTableInner { struct SharedTableInner {
table: Table<TableContext>, table: Table<TableContext>,
trackers: Vec<IncludabilitySender>, trackers: Vec<IncludabilitySender>,
extrinsic_store: ExtrinsicStore, availability_store: AvailabilityStore,
validated: HashMap<Hash, ValidationWork>, validated: HashMap<Hash, ValidationWork>,
} }
@@ -186,7 +186,7 @@ impl SharedTableInner {
}; };
work.map(|work| ParachainWork { work.map(|work| ParachainWork {
extrinsic_store: self.extrinsic_store.clone(), availability_store: self.availability_store.clone(),
relay_parent: context.parent_hash.clone(), relay_parent: context.parent_hash.clone(),
work, work,
max_block_data_size, max_block_data_size,
@@ -221,24 +221,24 @@ impl Validated {
} }
/// Note that we've validated a candidate with given hash and it is good. /// Note that we've validated a candidate with given hash and it is good.
/// Extrinsic data required. /// outgoing message required.
pub fn known_good(hash: Hash, collation: PoVBlock, extrinsic: Extrinsic) -> Self { pub fn known_good(hash: Hash, collation: PoVBlock, outgoing: OutgoingMessages) -> Self {
Validated { Validated {
statement: GenericStatement::Valid(hash), statement: GenericStatement::Valid(hash),
result: Validation::Valid(collation, extrinsic), result: Validation::Valid(collation, outgoing),
} }
} }
/// Note that we've collated a candidate. /// Note that we've collated a candidate.
/// Extrinsic data required. /// outgoing message required.
pub fn collated_local( pub fn collated_local(
receipt: CandidateReceipt, receipt: CandidateReceipt,
collation: PoVBlock, collation: PoVBlock,
extrinsic: Extrinsic, outgoing: OutgoingMessages,
) -> Self { ) -> Self {
Validated { Validated {
statement: GenericStatement::Candidate(receipt), statement: GenericStatement::Candidate(receipt),
result: Validation::Valid(collation, extrinsic), result: Validation::Valid(collation, outgoing),
} }
} }
@@ -249,8 +249,8 @@ impl Validated {
} }
} }
/// Get a reference to the extrinsic data, if any. /// Get a reference to the outgoing messages data, if any.
pub fn extrinsic(&self) -> Option<&Extrinsic> { pub fn outgoing_messages(&self) -> Option<&OutgoingMessages> {
match self.result { match self.result {
Validation::Valid(_, ref ex) => Some(ex), Validation::Valid(_, ref ex) => Some(ex),
Validation::Invalid(_) => None, Validation::Invalid(_) => None,
@@ -262,7 +262,7 @@ impl Validated {
pub struct ParachainWork<Fetch> { pub struct ParachainWork<Fetch> {
work: Work<Fetch>, work: Work<Fetch>,
relay_parent: Hash, relay_parent: Hash,
extrinsic_store: ExtrinsicStore, availability_store: AvailabilityStore,
max_block_data_size: Option<u64>, max_block_data_size: Option<u64>,
} }
@@ -272,7 +272,7 @@ impl<Fetch: Future> ParachainWork<Fetch> {
pub fn prime<P: ProvideRuntimeApi>(self, api: Arc<P>) pub fn prime<P: ProvideRuntimeApi>(self, api: Arc<P>)
-> PrimedParachainWork< -> PrimedParachainWork<
Fetch, Fetch,
impl Send + FnMut(&BlockId, &Collation) -> Result<Extrinsic, ()>, impl Send + FnMut(&BlockId, &Collation) -> Result<OutgoingMessages, ()>,
> >
where where
P: Send + Sync + 'static, P: Send + Sync + 'static,
@@ -301,7 +301,7 @@ impl<Fetch: Future> ParachainWork<Fetch> {
/// Prime the parachain work with a custom validation function. /// Prime the parachain work with a custom validation function.
pub fn prime_with<F>(self, validate: F) -> PrimedParachainWork<Fetch, F> pub fn prime_with<F>(self, validate: F) -> PrimedParachainWork<Fetch, F>
where F: FnMut(&BlockId, &Collation) -> Result<Extrinsic, ()> where F: FnMut(&BlockId, &Collation) -> Result<OutgoingMessages, ()>
{ {
PrimedParachainWork { inner: self, validate } PrimedParachainWork { inner: self, validate }
} }
@@ -321,7 +321,7 @@ pub struct PrimedParachainWork<Fetch, F> {
impl<Fetch, F, Err> Future for PrimedParachainWork<Fetch, F> impl<Fetch, F, Err> Future for PrimedParachainWork<Fetch, F>
where where
Fetch: Future<Item=PoVBlock,Error=Err>, Fetch: Future<Item=PoVBlock,Error=Err>,
F: FnMut(&BlockId, &Collation) -> Result<Extrinsic, ()>, F: FnMut(&BlockId, &Collation) -> Result<OutgoingMessages, ()>,
Err: From<::std::io::Error>, Err: From<::std::io::Error>,
{ {
type Item = Validated; type Item = Validated;
@@ -347,18 +347,22 @@ impl<Fetch, F, Err> Future for PrimedParachainWork<Fetch, F>
GenericStatement::Invalid(candidate_hash), GenericStatement::Invalid(candidate_hash),
Validation::Invalid(pov_block), Validation::Invalid(pov_block),
), ),
Ok(extrinsic) => { Ok(outgoing_targeted) => {
self.inner.extrinsic_store.make_available(Data { let outgoing_queues = crate::outgoing_queues(&outgoing_targeted)
.map(|(_target, root, data)| (root, data))
.collect();
self.inner.availability_store.make_available(Data {
relay_parent: self.inner.relay_parent, relay_parent: self.inner.relay_parent,
parachain_id: work.candidate_receipt.parachain_index, parachain_id: work.candidate_receipt.parachain_index,
candidate_hash, candidate_hash,
block_data: pov_block.block_data.clone(), block_data: pov_block.block_data.clone(),
extrinsic: Some(extrinsic.clone()), outgoing_queues: Some(outgoing_queues),
})?; })?;
( (
GenericStatement::Valid(candidate_hash), GenericStatement::Valid(candidate_hash),
Validation::Valid(pov_block, extrinsic) Validation::Valid(pov_block, outgoing_targeted)
) )
} }
}; };
@@ -397,7 +401,7 @@ impl SharedTable {
groups: HashMap<ParaId, GroupInfo>, groups: HashMap<ParaId, GroupInfo>,
key: Option<Arc<ValidatorPair>>, key: Option<Arc<ValidatorPair>>,
parent_hash: Hash, parent_hash: Hash,
extrinsic_store: ExtrinsicStore, availability_store: AvailabilityStore,
max_block_data_size: Option<u64>, max_block_data_size: Option<u64>,
) -> Self { ) -> Self {
SharedTable { SharedTable {
@@ -407,7 +411,7 @@ impl SharedTable {
table: Table::default(), table: Table::default(),
validated: HashMap::new(), validated: HashMap::new(),
trackers: Vec::new(), trackers: Vec::new(),
extrinsic_store, availability_store,
})) }))
} }
} }
@@ -427,19 +431,6 @@ impl SharedTable {
&self.context.groups &self.context.groups
} }
/// Get extrinsic data for candidate with given hash, if any.
///
/// This will return `Some` for any candidates that have been validated
/// locally.
pub(crate) fn extrinsic_data(&self, hash: &Hash) -> Option<Extrinsic> {
self.inner.lock().validated.get(hash).and_then(|x| match *x {
ValidationWork::Error(_) => None,
ValidationWork::InProgress => None,
ValidationWork::Done(Validation::Invalid(_)) => None,
ValidationWork::Done(Validation::Valid(_, ref ex)) => Some(ex.clone()),
})
}
/// Import a single statement with remote source, whose signature has already been checked. /// Import a single statement with remote source, whose signature has already been checked.
/// ///
/// The statement producer, if any, will produce only statements concerning the same candidate /// The statement producer, if any, will produce only statements concerning the same candidate
@@ -598,7 +589,7 @@ mod tests {
type Error = ::std::io::Error; type Error = ::std::io::Error;
type FetchValidationProof = future::FutureResult<PoVBlock,Self::Error>; type FetchValidationProof = future::FutureResult<PoVBlock,Self::Error>;
fn local_collation(&self, _collation: Collation, _extrinsic: Extrinsic) { fn local_collation(&self, _collation: Collation, _outgoing: OutgoingMessages) {
} }
fn fetch_pov_block(&self, _candidate: &CandidateReceipt) -> Self::FetchValidationProof { fn fetch_pov_block(&self, _candidate: &CandidateReceipt) -> Self::FetchValidationProof {
@@ -631,7 +622,7 @@ mod tests {
groups, groups,
Some(local_key.clone()), Some(local_key.clone()),
parent_hash, parent_hash,
ExtrinsicStore::new_in_memory(), AvailabilityStore::new_in_memory(),
None, None,
); );
@@ -686,7 +677,7 @@ mod tests {
groups, groups,
Some(local_key.clone()), Some(local_key.clone()),
parent_hash, parent_hash,
ExtrinsicStore::new_in_memory(), AvailabilityStore::new_in_memory(),
None, None,
); );
@@ -718,7 +709,7 @@ mod tests {
#[test] #[test]
fn evaluate_makes_block_data_available() { fn evaluate_makes_block_data_available() {
let store = ExtrinsicStore::new_in_memory(); let store = AvailabilityStore::new_in_memory();
let relay_parent = [0; 32].into(); let relay_parent = [0; 32].into();
let para_id = 5.into(); let para_id = 5.into();
let pov_block = pov_block_with_data(vec![1, 2, 3]); let pov_block = pov_block_with_data(vec![1, 2, 3]);
@@ -742,11 +733,11 @@ mod tests {
fetch: future::ok(pov_block.clone()), fetch: future::ok(pov_block.clone()),
}, },
relay_parent, relay_parent,
extrinsic_store: store.clone(), availability_store: store.clone(),
max_block_data_size: None, max_block_data_size: None,
}; };
let validated = producer.prime_with(|_, _| Ok(Extrinsic { outgoing_messages: Vec::new() })) let validated = producer.prime_with(|_, _| Ok(OutgoingMessages { outgoing_messages: Vec::new() }))
.wait() .wait()
.unwrap(); .unwrap();
@@ -754,12 +745,12 @@ mod tests {
assert_eq!(validated.statement, GenericStatement::Valid(hash)); assert_eq!(validated.statement, GenericStatement::Valid(hash));
assert_eq!(store.block_data(relay_parent, hash).unwrap(), pov_block.block_data); assert_eq!(store.block_data(relay_parent, hash).unwrap(), pov_block.block_data);
assert!(store.extrinsic(relay_parent, hash).is_some()); // TODO: check that a message queue is included by root.
} }
#[test] #[test]
fn full_availability() { fn full_availability() {
let store = ExtrinsicStore::new_in_memory(); let store = AvailabilityStore::new_in_memory();
let relay_parent = [0; 32].into(); let relay_parent = [0; 32].into();
let para_id = 5.into(); let para_id = 5.into();
let pov_block = pov_block_with_data(vec![1, 2, 3]); let pov_block = pov_block_with_data(vec![1, 2, 3]);
@@ -783,18 +774,18 @@ mod tests {
fetch: future::ok::<_, ::std::io::Error>(pov_block.clone()), fetch: future::ok::<_, ::std::io::Error>(pov_block.clone()),
}, },
relay_parent, relay_parent,
extrinsic_store: store.clone(), availability_store: store.clone(),
max_block_data_size: None, max_block_data_size: None,
}; };
let validated = producer.prime_with(|_, _| Ok(Extrinsic { outgoing_messages: Vec::new() })) let validated = producer.prime_with(|_, _| Ok(OutgoingMessages { outgoing_messages: Vec::new() }))
.wait() .wait()
.unwrap(); .unwrap();
assert_eq!(validated.pov_block(), &pov_block); assert_eq!(validated.pov_block(), &pov_block);
assert_eq!(store.block_data(relay_parent, hash).unwrap(), pov_block.block_data); assert_eq!(store.block_data(relay_parent, hash).unwrap(), pov_block.block_data);
assert!(store.extrinsic(relay_parent, hash).is_some()); // TODO: check that a message queue is included by root.
} }
#[test] #[test]
@@ -822,7 +813,7 @@ mod tests {
groups, groups,
Some(local_key.clone()), Some(local_key.clone()),
parent_hash, parent_hash,
ExtrinsicStore::new_in_memory(), AvailabilityStore::new_in_memory(),
None, None,
); );
@@ -868,7 +859,7 @@ mod tests {
let para_id = ParaId::from(1); let para_id = ParaId::from(1);
let pov_block = pov_block_with_data(vec![1, 2, 3]); let pov_block = pov_block_with_data(vec![1, 2, 3]);
let extrinsic = Extrinsic { outgoing_messages: Vec::new() }; let outgoing_messages = OutgoingMessages { outgoing_messages: Vec::new() };
let parent_hash = Default::default(); let parent_hash = Default::default();
let local_key = Sr25519Keyring::Alice.pair(); let local_key = Sr25519Keyring::Alice.pair();
@@ -888,7 +879,7 @@ mod tests {
groups, groups,
Some(local_key.clone()), Some(local_key.clone()),
parent_hash, parent_hash,
ExtrinsicStore::new_in_memory(), AvailabilityStore::new_in_memory(),
None, None,
); );
@@ -907,7 +898,7 @@ mod tests {
let signed_statement = shared_table.import_validated(Validated::collated_local( let signed_statement = shared_table.import_validated(Validated::collated_local(
candidate, candidate,
pov_block, pov_block,
extrinsic, outgoing_messages,
)).unwrap(); )).unwrap();
assert!(shared_table.inner.lock().validated.get(&hash).expect("validation has started").is_done()); assert!(shared_table.inner.lock().validated.get(&hash).expect("validation has started").is_done());