Track and accumulate ingress roots in runtime (#287)

* track unrouted ingress in runtime

* test ingress routing

* fix compilation

* add space

Co-Authored-By: Gavin Wood <github@gavwood.com>
This commit is contained in:
Robert Habermeier
2019-06-17 14:38:03 +02:00
committed by GitHub
parent bc59254f41
commit 58ab4f6b9f
7 changed files with 263 additions and 101 deletions
+2 -2
View File
@@ -356,8 +356,8 @@ impl_runtime_apis! {
fn parachain_code(id: parachain::Id) -> Option<Vec<u8>> {
Parachains::parachain_code(&id)
}
fn ingress(to: parachain::Id) -> Option<parachain::ConsolidatedIngressRoots> {
Parachains::ingress(to).map(Into::into)
fn ingress(to: parachain::Id) -> Option<parachain::StructuredUnroutedIngress> {
Parachains::ingress(to).map(parachain::StructuredUnroutedIngress)
}
}
+211 -66
View File
@@ -17,14 +17,17 @@
//! Main parachains logic. For now this is just the determination of which validators do what.
use rstd::prelude::*;
use rstd::collections::btree_map::BTreeMap;
use parity_codec::{Decode, HasCompact};
use srml_support::{decl_storage, decl_module, fail, ensure};
use bitvec::{bitvec, BigEndian};
use sr_primitives::traits::{Hash as HashT, BlakeTwo256, Member, CheckedConversion};
use sr_primitives::traits::{
Hash as HashT, BlakeTwo256, Member, CheckedConversion, Saturating, One, Zero,
};
use primitives::{Hash, parachain::{
Id as ParaId, Chain, DutyRoster, AttestedCandidate, Statement, AccountIdConversion,
ParachainDispatchOrigin, UpwardMessage
ParachainDispatchOrigin, UpwardMessage, BlockIngressRoots,
}};
use {system, session};
use srml_support::{
@@ -44,6 +47,32 @@ use rstd::marker::PhantomData;
use system::ensure_none;
// ranges for iteration of general block number don't work, so this
// is a utility to get around that.
struct BlockNumberRange<N> {
low: N,
high: N,
}
impl<N: Saturating + One + PartialOrd + PartialEq + Clone> Iterator for BlockNumberRange<N> {
type Item = N;
fn next(&mut self) -> Option<N> {
if self.low >= self.high {
return None
}
let item = self.low.clone();
self.low = self.low.clone().saturating_add(One::one());
Some(item)
}
}
// creates a range iterator between `low` and `high`. `low` must be <= `high`.
fn number_range<N>(low: N, high: N) -> BlockNumberRange<N> {
BlockNumberRange { low, high }
}
/// Parachain registration API.
pub trait ParachainRegistrar<AccountId> {
/// An identifier for a parachain.
@@ -76,6 +105,12 @@ impl<T: Trait> ParachainRegistrar<T::AccountId> for Module<T> {
<Parachains<T>>::put(parachains);
<Heads<T>>::insert(id, initial_head_data);
// Because there are no ordering guarantees that inherents
// are applied before regular transactions, a parachain candidate could
// be registered before the `UpdateHeads` inherent is processed. If so, messages
// could be sent to a parachain in the block it is registered.
<Watermarks<T>>::insert(id, <system::Module<T>>::block_number().saturating_sub(One::one()));
Ok(())
}
fn deregister_parachain(id: ParaId) -> Result {
@@ -88,10 +123,17 @@ impl<T: Trait> ParachainRegistrar<T::AccountId> for Module<T> {
<Code<T>>::remove(id);
<Heads<T>>::remove(id);
// clear all routing entries to and from other parachains.
for other in parachains.iter().cloned() {
<Routing<T>>::remove((id, other));
<Routing<T>>::remove((other, id));
let watermark = <Watermarks<T>>::take(id);
// clear all routing entries _to_. But not those _from_.
if let Some(watermark) = watermark {
let now = <system::Module<T>>::block_number();
// iterate over all blocks between watermark and now + 1 (since messages might
// have already been sent to `id` in this block.
for unrouted_block in number_range(watermark, now).map(|n| n.saturating_add(One::one())) {
<UnroutedIngress<T>>::remove(&(unrouted_block, id));
}
}
<Parachains<T>>::put(parachains);
@@ -137,8 +179,16 @@ decl_storage! {
pub Code get(parachain_code): map ParaId => Option<Vec<u8>>;
// The heads of the parachains registered at present.
pub Heads get(parachain_head): map ParaId => Option<Vec<u8>>;
// message routing roots (from, to).
pub Routing: map (ParaId, ParaId) => Option<Hash>;
// The watermark heights of the parachains registered at present.
// For every parachain, this is the block height from which all messages targeting
// that parachain have been processed. Can be `None` only if the parachain doesn't exist.
pub Watermarks get(watermark): map ParaId => Option<T::BlockNumber>;
/// Unrouted ingress. Maps (BlockNumber, to_chain) pairs to [(from_chain, egress_root)].
///
/// There may be an entry under (i, p) in this map for every i between the parachain's
/// watermark and the current block.
pub UnroutedIngress: map (T::BlockNumber, ParaId) => Option<Vec<(ParaId, Hash)>>;
/// Messages ready to be dispatched onto the relay chain. It is subject to
/// `MAX_MESSAGE_COUNT` and `WATERMARK_MESSAGE_SIZE`.
@@ -170,6 +220,7 @@ decl_storage! {
// no ingress -- a chain cannot be routed to until it is live.
<Code<T> as generator::StorageMap<_, _>>::insert(&id, &code, storage);
<Heads<T> as generator::StorageMap<_, _>>::insert(&id, &genesis, storage);
<Watermarks<T> as generator::StorageMap<_, _>>::insert(&id, &Zero::zero(), storage);
}
});
}
@@ -220,21 +271,15 @@ decl_module! {
Self::check_attestations(&heads)?;
for head in heads.iter() {
let id = head.parachain_index();
<Heads<T>>::insert(id, &head.candidate.head_data.0);
let current_number = <system::Module<T>>::block_number();
// update egress.
for &(to, root) in &head.candidate.egress_queue_roots {
<Routing<T>>::insert((id, to), root);
}
// Queue up upwards messages (from parachains to relay chain).
Self::queue_upward_messages(id, &head.candidate.upward_messages);
}
Self::update_routing(
current_number,
&heads
);
Self::dispatch_upward_messages(
<system::Module<T>>::block_number(),
current_number,
&active_parachains,
MAX_QUEUE_COUNT,
WATERMARK_QUEUE_SIZE,
@@ -327,6 +372,48 @@ impl<T: Trait> Module<T> {
Ok(())
}
/// Update routing information from the parachain heads. This queues upwards
/// messages to the relay chain as well.
fn update_routing(
now: T::BlockNumber,
heads: &[AttestedCandidate],
) {
// TODO: per-chain watermark
// https://github.com/paritytech/polkadot/issues/286
let watermark = now.saturating_sub(One::one());
let mut ingress_update = BTreeMap::new();
for head in heads.iter() {
let id = head.parachain_index();
<Heads<T>>::insert(id, &head.candidate.head_data.0);
let last_watermark = <Watermarks<T>>::mutate(id, |mark| {
rstd::mem::replace(mark, Some(watermark))
});
if let Some(last_watermark) = last_watermark {
// Discard routed ingress.
for routed_height in number_range(last_watermark, watermark) {
<UnroutedIngress<T>>::remove(&(routed_height, id));
}
}
// place our egress root to `to` into the ingress table for (now, `to`).
for &(to, root) in &head.candidate.egress_queue_roots {
ingress_update.entry(to).or_insert_with(Vec::new).push((id, root));
}
// Queue up upwards messages (from parachains to relay chain).
Self::queue_upward_messages(id, &head.candidate.upward_messages);
}
// apply the ingress update.
for (to, ingress_roots) in ingress_update {
<UnroutedIngress<T>>::insert((now, to), ingress_roots);
}
}
/// Place any new upward messages into our queue for later dispatch.
fn queue_upward_messages(id: ParaId, upward_messages: &[UpwardMessage]) {
if !upward_messages.is_empty() {
@@ -445,16 +532,19 @@ impl<T: Trait> Module<T> {
}
/// Calculate the ingress to a specific parachain.
/// Complexity: O(n) in the number of blocks since the parachain's watermark.
/// invoked off-chain.
///
/// Yields a list of parachains being routed from, and the egress
/// queue roots to consider.
pub fn ingress(to: ParaId) -> Option<Vec<(ParaId, Hash)>> {
let active_parachains = Self::active_parachains();
if !active_parachains.contains(&to) { return None }
/// Yields a structure containing all unrouted ingress to the parachain.
pub fn ingress(to: ParaId) -> Option<Vec<(T::BlockNumber, BlockIngressRoots)>> {
let watermark = <Watermarks<T>>::get(to)?;
let now = <system::Module<T>>::block_number();
Some(active_parachains.into_iter().filter(|i| i != &to)
.filter_map(move |from| {
<Routing<T>>::get((from, to.clone())).map(move |h| (from, h))
Some(number_range(watermark.saturating_add(One::one()),now)
.filter_map(|unrouted_height| {
<UnroutedIngress<T>>::get(&(unrouted_height, to)).map(|roots| {
(unrouted_height, BlockIngressRoots(roots))
})
})
.collect())
}
@@ -1299,6 +1389,8 @@ mod tests {
#[test]
fn ingress_works() {
use sr_primitives::traits::OnFinalize;
let parachains = vec![
(0u32.into(), vec![], vec![]),
(1u32.into(), vec![], vec![]),
@@ -1306,62 +1398,115 @@ mod tests {
];
with_externalities(&mut new_test_ext(parachains), || {
let from_a = vec![(1.into(), [1; 32].into())];
let mut candidate_a = AttestedCandidate {
validity_votes: vec![],
candidate: CandidateReceipt {
parachain_index: 0.into(),
collator: Default::default(),
signature: Default::default(),
head_data: HeadData(vec![1, 2, 3]),
egress_queue_roots: from_a.clone(),
fees: 0,
block_data_hash: Default::default(),
upward_messages: vec![],
}
};
let from_b = vec![(99.into(), [1; 32].into())];
let mut candidate_b = AttestedCandidate {
validity_votes: vec![],
candidate: CandidateReceipt {
parachain_index: 1.into(),
collator: Default::default(),
signature: Default::default(),
head_data: HeadData(vec![1, 2, 3]),
egress_queue_roots: from_b.clone(),
fees: 0,
block_data_hash: Default::default(),
upward_messages: vec![],
}
};
make_attestations(&mut candidate_a);
make_attestations(&mut candidate_b);
assert_eq!(Parachains::ingress(ParaId::from(1)), Some(Vec::new()));
assert_eq!(Parachains::ingress(ParaId::from(99)), Some(Vec::new()));
for i in 1..10 {
System::set_block_number(i);
let from_a = vec![(1.into(), [i as u8; 32].into())];
let mut candidate_a = AttestedCandidate {
validity_votes: vec![],
candidate: CandidateReceipt {
parachain_index: 0.into(),
collator: Default::default(),
signature: Default::default(),
head_data: HeadData(vec![1, 2, 3]),
egress_queue_roots: from_a.clone(),
fees: 0,
block_data_hash: Default::default(),
upward_messages: vec![],
}
};
let from_b = vec![(99.into(), [i as u8; 32].into())];
let mut candidate_b = AttestedCandidate {
validity_votes: vec![],
candidate: CandidateReceipt {
parachain_index: 1.into(),
collator: Default::default(),
signature: Default::default(),
head_data: HeadData(vec![1, 2, 3]),
egress_queue_roots: from_b.clone(),
fees: 0,
block_data_hash: Default::default(),
upward_messages: vec![],
}
};
make_attestations(&mut candidate_a);
make_attestations(&mut candidate_b);
assert!(Parachains::dispatch(
set_heads(vec![candidate_a, candidate_b]),
Origin::NONE,
).is_ok());
Parachains::on_finalize(i);
}
System::set_block_number(10);
assert!(Parachains::dispatch(
set_heads(vec![candidate_a, candidate_b]),
set_heads(vec![]),
Origin::NONE,
).is_ok());
// parachain 1 has had a bunch of parachain candidates included,
// which raises the watermark.
assert_eq!(
Parachains::ingress(ParaId::from(1)),
Some(vec![(0.into(), [1; 32].into())]),
Some(vec![
(9, BlockIngressRoots(vec![
(0.into(), [9; 32].into())
]))
]),
);
// parachain 99 hasn't had any candidates included, so the
// ingress is piling up.
assert_eq!(
Parachains::ingress(ParaId::from(99)),
Some(vec![(1.into(), [1; 32].into())]),
Some((1..10).map(|i| (i, BlockIngressRoots(
vec![(1.into(), [i as u8; 32].into())]
))).collect::<Vec<_>>()),
);
assert_ok!(Parachains::deregister_parachain(1u32.into()));
// after deregistering, there is no ingress to 1 and we stop routing
// from 1.
// after deregistering, there is no ingress to 1, but unrouted messages
// from 1 stick around.
assert_eq!(Parachains::ingress(ParaId::from(1)), None);
assert_eq!(Parachains::ingress(ParaId::from(99)), Some((1..10).map(|i| (i, BlockIngressRoots(
vec![(1.into(), [i as u8; 32].into())]
))).collect::<Vec<_>>()));
Parachains::on_finalize(10);
System::set_block_number(11);
let mut candidate_c = AttestedCandidate {
validity_votes: vec![],
candidate: CandidateReceipt {
parachain_index: 99.into(),
collator: Default::default(),
signature: Default::default(),
head_data: HeadData(vec![1, 2, 3]),
egress_queue_roots: Vec::new(),
fees: 0,
block_data_hash: Default::default(),
upward_messages: vec![],
}
};
make_attestations(&mut candidate_c);
assert!(Parachains::dispatch(
set_heads(vec![candidate_c]),
Origin::NONE,
).is_ok());
Parachains::on_finalize(11);
System::set_block_number(12);
// at the next block, ingress to 99 should be empty.
assert_eq!(Parachains::ingress(ParaId::from(99)), Some(Vec::new()));
});
}