Fix bug and further optimizations in availability distribution (#2104)

* Fix bug and further optimizations in availability distribution

- There was a bug that resulted in only getting one candidate per block
as the candidates were put into the hashmap with the relay block hash as
key. The solution for this is to use the candidate hash and the relay
block hash as key.
- We stored received/sent messages with the candidate hash and chunk
index as key. The candidate hash wasn't required in this case, as the
messages are already stored per candidate.

* Update node/core/bitfield-signing/src/lib.rs

Co-authored-by: Robert Habermeier <rphmeier@gmail.com>

* Remove the reverse map

* major refactor of receipts & query_live

* finish refactoring

remove ancestory mapping,

improve relay-parent cleanup & receipts-cache cleanup,
add descriptor to `PerCandidate`

* rename and rewrite query_pending_availability

* add a bunch of consistency tests

* Add some last changes

* xy

* fz

* Make it compile again

* Fix one test

* Fix logging

* Remove some buggy code

* Make tests work again

* Move stuff around

* Remove dbg

* Remove state from test_harness

* More refactor and new test

* New test and fixes

* Move metric

* Remove "duplicated code"

* Fix tests

* New test

* Change break to continue

* Update node/core/av-store/src/lib.rs

* Update node/core/av-store/src/lib.rs

* Update node/core/bitfield-signing/src/lib.rs

Co-authored-by: Fedor Sakharov <fedor.sakharov@gmail.com>

* update guide to match live_candidates changes

* add comment

* fix bitfield signing

Co-authored-by: Robert Habermeier <rphmeier@gmail.com>
Co-authored-by: Bernhard Schuster <bernhard@ahoi.io>
Co-authored-by: Fedor Sakharov <fedor.sakharov@gmail.com>
This commit is contained in:
Bastian Köcher
2020-12-17 20:09:17 +01:00
committed by GitHub
parent 3f5156e866
commit d0c97539e4
8 changed files with 1203 additions and 1002 deletions
+2 -4
View File
@@ -4914,10 +4914,8 @@ name = "polkadot-availability-distribution"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"assert_matches", "assert_matches",
"env_logger 0.8.2",
"futures 0.3.8", "futures 0.3.8",
"futures-timer 3.0.2", "maplit",
"log",
"parity-scale-codec", "parity-scale-codec",
"polkadot-erasure-coding", "polkadot-erasure-coding",
"polkadot-node-network-protocol", "polkadot-node-network-protocol",
@@ -4926,11 +4924,11 @@ dependencies = [
"polkadot-node-subsystem-util", "polkadot-node-subsystem-util",
"polkadot-primitives", "polkadot-primitives",
"sc-keystore", "sc-keystore",
"smallvec 1.5.1",
"sp-application-crypto", "sp-application-crypto",
"sp-core", "sp-core",
"sp-keyring", "sp-keyring",
"sp-keystore", "sp-keystore",
"sp-tracing",
"thiserror", "thiserror",
"tracing", "tracing",
"tracing-futures", "tracing-futures",
+40 -10
View File
@@ -713,25 +713,51 @@ where
match msg { match msg {
QueryAvailableData(hash, tx) => { QueryAvailableData(hash, tx) => {
tx.send(available_data(&subsystem.inner, &hash).map(|d| d.data)) tx.send(available_data(&subsystem.inner, &hash).map(|d| d.data)).map_err(|_| oneshot::Canceled)?;
.map_err(|_| oneshot::Canceled)?;
} }
QueryDataAvailability(hash, tx) => { QueryDataAvailability(hash, tx) => {
tx.send(available_data(&subsystem.inner, &hash).is_some()) let result = available_data(&subsystem.inner, &hash).is_some();
.map_err(|_| oneshot::Canceled)?;
tracing::trace!(
target: LOG_TARGET,
candidate_hash = ?hash,
availability = ?result,
"Queried data availability",
);
tx.send(result).map_err(|_| oneshot::Canceled)?;
} }
QueryChunk(hash, id, tx) => { QueryChunk(hash, id, tx) => {
tx.send(get_chunk(subsystem, &hash, id)?) tx.send(get_chunk(subsystem, &hash, id)?).map_err(|_| oneshot::Canceled)?;
.map_err(|_| oneshot::Canceled)?;
} }
QueryChunkAvailability(hash, id, tx) => { QueryChunkAvailability(hash, id, tx) => {
tx.send(get_chunk(subsystem, &hash, id)?.is_some()) let result = get_chunk(subsystem, &hash, id).map(|r| r.is_some());
.map_err(|_| oneshot::Canceled)?;
tracing::trace!(
target: LOG_TARGET,
candidate_hash = ?hash,
availability = ?result,
"Queried chunk availability",
);
tx.send(result?).map_err(|_| oneshot::Canceled)?;
} }
StoreChunk { candidate_hash, relay_parent, validator_index, chunk, tx } => { StoreChunk { candidate_hash, relay_parent, validator_index, chunk, tx } => {
let chunk_index = chunk.index;
// Current block number is relay_parent block number + 1. // Current block number is relay_parent block number + 1.
let block_number = get_block_number(ctx, relay_parent).await? + 1; let block_number = get_block_number(ctx, relay_parent).await? + 1;
match store_chunk(subsystem, &candidate_hash, validator_index, chunk, block_number) { let result = store_chunk(subsystem, &candidate_hash, validator_index, chunk, block_number);
tracing::trace!(
target: LOG_TARGET,
%chunk_index,
?candidate_hash,
%block_number,
?result,
"Stored chunk",
);
match result {
Err(e) => { Err(e) => {
tx.send(Err(())).map_err(|_| oneshot::Canceled)?; tx.send(Err(())).map_err(|_| oneshot::Canceled)?;
return Err(e); return Err(e);
@@ -742,7 +768,11 @@ where
} }
} }
StoreAvailableData(hash, id, n_validators, av_data, tx) => { StoreAvailableData(hash, id, n_validators, av_data, tx) => {
match store_available_data(subsystem, &hash, id, n_validators, av_data) { let result = store_available_data(subsystem, &hash, id, n_validators, av_data);
tracing::trace!(target: LOG_TARGET, candidate_hash = ?hash, ?result, "Stored available data");
match result {
Err(e) => { Err(e) => {
tx.send(Err(())).map_err(|_| oneshot::Canceled)?; tx.send(Err(())).map_err(|_| oneshot::Canceled)?;
return Err(e); return Err(e);
+20 -3
View File
@@ -78,6 +78,8 @@ async fn get_core_availability(
) -> Result<bool, Error> { ) -> Result<bool, Error> {
let span = jaeger::hash_span(&relay_parent, "core_availability"); let span = jaeger::hash_span(&relay_parent, "core_availability");
if let CoreState::Occupied(core) = core { if let CoreState::Occupied(core) = core {
tracing::trace!(target: LOG_TARGET, para_id = %core.para_id, "Getting core availability");
let _span = span.child("occupied"); let _span = span.child("occupied");
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
sender sender
@@ -93,7 +95,10 @@ async fn get_core_availability(
let committed_candidate_receipt = match rx.await? { let committed_candidate_receipt = match rx.await? {
Ok(Some(ccr)) => ccr, Ok(Some(ccr)) => ccr,
Ok(None) => return Ok(false), Ok(None) => {
tracing::trace!(target: LOG_TARGET, para_id = %core.para_id, "No committed candidate");
return Ok(false)
},
Err(e) => { Err(e) => {
// Don't take down the node on runtime API errors. // Don't take down the node on runtime API errors.
tracing::warn!(target: LOG_TARGET, err = ?e, "Encountered a runtime API error"); tracing::warn!(target: LOG_TARGET, err = ?e, "Encountered a runtime API error");
@@ -103,6 +108,7 @@ async fn get_core_availability(
drop(_span); drop(_span);
let _span = span.child("query chunk"); let _span = span.child("query chunk");
let candidate_hash = committed_candidate_receipt.hash();
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
sender sender
@@ -110,13 +116,24 @@ async fn get_core_availability(
.await .await
.send( .send(
AllMessages::from(AvailabilityStoreMessage::QueryChunkAvailability( AllMessages::from(AvailabilityStoreMessage::QueryChunkAvailability(
committed_candidate_receipt.hash(), candidate_hash,
validator_idx, validator_idx,
tx, tx,
)).into(), )).into(),
) )
.await?; .await?;
return rx.await.map_err(Into::into);
let res = rx.await.map_err(Into::into);
tracing::trace!(
target: LOG_TARGET,
para_id = %core.para_id,
availability = ?res,
?candidate_hash,
"Candidate availability",
);
return res;
} }
Ok(false) Ok(false)
@@ -23,9 +23,7 @@ polkadot-subsystem-testhelpers = { package = "polkadot-node-subsystem-test-helpe
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] }
sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
futures-timer = "3.0.2"
env_logger = "0.8.2"
assert_matches = "1.4.0" assert_matches = "1.4.0"
smallvec = "1.5.1" maplit = "1.0"
log = "0.4.11"
@@ -38,6 +38,7 @@ use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_primitives::v1::{ use polkadot_primitives::v1::{
BlakeTwo256, CommittedCandidateReceipt, CoreState, ErasureChunk, Hash, HashT, Id as ParaId, BlakeTwo256, CommittedCandidateReceipt, CoreState, ErasureChunk, Hash, HashT, Id as ParaId,
SessionIndex, ValidatorId, ValidatorIndex, PARACHAIN_KEY_TYPE_ID, CandidateHash, SessionIndex, ValidatorId, ValidatorIndex, PARACHAIN_KEY_TYPE_ID, CandidateHash,
CandidateDescriptor,
}; };
use polkadot_subsystem::messages::{ use polkadot_subsystem::messages::{
AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage, ChainApiMessage, AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage, ChainApiMessage,
@@ -50,6 +51,7 @@ use polkadot_subsystem::{
SubsystemContext, SubsystemError, SubsystemContext, SubsystemError,
}; };
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::collections::hash_map::Entry;
use std::iter; use std::iter;
use thiserror::Error; use thiserror::Error;
@@ -116,6 +118,12 @@ pub struct AvailabilityGossipMessage {
pub erasure_chunk: ErasureChunk, pub erasure_chunk: ErasureChunk,
} }
impl From<AvailabilityGossipMessage> for protocol_v1::AvailabilityDistributionMessage {
fn from(message: AvailabilityGossipMessage) -> Self {
Self::Chunk(message.candidate_hash, message.erasure_chunk)
}
}
/// Data used to track information of peers and relay parents the /// Data used to track information of peers and relay parents the
/// overseer ordered us to work on. /// overseer ordered us to work on.
#[derive(Default, Clone, Debug)] #[derive(Default, Clone, Debug)]
@@ -129,19 +137,8 @@ struct ProtocolState {
/// Caches a mapping of relay parents or ancestor to live candidate receipts. /// Caches a mapping of relay parents or ancestor to live candidate receipts.
/// Allows fast intersection of live candidates with views and consecutive unioning. /// Allows fast intersection of live candidates with views and consecutive unioning.
/// Maps relay parent / ancestor -> live candidate receipts + its hash. /// Maps relay parent / ancestor -> candidate receipts.
receipts: HashMap<Hash, HashSet<(CandidateHash, CommittedCandidateReceipt)>>, receipts: HashMap<Hash, HashSet<CandidateHash>>,
/// Allow reverse caching of view checks.
/// Maps candidate hash -> relay parent for extracting meta information from `PerRelayParent`.
/// Note that the presence of this is not sufficient to determine if deletion is OK, i.e.
/// two histories could cover this.
reverse: HashMap<CandidateHash, Hash>,
/// Keeps track of which candidate receipts are required due to ancestors of which relay parents
/// of our view.
/// Maps ancestor -> relay parents in view
ancestry: HashMap<Hash, HashSet<Hash>>,
/// Track things needed to start and stop work on a particular relay parent. /// Track things needed to start and stop work on a particular relay parent.
per_relay_parent: HashMap<Hash, PerRelayParent>, per_relay_parent: HashMap<Hash, PerRelayParent>,
@@ -157,24 +154,30 @@ struct PerCandidate {
/// candidate hash + erasure chunk index -> gossip message /// candidate hash + erasure chunk index -> gossip message
message_vault: HashMap<u32, AvailabilityGossipMessage>, message_vault: HashMap<u32, AvailabilityGossipMessage>,
/// Track received candidate hashes and validator indices from peers. /// Track received erasure chunk indices per peer.
received_messages: HashMap<PeerId, HashSet<(CandidateHash, ValidatorIndex)>>, received_messages: HashMap<PeerId, HashSet<ValidatorIndex>>,
/// Track already sent candidate hashes and the erasure chunk index to the peers. /// Track sent erasure chunk indices per peer.
sent_messages: HashMap<PeerId, HashSet<(CandidateHash, ValidatorIndex)>>, sent_messages: HashMap<PeerId, HashSet<ValidatorIndex>>,
/// The set of validators. /// The set of validators.
validators: Vec<ValidatorId>, validators: Vec<ValidatorId>,
/// If this node is a validator, note the index in the validator set. /// If this node is a validator, note the index in the validator set.
validator_index: Option<ValidatorIndex>, validator_index: Option<ValidatorIndex>,
/// The descriptor of this candidate.
descriptor: CandidateDescriptor,
/// The set of relay chain blocks this appears to be live in.
live_in: HashSet<Hash>,
} }
impl PerCandidate { impl PerCandidate {
/// Returns `true` iff the given `message` is required by the given `peer`. /// Returns `true` iff the given `validator_index` is required by the given `peer`.
fn message_required_by_peer(&self, peer: &PeerId, message: &(CandidateHash, ValidatorIndex)) -> bool { fn message_required_by_peer(&self, peer: &PeerId, validator_index: &ValidatorIndex) -> bool {
self.received_messages.get(peer).map(|v| !v.contains(message)).unwrap_or(true) self.received_messages.get(peer).map(|v| !v.contains(validator_index)).unwrap_or(true)
&& self.sent_messages.get(peer).map(|v| !v.contains(message)).unwrap_or(true) && self.sent_messages.get(peer).map(|v| !v.contains(validator_index)).unwrap_or(true)
} }
} }
@@ -182,138 +185,84 @@ impl PerCandidate {
struct PerRelayParent { struct PerRelayParent {
/// Set of `K` ancestors for this relay parent. /// Set of `K` ancestors for this relay parent.
ancestors: Vec<Hash>, ancestors: Vec<Hash>,
/// Live candidates, according to this relay parent.
live_candidates: HashSet<CandidateHash>,
} }
impl ProtocolState { impl ProtocolState {
/// Collects the relay_parents ancestors including the relay parents themselfes. /// Unionize all live candidate hashes of the given relay parents and their recent
#[tracing::instrument(level = "trace", skip(relay_parents), fields(subsystem = LOG_TARGET))] /// ancestors.
fn extend_with_ancestors<'a>( ///
&'a self,
relay_parents: impl IntoIterator<Item = &'a Hash> + 'a,
) -> HashSet<Hash> {
relay_parents
.into_iter()
.map(|relay_parent| {
self.per_relay_parent
.get(relay_parent)
.into_iter()
.map(|per_relay_parent| per_relay_parent.ancestors.iter().cloned())
.flatten()
.chain(iter::once(*relay_parent))
})
.flatten()
.collect::<HashSet<Hash>>()
}
/// Unionize all cached entries for the given relay parents and its ancestors.
/// Ignores all non existent relay parents, so this can be used directly with a peers view. /// Ignores all non existent relay parents, so this can be used directly with a peers view.
/// Returns a map from candidate hash -> receipt /// Returns a set of candidate hashes.
#[tracing::instrument(level = "trace", skip(relay_parents), fields(subsystem = LOG_TARGET))] #[tracing::instrument(level = "trace", skip(relay_parents), fields(subsystem = LOG_TARGET))]
fn cached_live_candidates_unioned<'a>( fn cached_live_candidates_unioned<'a>(
&'a self, &'a self,
relay_parents: impl IntoIterator<Item = &'a Hash> + 'a, relay_parents: impl IntoIterator<Item = &'a Hash> + 'a,
) -> HashMap<CandidateHash, CommittedCandidateReceipt> { ) -> HashSet<CandidateHash> {
let relay_parents_and_ancestors = self.extend_with_ancestors(relay_parents); relay_parents
relay_parents_and_ancestors
.into_iter() .into_iter()
.filter_map(|relay_parent_or_ancestor| self.receipts.get(&relay_parent_or_ancestor)) .filter_map(|r| self.per_relay_parent.get(r))
.map(|receipt_set| receipt_set.into_iter()) .map(|per_relay_parent| per_relay_parent.live_candidates.iter().cloned())
.flatten() .flatten()
.map(|(receipt_hash, receipt)| (receipt_hash.clone(), receipt.clone()))
.collect() .collect()
} }
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))] #[tracing::instrument(level = "trace", skip(candidates), fields(subsystem = LOG_TARGET))]
async fn add_relay_parent<Context>( fn add_relay_parent(
&mut self, &mut self,
ctx: &mut Context,
relay_parent: Hash, relay_parent: Hash,
validators: Vec<ValidatorId>, validators: Vec<ValidatorId>,
validator_index: Option<ValidatorIndex>, validator_index: Option<ValidatorIndex>,
) -> Result<()> candidates: HashMap<CandidateHash, FetchedLiveCandidate>,
where ancestors: Vec<Hash>,
Context: SubsystemContext<Message = AvailabilityDistributionMessage>, ) {
{ let candidate_hashes: Vec<_> = candidates.keys().cloned().collect();
let candidates = query_live_candidates(ctx, self, std::iter::once(relay_parent)).await?;
// register the relation of relay_parent to candidate.. // register the relation of relay_parent to candidate..
// ..and the reverse association. for (receipt_hash, fetched) in candidates {
for (relay_parent_or_ancestor, (receipt_hash, receipt)) in candidates.clone() { let per_candidate = self.per_candidate.entry(receipt_hash).or_default();
self.reverse
.insert(receipt_hash.clone(), relay_parent_or_ancestor.clone()); // Cached candidates already have entries and thus don't need this
let per_candidate = self.per_candidate.entry(receipt_hash.clone()).or_default(); // information to be set.
if let FetchedLiveCandidate::Fresh(descriptor) = fetched {
per_candidate.validator_index = validator_index.clone(); per_candidate.validator_index = validator_index.clone();
per_candidate.validators = validators.clone(); per_candidate.validators = validators.clone();
per_candidate.descriptor = descriptor;
self.receipts }
.entry(relay_parent_or_ancestor) per_candidate.live_in.insert(relay_parent);
.or_default()
.insert((receipt_hash, receipt));
} }
// collect the ancestors again from the hash map let per_relay_parent = self.per_relay_parent.entry(relay_parent).or_default();
let ancestors = candidates per_relay_parent.ancestors = ancestors;
.iter() per_relay_parent.live_candidates.extend(candidate_hashes);
.filter_map(|(ancestor_or_relay_parent, _receipt)| {
if ancestor_or_relay_parent == &relay_parent {
None
} else {
Some(*ancestor_or_relay_parent)
}
})
.collect::<Vec<Hash>>();
// mark all the ancestors as "needed" by this newly added relay parent
for ancestor in ancestors.iter() {
self.ancestry
.entry(ancestor.clone())
.or_default()
.insert(relay_parent);
}
self.per_relay_parent
.entry(relay_parent)
.or_default()
.ancestors = ancestors;
Ok(())
} }
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn remove_relay_parent(&mut self, relay_parent: &Hash) { fn remove_relay_parent(&mut self, relay_parent: &Hash) {
// we might be ancestor of some other relay_parent
if let Some(ref mut descendants) = self.ancestry.get_mut(relay_parent) {
// if we were the last user, and it is
// not explicitly set to be worked on by the overseer
if descendants.is_empty() {
// remove from the ancestry index
self.ancestry.remove(relay_parent);
// and also remove the actual receipt
if let Some(candidates) = self.receipts.remove(relay_parent) {
candidates.into_iter().for_each(|c| { self.per_candidate.remove(&c.0); });
}
}
}
if let Some(per_relay_parent) = self.per_relay_parent.remove(relay_parent) { if let Some(per_relay_parent) = self.per_relay_parent.remove(relay_parent) {
// remove all "references" from the hash maps and sets for all ancestors for candidate_hash in per_relay_parent.live_candidates {
for ancestor in per_relay_parent.ancestors { // Prune the candidate if this was the last member of our view
// one of our decendants might be ancestor of some other relay_parent // to consider it live (including its ancestors).
if let Some(ref mut descendants) = self.ancestry.get_mut(&ancestor) { if let Entry::Occupied(mut occ) = self.per_candidate.entry(candidate_hash) {
// we do not need this descendant anymore occ.get_mut().live_in.remove(relay_parent);
descendants.remove(&relay_parent); if occ.get().live_in.is_empty() {
// if we were the last user, and it is occ.remove();
// not explicitly set to be worked on by the overseer
if descendants.is_empty() && !self.per_relay_parent.contains_key(&ancestor) {
// remove from the ancestry index
self.ancestry.remove(&ancestor);
// and also remove the actual receipt
if let Some(candidates) = self.receipts.remove(&ancestor) {
candidates.into_iter().for_each(|c| { self.per_candidate.remove(&c.0); });
} }
} }
} }
} }
} }
// Removes all entries from receipts which aren't referenced in the ancestry of
// one of our live relay-chain heads.
fn clean_up_receipts_cache(&mut self) {
let extended_view: HashSet<_> = self.per_relay_parent.iter()
.map(|(r_hash, v)| v.ancestors.iter().cloned().chain(std::iter::once(*r_hash)))
.flatten()
.collect();
self.receipts.retain(|ancestor_hash, _| extended_view.contains(ancestor_hash));
} }
} }
@@ -387,27 +336,30 @@ where
for added in view.difference(&old_view) { for added in view.difference(&old_view) {
let validators = query_validators(ctx, *added).await?; let validators = query_validators(ctx, *added).await?;
let validator_index = obtain_our_validator_index(&validators, keystore.clone()).await; let validator_index = obtain_our_validator_index(&validators, keystore.clone()).await;
state let (candidates, ancestors)
.add_relay_parent(ctx, *added, validators, validator_index) = query_live_candidates(ctx, &mut state.receipts, *added).await?;
.await?;
state.add_relay_parent(
*added,
validators,
validator_index,
candidates,
ancestors,
);
} }
// handle all candidates // handle all candidates
for (candidate_hash, _receipt) in state.cached_live_candidates_unioned(view.difference(&old_view)) { for candidate_hash in state.cached_live_candidates_unioned(view.difference(&old_view)) {
let per_candidate = state.per_candidate.entry(candidate_hash).or_default(); // If we are not a validator for this candidate, let's skip it.
if state.per_candidate.entry(candidate_hash).or_default().validator_index.is_none() {
// assure the node has the validator role continue
if per_candidate.validator_index.is_none() { }
continue;
};
// check if the availability is present in the store exists // check if the availability is present in the store exists
if !query_data_availability(ctx, candidate_hash).await? { if !query_data_availability(ctx, candidate_hash).await? {
continue; continue;
} }
let validator_count = per_candidate.validators.len();
// obtain interested peers in the candidate hash // obtain interested peers in the candidate hash
let peers: Vec<PeerId> = state let peers: Vec<PeerId> = state
.peer_views .peer_views
@@ -417,79 +369,66 @@ where
// collect all direct interests of a peer w/o ancestors // collect all direct interests of a peer w/o ancestors
state state
.cached_live_candidates_unioned(view.heads.iter()) .cached_live_candidates_unioned(view.heads.iter())
.contains_key(&candidate_hash) .contains(&candidate_hash)
}) })
.map(|(peer, _view)| peer.clone()) .map(|(peer, _view)| peer.clone())
.collect(); .collect();
// distribute all erasure messages to interested peers
for chunk_index in 0u32..(validator_count as u32) {
// only the peers which did not receive this particular erasure chunk
let per_candidate = state.per_candidate.entry(candidate_hash).or_default(); let per_candidate = state.per_candidate.entry(candidate_hash).or_default();
// obtain the chunks from the cache, if not fallback let validator_count = per_candidate.validators.len();
// and query the availability store
let message_id = (candidate_hash, chunk_index); // distribute all erasure messages to interested peers
let erasure_chunk = if let Some(message) = per_candidate.message_vault.get(&chunk_index) { for chunk_index in 0u32..(validator_count as u32) {
message.erasure_chunk.clone() let message = if let Some(message) = per_candidate.message_vault.get(&chunk_index) {
tracing::trace!(
target: LOG_TARGET,
%chunk_index,
?candidate_hash,
"Retrieved chunk from message vault",
);
message.clone()
} else if let Some(erasure_chunk) = query_chunk(ctx, candidate_hash, chunk_index as ValidatorIndex).await? { } else if let Some(erasure_chunk) = query_chunk(ctx, candidate_hash, chunk_index as ValidatorIndex).await? {
erasure_chunk tracing::trace!(
target: LOG_TARGET,
%chunk_index,
?candidate_hash,
"Retrieved chunk from availability storage",
);
AvailabilityGossipMessage {
candidate_hash,
erasure_chunk,
}
} else { } else {
tracing::error!(
target: LOG_TARGET,
%chunk_index,
?candidate_hash,
"Availability store reported that we have the availability data, but we could not retrieve a chunk of it!",
);
continue; continue;
}; };
debug_assert_eq!(erasure_chunk.index, chunk_index); debug_assert_eq!(message.erasure_chunk.index, chunk_index);
let peers = peers let peers = peers
.iter() .iter()
.filter(|peer| per_candidate.message_required_by_peer(peer, &message_id)) .filter(|peer| per_candidate.message_required_by_peer(peer, &chunk_index))
.cloned() .cloned()
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let message = AvailabilityGossipMessage {
candidate_hash,
erasure_chunk,
};
send_tracked_gossip_message_to_peers(ctx, per_candidate, metrics, peers, message).await; send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message)).await;
} }
} }
// cleanup the removed relay parents and their states // cleanup the removed relay parents and their states
let removed = old_view.difference(&view).collect::<Vec<_>>(); old_view.difference(&view).for_each(|r| state.remove_relay_parent(r));
for removed in removed { state.clean_up_receipts_cache();
state.remove_relay_parent(&removed);
}
Ok(()) Ok(())
} }
#[inline(always)]
async fn send_tracked_gossip_message_to_peers<Context>(
ctx: &mut Context,
per_candidate: &mut PerCandidate,
metrics: &Metrics,
peers: Vec<PeerId>,
message: AvailabilityGossipMessage,
)
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message)).await
}
#[inline(always)]
async fn send_tracked_gossip_messages_to_peer<Context>(
ctx: &mut Context,
per_candidate: &mut PerCandidate,
metrics: &Metrics,
peer: PeerId,
message_iter: impl IntoIterator<Item = AvailabilityGossipMessage>,
)
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, vec![peer], message_iter).await
}
#[tracing::instrument(level = "trace", skip(ctx, metrics, message_iter), fields(subsystem = LOG_TARGET))] #[tracing::instrument(level = "trace", skip(ctx, metrics, message_iter), fields(subsystem = LOG_TARGET))]
async fn send_tracked_gossip_messages_to_peers<Context>( async fn send_tracked_gossip_messages_to_peers<Context>(
ctx: &mut Context, ctx: &mut Context,
@@ -501,38 +440,28 @@ async fn send_tracked_gossip_messages_to_peers<Context>(
where where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>, Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{ {
if peers.is_empty() {
return;
}
for message in message_iter { for message in message_iter {
for peer in peers.iter() { for peer in peers.iter() {
let message_id = (message.candidate_hash, message.erasure_chunk.index);
per_candidate per_candidate
.sent_messages .sent_messages
.entry(peer.clone()) .entry(peer.clone())
.or_default() .or_default()
.insert(message_id); .insert(message.erasure_chunk.index);
} }
per_candidate per_candidate
.message_vault .message_vault
.insert(message.erasure_chunk.index, message.clone()); .insert(message.erasure_chunk.index, message.clone());
let wire_message = protocol_v1::AvailabilityDistributionMessage::Chunk( if !peers.is_empty() {
message.candidate_hash, ctx.send_message(NetworkBridgeMessage::SendValidationMessage(
message.erasure_chunk,
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(
peers.clone(), peers.clone(),
protocol_v1::ValidationProtocol::AvailabilityDistribution(wire_message), protocol_v1::ValidationProtocol::AvailabilityDistribution(message.into()),
), ).into()).await;
))
.await;
metrics.on_chunk_distributed(); metrics.on_chunk_distributed();
} }
}
} }
// Send the difference between two views which were not sent // Send the difference between two views which were not sent
@@ -558,29 +487,25 @@ where
// the union of all relay parent's candidates. // the union of all relay parent's candidates.
let added_candidates = state.cached_live_candidates_unioned(added.iter()); let added_candidates = state.cached_live_candidates_unioned(added.iter());
// Send all messages we've seen before and the peer is now interested // Send all messages we've seen before and the peer is now interested in.
// in to that peer. for candidate_hash in added_candidates {
for (candidate_hash, _receipt) in added_candidates {
let per_candidate = state.per_candidate.entry(candidate_hash).or_default(); let per_candidate = state.per_candidate.entry(candidate_hash).or_default();
// obtain the relevant chunk indices not sent yet // obtain the relevant chunk indices not sent yet
let messages = ((0 as ValidatorIndex)..(per_candidate.validators.len() as ValidatorIndex)) let messages = ((0 as ValidatorIndex)..(per_candidate.validators.len() as ValidatorIndex))
.into_iter() .into_iter()
.filter_map(|erasure_chunk_index: ValidatorIndex| { .filter_map(|erasure_chunk_index: ValidatorIndex| {
let message_id = (candidate_hash, erasure_chunk_index);
// try to pick up the message from the message vault // try to pick up the message from the message vault
// so we send as much as we have // so we send as much as we have
per_candidate per_candidate
.message_vault .message_vault
.get(&erasure_chunk_index) .get(&erasure_chunk_index)
.filter(|_| per_candidate.message_required_by_peer(&origin, &message_id)) .filter(|_| per_candidate.message_required_by_peer(&origin, &erasure_chunk_index))
}) })
.cloned() .cloned()
.collect::<HashSet<_>>(); .collect::<HashSet<_>>();
send_tracked_gossip_messages_to_peer(ctx, per_candidate, metrics, origin.clone(), messages).await; send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, vec![origin.clone()], messages).await;
} }
} }
@@ -622,37 +547,59 @@ where
let live_candidates = state.cached_live_candidates_unioned(state.view.heads.iter()); let live_candidates = state.cached_live_candidates_unioned(state.view.heads.iter());
// check if the candidate is of interest // check if the candidate is of interest
let live_candidate = if let Some(live_candidate) = live_candidates.get(&message.candidate_hash) { let descriptor = if live_candidates.contains(&message.candidate_hash) {
live_candidate state.per_candidate
.get(&message.candidate_hash)
.expect("All live candidates are contained in per_candidate; qed")
.descriptor
.clone()
} else { } else {
tracing::trace!(
target: LOG_TARGET,
candidate_hash = ?message.candidate_hash,
peer = %origin,
"Peer send not live candidate",
);
modify_reputation(ctx, origin, COST_NOT_A_LIVE_CANDIDATE).await; modify_reputation(ctx, origin, COST_NOT_A_LIVE_CANDIDATE).await;
return Ok(()); return Ok(())
}; };
// check the merkle proof // check the merkle proof against the erasure root in the candidate descriptor.
let root = &live_candidate.descriptor.erasure_root; let anticipated_hash = match branch_hash(
let anticipated_hash = if let Ok(hash) = branch_hash( &descriptor.erasure_root,
root,
&message.erasure_chunk.proof, &message.erasure_chunk.proof,
message.erasure_chunk.index as usize, message.erasure_chunk.index as usize,
) { ) {
hash Ok(hash) => hash,
} else { Err(e) => {
tracing::trace!(
target: LOG_TARGET,
candidate_hash = ?message.candidate_hash,
peer = %origin,
error = ?e,
"Failed to calculate chunk merkle proof",
);
modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await; modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await;
return Ok(()); return Ok(());
},
}; };
let erasure_chunk_hash = BlakeTwo256::hash(&message.erasure_chunk.chunk); let erasure_chunk_hash = BlakeTwo256::hash(&message.erasure_chunk.chunk);
if anticipated_hash != erasure_chunk_hash { if anticipated_hash != erasure_chunk_hash {
tracing::trace!(
target: LOG_TARGET,
candidate_hash = ?message.candidate_hash,
peer = %origin,
"Peer send chunk with invalid merkle proof",
);
modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await; modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await;
return Ok(()); return Ok(());
} }
// an internal unique identifier of this message let erasure_chunk_index = &message.erasure_chunk.index;
let message_id = (message.candidate_hash, message.erasure_chunk.index);
{ {
let per_candidate = state.per_candidate.entry(message_id.0.clone()).or_default(); let per_candidate = state.per_candidate.entry(message.candidate_hash).or_default();
// check if this particular erasure chunk was already sent by that peer before // check if this particular erasure chunk was already sent by that peer before
{ {
@@ -660,18 +607,16 @@ where
.received_messages .received_messages
.entry(origin.clone()) .entry(origin.clone())
.or_default(); .or_default();
if received_set.contains(&message_id) { if !received_set.insert(*erasure_chunk_index) {
modify_reputation(ctx, origin, COST_PEER_DUPLICATE_MESSAGE).await; modify_reputation(ctx, origin, COST_PEER_DUPLICATE_MESSAGE).await;
return Ok(()); return Ok(());
} else {
received_set.insert(message_id.clone());
} }
} }
// insert into known messages and change reputation // insert into known messages and change reputation
if per_candidate if per_candidate
.message_vault .message_vault
.insert(message_id.1, message.clone()) .insert(*erasure_chunk_index, message.clone())
.is_some() .is_some()
{ {
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await; modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await;
@@ -679,24 +624,20 @@ where
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE_FIRST).await; modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE_FIRST).await;
// save the chunk for our index // save the chunk for our index
if let Some(validator_index) = per_candidate.validator_index { if Some(*erasure_chunk_index) == per_candidate.validator_index {
if message.erasure_chunk.index == validator_index { if store_chunk(
if let Err(_e) = store_chunk(
ctx, ctx,
message.candidate_hash.clone(), message.candidate_hash,
live_candidate.descriptor.relay_parent.clone(), descriptor.relay_parent,
message.erasure_chunk.index, message.erasure_chunk.index,
message.erasure_chunk.clone(), message.erasure_chunk.clone(),
) ).await?.is_err() {
.await?
{
tracing::warn!( tracing::warn!(
target: LOG_TARGET, target: LOG_TARGET,
"Failed to store erasure chunk to availability store" "Failed to store erasure chunk to availability store"
); );
} }
} }
}
}; };
} }
// condense the peers to the peers with interest on the candidate // condense the peers to the peers with interest on the candidate
@@ -704,24 +645,24 @@ where
.peer_views .peer_views
.clone() .clone()
.into_iter() .into_iter()
.filter(|(_peer, view)| { .filter(|(_, view)| {
// peers view must contain the candidate hash too // peers view must contain the candidate hash too
state state
.cached_live_candidates_unioned(view.heads.iter()) .cached_live_candidates_unioned(view.heads.iter())
.contains_key(&message_id.0) .contains(&message.candidate_hash)
}) })
.map(|(peer, _)| -> PeerId { peer.clone() }) .map(|(peer, _)| -> PeerId { peer.clone() })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let per_candidate = state.per_candidate.entry(message_id.0.clone()).or_default(); let per_candidate = state.per_candidate.entry(message.candidate_hash).or_default();
let peers = peers let peers = peers
.into_iter() .into_iter()
.filter(|peer| per_candidate.message_required_by_peer(peer, &message_id)) .filter(|peer| per_candidate.message_required_by_peer(peer, erasure_chunk_index))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
// gossip that message to interested peers // gossip that message to interested peers
send_tracked_gossip_message_to_peers(ctx, per_candidate, metrics, peers, message).await; send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message)).await;
Ok(()) Ok(())
} }
@@ -743,13 +684,21 @@ impl AvailabilityDistributionSubsystem {
} }
/// Start processing work as passed on from the Overseer. /// Start processing work as passed on from the Overseer.
async fn run<Context>(self, ctx: Context) -> Result<()>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
let mut state = ProtocolState::default();
self.run_inner(ctx, &mut state).await
}
/// Start processing work.
#[tracing::instrument(skip(self, ctx), fields(subsystem = LOG_TARGET))] #[tracing::instrument(skip(self, ctx), fields(subsystem = LOG_TARGET))]
async fn run<Context>(self, mut ctx: Context) -> Result<()> async fn run_inner<Context>(self, mut ctx: Context, state: &mut ProtocolState) -> Result<()>
where where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>, Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{ {
// work: process incoming messages from the overseer. // work: process incoming messages from the overseer.
let mut state = ProtocolState::default();
loop { loop {
let message = ctx let message = ctx
.recv() .recv()
@@ -762,7 +711,7 @@ impl AvailabilityDistributionSubsystem {
if let Err(e) = handle_network_msg( if let Err(e) = handle_network_msg(
&mut ctx, &mut ctx,
&self.keystore.clone(), &self.keystore.clone(),
&mut state, state,
&self.metrics, &self.metrics,
event, event,
) )
@@ -807,96 +756,102 @@ where
} }
} }
/// Obtain all live candidates based on an iterator of relay heads. // Metadata about a candidate that is part of the live_candidates set.
#[tracing::instrument(level = "trace", skip(ctx, relay_parents), fields(subsystem = LOG_TARGET))] //
async fn query_live_candidates_without_ancestors<Context>( // Those which were not present in a cache are "fresh" and have their candidate descriptor attached. This
// information is propagated to the higher level where it can be used to create data entries. Cached candidates
// already have entries associated with them, and thus don't need this metadata to be fetched.
#[derive(Debug)]
enum FetchedLiveCandidate {
Cached,
Fresh(CandidateDescriptor),
}
/// Obtain all live candidates for all given `relay_blocks`.
///
/// This returns a set of all candidate hashes pending availability within the state
/// of the explicitly referenced relay heads.
///
/// This also queries the provided `receipts` cache before reaching into the
/// runtime and updates it with the information learned.
#[tracing::instrument(level = "trace", skip(ctx, relay_blocks, receipts), fields(subsystem = LOG_TARGET))]
async fn query_pending_availability_at<Context>(
ctx: &mut Context, ctx: &mut Context,
relay_parents: impl IntoIterator<Item = Hash>, relay_blocks: impl IntoIterator<Item = Hash>,
) -> Result<HashSet<CommittedCandidateReceipt>> receipts: &mut HashMap<Hash, HashSet<CandidateHash>>,
) -> Result<HashMap<CandidateHash, FetchedLiveCandidate>>
where where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>, Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{ {
let iter = relay_parents.into_iter(); let mut live_candidates = HashMap::new();
let hint = iter.size_hint();
let mut live_candidates = HashSet::with_capacity(hint.1.unwrap_or(hint.0)); // fetch and fill out cache for each of these
for relay_parent in iter { for relay_parent in relay_blocks {
let paras = query_para_ids(ctx, relay_parent).await?; let receipts_for = match receipts.entry(relay_parent) {
for para in paras { Entry::Occupied(e) => {
live_candidates.extend(
e.get().iter().cloned().map(|c| (c, FetchedLiveCandidate::Cached))
);
continue
},
e => e.or_default(),
};
for para in query_para_ids(ctx, relay_parent).await? {
if let Some(ccr) = query_pending_availability(ctx, relay_parent, para).await? { if let Some(ccr) = query_pending_availability(ctx, relay_parent, para).await? {
live_candidates.insert(ccr); let receipt_hash = ccr.hash();
let descriptor = ccr.descriptor().clone();
// unfortunately we have no good way of telling the candidate was
// cached until now. But we don't clobber a `Cached` entry if there
// is one already.
live_candidates.entry(receipt_hash)
.or_insert(FetchedLiveCandidate::Fresh(descriptor));
receipts_for.insert(receipt_hash);
} }
} }
} }
Ok(live_candidates) Ok(live_candidates)
} }
/// Obtain all live candidates based on an iterator or relay heads including `k` ancestors. /// Obtain all live candidates under a particular relay head. This implicitly includes
/// `K` ancestors of the head, such that the candidates pending availability in all of
/// the states of the head and the ancestors are unioned together to produce the
/// return type of this function. Each candidate hash is paired.
/// ///
/// Relay parent. /// This also updates all `receipts` cached by the protocol state and returns a list
#[tracing::instrument(level = "trace", skip(ctx, relay_parents), fields(subsystem = LOG_TARGET))] /// of up to `K` ancestors of the relay-parent.
#[tracing::instrument(level = "trace", skip(ctx, receipts), fields(subsystem = LOG_TARGET))]
async fn query_live_candidates<Context>( async fn query_live_candidates<Context>(
ctx: &mut Context, ctx: &mut Context,
state: &mut ProtocolState, receipts: &mut HashMap<Hash, HashSet<CandidateHash>>,
relay_parents: impl IntoIterator<Item = Hash>, relay_parent: Hash,
) -> Result<HashMap<Hash, (CandidateHash, CommittedCandidateReceipt)>> ) -> Result<(HashMap<CandidateHash, FetchedLiveCandidate>, Vec<Hash>)>
where where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>, Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{ {
let iter = relay_parents.into_iter();
let hint = iter.size_hint();
let capacity = hint.1.unwrap_or(hint.0) * (1 + AvailabilityDistributionSubsystem::K);
let mut live_candidates =
HashMap::<Hash, (CandidateHash, CommittedCandidateReceipt)>::with_capacity(capacity);
for relay_parent in iter {
// register one of relay parents (not the ancestors) // register one of relay parents (not the ancestors)
let mut ancestors = query_up_to_k_ancestors_in_same_session( let ancestors = query_up_to_k_ancestors_in_same_session(
ctx, ctx,
relay_parent, relay_parent,
AvailabilityDistributionSubsystem::K, AvailabilityDistributionSubsystem::K,
) )
.await?; .await?;
ancestors.push(relay_parent); // query the ones that were not present in the receipts cache and add them
// to it.
let live_candidates = query_pending_availability_at(
ctx,
ancestors.iter().cloned().chain(std::iter::once(relay_parent)),
receipts,
).await?;
// ancestors might overlap, so check the cache too Ok((live_candidates, ancestors))
let unknown = ancestors
.into_iter()
.filter(|relay_parent_or_ancestor| {
// use the ones which we pulled before
// but keep the unknown relay parents
state
.receipts
.get(relay_parent_or_ancestor)
.and_then(|receipts| {
// directly extend the live_candidates with the cached value
live_candidates.extend(receipts.into_iter().map(
|(receipt_hash, receipt)| {
(relay_parent, (receipt_hash.clone(), receipt.clone()))
},
));
Some(())
})
.is_none()
})
.collect::<Vec<_>>();
// query the ones that were not present in the receipts cache
let receipts = query_live_candidates_without_ancestors(ctx, unknown.clone()).await?;
live_candidates.extend(
unknown.into_iter().zip(
receipts
.into_iter()
.map(|receipt| (receipt.hash(), receipt)),
),
);
}
Ok(live_candidates)
} }
/// Query all para IDs. /// Query all para IDs that are occupied under a given relay-parent.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))] #[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_para_ids<Context>(ctx: &mut Context, relay_parent: Hash) -> Result<Vec<ParaId>> async fn query_para_ids<Context>(ctx: &mut Context, relay_parent: Hash) -> Result<Vec<ParaId>>
where where
@@ -909,7 +864,7 @@ where
))) )))
.await; .await;
let all_para_ids: Vec<_> = rx let all_para_ids = rx
.await .await
.map_err(|e| Error::AvailabilityCoresResponseChannel(e))? .map_err(|e| Error::AvailabilityCoresResponseChannel(e))?
.map_err(|e| Error::AvailabilityCores(e))?; .map_err(|e| Error::AvailabilityCores(e))?;
@@ -955,8 +910,7 @@ where
AvailabilityStoreMessage::QueryDataAvailability(candidate_hash, tx), AvailabilityStoreMessage::QueryDataAvailability(candidate_hash, tx),
)).await; )).await;
rx.await rx.await.map_err(|e| Error::QueryAvailabilityResponseChannel(e))
.map_err(|e| Error::QueryAvailabilityResponseChannel(e))
} }
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))] #[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
@@ -1111,18 +1065,11 @@ where
// iterate from youngest to oldest // iterate from youngest to oldest
let mut iter = ancestors.into_iter().peekable(); let mut iter = ancestors.into_iter().peekable();
while let Some(ancestor) = iter.next() { while let Some((ancestor, ancestor_parent)) = iter.next().and_then(|a| iter.peek().map(|ap| (a, ap))) {
if let Some(ancestor_parent) = iter.peek() { if query_session_index_for_child(ctx, *ancestor_parent).await? != desired_session {
let session = query_session_index_for_child(ctx, *ancestor_parent).await?;
if session != desired_session {
break; break;
} }
acc.push(ancestor); acc.push(ancestor);
} else {
// either ended up at genesis or the blocks were
// already pruned
break;
}
} }
debug_assert!(acc.len() <= k); debug_assert!(acc.len() <= k);
File diff suppressed because it is too large Load Diff
+1 -1
View File
@@ -231,7 +231,7 @@ impl NetworkBridgeMessage {
} }
/// Availability Distribution Message. /// Availability Distribution Message.
#[derive(Debug)] #[derive(Debug, derive_more::From)]
pub enum AvailabilityDistributionMessage { pub enum AvailabilityDistributionMessage {
/// Event from the network bridge. /// Event from the network bridge.
NetworkBridgeUpdateV1(NetworkBridgeEvent<protocol_v1::AvailabilityDistributionMessage>), NetworkBridgeUpdateV1(NetworkBridgeEvent<protocol_v1::AvailabilityDistributionMessage>),
@@ -23,7 +23,8 @@ Output:
For each relay-parent in our local view update, look at all backed candidates pending availability. Distribute via gossip all erasure chunks for all candidates that we have to peers. For each relay-parent in our local view update, look at all backed candidates pending availability. Distribute via gossip all erasure chunks for all candidates that we have to peers.
We define an operation `live_candidates(relay_heads) -> Set<CommittedCandidateReceipt>` which returns a set of [`CommittedCandidateReceipt`s](../../types/candidate.md#committed-candidate-receipt). We define an operation `live_candidates(relay_heads) -> Set<CandidateHash>` which returns a set of hashes corresponding to [`CandidateReceipt`s](../../types/candidate.md#candidate-receipt).
This is defined as all candidates pending availability in any of those relay-chain heads or any of their last `K` ancestors in the same session. We assume that state is not pruned within `K` blocks of the chain-head. `K` commonly is small and is currently fixed to `K=3`. This is defined as all candidates pending availability in any of those relay-chain heads or any of their last `K` ancestors in the same session. We assume that state is not pruned within `K` blocks of the chain-head. `K` commonly is small and is currently fixed to `K=3`.
We will send any erasure-chunks that correspond to candidates in `live_candidates(peer_most_recent_view_update)`. We will send any erasure-chunks that correspond to candidates in `live_candidates(peer_most_recent_view_update)`.