Approval Checking Improvements Omnibus (#2480)

* add tracing to approval voting

* notify if session info is not working

* add dispute period to chain specs

* propagate genesis session to parachains runtime

* use `on_genesis_session`

* protect against zero cores in computation

* tweak voting rule to be based off of best and add logs

* genesis configuration should use VRF slots only

* swallow more keystore errors

* add some docs

* make validation-worker args non-optional and update clap

* better tracing for bitfield signing and provisioner

* pass amount of bits in bitfields to inclusion instead of recomputing

* debug -> warn for some logs

* better tracing for availability recovery

* a little av-store tracing

* bridge: forward availability recovery messages

* add missing try_from impl

* some more tracing

* improve approval distribution tracing

* guide: hold onto pending approval messages until NewBlocks

* Hold onto pending approval messages until NewBlocks

* guide: adjust comment

* process all actions for one wakeup at a time

* vec

* fix network bridge test

* replace randomness-collective-flip with Babe

* remove PairNotFound
This commit is contained in:
Robert Habermeier
2021-02-23 14:12:28 -06:00
committed by GitHub
parent 3c4ed7b234
commit 3300b53306
27 changed files with 647 additions and 132 deletions
+2 -2
View File
@@ -787,9 +787,9 @@ dependencies = [
[[package]]
name = "clap"
version = "2.33.1"
version = "2.33.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bdfa80d47f954d53a35a64987ca1422f495b8d6483c0fe9f7117b36c2a792129"
checksum = "37e58ac78573c40708d45522f0d80fa2f01cc4f9b4e2bf749807255454312002"
dependencies = [
"ansi_term 0.11.0",
"atty",
+2 -2
View File
@@ -60,8 +60,8 @@ pub enum Subcommand {
#[allow(missing_docs)]
#[derive(Debug, StructOpt)]
pub struct ValidationWorkerCommand {
/// The path that the executor can use for it's caching purposes.
pub cache_base_path: Option<std::path::PathBuf>,
/// The path that the executor can use for its caching purposes.
pub cache_base_path: std::path::PathBuf,
#[allow(missing_docs)]
pub mem_id: String,
+1 -1
View File
@@ -259,7 +259,7 @@ pub fn run() -> Result<()> {
#[cfg(not(any(target_os = "android", feature = "browser")))]
polkadot_parachain::wasm_executor::run_worker(
&cmd.mem_id,
cmd.cache_base_path.clone(),
Some(cmd.cache_base_path.clone()),
)?;
Ok(())
}
@@ -246,11 +246,17 @@ pub(crate) fn compute_assignments(
config: &Config,
leaving_cores: impl IntoIterator<Item = (CoreIndex, GroupIndex)> + Clone,
) -> HashMap<CoreIndex, OurAssignment> {
if config.n_cores == 0 || config.assignment_keys.is_empty() || config.validator_groups.is_empty() {
return HashMap::new()
}
let (index, assignments_key): (ValidatorIndex, AssignmentPair) = {
let key = config.assignment_keys.iter().enumerate()
.find_map(|(i, p)| match keystore.key_pair(p) {
Ok(Some(pair)) => Some((i as ValidatorIndex, pair)),
Ok(None) => None,
Err(sc_keystore::Error::Unavailable) => None,
Err(sc_keystore::Error::Io(e)) if e.kind() == std::io::ErrorKind::NotFound => None,
Err(e) => {
tracing::warn!(target: LOG_TARGET, "Encountered keystore error: {:?}", e);
None
@@ -608,6 +614,34 @@ mod tests {
assert!(assignments.get(&CoreIndex(1)).is_some());
}
#[test]
fn succeeds_empty_for_0_cores() {
let keystore = futures::executor::block_on(
make_keystore(&[Sr25519Keyring::Alice])
);
let relay_vrf_story = RelayVRFStory([42u8; 32]);
let assignments = compute_assignments(
&keystore,
relay_vrf_story,
&Config {
assignment_keys: assignment_keys(&[
Sr25519Keyring::Alice,
Sr25519Keyring::Bob,
Sr25519Keyring::Charlie,
]),
validator_groups: vec![],
n_cores: 0,
zeroth_delay_tranche_width: 10,
relay_vrf_modulo_samples: 3,
n_delay_tranches: 40,
},
vec![],
);
assert!(assignments.is_empty());
}
struct MutatedAssignment {
core: CoreIndex,
cert: AssignmentCert,
@@ -206,7 +206,16 @@ async fn load_all_sessions(
let session_info = match rx.await {
Ok(Ok(Some(s))) => s,
Ok(Ok(None)) => return Ok(None),
Ok(Ok(None)) => {
tracing::warn!(
target: LOG_TARGET,
"Session {} is missing from session-info state of block {}",
i,
block_hash,
);
return Ok(None);
}
Ok(Err(e)) => return Err(SubsystemError::with_origin("approval-voting", e)),
Err(e) => return Err(SubsystemError::with_origin("approval-voting", e)),
};
+191 -47
View File
@@ -54,7 +54,6 @@ use futures::channel::{mpsc, oneshot};
use std::collections::{BTreeMap, HashMap};
use std::collections::btree_map::Entry;
use std::sync::Arc;
use std::ops::{RangeBounds, Bound as RangeBound};
use approval_checking::RequiredTranches;
use persisted_entries::{ApprovalEntry, CandidateEntry, BlockEntry};
@@ -88,6 +87,9 @@ pub struct Config {
/// The approval voting subsystem.
pub struct ApprovalVotingSubsystem {
/// LocalKeystore is needed for assignment keys, but not necessarily approval keys.
///
/// We do a lot of VRF signing and need the keys to have low latency.
keystore: Arc<LocalKeystore>,
slot_duration_millis: u64,
db: Arc<dyn KeyValueDB>,
@@ -190,25 +192,29 @@ impl Wakeups {
self.wakeups.entry(tick).or_default().push((block_hash, candidate_hash));
}
// drains all wakeups within the given range.
// panics if the given range is empty.
//
// only looks at the end bound of the range.
fn drain<'a, R: RangeBounds<Tick>>(&'a mut self, range: R)
-> impl Iterator<Item = (Hash, CandidateHash)> + 'a
{
let reverse = &mut self.reverse_wakeups;
// Returns the next wakeup. this future never returns if there are no wakeups.
async fn next(&mut self, clock: &(dyn Clock + Sync)) -> (Hash, CandidateHash) {
match self.first() {
None => future::pending().await,
Some(tick) => {
clock.wait(tick).await;
match self.wakeups.entry(tick) {
Entry::Vacant(_) => panic!("entry is known to exist since `first` was `Some`; qed"),
Entry::Occupied(mut entry) => {
let (hash, candidate_hash) = entry.get_mut().pop()
.expect("empty entries are removed here and in `schedule`; no other mutation of this map; qed");
// BTreeMap has no `drain` method :(
let after = match range.end_bound() {
RangeBound::Unbounded => BTreeMap::new(),
RangeBound::Included(last) => self.wakeups.split_off(&(last + 1)),
RangeBound::Excluded(last) => self.wakeups.split_off(&last),
};
let prev = std::mem::replace(&mut self.wakeups, after);
prev.into_iter()
.flat_map(|(_, wakeup)| wakeup)
.inspect(move |&(ref b, ref c)| { let _ = reverse.remove(&(*b, *c)); })
if entry.get().is_empty() {
let _ = entry.remove();
}
self.reverse_wakeups.remove(&(hash, candidate_hash));
(hash, candidate_hash)
}
}
}
}
}
}
@@ -323,28 +329,13 @@ async fn run<C>(
let db_writer = &*subsystem.db;
loop {
let wait_til_next_tick = match wakeups.first() {
None => future::Either::Left(future::pending()),
Some(tick) => future::Either::Right(
state.clock.wait(tick).map(move |()| tick)
),
};
futures::pin_mut!(wait_til_next_tick);
let actions = futures::select! {
tick_wakeup = wait_til_next_tick.fuse() => {
let woken = wakeups.drain(..=tick_wakeup).collect::<Vec<_>>();
let mut actions = Vec::new();
for (woken_block, woken_candidate) in woken {
actions.extend(process_wakeup(
&mut state,
woken_block,
woken_candidate,
)?);
}
actions
(woken_block, woken_candidate) = wakeups.next(&*state.clock).fuse() => {
process_wakeup(
&mut state,
woken_block,
woken_candidate,
)?
}
next_msg = ctx.recv().fuse() => {
handle_from_overseer(
@@ -467,19 +458,36 @@ async fn handle_from_overseer(
Ok(block_imported_candidates) => {
// Schedule wakeups for all imported candidates.
for block_batch in block_imported_candidates {
tracing::debug!(
target: LOG_TARGET,
"Imported new block {} with {} included candidates",
block_batch.block_hash,
block_batch.imported_candidates.len(),
);
for (c_hash, c_entry) in block_batch.imported_candidates {
let our_tranche = c_entry
.approval_entry(&block_batch.block_hash)
.and_then(|a| a.our_assignment().map(|a| a.tranche()));
if let Some(our_tranche) = our_tranche {
let tick = our_tranche as Tick + block_batch.block_tick;
tracing::trace!(
target: LOG_TARGET,
"Scheduling first wakeup at tranche {} for candidate {} in block ({}, tick={})",
our_tranche,
c_hash,
block_batch.block_hash,
block_batch.block_tick,
);
// Our first wakeup will just be the tranche of our assignment,
// if any. This will likely be superseded by incoming assignments
// and approvals which trigger rescheduling.
actions.push(Action::ScheduleWakeup {
block_hash: block_batch.block_hash,
candidate_hash: c_hash,
tick: our_tranche as Tick + block_batch.block_tick,
tick,
});
}
}
@@ -564,6 +572,10 @@ async fn handle_approved_ancestor(
target: Hash,
lower_bound: BlockNumber,
) -> SubsystemResult<Option<(Hash, BlockNumber)>> {
const MAX_TRACING_WINDOW: usize = 200;
use bitvec::{order::Lsb0, vec::BitVec};
let mut all_approved_max = None;
let target_number = {
@@ -600,15 +612,29 @@ async fn handle_approved_ancestor(
Vec::new()
};
let mut bits: BitVec<Lsb0, u8> = Default::default();
for (i, block_hash) in std::iter::once(target).chain(ancestry).enumerate() {
// Block entries should be present as the assumption is that
// nothing here is finalized. If we encounter any missing block
// entries we can fail.
let entry = match db.load_block_entry(&block_hash)? {
None => return Ok(None),
None => {
tracing::trace!{
target: LOG_TARGET,
"Chain between ({}, {}) and {} not fully known. Forcing vote on {}",
target,
target_number,
lower_bound,
lower_bound,
}
return Ok(None);
}
Some(b) => b,
};
// even if traversing millions of blocks this is fairly cheap and always dwarfed by the
// disk lookups.
bits.push(entry.is_fully_approved());
if entry.is_fully_approved() {
if all_approved_max.is_none() {
// First iteration of the loop is target, i = 0. After that,
@@ -620,6 +646,32 @@ async fn handle_approved_ancestor(
}
}
tracing::trace!(
target: LOG_TARGET,
"approved blocks {}-[{}]-{}",
target_number,
{
// formatting to divide bits by groups of 10.
// when comparing logs on multiple machines where the exact vote
// targets may differ, this grouping is useful.
let mut s = String::with_capacity(bits.len());
for (i, bit) in bits.iter().enumerate().take(MAX_TRACING_WINDOW) {
s.push(if *bit { '1' } else { '0' });
if (target_number - i as u32) % 10 == 0 && i != bits.len() - 1 { s.push(' '); }
}
s
},
if bits.len() > MAX_TRACING_WINDOW {
format!(
"{}... (truncated due to large window)",
target_number - MAX_TRACING_WINDOW as u32 + 1,
)
} else {
format!("{}", lower_bound + 1)
},
);
Ok(all_approved_max)
}
@@ -649,11 +701,8 @@ fn schedule_wakeup_action(
block_tick: Tick,
required_tranches: RequiredTranches,
) -> Option<Action> {
if approval_entry.is_approved() {
return None
}
match required_tranches {
let maybe_action = match required_tranches {
_ if approval_entry.is_approved() => None,
RequiredTranches::All => None,
RequiredTranches::Exact { next_no_show, .. } => next_no_show.map(|tick| Action::ScheduleWakeup {
block_hash,
@@ -686,7 +735,28 @@ fn schedule_wakeup_action(
min_prefer_some(next_non_empty_tranche, next_no_show)
.map(|tick| Action::ScheduleWakeup { block_hash, candidate_hash, tick })
}
};
match maybe_action {
Some(Action::ScheduleWakeup { ref tick, .. }) => tracing::debug!(
target: LOG_TARGET,
"Scheduling next wakeup at {} for candidate {} under block ({}, tick={})",
tick,
candidate_hash,
block_hash,
block_tick,
),
None => tracing::debug!(
target: LOG_TARGET,
"No wakeup needed for candidate {} under block ({}, tick={})",
candidate_hash,
block_hash,
block_tick,
),
Some(_) => {} // unreachable
}
maybe_action
}
fn check_and_import_assignment(
@@ -773,6 +843,13 @@ fn check_and_import_assignment(
if is_duplicate {
AssignmentCheckResult::AcceptedDuplicate
} else {
tracing::trace!(
target: LOG_TARGET,
"Imported assignment from validator {} on candidate {:?}",
assignment.validator,
(assigned_candidate_hash, candidate_entry.candidate_receipt().descriptor.para_id),
);
AssignmentCheckResult::Accepted
}
};
@@ -867,6 +944,13 @@ fn check_and_import_approval<T>(
// importing the approval can be heavy as it may trigger acceptance for a series of blocks.
let t = with_response(ApprovalCheckResult::Accepted);
tracing::trace!(
target: LOG_TARGET,
"Importing approval vote from validator {:?} on candidate {:?}",
(approval.validator, &pubkey),
(approved_candidate_hash, candidate_entry.candidate_receipt().descriptor.para_id),
);
let actions = import_checked_approval(
state,
Some((approval.block_hash, block_entry)),
@@ -976,6 +1060,13 @@ fn check_and_apply_full_approval(
);
if now_approved {
tracing::trace!(
target: LOG_TARGET,
"Candidate approved {} under block {}",
candidate_hash,
block_hash,
);
newly_approved.push(*block_hash);
block_entry.mark_approved_by_hash(&candidate_hash);
@@ -1072,6 +1163,14 @@ fn process_wakeup(
let tranche_now = state.clock.tranche_now(state.slot_duration_millis, block_entry.slot());
tracing::debug!(
target: LOG_TARGET,
"Processing wakeup at tranche {} for candidate {} under block {}",
tranche_now,
candidate_hash,
relay_block,
);
let (should_trigger, backing_group) = {
let approval_entry = match candidate_entry.approval_entry(&relay_block) {
Some(e) => e,
@@ -1123,6 +1222,13 @@ fn process_wakeup(
.position(|(_, h)| &candidate_hash == h);
if let Some(i) = index_in_candidate {
tracing::debug!(
target: LOG_TARGET,
"Launching approval work for candidate {:?} in block {}",
(&candidate_hash, candidate_entry.candidate_receipt().descriptor.para_id),
relay_block,
);
// sanity: should always be present.
actions.push(Action::LaunchApproval {
indirect_cert,
@@ -1173,6 +1279,14 @@ async fn launch_approval(
let (code_tx, code_rx) = oneshot::channel();
let (context_num_tx, context_num_rx) = oneshot::channel();
let candidate_hash = candidate.hash();
tracing::debug!(
target: LOG_TARGET,
"Recovering data for candidate {:?}",
(candidate_hash, candidate.descriptor.para_id),
);
ctx.send_message(AvailabilityRecoveryMessage::RecoverAvailableData(
candidate.clone(),
session_index,
@@ -1208,10 +1322,21 @@ async fn launch_approval(
Err(_) => return,
Ok(Ok(a)) => a,
Ok(Err(RecoveryError::Unavailable)) => {
tracing::warn!(
target: LOG_TARGET,
"Data unavailable for candidate {:?}",
(candidate_hash, candidate.descriptor.para_id),
);
// do nothing. we'll just be a no-show and that'll cause others to rise up.
return;
}
Ok(Err(RecoveryError::Invalid)) => {
tracing::warn!(
target: LOG_TARGET,
"Data recovery invalid for candidate {:?}",
(candidate_hash, candidate.descriptor.para_id),
);
// TODO: dispute. Either the merkle trie is bad or the erasure root is.
// https://github.com/paritytech/polkadot/issues/2176
return;
@@ -1238,6 +1363,7 @@ async fn launch_approval(
let (val_tx, val_rx) = oneshot::channel();
let para_id = candidate.descriptor.para_id;
let _ = background_tx.send(BackgroundRequest::CandidateValidation(
available_data.validation_data,
validation_code,
@@ -1252,6 +1378,12 @@ async fn launch_approval(
// Validation checked out. Issue an approval command. If the underlying service is unreachable,
// then there isn't anything we can do.
tracing::debug!(
target: LOG_TARGET,
"Candidate Valid {:?}",
(candidate_hash, para_id),
);
let _ = background_tx.send(BackgroundRequest::ApprovalVote(ApprovalVoteRequest {
validator_index,
block_hash,
@@ -1259,6 +1391,12 @@ async fn launch_approval(
})).await;
}
Ok(Ok(ValidationResult::Invalid(_))) => {
tracing::warn!(
target: LOG_TARGET,
"Detected invalid candidate as an approval checker {:?}",
(candidate_hash, para_id),
);
// TODO: issue dispute, but not for timeouts.
// https://github.com/paritytech/polkadot/issues/2176
}
@@ -1358,6 +1496,12 @@ async fn issue_approval(
}
};
tracing::debug!(
target: LOG_TARGET,
"Issuing approval vote for candidate {:?}",
candidate_hash,
);
let actions = import_checked_approval(
state,
Some((block_hash, block_entry)),
+33 -15
View File
@@ -37,7 +37,7 @@ fn slot_to_tick(t: impl Into<Slot>) -> crate::time::Tick {
crate::time::slot_number_to_tick(SLOT_DURATION_MILLIS, t.into())
}
#[derive(Default)]
#[derive(Default, Clone)]
struct MockClock {
inner: Arc<Mutex<MockClockInner>>,
}
@@ -1209,7 +1209,7 @@ fn assignment_not_triggered_if_at_maximum_but_clock_is_before_with_drift() {
}
#[test]
fn wakeups_drain() {
fn wakeups_next() {
let mut wakeups = Wakeups::default();
let b_a = Hash::repeat_byte(0);
@@ -1224,12 +1224,24 @@ fn wakeups_drain() {
assert_eq!(wakeups.first().unwrap(), 1);
assert_eq!(
wakeups.drain(..=3).collect::<Vec<_>>(),
vec![(b_a, c_a), (b_b, c_b)],
);
let clock = MockClock::new(0);
let clock_aux = clock.clone();
assert_eq!(wakeups.first().unwrap(), 4);
let test_fut = Box::pin(async move {
assert_eq!(wakeups.next(&clock).await, (b_a, c_a));
assert_eq!(wakeups.next(&clock).await, (b_b, c_b));
assert_eq!(wakeups.next(&clock).await, (b_a, c_b));
assert!(wakeups.first().is_none());
assert!(wakeups.wakeups.is_empty());
});
let aux_fut = Box::pin(async move {
clock_aux.inner.lock().set_tick(1);
// skip direct set to 3.
clock_aux.inner.lock().set_tick(4);
});
futures::executor::block_on(futures::future::join(test_fut, aux_fut));
}
#[test]
@@ -1243,14 +1255,20 @@ fn wakeup_earlier_supersedes_later() {
wakeups.schedule(b_a, c_a, 2);
wakeups.schedule(b_a, c_a, 3);
assert_eq!(wakeups.first().unwrap(), 2);
let clock = MockClock::new(0);
let clock_aux = clock.clone();
assert_eq!(
wakeups.drain(..=2).collect::<Vec<_>>(),
vec![(b_a, c_a)],
);
let test_fut = Box::pin(async move {
assert_eq!(wakeups.next(&clock).await, (b_a, c_a));
assert!(wakeups.first().is_none());
assert!(wakeups.reverse_wakeups.is_empty());
});
assert!(wakeups.first().is_none());
let aux_fut = Box::pin(async move {
clock_aux.inner.lock().set_tick(2);
});
futures::executor::block_on(futures::future::join(test_fut, aux_fut));
}
#[test]
@@ -1517,7 +1535,7 @@ fn approved_ancestor_all_approved() {
);
});
futures::executor::block_on(futures::future::select(test_fut, aux_fut));
futures::executor::block_on(futures::future::join(test_fut, aux_fut));
}
#[test]
@@ -1599,7 +1617,7 @@ fn approved_ancestor_missing_approval() {
);
});
futures::executor::block_on(futures::future::select(test_fut, aux_fut));
futures::executor::block_on(futures::future::join(test_fut, aux_fut));
}
#[test]
+7
View File
@@ -1104,6 +1104,13 @@ fn store_available_data(
write_available_data(&mut tx, &candidate_hash, &available_data);
subsystem.db.write(tx)?;
tracing::debug!(
target: LOG_TARGET,
"Stored data and chunks for candidate={}",
candidate_hash,
);
Ok(())
}
+11 -5
View File
@@ -71,13 +71,12 @@ pub enum Error {
/// for whether we have the availability chunk for our validator index.
#[tracing::instrument(level = "trace", skip(sender, span), fields(subsystem = LOG_TARGET))]
async fn get_core_availability(
relay_parent: Hash,
core: CoreState,
core: &CoreState,
validator_idx: ValidatorIndex,
sender: &Mutex<&mut mpsc::Sender<FromJobCommand>>,
span: &jaeger::Span,
) -> Result<bool, Error> {
if let CoreState::Occupied(core) = core {
if let &CoreState::Occupied(ref core) = core {
let _span = span.child("query-chunk-availability");
let (tx, rx) = oneshot::channel();
@@ -152,10 +151,17 @@ async fn construct_availability_bitfield(
// Handle all cores concurrently
// `try_join_all` returns all results in the same order as the input futures.
let results = future::try_join_all(
availability_cores.into_iter()
.map(|core| get_core_availability(relay_parent, core, validator_idx, &sender, span)),
availability_cores.iter()
.map(|core| get_core_availability(core, validator_idx, &sender, span)),
).await?;
tracing::debug!(
target: LOG_TARGET,
"Signing Bitfield for {} cores: {:?}",
availability_cores.len(),
results,
);
Ok(AvailabilityBitfield(FromIterator::from_iter(results)))
}
+18 -2
View File
@@ -377,7 +377,7 @@ async fn select_candidates(
}
}
}
_ => continue,
CoreState::Free => continue,
};
let validation_data = match request_persisted_validation_data(
@@ -401,7 +401,16 @@ async fn select_candidates(
descriptor.para_id == scheduled_core.para_id
&& descriptor.persisted_validation_data_hash == computed_validation_data_hash
}) {
selected_candidates.push(candidate.hash());
let candidate_hash = candidate.hash();
tracing::trace!(
target: LOG_TARGET,
"Selecting candidate {}. para_id={} core={}",
candidate_hash,
candidate.descriptor.para_id,
core_idx,
);
selected_candidates.push(candidate_hash);
}
}
@@ -444,6 +453,13 @@ async fn select_candidates(
true
});
tracing::debug!(
target: LOG_TARGET,
"Selected {} candidates for {} cores",
candidates.len(),
availability_cores.len(),
);
Ok(candidates)
}
@@ -70,6 +70,14 @@ struct State {
blocks_by_number: BTreeMap<BlockNumber, Vec<Hash>>,
blocks: HashMap<Hash, BlockEntry>,
/// Our view updates to our peers can race with `NewBlocks` updates. We store messages received
/// against the directly mentioned blocks in our view in this map until `NewBlocks` is received.
///
/// As long as the parent is already in the `blocks` map and `NewBlocks` messages aren't delayed
/// by more than a block length, this strategy will work well for mitigating the race. This is
/// also a race that occurs typically on local networks.
pending_known: HashMap<Hash, Vec<(PeerId, PendingMessage)>>,
/// Peer view data is partially stored here, and partially inline within the [`BlockEntry`]s
peer_views: HashMap<PeerId, View>,
}
@@ -129,6 +137,11 @@ impl MessageSource {
}
}
enum PendingMessage {
Assignment(IndirectAssignmentCert, CandidateIndex),
Approval(IndirectSignedApprovalVote),
}
impl State {
async fn handle_network_msg(
&mut self,
@@ -150,8 +163,14 @@ impl State {
NetworkBridgeEvent::PeerViewChange(peer_id, view) => {
self.handle_peer_view_change(ctx, peer_id, view).await;
}
NetworkBridgeEvent::OurViewChange(_view) => {
// handled by `BlockFinalized` notification
NetworkBridgeEvent::OurViewChange(view) => {
for head in &view.heads {
if !self.blocks.contains_key(head) {
self.pending_known.entry(*head).or_default();
}
}
self.pending_known.retain(|h, _| view.contains(h));
}
NetworkBridgeEvent::PeerMessage(peer_id, msg) => {
self.process_incoming_peer_message(ctx, metrics, peer_id, msg).await;
@@ -162,10 +181,11 @@ impl State {
async fn handle_new_blocks(
&mut self,
ctx: &mut impl SubsystemContext<Message = ApprovalDistributionMessage>,
metrics: &Metrics,
metas: Vec<BlockApprovalMeta>,
) {
let mut new_hashes = HashSet::new();
for meta in metas.into_iter() {
for meta in &metas {
match self.blocks.entry(meta.hash.clone()) {
hash_map::Entry::Vacant(entry) => {
let candidates_count = meta.candidates.len();
@@ -185,6 +205,47 @@ impl State {
}
self.blocks_by_number.entry(meta.number).or_default().push(meta.hash);
}
tracing::debug!(
target: LOG_TARGET,
"Got new blocks {:?}",
metas.iter().map(|m| (m.hash, m.number)).collect::<Vec<_>>(),
);
{
let pending_now_known = self.pending_known.keys()
.filter(|k| self.blocks.contains_key(k))
.copied()
.collect::<Vec<_>>();
let to_import = pending_now_known.into_iter()
.filter_map(|k| self.pending_known.remove(&k))
.flatten()
.collect::<Vec<_>>();
for (peer_id, message) in to_import {
match message {
PendingMessage::Assignment(assignment, claimed_index) => {
self.import_and_circulate_assignment(
ctx,
metrics,
MessageSource::Peer(peer_id),
assignment,
claimed_index,
).await;
}
PendingMessage::Approval(approval_vote) => {
self.import_and_circulate_approval(
ctx,
metrics,
MessageSource::Peer(peer_id),
approval_vote,
).await;
}
}
}
}
for (peer_id, view) in self.peer_views.iter() {
let intersection = view.iter().filter(|h| new_hashes.contains(h));
let view_intersection = View::new(
@@ -216,6 +277,15 @@ impl State {
"Processing assignments from a peer",
);
for (assignment, claimed_index) in assignments.into_iter() {
if let Some(pending) = self.pending_known.get_mut(&assignment.block_hash) {
pending.push((
peer_id.clone(),
PendingMessage::Assignment(assignment, claimed_index),
));
continue;
}
self.import_and_circulate_assignment(
ctx,
metrics,
@@ -233,6 +303,15 @@ impl State {
"Processing approvals from a peer",
);
for approval_vote in approvals.into_iter() {
if let Some(pending) = self.pending_known.get_mut(&approval_vote.block_hash) {
pending.push((
peer_id.clone(),
PendingMessage::Approval(approval_vote),
));
continue;
}
self.import_and_circulate_approval(
ctx,
metrics,
@@ -446,6 +525,14 @@ impl State {
}
if !peers.is_empty() {
tracing::trace!(
target: LOG_TARGET,
"Sending assignment (block={}, index={})to {} peers",
block_hash,
claimed_candidate_index,
peers.len(),
);
ctx.send_message(NetworkBridgeMessage::SendValidationMessage(
peers,
protocol_v1::ValidationProtocol::ApprovalDistribution(
@@ -616,6 +703,14 @@ impl State {
let approvals = vec![vote];
if !peers.is_empty() {
tracing::trace!(
target: LOG_TARGET,
"Sending approval (block={}, index={})to {} peers",
block_hash,
candidate_index,
peers.len(),
);
ctx.send_message(NetworkBridgeMessage::SendValidationMessage(
peers,
protocol_v1::ValidationProtocol::ApprovalDistribution(
@@ -681,6 +776,14 @@ impl State {
Some(entry) => entry,
None => continue, // should be unreachable
};
tracing::trace!(
target: LOG_TARGET,
"Sending all assignments and approvals in block {} to peer {}",
block,
peer_id,
);
for (candidate_index, candidate_entry) in entry.candidates.iter().enumerate() {
let candidate_index = candidate_index as u32;
for (validator_index, approval_state) in candidate_entry.approvals.iter() {
@@ -785,12 +888,18 @@ impl ApprovalDistribution {
msg: ApprovalDistributionMessage::NewBlocks(metas),
} => {
tracing::debug!(target: LOG_TARGET, "Processing NewBlocks");
state.handle_new_blocks(&mut ctx, metas).await;
state.handle_new_blocks(&mut ctx, &self.metrics, metas).await;
}
FromOverseer::Communication {
msg: ApprovalDistributionMessage::DistributeAssignment(cert, candidate_index),
} => {
tracing::debug!(target: LOG_TARGET, "Processing DistributeAssignment");
tracing::debug!(
target: LOG_TARGET,
"Distributing our assignment on candidate (block={}, index={})",
cert.block_hash,
candidate_index,
);
state.import_and_circulate_assignment(
&mut ctx,
&self.metrics,
@@ -802,7 +911,13 @@ impl ApprovalDistribution {
FromOverseer::Communication {
msg: ApprovalDistributionMessage::DistributeApproval(vote),
} => {
tracing::debug!(target: LOG_TARGET, "Processing DistributeApproval");
tracing::debug!(
target: LOG_TARGET,
"Distributing our approval vote on candidate (block={}, index={})",
vote.block_hash,
vote.candidate_index,
);
state.import_and_circulate_approval(
&mut ctx,
&self.metrics,
@@ -846,6 +846,15 @@ async fn handle_network_update(
// message.
let chunk = query_chunk(ctx, candidate_hash, validator_index).await?;
tracing::trace!(
target: LOG_TARGET,
"Responding({}) to chunk request req_id={} candidate={} index={}",
chunk.is_some(),
request_id,
candidate_hash,
validator_index,
);
// Whatever the result, issue an
// AvailabilityRecoveryV1Message::Chunk(r_id, response) message.
let wire_message = protocol_v1::AvailabilityRecoveryMessage::Chunk(
@@ -867,6 +876,15 @@ async fn handle_network_update(
report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await;
}
Some((peer_id, Awaited::Chunk(awaited_chunk))) if peer_id == peer => {
tracing::trace!(
target: LOG_TARGET,
"Received chunk response({}) req_id={} candidate={} index={}",
chunk.is_some(),
request_id,
awaited_chunk.candidate_hash,
awaited_chunk.validator_index,
);
// If there exists an entry under r_id, remove it.
// Send the chunk response on the awaited_chunk for the interaction to handle.
if let Some(chunk) = chunk {
@@ -897,6 +915,14 @@ async fn handle_network_update(
// message.
let full_data = query_full_data(ctx, candidate_hash).await?;
tracing::trace!(
target: LOG_TARGET,
"Responding({}) to full data request req_id={} candidate={}",
full_data.is_some(),
request_id,
candidate_hash,
);
// Whatever the result, issue an
// AvailabilityRecoveryV1Message::FullData(r_id, response) message.
let wire_message = protocol_v1::AvailabilityRecoveryMessage::FullData(
@@ -918,6 +944,14 @@ async fn handle_network_update(
report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await;
}
Some((peer_id, Awaited::FullData(awaited))) if peer_id == peer => {
tracing::trace!(
target: LOG_TARGET,
"Received full data response({}) req_id={} candidate={}",
data.is_some(),
request_id,
awaited.candidate_hash,
);
// If there exists an entry under r_id, remove it.
// Send the response on the awaited for the interaction to handle.
if let Some(data) = data {
@@ -962,17 +996,41 @@ async fn issue_request(
state.next_request_id += 1;
let wire_message = match awaited {
Awaited::Chunk(ref awaited_chunk) => protocol_v1::AvailabilityRecoveryMessage::RequestChunk(
request_id,
awaited_chunk.candidate_hash,
awaited_chunk.validator_index,
),
Awaited::FullData(ref awaited_data) => protocol_v1::AvailabilityRecoveryMessage::RequestFullData(
request_id,
awaited_data.candidate_hash,
),
Awaited::Chunk(ref awaited_chunk) => {
tracing::trace!(
target: LOG_TARGET,
"Requesting chunk req_id={} peer_id={} candidate={} index={}",
request_id,
peer_id,
awaited_chunk.candidate_hash,
awaited_chunk.validator_index,
);
protocol_v1::AvailabilityRecoveryMessage::RequestChunk(
request_id,
awaited_chunk.candidate_hash,
awaited_chunk.validator_index,
)
}
Awaited::FullData(ref awaited_data) => {
tracing::trace!(
target: LOG_TARGET,
"Requesting full data req_id={} peer_id={} candidate={} index={}",
request_id,
peer_id,
awaited_data.candidate_hash,
awaited_data.validator_index,
);
protocol_v1::AvailabilityRecoveryMessage::RequestFullData(
request_id,
awaited_data.candidate_hash,
)
}
};
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(
vec![peer_id.clone()],
+14 -2
View File
@@ -31,6 +31,7 @@ use polkadot_subsystem::messages::{
NetworkBridgeMessage, AllMessages, AvailabilityDistributionMessage,
BitfieldDistributionMessage, PoVDistributionMessage, StatementDistributionMessage,
CollatorProtocolMessage, ApprovalDistributionMessage, NetworkBridgeEvent,
AvailabilityRecoveryMessage,
};
use polkadot_primitives::v1::{Hash, BlockNumber};
use polkadot_node_network_protocol::{
@@ -565,7 +566,7 @@ async fn dispatch_validation_events_to_all<I>(
I::IntoIter: Send,
{
let messages_for = |event: NetworkBridgeEvent<protocol_v1::ValidationProtocol>| {
let a = std::iter::once(event.focus().ok().map(|m| AllMessages::AvailabilityDistribution(
let av_d = std::iter::once(event.focus().ok().map(|m| AllMessages::AvailabilityDistribution(
AvailabilityDistributionMessage::NetworkBridgeUpdateV1(m)
)));
@@ -585,7 +586,11 @@ async fn dispatch_validation_events_to_all<I>(
ApprovalDistributionMessage::NetworkBridgeUpdateV1(m)
)));
a.chain(b).chain(p).chain(s).chain(ap).filter_map(|x| x)
let av_r = std::iter::once(event.focus().ok().map(|m| AllMessages::AvailabilityRecovery(
AvailabilityRecoveryMessage::NetworkBridgeUpdateV1(m)
)));
av_d.chain(b).chain(p).chain(s).chain(ap).chain(av_r).filter_map(|x| x)
};
ctx.send_messages(events.into_iter().flat_map(messages_for)).await
@@ -847,6 +852,13 @@ mod tests {
ApprovalDistributionMessage::NetworkBridgeUpdateV1(e)
) if e == event.focus().expect("could not focus message")
);
assert_matches!(
virtual_overseer.recv().await,
AllMessages::AvailabilityRecovery(
AvailabilityRecoveryMessage::NetworkBridgeUpdateV1(e)
) if e == event.focus().expect("could not focus message")
);
}
async fn assert_sends_collation_event_to_all(
@@ -481,6 +481,7 @@ pub mod v1 {
impl_try_from!(ValidationProtocol, PoVDistribution, PoVDistributionMessage);
impl_try_from!(ValidationProtocol, StatementDistribution, StatementDistributionMessage);
impl_try_from!(ValidationProtocol, ApprovalDistribution, ApprovalDistributionMessage);
impl_try_from!(ValidationProtocol, AvailabilityRecovery, AvailabilityRecoveryMessage);
/// All network messages on the collation peer-set.
#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)]
+2
View File
@@ -892,6 +892,7 @@ fn rococo_staging_testnet_config_genesis(wasm_binary: &[u8]) -> rococo_runtime::
hrmp_max_parachain_outbound_channels: 4,
hrmp_max_parathread_outbound_channels: 4,
hrmp_max_message_num_per_candidate: 5,
dispute_period: 6,
no_show_slots: 2,
n_delay_tranches: 25,
needed_approvals: 2,
@@ -1402,6 +1403,7 @@ pub fn rococo_testnet_genesis(
hrmp_max_parachain_outbound_channels: 4,
hrmp_max_parathread_outbound_channels: 4,
hrmp_max_message_num_per_candidate: 5,
dispute_period: 6,
no_show_slots: 2,
n_delay_tranches: 25,
needed_approvals: 2,
+22 -7
View File
@@ -79,7 +79,7 @@ impl<B> grandpa::VotingRule<PolkadotBlock, B> for ApprovalCheckingDiagnostic
&self,
backend: Arc<B>,
base: &PolkadotHeader,
_best_target: &PolkadotHeader,
best_target: &PolkadotHeader,
current_target: &PolkadotHeader,
) -> grandpa::VotingRuleResult<PolkadotBlock> {
// always wait 50 blocks behind the head to finalize.
@@ -109,8 +109,16 @@ impl<B> grandpa::VotingRule<PolkadotBlock, B> for ApprovalCheckingDiagnostic
}
};
// delay blocks behind the head, but make sure we're not ahead of the current
// target.
let target_number = std::cmp::min(
best_target.number().saturating_sub(DIAGNOSTIC_GRANDPA_DELAY),
current_target.number().clone(),
);
// don't go below base
let target_number = std::cmp::max(
current_target.number().saturating_sub(DIAGNOSTIC_GRANDPA_DELAY),
target_number,
base.number().clone(),
);
@@ -123,8 +131,8 @@ impl<B> grandpa::VotingRule<PolkadotBlock, B> for ApprovalCheckingDiagnostic
let mut overseer = self.overseer.clone();
let checking_lag = self.checking_lag.clone();
let current_hash = current_target.hash();
let current_number = current_target.number.clone();
let best_hash = best_target.hash();
let best_number = best_target.number.clone();
let base_number = base.number;
@@ -132,7 +140,7 @@ impl<B> grandpa::VotingRule<PolkadotBlock, B> for ApprovalCheckingDiagnostic
let (tx, rx) = oneshot::channel();
let approval_checking_subsystem_vote = {
overseer.send_msg(ApprovalVotingMessage::ApprovedAncestor(
current_hash,
best_hash,
base_number,
tx,
)).await;
@@ -141,14 +149,21 @@ impl<B> grandpa::VotingRule<PolkadotBlock, B> for ApprovalCheckingDiagnostic
};
let approval_checking_subsystem_lag = approval_checking_subsystem_vote.map_or(
current_number - base_number,
|(_h, n)| current_number - n,
best_number - base_number,
|(_h, n)| best_number - n,
);
if let Some(ref checking_lag) = checking_lag {
checking_lag.observe(approval_checking_subsystem_lag as _);
}
tracing::debug!(
target: "approval_voting",
"GRANDPA: voting on {:?}. Approval-checking lag behind best is {}",
actual_vote_target,
approval_checking_subsystem_lag,
);
actual_vote_target
})
}
@@ -42,6 +42,11 @@ Output:
```rust
type BlockScopedCandidate = (Hash, CandidateHash);
enum PendingMessage {
Assignment(IndirectAssignmentCert, CoreIndex),
Approval(IndirectSignedApprovalVote),
}
/// The `State` struct is responsible for tracking the overall state of the subsystem.
///
/// It tracks metadata about our view of the unfinalized chain, which assignments and approvals we have seen, and our peers' views.
@@ -50,6 +55,14 @@ struct State {
blocks_by_number: BTreeMap<BlockNumber, Vec<Hash>>,
blocks: HashMap<Hash, BlockEntry>,
/// Our view updates to our peers can race with `NewBlocks` updates. We store messages received
/// against the directly mentioned blocks in our view in this map until `NewBlocks` is received.
///
/// As long as the parent is already in the `blocks` map and `NewBlocks` messages aren't delayed
/// by more than a block length, this strategy will work well for mitigating the race. This is
/// also a race that occurs typically on local networks.
pending_known: HashMap<Hash, Vec<(PeerId, PendingMessage>)>>,
// Peer view data is partially stored here, and partially inline within the `BlockEntry`s
peer_views: HashMap<PeerId, View>,
}
@@ -102,6 +115,11 @@ Remove the view under the associated `PeerId` from `State::peer_views`.
Iterate over every `BlockEntry` and remove `PeerId` from it.
#### `NetworkBridgeEvent::OurViewChange`
Remove entries in `pending_known` for all hashes not present in the view.
Ensure a vector is present in `pending_known` for each hash in the view that does not have an entry in `blocks`.
#### `NetworkBridgeEvent::PeerViewChange`
Invoke `unify_with_peer(peer, view)` to catch them up to messages we have.
@@ -116,6 +134,8 @@ From there, we can loop backwards from `constrain(view.finalized_number)` until
#### `NetworkBridgeEvent::PeerMessage`
If the block hash referenced by the message exists in `pending_known`, add it to the vector of pending messages and return.
If the message is of type `ApprovalDistributionV1Message::Assignment(assignment_cert, claimed_index)`, then call `import_and_circulate_assignment(MessageSource::Peer(sender), assignment_cert, claimed_index)`
If the message is of type `ApprovalDistributionV1Message::Approval(approval_vote)`, then call `import_and_circulate_approval(MessageSource::Peer(sender), approval_vote)`
@@ -126,6 +146,9 @@ If the message is of type `ApprovalDistributionV1Message::Approval(approval_vote
Create `BlockEntry` and `CandidateEntries` for all blocks.
For all entries in `pending_known`:
* If there is now an entry under `blocks` for the block hash, drain all messages and import with `import_and_circulate_assignment` and `import_and_circulate_approval`.
For all peers:
* Compute `view_intersection` as the intersection of the peer's view blocks with the hashes of the new blocks.
* Invoke `unify_with_peer(peer, view_intersection)`.
@@ -157,8 +180,8 @@ enum MessageSource {
Imports an assignment cert referenced by block hash and candidate index. As a postcondition, if the cert is valid, it will have distributed the cert to all peers who have the block in their view, with the exclusion of the peer referenced by the `MessageSource`.
We maintain a few invariants:
* we only send an assignment to a peer after we add its fingerpring to our knownledge
* we add a fingerprint of an assignment to our knownledge only if it's valid and hasn't been added before
* we only send an assignment to a peer after we add its fingerprint to our knowledge
* we add a fingerprint of an assignment to our knowledge only if it's valid and hasn't been added before
The algorithm is the following:
@@ -167,7 +190,7 @@ The algorithm is the following:
* If the source is `MessageSource::Peer(sender)`:
* check if `peer` appears under `known_by` and whether the fingerprint is in the `known_messages` of the peer. If the peer does not know the block, report for providing data out-of-view and proceed. If the peer does know the block and the knowledge contains the fingerprint, report for providing replicate data and return.
* If the message fingerprint appears under the `BlockEntry`'s `Knowledge`, give the peer a small positive reputation boost,
add the fingerpring to the peer's knownledge only if it knows about the block and return.
add the fingerprint to the peer's knowledge only if it knows about the block and return.
Note that we must do this after checking for out-of-view and if the peers knows about the block to avoid being spammed.
If we did this check earlier, a peer could provide data out-of-view repeatedly and be rewarded for it.
* Dispatch `ApprovalVotingMessage::CheckAndImportAssignment(assignment)` and wait for the response.
@@ -194,7 +217,7 @@ Imports an approval signature referenced by block hash and candidate index:
* If the source is `MessageSource::Peer(sender)`:
* check if `peer` appears under `known_by` and whether the fingerprint is in the `known_messages` of the peer. If the peer does not know the block, report for providing data out-of-view and proceed. If the peer does know the block and the knowledge contains the fingerprint, report for providing replicate data and return.
* If the message fingerprint appears under the `BlockEntry`'s `Knowledge`, give the peer a small positive reputation boost,
add the fingerpring to the peer's knownledge only if it knows about the block and return.
add the fingerprint to the peer's knowledge only if it knows about the block and return.
Note that we must do this after checking for out-of-view to avoid being spammed. If we did this check earlier, a peer could provide data out-of-view repeatedly and be rewarded for it.
* Dispatch `ApprovalVotingMessage::CheckAndImportApproval(approval)` and wait for the response.
* If the result is `VoteCheckResult::Accepted(())`:
@@ -135,7 +135,10 @@ struct State {
session_info: Vec<SessionInfo>,
babe_epoch: Option<BabeEpoch>, // information about a cached BABE epoch.
keystore: KeyStorePtr,
wakeups: BTreeMap<Tick, Vec<(Hash, Hash)>>, // Tick -> [(Relay Block, Candidate Hash)]
// A scheduler which keeps at most one wakeup per hash, candidate hash pair and
// maps such pairs to `Tick`s.
wakeups: Wakeups,
// These are connected to each other.
background_tx: mpsc::Sender<BackgroundRequest>,
@@ -48,8 +48,8 @@ Validators: Vec<ValidatorId>;
All failed checks should lead to an unrecoverable error making the block invalid.
* `process_bitfields(Bitfields, core_lookup: Fn(CoreIndex) -> Option<ParaId>)`:
1. check that the number of bitfields and bits in each bitfield is correct.
* `process_bitfields(expected_bits, Bitfields, core_lookup: Fn(CoreIndex) -> Option<ParaId>)`:
1. check that there is at most 1 bitfield per validator and that the number of bits in each bitfield is equal to expected_bits.
1. check that there are no duplicates
1. check all validator signatures.
1. apply each bit of bitfield to the corresponding pending candidate. looking up parathread cores using the `core_lookup`. Disregard bitfields that have a `1` bit for any free cores.
@@ -27,7 +27,7 @@ Included: Option<()>,
1. Invoke `Disputes::provide_multi_dispute_data`.
1. If `Disputes::is_frozen`, return and set `Included` to `Some(())`.
1. If there are any created disputes from the current session, invoke `Inclusion::collect_disputed` with the disputed candidates. Annotate each returned core with `FreedReason::Concluded`.
1. The `Bitfields` are first forwarded to the `Inclusion::process_bitfields` routine, returning a set of freed cores. Provide a `Scheduler::core_para` as a core-lookup to the `process_bitfields` routine. Annotate each of these freed cores with `FreedReason::Concluded`.
1. The `Bitfields` are first forwarded to the `Inclusion::process_bitfields` routine, returning a set of freed cores. Provide the number of availability cores (`Scheduler::availability_cores().len()`) as the expected number of bits and a `Scheduler::core_para` as a core-lookup to the `process_bitfields` routine. Annotate each of these freed cores with `FreedReason::Concluded`.
1. For each freed candidate from the `Inclusion::process_bitfields` call, invoke `Disputes::note_included(current_session, candidate)`.
1. If `Scheduler::availability_timeout_predicate` is `Some`, invoke `Inclusion::collect_pending` using it and annotate each of those freed cores with `FreedReason::TimedOut`.
1. Combine and sort the dispute-freed cores, the bitfield-freed cores, and the timed-out cores.
+1 -1
View File
@@ -1240,7 +1240,7 @@ sp_api::impl_runtime_apis! {
c: PRIMARY_PROBABILITY,
genesis_authorities: Babe::authorities(),
randomness: Babe::randomness(),
allowed_slots: babe_primitives::AllowedSlots::PrimaryAndSecondaryPlainSlots,
allowed_slots: babe_primitives::AllowedSlots::PrimaryAndSecondaryVRFSlots,
}
}
+38 -12
View File
@@ -238,17 +238,14 @@ impl<T: Config> Module<T> {
/// Process a set of incoming bitfields. Return a vec of cores freed by candidates
/// becoming available.
pub(crate) fn process_bitfields(
expected_bits: usize,
signed_bitfields: SignedAvailabilityBitfields,
core_lookup: impl Fn(CoreIndex) -> Option<ParaId>,
) -> Result<Vec<CoreIndex>, DispatchError> {
let validators = Validators::get();
let session_index = shared::Module::<T>::session_index();
let config = <configuration::Module<T>>::config();
let parachains = <paras::Module<T>>::parachains();
let n_bits = parachains.len() + config.parathread_cores as usize;
let mut assigned_paras_record: Vec<_> = (0..n_bits)
let mut assigned_paras_record: Vec<_> = (0..expected_bits)
.map(|bit_index| core_lookup(CoreIndex::from(bit_index as u32)))
.map(|core_para| core_para.map(|p| (p, PendingAvailability::<T>::get(&p))))
.collect();
@@ -256,7 +253,7 @@ impl<T: Config> Module<T> {
// do sanity checks on the bitfields:
// 1. no more than one bitfield per validator
// 2. bitfields are ascending by validator index.
// 3. each bitfield has exactly `n_bits`
// 3. each bitfield has exactly `expected_bits`
// 4. signature is valid.
{
let occupied_bitmask: BitVec<BitOrderLsb0, u8> = assigned_paras_record.iter()
@@ -274,7 +271,7 @@ impl<T: Config> Module<T> {
for signed_bitfield in &signed_bitfields {
ensure!(
signed_bitfield.payload().0.len() == n_bits,
signed_bitfield.payload().0.len() == expected_bits,
Error::<T>::WrongBitfieldSize,
);
@@ -336,7 +333,7 @@ impl<T: Config> Module<T> {
let threshold = availability_threshold(validators.len());
let mut freed_cores = Vec::with_capacity(n_bits);
let mut freed_cores = Vec::with_capacity(expected_bits);
for (para_id, pending_availability) in assigned_paras_record.into_iter()
.filter_map(|x| x)
.filter_map(|(id, p)| p.map(|p| (id, p)))
@@ -1060,10 +1057,12 @@ mod tests {
}
}
fn default_bitfield() -> AvailabilityBitfield {
let n_bits = Paras::parachains().len() + Configuration::config().parathread_cores as usize;
fn expected_bits() -> usize {
Paras::parachains().len() + Configuration::config().parathread_cores as usize
}
AvailabilityBitfield(bitvec::bitvec![BitOrderLsb0, u8; 0; n_bits])
fn default_bitfield() -> AvailabilityBitfield {
AvailabilityBitfield(bitvec::bitvec![BitOrderLsb0, u8; 0; expected_bits()])
}
fn default_availability_votes() -> BitVec<BitOrderLsb0, u8> {
@@ -1228,7 +1227,8 @@ mod tests {
core if core == CoreIndex::from(0) => Some(chain_a),
core if core == CoreIndex::from(1) => Some(chain_b),
core if core == CoreIndex::from(2) => Some(thread_a),
_ => panic!("Core out of bounds for 2 parachains and 1 parathread core."),
core if core == CoreIndex::from(3) => None, // for the expected_cores() + 1 test below.
_ => panic!("out of bounds for testing"),
};
// wrong number of bits.
@@ -1244,6 +1244,25 @@ mod tests {
));
assert!(Inclusion::process_bitfields(
expected_bits(),
vec![signed],
&core_lookup,
).is_err());
}
// wrong number of bits: other way around.
{
let bare_bitfield = default_bitfield();
let signed = block_on(sign_bitfield(
&keystore,
&validators[0],
0,
bare_bitfield,
&signing_context,
));
assert!(Inclusion::process_bitfields(
expected_bits() + 1,
vec![signed],
&core_lookup,
).is_err());
@@ -1261,6 +1280,7 @@ mod tests {
));
assert!(Inclusion::process_bitfields(
expected_bits(),
vec![signed.clone(), signed],
&core_lookup,
).is_err());
@@ -1286,6 +1306,7 @@ mod tests {
));
assert!(Inclusion::process_bitfields(
expected_bits(),
vec![signed_1, signed_0],
&core_lookup,
).is_err());
@@ -1304,6 +1325,7 @@ mod tests {
));
assert!(Inclusion::process_bitfields(
expected_bits(),
vec![signed],
&core_lookup,
).is_err());
@@ -1321,6 +1343,7 @@ mod tests {
));
assert!(Inclusion::process_bitfields(
expected_bits(),
vec![signed],
&core_lookup,
).is_ok());
@@ -1355,6 +1378,7 @@ mod tests {
));
assert!(Inclusion::process_bitfields(
expected_bits(),
vec![signed],
&core_lookup,
).is_ok());
@@ -1393,6 +1417,7 @@ mod tests {
// no core is freed
assert_eq!(
Inclusion::process_bitfields(
expected_bits(),
vec![signed],
&core_lookup,
),
@@ -1516,6 +1541,7 @@ mod tests {
}).collect();
assert!(Inclusion::process_bitfields(
expected_bits(),
signed_bitfields,
&core_lookup,
).is_ok());
@@ -107,7 +107,9 @@ decl_module! {
// Process new availability bitfields, yielding any availability cores whose
// work has now concluded.
let expected_bits = <scheduler::Module<T>>::availability_cores().len();
let freed_concluded = <inclusion::Module<T>>::process_bitfields(
expected_bits,
signed_bitfields,
<scheduler::Module<T>>::core_para,
)?;
+32 -8
View File
@@ -229,11 +229,17 @@ impl<T: Config> Module<T> {
validators.clone()
};
BufferedSessionChanges::mutate(|v| v.push(BufferedSessionChange {
validators,
queued,
session_index,
}));
if session_index == 0 {
// Genesis session should be immediately enacted.
Self::apply_new_session(0, validators, queued);
} else {
BufferedSessionChanges::mutate(|v| v.push(BufferedSessionChange {
validators,
queued,
session_index,
}));
}
}
}
@@ -244,10 +250,10 @@ impl<T: Config> sp_runtime::BoundToRuntimeAppPublic for Module<T> {
impl<T: pallet_session::Config + Config> OneSessionHandler<T::AccountId> for Module<T> {
type Key = ValidatorId;
fn on_genesis_session<'a, I: 'a>(_validators: I)
fn on_genesis_session<'a, I: 'a>(validators: I)
where I: Iterator<Item=(&'a T::AccountId, Self::Key)>
{
<Module<T>>::on_new_session(false, 0, validators, None);
}
fn on_new_session<'a, I: 'a>(changed: bool, validators: I, queued: I)
@@ -266,7 +272,7 @@ mod tests {
use primitives::v1::{Id as ParaId};
use crate::mock::{
new_test_ext,
Initializer, System, Dmp, Paras, Configuration, MockGenesisConfig,
Initializer, System, Dmp, Paras, Configuration, SessionInfo, MockGenesisConfig,
};
use frame_support::{
@@ -274,6 +280,24 @@ mod tests {
traits::{OnFinalize, OnInitialize},
};
#[test]
fn session_0_is_instantly_applied() {
new_test_ext(Default::default()).execute_with(|| {
Initializer::on_new_session(
false,
0,
Vec::new().into_iter(),
Some(Vec::new().into_iter()),
);
let v = <BufferedSessionChanges>::get();
assert!(v.is_empty());
assert_eq!(SessionInfo::earliest_stored_session(), 0);
assert!(SessionInfo::session_info(0).is_some());
});
}
#[test]
fn session_change_before_initialize_is_still_buffered_after() {
new_test_ext(Default::default()).execute_with(|| {
+1 -1
View File
@@ -807,7 +807,7 @@ sp_api::impl_runtime_apis! {
c: PRIMARY_PROBABILITY,
genesis_authorities: Babe::authorities(),
randomness: Babe::randomness(),
allowed_slots: babe_primitives::AllowedSlots::PrimaryAndSecondaryPlainSlots,
allowed_slots: babe_primitives::AllowedSlots::PrimaryAndSecondaryVRFSlots,
}
}
+2 -2
View File
@@ -467,7 +467,7 @@ impl parachains_inclusion::Config for Runtime {
impl parachains_inclusion_inherent::Config for Runtime {}
impl parachains_initializer::Config for Runtime {
type Randomness = RandomnessCollectiveFlip;
type Randomness = Babe;
}
impl parachains_session_info::Config for Runtime {}
@@ -742,7 +742,7 @@ sp_api::impl_runtime_apis! {
c: PRIMARY_PROBABILITY,
genesis_authorities: Babe::authorities(),
randomness: Babe::randomness(),
allowed_slots: babe_primitives::AllowedSlots::PrimaryAndSecondaryPlainSlots,
allowed_slots: babe_primitives::AllowedSlots::PrimaryAndSecondaryVRFSlots,
}
}
+1 -1
View File
@@ -955,7 +955,7 @@ sp_api::impl_runtime_apis! {
c: PRIMARY_PROBABILITY,
genesis_authorities: Babe::authorities(),
randomness: Babe::randomness(),
allowed_slots: babe_primitives::AllowedSlots::PrimaryAndSecondaryPlainSlots,
allowed_slots: babe_primitives::AllowedSlots::PrimaryAndSecondaryVRFSlots,
}
}