diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index a55b062db8..692a9c1c48 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -4914,10 +4914,8 @@ name = "polkadot-availability-distribution" version = "0.1.0" dependencies = [ "assert_matches", - "env_logger 0.8.2", "futures 0.3.8", - "futures-timer 3.0.2", - "log", + "maplit", "parity-scale-codec", "polkadot-erasure-coding", "polkadot-node-network-protocol", @@ -4926,11 +4924,11 @@ dependencies = [ "polkadot-node-subsystem-util", "polkadot-primitives", "sc-keystore", - "smallvec 1.5.1", "sp-application-crypto", "sp-core", "sp-keyring", "sp-keystore", + "sp-tracing", "thiserror", "tracing", "tracing-futures", diff --git a/polkadot/node/core/av-store/src/lib.rs b/polkadot/node/core/av-store/src/lib.rs index d1d81d031c..66bec12142 100644 --- a/polkadot/node/core/av-store/src/lib.rs +++ b/polkadot/node/core/av-store/src/lib.rs @@ -713,25 +713,51 @@ where match msg { QueryAvailableData(hash, tx) => { - tx.send(available_data(&subsystem.inner, &hash).map(|d| d.data)) - .map_err(|_| oneshot::Canceled)?; + tx.send(available_data(&subsystem.inner, &hash).map(|d| d.data)).map_err(|_| oneshot::Canceled)?; } QueryDataAvailability(hash, tx) => { - tx.send(available_data(&subsystem.inner, &hash).is_some()) - .map_err(|_| oneshot::Canceled)?; + let result = available_data(&subsystem.inner, &hash).is_some(); + + tracing::trace!( + target: LOG_TARGET, + candidate_hash = ?hash, + availability = ?result, + "Queried data availability", + ); + + tx.send(result).map_err(|_| oneshot::Canceled)?; } QueryChunk(hash, id, tx) => { - tx.send(get_chunk(subsystem, &hash, id)?) - .map_err(|_| oneshot::Canceled)?; + tx.send(get_chunk(subsystem, &hash, id)?).map_err(|_| oneshot::Canceled)?; } QueryChunkAvailability(hash, id, tx) => { - tx.send(get_chunk(subsystem, &hash, id)?.is_some()) - .map_err(|_| oneshot::Canceled)?; + let result = get_chunk(subsystem, &hash, id).map(|r| r.is_some()); + + tracing::trace!( + target: LOG_TARGET, + candidate_hash = ?hash, + availability = ?result, + "Queried chunk availability", + ); + + tx.send(result?).map_err(|_| oneshot::Canceled)?; } StoreChunk { candidate_hash, relay_parent, validator_index, chunk, tx } => { + let chunk_index = chunk.index; // Current block number is relay_parent block number + 1. let block_number = get_block_number(ctx, relay_parent).await? + 1; - match store_chunk(subsystem, &candidate_hash, validator_index, chunk, block_number) { + let result = store_chunk(subsystem, &candidate_hash, validator_index, chunk, block_number); + + tracing::trace!( + target: LOG_TARGET, + %chunk_index, + ?candidate_hash, + %block_number, + ?result, + "Stored chunk", + ); + + match result { Err(e) => { tx.send(Err(())).map_err(|_| oneshot::Canceled)?; return Err(e); @@ -742,7 +768,11 @@ where } } StoreAvailableData(hash, id, n_validators, av_data, tx) => { - match store_available_data(subsystem, &hash, id, n_validators, av_data) { + let result = store_available_data(subsystem, &hash, id, n_validators, av_data); + + tracing::trace!(target: LOG_TARGET, candidate_hash = ?hash, ?result, "Stored available data"); + + match result { Err(e) => { tx.send(Err(())).map_err(|_| oneshot::Canceled)?; return Err(e); diff --git a/polkadot/node/core/bitfield-signing/src/lib.rs b/polkadot/node/core/bitfield-signing/src/lib.rs index e02bd5661f..d5c7f4b29a 100644 --- a/polkadot/node/core/bitfield-signing/src/lib.rs +++ b/polkadot/node/core/bitfield-signing/src/lib.rs @@ -78,6 +78,8 @@ async fn get_core_availability( ) -> Result { let span = jaeger::hash_span(&relay_parent, "core_availability"); if let CoreState::Occupied(core) = core { + tracing::trace!(target: LOG_TARGET, para_id = %core.para_id, "Getting core availability"); + let _span = span.child("occupied"); let (tx, rx) = oneshot::channel(); sender @@ -93,7 +95,10 @@ async fn get_core_availability( let committed_candidate_receipt = match rx.await? { Ok(Some(ccr)) => ccr, - Ok(None) => return Ok(false), + Ok(None) => { + tracing::trace!(target: LOG_TARGET, para_id = %core.para_id, "No committed candidate"); + return Ok(false) + }, Err(e) => { // Don't take down the node on runtime API errors. tracing::warn!(target: LOG_TARGET, err = ?e, "Encountered a runtime API error"); @@ -103,6 +108,7 @@ async fn get_core_availability( drop(_span); let _span = span.child("query chunk"); + let candidate_hash = committed_candidate_receipt.hash(); let (tx, rx) = oneshot::channel(); sender @@ -110,13 +116,24 @@ async fn get_core_availability( .await .send( AllMessages::from(AvailabilityStoreMessage::QueryChunkAvailability( - committed_candidate_receipt.hash(), + candidate_hash, validator_idx, tx, )).into(), ) .await?; - return rx.await.map_err(Into::into); + + let res = rx.await.map_err(Into::into); + + tracing::trace!( + target: LOG_TARGET, + para_id = %core.para_id, + availability = ?res, + ?candidate_hash, + "Candidate availability", + ); + + return res; } Ok(false) diff --git a/polkadot/node/network/availability-distribution/Cargo.toml b/polkadot/node/network/availability-distribution/Cargo.toml index dd1c6f61c3..b3dc14422c 100644 --- a/polkadot/node/network/availability-distribution/Cargo.toml +++ b/polkadot/node/network/availability-distribution/Cargo.toml @@ -23,9 +23,7 @@ polkadot-subsystem-testhelpers = { package = "polkadot-node-subsystem-test-helpe sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] } sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } -futures-timer = "3.0.2" -env_logger = "0.8.2" assert_matches = "1.4.0" -smallvec = "1.5.1" -log = "0.4.11" +maplit = "1.0" diff --git a/polkadot/node/network/availability-distribution/src/lib.rs b/polkadot/node/network/availability-distribution/src/lib.rs index 67d1d4ffa8..5bf11defd2 100644 --- a/polkadot/node/network/availability-distribution/src/lib.rs +++ b/polkadot/node/network/availability-distribution/src/lib.rs @@ -38,6 +38,7 @@ use polkadot_node_subsystem_util::metrics::{self, prometheus}; use polkadot_primitives::v1::{ BlakeTwo256, CommittedCandidateReceipt, CoreState, ErasureChunk, Hash, HashT, Id as ParaId, SessionIndex, ValidatorId, ValidatorIndex, PARACHAIN_KEY_TYPE_ID, CandidateHash, + CandidateDescriptor, }; use polkadot_subsystem::messages::{ AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage, ChainApiMessage, @@ -50,6 +51,7 @@ use polkadot_subsystem::{ SubsystemContext, SubsystemError, }; use std::collections::{HashMap, HashSet}; +use std::collections::hash_map::Entry; use std::iter; use thiserror::Error; @@ -116,6 +118,12 @@ pub struct AvailabilityGossipMessage { pub erasure_chunk: ErasureChunk, } +impl From for protocol_v1::AvailabilityDistributionMessage { + fn from(message: AvailabilityGossipMessage) -> Self { + Self::Chunk(message.candidate_hash, message.erasure_chunk) + } +} + /// Data used to track information of peers and relay parents the /// overseer ordered us to work on. #[derive(Default, Clone, Debug)] @@ -129,19 +137,8 @@ struct ProtocolState { /// Caches a mapping of relay parents or ancestor to live candidate receipts. /// Allows fast intersection of live candidates with views and consecutive unioning. - /// Maps relay parent / ancestor -> live candidate receipts + its hash. - receipts: HashMap>, - - /// Allow reverse caching of view checks. - /// Maps candidate hash -> relay parent for extracting meta information from `PerRelayParent`. - /// Note that the presence of this is not sufficient to determine if deletion is OK, i.e. - /// two histories could cover this. - reverse: HashMap, - - /// Keeps track of which candidate receipts are required due to ancestors of which relay parents - /// of our view. - /// Maps ancestor -> relay parents in view - ancestry: HashMap>, + /// Maps relay parent / ancestor -> candidate receipts. + receipts: HashMap>, /// Track things needed to start and stop work on a particular relay parent. per_relay_parent: HashMap, @@ -157,24 +154,30 @@ struct PerCandidate { /// candidate hash + erasure chunk index -> gossip message message_vault: HashMap, - /// Track received candidate hashes and validator indices from peers. - received_messages: HashMap>, + /// Track received erasure chunk indices per peer. + received_messages: HashMap>, - /// Track already sent candidate hashes and the erasure chunk index to the peers. - sent_messages: HashMap>, + /// Track sent erasure chunk indices per peer. + sent_messages: HashMap>, /// The set of validators. validators: Vec, /// If this node is a validator, note the index in the validator set. validator_index: Option, + + /// The descriptor of this candidate. + descriptor: CandidateDescriptor, + + /// The set of relay chain blocks this appears to be live in. + live_in: HashSet, } impl PerCandidate { - /// Returns `true` iff the given `message` is required by the given `peer`. - fn message_required_by_peer(&self, peer: &PeerId, message: &(CandidateHash, ValidatorIndex)) -> bool { - self.received_messages.get(peer).map(|v| !v.contains(message)).unwrap_or(true) - && self.sent_messages.get(peer).map(|v| !v.contains(message)).unwrap_or(true) + /// Returns `true` iff the given `validator_index` is required by the given `peer`. + fn message_required_by_peer(&self, peer: &PeerId, validator_index: &ValidatorIndex) -> bool { + self.received_messages.get(peer).map(|v| !v.contains(validator_index)).unwrap_or(true) + && self.sent_messages.get(peer).map(|v| !v.contains(validator_index)).unwrap_or(true) } } @@ -182,139 +185,85 @@ impl PerCandidate { struct PerRelayParent { /// Set of `K` ancestors for this relay parent. ancestors: Vec, + /// Live candidates, according to this relay parent. + live_candidates: HashSet, } impl ProtocolState { - /// Collects the relay_parents ancestors including the relay parents themselfes. - #[tracing::instrument(level = "trace", skip(relay_parents), fields(subsystem = LOG_TARGET))] - fn extend_with_ancestors<'a>( - &'a self, - relay_parents: impl IntoIterator + 'a, - ) -> HashSet { - relay_parents - .into_iter() - .map(|relay_parent| { - self.per_relay_parent - .get(relay_parent) - .into_iter() - .map(|per_relay_parent| per_relay_parent.ancestors.iter().cloned()) - .flatten() - .chain(iter::once(*relay_parent)) - }) - .flatten() - .collect::>() - } - - /// Unionize all cached entries for the given relay parents and its ancestors. + /// Unionize all live candidate hashes of the given relay parents and their recent + /// ancestors. + /// /// Ignores all non existent relay parents, so this can be used directly with a peers view. - /// Returns a map from candidate hash -> receipt + /// Returns a set of candidate hashes. #[tracing::instrument(level = "trace", skip(relay_parents), fields(subsystem = LOG_TARGET))] fn cached_live_candidates_unioned<'a>( &'a self, relay_parents: impl IntoIterator + 'a, - ) -> HashMap { - let relay_parents_and_ancestors = self.extend_with_ancestors(relay_parents); - relay_parents_and_ancestors + ) -> HashSet { + relay_parents .into_iter() - .filter_map(|relay_parent_or_ancestor| self.receipts.get(&relay_parent_or_ancestor)) - .map(|receipt_set| receipt_set.into_iter()) + .filter_map(|r| self.per_relay_parent.get(r)) + .map(|per_relay_parent| per_relay_parent.live_candidates.iter().cloned()) .flatten() - .map(|(receipt_hash, receipt)| (receipt_hash.clone(), receipt.clone())) .collect() } - #[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))] - async fn add_relay_parent( + #[tracing::instrument(level = "trace", skip(candidates), fields(subsystem = LOG_TARGET))] + fn add_relay_parent( &mut self, - ctx: &mut Context, relay_parent: Hash, validators: Vec, validator_index: Option, - ) -> Result<()> - where - Context: SubsystemContext, - { - let candidates = query_live_candidates(ctx, self, std::iter::once(relay_parent)).await?; + candidates: HashMap, + ancestors: Vec, + ) { + let candidate_hashes: Vec<_> = candidates.keys().cloned().collect(); // register the relation of relay_parent to candidate.. - // ..and the reverse association. - for (relay_parent_or_ancestor, (receipt_hash, receipt)) in candidates.clone() { - self.reverse - .insert(receipt_hash.clone(), relay_parent_or_ancestor.clone()); - let per_candidate = self.per_candidate.entry(receipt_hash.clone()).or_default(); - per_candidate.validator_index = validator_index.clone(); - per_candidate.validators = validators.clone(); + for (receipt_hash, fetched) in candidates { + let per_candidate = self.per_candidate.entry(receipt_hash).or_default(); - self.receipts - .entry(relay_parent_or_ancestor) - .or_default() - .insert((receipt_hash, receipt)); + // Cached candidates already have entries and thus don't need this + // information to be set. + if let FetchedLiveCandidate::Fresh(descriptor) = fetched { + per_candidate.validator_index = validator_index.clone(); + per_candidate.validators = validators.clone(); + per_candidate.descriptor = descriptor; + } + per_candidate.live_in.insert(relay_parent); } - // collect the ancestors again from the hash map - let ancestors = candidates - .iter() - .filter_map(|(ancestor_or_relay_parent, _receipt)| { - if ancestor_or_relay_parent == &relay_parent { - None - } else { - Some(*ancestor_or_relay_parent) - } - }) - .collect::>(); - - // mark all the ancestors as "needed" by this newly added relay parent - for ancestor in ancestors.iter() { - self.ancestry - .entry(ancestor.clone()) - .or_default() - .insert(relay_parent); - } - - self.per_relay_parent - .entry(relay_parent) - .or_default() - .ancestors = ancestors; - - Ok(()) + let per_relay_parent = self.per_relay_parent.entry(relay_parent).or_default(); + per_relay_parent.ancestors = ancestors; + per_relay_parent.live_candidates.extend(candidate_hashes); } #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] fn remove_relay_parent(&mut self, relay_parent: &Hash) { - // we might be ancestor of some other relay_parent - if let Some(ref mut descendants) = self.ancestry.get_mut(relay_parent) { - // if we were the last user, and it is - // not explicitly set to be worked on by the overseer - if descendants.is_empty() { - // remove from the ancestry index - self.ancestry.remove(relay_parent); - // and also remove the actual receipt - if let Some(candidates) = self.receipts.remove(relay_parent) { - candidates.into_iter().for_each(|c| { self.per_candidate.remove(&c.0); }); - } - } - } if let Some(per_relay_parent) = self.per_relay_parent.remove(relay_parent) { - // remove all "references" from the hash maps and sets for all ancestors - for ancestor in per_relay_parent.ancestors { - // one of our decendants might be ancestor of some other relay_parent - if let Some(ref mut descendants) = self.ancestry.get_mut(&ancestor) { - // we do not need this descendant anymore - descendants.remove(&relay_parent); - // if we were the last user, and it is - // not explicitly set to be worked on by the overseer - if descendants.is_empty() && !self.per_relay_parent.contains_key(&ancestor) { - // remove from the ancestry index - self.ancestry.remove(&ancestor); - // and also remove the actual receipt - if let Some(candidates) = self.receipts.remove(&ancestor) { - candidates.into_iter().for_each(|c| { self.per_candidate.remove(&c.0); }); - } + for candidate_hash in per_relay_parent.live_candidates { + // Prune the candidate if this was the last member of our view + // to consider it live (including its ancestors). + if let Entry::Occupied(mut occ) = self.per_candidate.entry(candidate_hash) { + occ.get_mut().live_in.remove(relay_parent); + if occ.get().live_in.is_empty() { + occ.remove(); } } } } } + + // Removes all entries from receipts which aren't referenced in the ancestry of + // one of our live relay-chain heads. + fn clean_up_receipts_cache(&mut self) { + let extended_view: HashSet<_> = self.per_relay_parent.iter() + .map(|(r_hash, v)| v.ancestors.iter().cloned().chain(std::iter::once(*r_hash))) + .flatten() + .collect(); + + self.receipts.retain(|ancestor_hash, _| extended_view.contains(ancestor_hash)); + } } /// Deal with network bridge updates and track what needs to be tracked @@ -387,27 +336,30 @@ where for added in view.difference(&old_view) { let validators = query_validators(ctx, *added).await?; let validator_index = obtain_our_validator_index(&validators, keystore.clone()).await; - state - .add_relay_parent(ctx, *added, validators, validator_index) - .await?; + let (candidates, ancestors) + = query_live_candidates(ctx, &mut state.receipts, *added).await?; + + state.add_relay_parent( + *added, + validators, + validator_index, + candidates, + ancestors, + ); } // handle all candidates - for (candidate_hash, _receipt) in state.cached_live_candidates_unioned(view.difference(&old_view)) { - let per_candidate = state.per_candidate.entry(candidate_hash).or_default(); - - // assure the node has the validator role - if per_candidate.validator_index.is_none() { - continue; - }; + for candidate_hash in state.cached_live_candidates_unioned(view.difference(&old_view)) { + // If we are not a validator for this candidate, let's skip it. + if state.per_candidate.entry(candidate_hash).or_default().validator_index.is_none() { + continue + } // check if the availability is present in the store exists if !query_data_availability(ctx, candidate_hash).await? { continue; } - let validator_count = per_candidate.validators.len(); - // obtain interested peers in the candidate hash let peers: Vec = state .peer_views @@ -417,79 +369,66 @@ where // collect all direct interests of a peer w/o ancestors state .cached_live_candidates_unioned(view.heads.iter()) - .contains_key(&candidate_hash) + .contains(&candidate_hash) }) .map(|(peer, _view)| peer.clone()) .collect(); + let per_candidate = state.per_candidate.entry(candidate_hash).or_default(); + + let validator_count = per_candidate.validators.len(); + // distribute all erasure messages to interested peers for chunk_index in 0u32..(validator_count as u32) { - // only the peers which did not receive this particular erasure chunk - let per_candidate = state.per_candidate.entry(candidate_hash).or_default(); - - // obtain the chunks from the cache, if not fallback - // and query the availability store - let message_id = (candidate_hash, chunk_index); - let erasure_chunk = if let Some(message) = per_candidate.message_vault.get(&chunk_index) { - message.erasure_chunk.clone() + let message = if let Some(message) = per_candidate.message_vault.get(&chunk_index) { + tracing::trace!( + target: LOG_TARGET, + %chunk_index, + ?candidate_hash, + "Retrieved chunk from message vault", + ); + message.clone() } else if let Some(erasure_chunk) = query_chunk(ctx, candidate_hash, chunk_index as ValidatorIndex).await? { - erasure_chunk + tracing::trace!( + target: LOG_TARGET, + %chunk_index, + ?candidate_hash, + "Retrieved chunk from availability storage", + ); + + AvailabilityGossipMessage { + candidate_hash, + erasure_chunk, + } } else { + tracing::error!( + target: LOG_TARGET, + %chunk_index, + ?candidate_hash, + "Availability store reported that we have the availability data, but we could not retrieve a chunk of it!", + ); continue; }; - debug_assert_eq!(erasure_chunk.index, chunk_index); + debug_assert_eq!(message.erasure_chunk.index, chunk_index); let peers = peers .iter() - .filter(|peer| per_candidate.message_required_by_peer(peer, &message_id)) + .filter(|peer| per_candidate.message_required_by_peer(peer, &chunk_index)) .cloned() .collect::>(); - let message = AvailabilityGossipMessage { - candidate_hash, - erasure_chunk, - }; - send_tracked_gossip_message_to_peers(ctx, per_candidate, metrics, peers, message).await; + send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message)).await; } } // cleanup the removed relay parents and their states - let removed = old_view.difference(&view).collect::>(); - for removed in removed { - state.remove_relay_parent(&removed); - } + old_view.difference(&view).for_each(|r| state.remove_relay_parent(r)); + state.clean_up_receipts_cache(); + Ok(()) } -#[inline(always)] -async fn send_tracked_gossip_message_to_peers( - ctx: &mut Context, - per_candidate: &mut PerCandidate, - metrics: &Metrics, - peers: Vec, - message: AvailabilityGossipMessage, -) -where - Context: SubsystemContext, -{ - send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message)).await -} - -#[inline(always)] -async fn send_tracked_gossip_messages_to_peer( - ctx: &mut Context, - per_candidate: &mut PerCandidate, - metrics: &Metrics, - peer: PeerId, - message_iter: impl IntoIterator, -) -where - Context: SubsystemContext, -{ - send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, vec![peer], message_iter).await -} - #[tracing::instrument(level = "trace", skip(ctx, metrics, message_iter), fields(subsystem = LOG_TARGET))] async fn send_tracked_gossip_messages_to_peers( ctx: &mut Context, @@ -501,37 +440,27 @@ async fn send_tracked_gossip_messages_to_peers( where Context: SubsystemContext, { - if peers.is_empty() { - return; - } for message in message_iter { for peer in peers.iter() { - let message_id = (message.candidate_hash, message.erasure_chunk.index); per_candidate .sent_messages .entry(peer.clone()) .or_default() - .insert(message_id); + .insert(message.erasure_chunk.index); } per_candidate .message_vault .insert(message.erasure_chunk.index, message.clone()); - let wire_message = protocol_v1::AvailabilityDistributionMessage::Chunk( - message.candidate_hash, - message.erasure_chunk, - ); - - ctx.send_message(AllMessages::NetworkBridge( - NetworkBridgeMessage::SendValidationMessage( + if !peers.is_empty() { + ctx.send_message(NetworkBridgeMessage::SendValidationMessage( peers.clone(), - protocol_v1::ValidationProtocol::AvailabilityDistribution(wire_message), - ), - )) - .await; + protocol_v1::ValidationProtocol::AvailabilityDistribution(message.into()), + ).into()).await; - metrics.on_chunk_distributed(); + metrics.on_chunk_distributed(); + } } } @@ -558,29 +487,25 @@ where // the union of all relay parent's candidates. let added_candidates = state.cached_live_candidates_unioned(added.iter()); - // Send all messages we've seen before and the peer is now interested - // in to that peer. - - for (candidate_hash, _receipt) in added_candidates { + // Send all messages we've seen before and the peer is now interested in. + for candidate_hash in added_candidates { let per_candidate = state.per_candidate.entry(candidate_hash).or_default(); // obtain the relevant chunk indices not sent yet let messages = ((0 as ValidatorIndex)..(per_candidate.validators.len() as ValidatorIndex)) .into_iter() .filter_map(|erasure_chunk_index: ValidatorIndex| { - let message_id = (candidate_hash, erasure_chunk_index); - // try to pick up the message from the message vault // so we send as much as we have per_candidate .message_vault .get(&erasure_chunk_index) - .filter(|_| per_candidate.message_required_by_peer(&origin, &message_id)) + .filter(|_| per_candidate.message_required_by_peer(&origin, &erasure_chunk_index)) }) .cloned() .collect::>(); - send_tracked_gossip_messages_to_peer(ctx, per_candidate, metrics, origin.clone(), messages).await; + send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, vec![origin.clone()], messages).await; } } @@ -622,37 +547,59 @@ where let live_candidates = state.cached_live_candidates_unioned(state.view.heads.iter()); // check if the candidate is of interest - let live_candidate = if let Some(live_candidate) = live_candidates.get(&message.candidate_hash) { - live_candidate + let descriptor = if live_candidates.contains(&message.candidate_hash) { + state.per_candidate + .get(&message.candidate_hash) + .expect("All live candidates are contained in per_candidate; qed") + .descriptor + .clone() } else { + tracing::trace!( + target: LOG_TARGET, + candidate_hash = ?message.candidate_hash, + peer = %origin, + "Peer send not live candidate", + ); modify_reputation(ctx, origin, COST_NOT_A_LIVE_CANDIDATE).await; - return Ok(()); + return Ok(()) }; - // check the merkle proof - let root = &live_candidate.descriptor.erasure_root; - let anticipated_hash = if let Ok(hash) = branch_hash( - root, + // check the merkle proof against the erasure root in the candidate descriptor. + let anticipated_hash = match branch_hash( + &descriptor.erasure_root, &message.erasure_chunk.proof, message.erasure_chunk.index as usize, ) { - hash - } else { - modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await; - return Ok(()); + Ok(hash) => hash, + Err(e) => { + tracing::trace!( + target: LOG_TARGET, + candidate_hash = ?message.candidate_hash, + peer = %origin, + error = ?e, + "Failed to calculate chunk merkle proof", + ); + modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await; + return Ok(()); + }, }; let erasure_chunk_hash = BlakeTwo256::hash(&message.erasure_chunk.chunk); if anticipated_hash != erasure_chunk_hash { + tracing::trace!( + target: LOG_TARGET, + candidate_hash = ?message.candidate_hash, + peer = %origin, + "Peer send chunk with invalid merkle proof", + ); modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await; return Ok(()); } - // an internal unique identifier of this message - let message_id = (message.candidate_hash, message.erasure_chunk.index); + let erasure_chunk_index = &message.erasure_chunk.index; { - let per_candidate = state.per_candidate.entry(message_id.0.clone()).or_default(); + let per_candidate = state.per_candidate.entry(message.candidate_hash).or_default(); // check if this particular erasure chunk was already sent by that peer before { @@ -660,18 +607,16 @@ where .received_messages .entry(origin.clone()) .or_default(); - if received_set.contains(&message_id) { + if !received_set.insert(*erasure_chunk_index) { modify_reputation(ctx, origin, COST_PEER_DUPLICATE_MESSAGE).await; return Ok(()); - } else { - received_set.insert(message_id.clone()); } } // insert into known messages and change reputation if per_candidate .message_vault - .insert(message_id.1, message.clone()) + .insert(*erasure_chunk_index, message.clone()) .is_some() { modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await; @@ -679,22 +624,18 @@ where modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE_FIRST).await; // save the chunk for our index - if let Some(validator_index) = per_candidate.validator_index { - if message.erasure_chunk.index == validator_index { - if let Err(_e) = store_chunk( - ctx, - message.candidate_hash.clone(), - live_candidate.descriptor.relay_parent.clone(), - message.erasure_chunk.index, - message.erasure_chunk.clone(), - ) - .await? - { - tracing::warn!( - target: LOG_TARGET, - "Failed to store erasure chunk to availability store" - ); - } + if Some(*erasure_chunk_index) == per_candidate.validator_index { + if store_chunk( + ctx, + message.candidate_hash, + descriptor.relay_parent, + message.erasure_chunk.index, + message.erasure_chunk.clone(), + ).await?.is_err() { + tracing::warn!( + target: LOG_TARGET, + "Failed to store erasure chunk to availability store" + ); } } }; @@ -704,24 +645,24 @@ where .peer_views .clone() .into_iter() - .filter(|(_peer, view)| { + .filter(|(_, view)| { // peers view must contain the candidate hash too state .cached_live_candidates_unioned(view.heads.iter()) - .contains_key(&message_id.0) + .contains(&message.candidate_hash) }) .map(|(peer, _)| -> PeerId { peer.clone() }) .collect::>(); - let per_candidate = state.per_candidate.entry(message_id.0.clone()).or_default(); + let per_candidate = state.per_candidate.entry(message.candidate_hash).or_default(); let peers = peers .into_iter() - .filter(|peer| per_candidate.message_required_by_peer(peer, &message_id)) + .filter(|peer| per_candidate.message_required_by_peer(peer, erasure_chunk_index)) .collect::>(); // gossip that message to interested peers - send_tracked_gossip_message_to_peers(ctx, per_candidate, metrics, peers, message).await; + send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message)).await; Ok(()) } @@ -743,13 +684,21 @@ impl AvailabilityDistributionSubsystem { } /// Start processing work as passed on from the Overseer. + async fn run(self, ctx: Context) -> Result<()> + where + Context: SubsystemContext, + { + let mut state = ProtocolState::default(); + self.run_inner(ctx, &mut state).await + } + + /// Start processing work. #[tracing::instrument(skip(self, ctx), fields(subsystem = LOG_TARGET))] - async fn run(self, mut ctx: Context) -> Result<()> + async fn run_inner(self, mut ctx: Context, state: &mut ProtocolState) -> Result<()> where Context: SubsystemContext, { // work: process incoming messages from the overseer. - let mut state = ProtocolState::default(); loop { let message = ctx .recv() @@ -762,7 +711,7 @@ impl AvailabilityDistributionSubsystem { if let Err(e) = handle_network_msg( &mut ctx, &self.keystore.clone(), - &mut state, + state, &self.metrics, event, ) @@ -807,96 +756,102 @@ where } } -/// Obtain all live candidates based on an iterator of relay heads. -#[tracing::instrument(level = "trace", skip(ctx, relay_parents), fields(subsystem = LOG_TARGET))] -async fn query_live_candidates_without_ancestors( +// Metadata about a candidate that is part of the live_candidates set. +// +// Those which were not present in a cache are "fresh" and have their candidate descriptor attached. This +// information is propagated to the higher level where it can be used to create data entries. Cached candidates +// already have entries associated with them, and thus don't need this metadata to be fetched. +#[derive(Debug)] +enum FetchedLiveCandidate { + Cached, + Fresh(CandidateDescriptor), +} + +/// Obtain all live candidates for all given `relay_blocks`. +/// +/// This returns a set of all candidate hashes pending availability within the state +/// of the explicitly referenced relay heads. +/// +/// This also queries the provided `receipts` cache before reaching into the +/// runtime and updates it with the information learned. +#[tracing::instrument(level = "trace", skip(ctx, relay_blocks, receipts), fields(subsystem = LOG_TARGET))] +async fn query_pending_availability_at( ctx: &mut Context, - relay_parents: impl IntoIterator, -) -> Result> + relay_blocks: impl IntoIterator, + receipts: &mut HashMap>, +) -> Result> where Context: SubsystemContext, { - let iter = relay_parents.into_iter(); - let hint = iter.size_hint(); + let mut live_candidates = HashMap::new(); - let mut live_candidates = HashSet::with_capacity(hint.1.unwrap_or(hint.0)); - for relay_parent in iter { - let paras = query_para_ids(ctx, relay_parent).await?; - for para in paras { + // fetch and fill out cache for each of these + for relay_parent in relay_blocks { + let receipts_for = match receipts.entry(relay_parent) { + Entry::Occupied(e) => { + live_candidates.extend( + e.get().iter().cloned().map(|c| (c, FetchedLiveCandidate::Cached)) + ); + continue + }, + e => e.or_default(), + }; + + for para in query_para_ids(ctx, relay_parent).await? { if let Some(ccr) = query_pending_availability(ctx, relay_parent, para).await? { - live_candidates.insert(ccr); + let receipt_hash = ccr.hash(); + let descriptor = ccr.descriptor().clone(); + + // unfortunately we have no good way of telling the candidate was + // cached until now. But we don't clobber a `Cached` entry if there + // is one already. + live_candidates.entry(receipt_hash) + .or_insert(FetchedLiveCandidate::Fresh(descriptor)); + + receipts_for.insert(receipt_hash); } } } + Ok(live_candidates) } -/// Obtain all live candidates based on an iterator or relay heads including `k` ancestors. +/// Obtain all live candidates under a particular relay head. This implicitly includes +/// `K` ancestors of the head, such that the candidates pending availability in all of +/// the states of the head and the ancestors are unioned together to produce the +/// return type of this function. Each candidate hash is paired. /// -/// Relay parent. -#[tracing::instrument(level = "trace", skip(ctx, relay_parents), fields(subsystem = LOG_TARGET))] +/// This also updates all `receipts` cached by the protocol state and returns a list +/// of up to `K` ancestors of the relay-parent. +#[tracing::instrument(level = "trace", skip(ctx, receipts), fields(subsystem = LOG_TARGET))] async fn query_live_candidates( ctx: &mut Context, - state: &mut ProtocolState, - relay_parents: impl IntoIterator, -) -> Result> + receipts: &mut HashMap>, + relay_parent: Hash, +) -> Result<(HashMap, Vec)> where Context: SubsystemContext, { - let iter = relay_parents.into_iter(); - let hint = iter.size_hint(); + // register one of relay parents (not the ancestors) + let ancestors = query_up_to_k_ancestors_in_same_session( + ctx, + relay_parent, + AvailabilityDistributionSubsystem::K, + ) + .await?; - let capacity = hint.1.unwrap_or(hint.0) * (1 + AvailabilityDistributionSubsystem::K); - let mut live_candidates = - HashMap::::with_capacity(capacity); + // query the ones that were not present in the receipts cache and add them + // to it. + let live_candidates = query_pending_availability_at( + ctx, + ancestors.iter().cloned().chain(std::iter::once(relay_parent)), + receipts, + ).await?; - for relay_parent in iter { - // register one of relay parents (not the ancestors) - let mut ancestors = query_up_to_k_ancestors_in_same_session( - ctx, - relay_parent, - AvailabilityDistributionSubsystem::K, - ) - .await?; - - ancestors.push(relay_parent); - - // ancestors might overlap, so check the cache too - let unknown = ancestors - .into_iter() - .filter(|relay_parent_or_ancestor| { - // use the ones which we pulled before - // but keep the unknown relay parents - state - .receipts - .get(relay_parent_or_ancestor) - .and_then(|receipts| { - // directly extend the live_candidates with the cached value - live_candidates.extend(receipts.into_iter().map( - |(receipt_hash, receipt)| { - (relay_parent, (receipt_hash.clone(), receipt.clone())) - }, - )); - Some(()) - }) - .is_none() - }) - .collect::>(); - - // query the ones that were not present in the receipts cache - let receipts = query_live_candidates_without_ancestors(ctx, unknown.clone()).await?; - live_candidates.extend( - unknown.into_iter().zip( - receipts - .into_iter() - .map(|receipt| (receipt.hash(), receipt)), - ), - ); - } - Ok(live_candidates) + Ok((live_candidates, ancestors)) } -/// Query all para IDs. +/// Query all para IDs that are occupied under a given relay-parent. #[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))] async fn query_para_ids(ctx: &mut Context, relay_parent: Hash) -> Result> where @@ -909,7 +864,7 @@ where ))) .await; - let all_para_ids: Vec<_> = rx + let all_para_ids = rx .await .map_err(|e| Error::AvailabilityCoresResponseChannel(e))? .map_err(|e| Error::AvailabilityCores(e))?; @@ -955,8 +910,7 @@ where AvailabilityStoreMessage::QueryDataAvailability(candidate_hash, tx), )).await; - rx.await - .map_err(|e| Error::QueryAvailabilityResponseChannel(e)) + rx.await.map_err(|e| Error::QueryAvailabilityResponseChannel(e)) } #[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))] @@ -1111,18 +1065,11 @@ where // iterate from youngest to oldest let mut iter = ancestors.into_iter().peekable(); - while let Some(ancestor) = iter.next() { - if let Some(ancestor_parent) = iter.peek() { - let session = query_session_index_for_child(ctx, *ancestor_parent).await?; - if session != desired_session { - break; - } - acc.push(ancestor); - } else { - // either ended up at genesis or the blocks were - // already pruned + while let Some((ancestor, ancestor_parent)) = iter.next().and_then(|a| iter.peek().map(|ap| (a, ap))) { + if query_session_index_for_child(ctx, *ancestor_parent).await? != desired_session { break; } + acc.push(ancestor); } debug_assert!(acc.len() <= k); diff --git a/polkadot/node/network/availability-distribution/src/tests.rs b/polkadot/node/network/availability-distribution/src/tests.rs index 3e7f0d83e4..310ca9e87c 100644 --- a/polkadot/node/network/availability-distribution/src/tests.rs +++ b/polkadot/node/network/availability-distribution/src/tests.rs @@ -26,17 +26,16 @@ use polkadot_primitives::v1::{ use polkadot_subsystem_testhelpers as test_helpers; use futures::{executor, future, Future}; -use futures_timer::Delay; use sc_keystore::LocalKeystore; -use smallvec::smallvec; use sp_application_crypto::AppKey; use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr}; use std::{sync::Arc, time::Duration}; +use maplit::hashmap; - -macro_rules! delay { - ($delay:expr) => { - Delay::new(Duration::from_millis($delay)).await; +macro_rules! view { + ( $( $hash:expr ),* $(,)? ) => { + // Finalized number unimportant for availability distribution. + View { heads: vec![ $( $hash.clone() ),* ], finalized_number: 0 } }; } @@ -55,66 +54,43 @@ struct TestHarness { fn test_harness>( keystore: SyncCryptoStorePtr, - test: impl FnOnce(TestHarness) -> T, -) { - let _ = env_logger::builder() - .is_test(true) - .filter( - Some("polkadot_availability_distribution"), - log::LevelFilter::Trace, - ) - .try_init(); + test_fx: impl FnOnce(TestHarness) -> T, +) -> ProtocolState { + sp_tracing::try_init_simple(); let pool = sp_core::testing::TaskExecutor::new(); - let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone()); let subsystem = AvailabilityDistributionSubsystem::new(keystore, Default::default()); - let subsystem = subsystem.run(context); + let mut state = ProtocolState::default(); + { + let subsystem = subsystem.run_inner(context, &mut state); - let test_fut = test(TestHarness { virtual_overseer }); + let test_fut = test_fx(TestHarness { virtual_overseer }); - futures::pin_mut!(test_fut); - futures::pin_mut!(subsystem); + futures::pin_mut!(test_fut); + futures::pin_mut!(subsystem); - executor::block_on(future::select(test_fut, subsystem)); -} + executor::block_on(future::select(test_fut, subsystem)); + } -const TIMEOUT: Duration = Duration::from_millis(100); - -async fn overseer_signal( - overseer: &mut test_helpers::TestSubsystemContextHandle, - signal: OverseerSignal, -) { - delay!(50); - overseer - .send(FromOverseer::Signal(signal)) - .timeout(TIMEOUT) - .await - .expect("10ms is more than enough for sending signals."); + state } async fn overseer_send( overseer: &mut test_helpers::TestSubsystemContextHandle, - msg: AvailabilityDistributionMessage, + msg: impl Into, ) { + let msg = msg.into(); tracing::trace!(msg = ?msg, "sending message"); - overseer - .send(FromOverseer::Communication { msg }) - .timeout(TIMEOUT) - .await - .expect("10ms is more than enough for sending messages."); + overseer.send(FromOverseer::Communication { msg }).await } async fn overseer_recv( overseer: &mut test_helpers::TestSubsystemContextHandle, ) -> AllMessages { tracing::trace!("waiting for message ..."); - let msg = overseer - .recv() - .timeout(TIMEOUT) - .await - .expect("TIMEOUT is enough to recv."); + let msg = overseer.recv().await; tracing::trace!(msg = ?msg, "received message"); msg } @@ -138,7 +114,6 @@ struct TestState { chain_ids: Vec, validators: Vec, validator_public: Vec, - validator_index: Option, validator_groups: (Vec>, GroupRotationInfo), head_data: HashMap, keystore: SyncCryptoStorePtr, @@ -146,6 +121,8 @@ struct TestState { ancestors: Vec, availability_cores: Vec, persisted_validation_data: PersistedValidationData, + candidates: Vec, + pov_blocks: Vec, } fn validator_pubkeys(val_ids: &[Sr25519Keyring]) -> Vec { @@ -216,7 +193,36 @@ impl Default for TestState { max_pov_size: 1024, }; - let validator_index = Some((validators.len() - 1) as ValidatorIndex); + let pov_block_a = PoV { + block_data: BlockData(vec![42, 43, 44]), + }; + + let pov_block_b = PoV { + block_data: BlockData(vec![45, 46, 47]), + }; + + let candidates = vec![ + TestCandidateBuilder { + para_id: chain_ids[0], + relay_parent: relay_parent, + pov_hash: pov_block_a.hash(), + erasure_root: make_erasure_root(persisted_validation_data.clone(), validators.len(), pov_block_a.clone()), + head_data: head_data.get(&chain_ids[0]).unwrap().clone(), + ..Default::default() + } + .build(), + TestCandidateBuilder { + para_id: chain_ids[1], + relay_parent: relay_parent, + pov_hash: pov_block_b.hash(), + erasure_root: make_erasure_root(persisted_validation_data.clone(), validators.len(), pov_block_b.clone()), + head_data: head_data.get(&chain_ids[1]).unwrap().clone(), + ..Default::default() + } + .build(), + ]; + + let pov_blocks = vec![pov_block_a, pov_block_b]; Self { chain_ids, @@ -229,34 +235,42 @@ impl Default for TestState { persisted_validation_data, relay_parent, ancestors, - validator_index, + candidates, + pov_blocks, } } } -fn make_available_data(test: &TestState, pov: PoV) -> AvailableData { +fn make_available_data(validation_data: PersistedValidationData, pov: PoV) -> AvailableData { AvailableData { - validation_data: test.persisted_validation_data.clone(), + validation_data, pov: Arc::new(pov), } } -fn make_erasure_root(test: &TestState, pov: PoV) -> Hash { - let available_data = make_available_data(test, pov); +fn make_erasure_root(peristed: PersistedValidationData, validator_count: usize, pov: PoV) -> Hash { + let available_data = make_available_data(peristed, pov); - let chunks = obtain_chunks(test.validators.len(), &available_data).unwrap(); + let chunks = obtain_chunks(validator_count, &available_data).unwrap(); branches(&chunks).root() } +fn make_erasure_chunks(peristed: PersistedValidationData, validator_count: usize, pov: PoV) -> Vec { + let available_data = make_available_data(peristed, pov); + + derive_erasure_chunks_with_proofs(validator_count, &available_data) +} + fn make_valid_availability_gossip( test: &TestState, - candidate_hash: CandidateHash, + candidate: usize, erasure_chunk_index: u32, - pov: PoV, ) -> AvailabilityGossipMessage { - let available_data = make_available_data(test, pov); - - let erasure_chunks = derive_erasure_chunks_with_proofs(test.validators.len(), &available_data); + let erasure_chunks = make_erasure_chunks( + test.persisted_validation_data.clone(), + test.validator_public.len(), + test.pov_blocks[candidate].clone(), + ); let erasure_chunk: ErasureChunk = erasure_chunks .get(erasure_chunk_index as usize) @@ -264,7 +278,7 @@ fn make_valid_availability_gossip( .clone(); AvailabilityGossipMessage { - candidate_hash, + candidate_hash: test.candidates[candidate].hash(), erasure_chunk, } } @@ -300,25 +314,13 @@ impl TestCandidateBuilder { fn helper_integrity() { let test_state = TestState::default(); - let pov_block = PoV { - block_data: BlockData(vec![42, 43, 44]), - }; + let message = make_valid_availability_gossip( + &test_state, + 0, + 2, + ); - let pov_hash = pov_block.hash(); - - let candidate = TestCandidateBuilder { - para_id: test_state.chain_ids[0], - relay_parent: test_state.relay_parent, - pov_hash, - erasure_root: make_erasure_root(&test_state, pov_block.clone()), - ..Default::default() - } - .build(); - - let message = - make_valid_availability_gossip(&test_state, candidate.hash(), 2, pov_block.clone()); - - let root = dbg!(&candidate.descriptor.erasure_root); + let root = &test_state.candidates[0].descriptor.erasure_root; let anticipated_hash = branch_hash( root, @@ -353,652 +355,574 @@ fn derive_erasure_chunks_with_proofs( erasure_chunks } -#[test] -fn reputation_verification() { - let test_state = TestState::default(); - - test_harness(test_state.keystore.clone(), |test_harness| async move { - let TestHarness { - mut virtual_overseer, - } = test_harness; - - let expected_head_data = test_state.head_data.get(&test_state.chain_ids[0]).unwrap(); - - let pov_block_a = PoV { - block_data: BlockData(vec![42, 43, 44]), - }; - - let pov_block_b = PoV { - block_data: BlockData(vec![45, 46, 47]), - }; - - let pov_block_c = PoV { - block_data: BlockData(vec![48, 49, 50]), - }; - - let pov_hash_a = pov_block_a.hash(); - let pov_hash_b = pov_block_b.hash(); - let pov_hash_c = pov_block_c.hash(); - - let candidates = vec![ - TestCandidateBuilder { - para_id: test_state.chain_ids[0], - relay_parent: test_state.relay_parent, - pov_hash: pov_hash_a, - erasure_root: make_erasure_root(&test_state, pov_block_a.clone()), - ..Default::default() - } - .build(), - TestCandidateBuilder { - para_id: test_state.chain_ids[1], - relay_parent: test_state.relay_parent, - pov_hash: pov_hash_b, - erasure_root: make_erasure_root(&test_state, pov_block_b.clone()), - head_data: expected_head_data.clone(), - ..Default::default() - } - .build(), - TestCandidateBuilder { - para_id: test_state.chain_ids[1], - relay_parent: Hash::repeat_byte(0xFA), - pov_hash: pov_hash_c, - erasure_root: make_erasure_root(&test_state, pov_block_c.clone()), - head_data: test_state - .head_data - .get(&test_state.chain_ids[1]) - .unwrap() - .clone(), - ..Default::default() - } - .build(), - ]; - - let TestState { - chain_ids, - keystore: _, - validators: _, - validator_public, - validator_groups, - availability_cores, - head_data: _, - persisted_validation_data: _, - relay_parent: current, - ancestors, - validator_index: _, - } = test_state.clone(); - - let _ = validator_groups; - let _ = availability_cores; - - let peer_a = PeerId::random(); - let peer_b = PeerId::random(); - assert_ne!(&peer_a, &peer_b); - - tracing::trace!("peer A: {:?}", peer_a); - tracing::trace!("peer B: {:?}", peer_b); - - tracing::trace!("candidate A: {:?}", candidates[0].hash()); - tracing::trace!("candidate B: {:?}", candidates[1].hash()); - - overseer_signal( - &mut virtual_overseer, - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: smallvec![current.clone()], - deactivated: smallvec![], - }), - ) - .await; - - overseer_send( - &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::OurViewChange(view![current,]), - ), - ) - .await; - - // obtain the validators per relay parent +async fn expect_chunks_network_message( + virtual_overseer: &mut test_helpers::TestSubsystemContextHandle, + peers: &[PeerId], + candidates: &[CandidateHash], + chunks: &[ErasureChunk], +) { + for _ in 0..chunks.len() { assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::Validators(tx), - )) => { - assert_eq!(relay_parent, current); - tx.send(Ok(validator_public.clone())).unwrap(); + overseer_recv(virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::SendValidationMessage( + send_peers, + protocol_v1::ValidationProtocol::AvailabilityDistribution( + protocol_v1::AvailabilityDistributionMessage::Chunk(send_candidate, send_chunk), + ), + ) + ) => { + assert!(candidates.contains(&send_candidate), format!("Could not find candidate: {:?}", send_candidate)); + assert!(chunks.iter().any(|c| c == &send_chunk), format!("Could not find chunk: {:?}", send_chunk)); + assert_eq!(peers.len(), send_peers.len()); + assert!(peers.iter().all(|p| send_peers.contains(p))); } ); + } +} - let genesis = Hash::repeat_byte(0xAA); - // query of k ancestors, we only provide one - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::ChainApi(ChainApiMessage::Ancestors { - hash: relay_parent, - k, - response_channel: tx, - }) => { - assert_eq!(relay_parent, current); - assert_eq!(k, AvailabilityDistributionSubsystem::K + 1); - // 0xAA..AA will not be included, since there is no mean to determine - // its session index - tx.send(Ok(vec![ancestors[0].clone(), genesis])).unwrap(); - } - ); +async fn change_our_view( + virtual_overseer: &mut test_helpers::TestSubsystemContextHandle, + view: View, + validator_public: &[ValidatorId], + ancestors: Vec, + session_per_relay_parent: HashMap, + availability_cores_per_relay_parent: HashMap>, + candidate_pending_availabilities_per_relay_parent: HashMap>, + data_availability: HashMap, + chunk_data_per_candidate: HashMap, + send_chunks_to: HashMap>, +) { + overseer_send(virtual_overseer, NetworkBridgeEvent::OurViewChange(view.clone())).await; - // state query for each of them + // obtain the validators per relay parent + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Validators(tx), + )) => { + assert!(view.contains(&relay_parent)); + tx.send(Ok(validator_public.to_vec())).unwrap(); + } + ); + + // query of k ancestors, we only provide one + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::ChainApi(ChainApiMessage::Ancestors { + hash: relay_parent, + k, + response_channel: tx, + }) => { + assert!(view.contains(&relay_parent)); + assert_eq!(k, AvailabilityDistributionSubsystem::K + 1); + tx.send(Ok(ancestors.clone())).unwrap(); + } + ); + + for _ in 0..session_per_relay_parent.len() { assert_matches!( - overseer_recv(&mut virtual_overseer).await, + overseer_recv(virtual_overseer).await, AllMessages::RuntimeApi(RuntimeApiMessage::Request( relay_parent, RuntimeApiRequest::SessionIndexForChild(tx) )) => { - assert_eq!(relay_parent, current); - tx.send(Ok(1 as SessionIndex)).unwrap(); + let index = session_per_relay_parent.get(&relay_parent) + .expect(&format!("Session index for relay parent {:?} does not exist", relay_parent)); + tx.send(Ok(*index)).unwrap(); } ); + } - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::SessionIndexForChild(tx) - )) => { - assert_eq!(relay_parent, genesis); - tx.send(Ok(1 as SessionIndex)).unwrap(); - } - ); - - // subsystem peer id collection - // which will query the availability cores - assert_matches!( - overseer_recv(&mut virtual_overseer).await, + for _ in 0..availability_cores_per_relay_parent.len() { + let relay_parent = assert_matches!( + overseer_recv(virtual_overseer).await, AllMessages::RuntimeApi(RuntimeApiMessage::Request( relay_parent, RuntimeApiRequest::AvailabilityCores(tx) )) => { - assert_eq!(relay_parent, ancestors[0]); - // respond with a set of availability core states - tx.send(Ok(vec![ - dummy_occupied_core(chain_ids[0]), - dummy_occupied_core(chain_ids[1]) - ])).unwrap(); + let cores = availability_cores_per_relay_parent.get(&relay_parent) + .expect(&format!("Availability core for relay parent {:?} does not exist", relay_parent)); + + tx.send(Ok(cores.clone())).unwrap(); + relay_parent } ); - // now each of the relay parents in the view (1) will - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::CandidatePendingAvailability(para, tx) - )) => { - assert_eq!(relay_parent, ancestors[0]); - assert_eq!(para, chain_ids[0]); - tx.send(Ok(Some( - candidates[0].clone() - ))).unwrap(); - } - ); + let pending_availability = candidate_pending_availabilities_per_relay_parent.get(&relay_parent) + .expect(&format!("Candidate pending availability for relay parent {:?} does not exist", relay_parent)); - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::CandidatePendingAvailability(para, tx) - )) => { - assert_eq!(relay_parent, ancestors[0]); - assert_eq!(para, chain_ids[1]); - tx.send(Ok(Some( - candidates[1].clone() - ))).unwrap(); - } - ); - - for _ in 0usize..1 { + for _ in 0..pending_availability.len() { assert_matches!( - overseer_recv(&mut virtual_overseer).await, + overseer_recv(virtual_overseer).await, AllMessages::RuntimeApi(RuntimeApiMessage::Request( - _relay_parent, - RuntimeApiRequest::AvailabilityCores(tx), + hash, + RuntimeApiRequest::CandidatePendingAvailability(para, tx) )) => { - tx.send(Ok(vec![ - CoreState::Occupied(OccupiedCore { - para_id: chain_ids[0].clone(), - next_up_on_available: None, - occupied_since: 0, - time_out_at: 10, - next_up_on_time_out: None, - availability: Default::default(), - group_responsible: GroupIndex::from(0), - }), - CoreState::Free, - CoreState::Free, - CoreState::Occupied(OccupiedCore { - para_id: chain_ids[1].clone(), - next_up_on_available: None, - occupied_since: 1, - time_out_at: 7, - next_up_on_time_out: None, - availability: Default::default(), - group_responsible: GroupIndex::from(0), - }), - CoreState::Free, - CoreState::Free, - ])).unwrap(); - } - ); + assert_eq!(relay_parent, hash); - // query the availability cores for each of the paras (2) - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi( - RuntimeApiMessage::Request( - _relay_parent, - RuntimeApiRequest::CandidatePendingAvailability(para, tx), - ) - ) => { - assert_eq!(para, chain_ids[0]); - tx.send(Ok(Some( - candidates[0].clone() - ))).unwrap(); - } - ); + let candidate = pending_availability.iter() + .find(|c| c.descriptor.para_id == para) + .expect(&format!("Pending candidate for para {} does not exist", para)); - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - _relay_parent, - RuntimeApiRequest::CandidatePendingAvailability(para, tx), - )) => { - assert_eq!(para, chain_ids[1]); - tx.send(Ok(Some( - candidates[1].clone() - ))).unwrap(); + tx.send(Ok(Some(candidate.clone()))).unwrap(); } ); } + } - let mut candidates2 = candidates.clone(); - // check if the availability store can provide the desired erasure chunks - for i in 0usize..2 { - tracing::trace!("0000"); - let avail_data = make_available_data(&test_state, pov_block_a.clone()); - let chunks = - derive_erasure_chunks_with_proofs(test_state.validators.len(), &avail_data); + for _ in 0..data_availability.len() { + let (available, candidate_hash) = assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::AvailabilityStore( + AvailabilityStoreMessage::QueryDataAvailability( + candidate_hash, + tx, + ) + ) => { + let available = data_availability.get(&candidate_hash) + .expect(&format!("No data availability for candidate {:?}", candidate_hash)); - let expected; - // store the chunk to the av store - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::AvailabilityStore( - AvailabilityStoreMessage::QueryDataAvailability( - candidate_hash, - tx, - ) - ) => { - let index = candidates2.iter().enumerate().find(|x| { x.1.hash() == candidate_hash }).map(|x| x.0).unwrap(); - expected = candidates2.swap_remove(index).hash(); - tx.send(i == 0).unwrap(); - } - ); + tx.send(*available).unwrap(); + (available, candidate_hash) + } + ); - assert_eq!(chunks.len(), test_state.validators.len()); + if !available { + continue; + } - tracing::trace!("xxxx"); - // retrieve a stored chunk - for (j, chunk) in chunks.into_iter().enumerate() { - tracing::trace!("yyyy i={}, j={}", i, j); - if i != 0 { - // not a validator, so this never happens - break; - } - assert_matches!( - overseer_recv(&mut virtual_overseer).await, + if let Some((pov, persisted)) = chunk_data_per_candidate.get(&candidate_hash) { + let chunks = make_erasure_chunks(persisted.clone(), validator_public.len(), pov.clone()); + + for _ in 0..chunks.len() { + let chunk = assert_matches!( + overseer_recv(virtual_overseer).await, AllMessages::AvailabilityStore( AvailabilityStoreMessage::QueryChunk( candidate_hash, - idx, + index, tx, ) ) => { - assert_eq!(candidate_hash, expected); - assert_eq!(j as u32, chunk.index); - assert_eq!(idx, j as u32); - tx.send( - Some(chunk.clone()) - ).unwrap(); + tracing::trace!("Query chunk {} for candidate {:?}", index, candidate_hash); + let chunk = chunks[index as usize].clone(); + tx.send(Some(chunk.clone())).unwrap(); + chunk } ); + + if let Some(peers) = send_chunks_to.get(&candidate_hash) { + expect_chunks_network_message(virtual_overseer, &peers, &[candidate_hash], &[chunk]).await; + } } } - // setup peer a with interest in current - overseer_send( - &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full), - ), - ) - .await; + } +} - overseer_send( +async fn setup_peer_with_view( + virtual_overseer: &mut test_helpers::TestSubsystemContextHandle, + peer: PeerId, + view: View, +) { + overseer_send(virtual_overseer, NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full)).await; + + overseer_send(virtual_overseer, NetworkBridgeEvent::PeerViewChange(peer, view)).await; +} + +async fn peer_send_message( + virtual_overseer: &mut test_helpers::TestSubsystemContextHandle, + peer: PeerId, + message: AvailabilityGossipMessage, + expected_reputation_change: Rep, +) { + overseer_send(virtual_overseer, NetworkBridgeEvent::PeerMessage(peer.clone(), chunk_protocol_message(message))).await; + + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer( + rep_peer, + rep, + ) + ) => { + assert_eq!(peer, rep_peer); + assert_eq!(expected_reputation_change, rep); + } + ); +} + +#[test] +fn check_views() { + let test_state = TestState::default(); + + let peer_a = PeerId::random(); + let peer_a_2 = peer_a.clone(); + let peer_b = PeerId::random(); + let peer_b_2 = peer_b.clone(); + assert_ne!(&peer_a, &peer_b); + + let keystore = test_state.keystore.clone(); + let current = test_state.relay_parent; + let ancestors = test_state.ancestors.clone(); + + let state = test_harness(keystore, move |test_harness| async move { + let mut virtual_overseer = test_harness.virtual_overseer; + + let TestState { + chain_ids, + validator_public, + relay_parent: current, + ancestors, + candidates, + pov_blocks, + .. + } = test_state.clone(); + + let genesis = Hash::repeat_byte(0xAA); + change_our_view( &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![current]), - ), - ) - .await; + view![current], + &validator_public, + vec![ancestors[0], genesis], + hashmap! { current => 1, genesis => 1 }, + hashmap! { + ancestors[0] => vec![dummy_occupied_core(chain_ids[0]), dummy_occupied_core(chain_ids[1])], + current => vec![ + CoreState::Occupied(OccupiedCore { + para_id: chain_ids[0].clone(), + next_up_on_available: None, + occupied_since: 0, + time_out_at: 10, + next_up_on_time_out: None, + availability: Default::default(), + group_responsible: GroupIndex::from(0), + }), + CoreState::Free, + CoreState::Free, + CoreState::Occupied(OccupiedCore { + para_id: chain_ids[1].clone(), + next_up_on_available: None, + occupied_since: 1, + time_out_at: 7, + next_up_on_time_out: None, + availability: Default::default(), + group_responsible: GroupIndex::from(0), + }), + CoreState::Free, + CoreState::Free, + ] + }, + hashmap! { + ancestors[0] => vec![candidates[0].clone(), candidates[1].clone()], + current => vec![candidates[0].clone(), candidates[1].clone()], + }, + hashmap! { + candidates[0].hash() => true, + candidates[1].hash() => false, + }, + hashmap! { + candidates[0].hash() => (pov_blocks[0].clone(), test_state.persisted_validation_data.clone()), + }, + hashmap! {}, + ).await; + + // setup peer a with interest in current + setup_peer_with_view(&mut virtual_overseer, peer_a.clone(), view![current]).await; // setup peer b with interest in ancestor - overseer_send( - &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full), - ), - ) - .await; + setup_peer_with_view(&mut virtual_overseer, peer_b.clone(), view![ancestors[0]]).await; + }); - overseer_send( - &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![ancestors[0]]), - ), - ) - .await; + assert_matches! { + state, + ProtocolState { + peer_views, + view, + .. + } => { + assert_eq!( + peer_views, + hashmap! { + peer_a_2 => view![current], + peer_b_2 => view![ancestors[0]], + }, + ); + assert_eq!(view, view![current]); + } + }; +} - delay!(100); +#[test] +fn reputation_verification() { + let test_state = TestState::default(); - let valid: AvailabilityGossipMessage = make_valid_availability_gossip( + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + assert_ne!(&peer_a, &peer_b); + + let keystore = test_state.keystore.clone(); + + test_harness(keystore, move |test_harness| async move { + let mut virtual_overseer = test_harness.virtual_overseer; + + let TestState { + relay_parent: current, + validator_public, + ancestors, + candidates, + pov_blocks, + .. + } = test_state.clone(); + + let valid = make_valid_availability_gossip( &test_state, - candidates[0].hash(), + 0, 2, - pov_block_a.clone(), ); - { - // valid (first, from b) - overseer_send( - &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_b.clone(), - chunk_protocol_message(valid.clone()), - ), - ), - ) - .await; - - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer( - peer, - rep - ) - ) => { - assert_eq!(peer, peer_b); - assert_eq!(rep, BENEFIT_VALID_MESSAGE_FIRST); - } - ); - } - - { - // valid (duplicate, from b) - overseer_send( - &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_b.clone(), - chunk_protocol_message(valid.clone()), - ), - ), - ) - .await; - - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendValidationMessage( - peers, - protocol_v1::ValidationProtocol::AvailabilityDistribution( - protocol_v1::AvailabilityDistributionMessage::Chunk(hash, chunk), - ), - ) - ) => { - assert_eq!(1, peers.len()); - assert_eq!(peers[0], peer_a); - assert_eq!(candidates[0].hash(), hash); - assert_eq!(valid.erasure_chunk, chunk); - } - ); - - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer( - peer, - rep - ) - ) => { - assert_eq!(peer, peer_b); - assert_eq!(rep, COST_PEER_DUPLICATE_MESSAGE); - } - ); - } - - { - // valid (second, from a) - overseer_send( - &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_a.clone(), - chunk_protocol_message(valid.clone()), - ), - ), - ) - .await; - - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer( - peer, - rep - ) - ) => { - assert_eq!(peer, peer_a); - assert_eq!(rep, BENEFIT_VALID_MESSAGE); - } - ); - } - - // peer a is not interested in anything anymore - overseer_send( + change_our_view( &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![]), - ), - ) - .await; + view![current], + &validator_public, + vec![ancestors[0]], + hashmap! { current => 1 }, + hashmap! { + current => vec![ + dummy_occupied_core(candidates[0].descriptor.para_id), + dummy_occupied_core(candidates[1].descriptor.para_id) + ], + }, + hashmap! { current => vec![candidates[0].clone(), candidates[1].clone()] }, + hashmap! { candidates[0].hash() => true, candidates[1].hash() => false }, + hashmap! { candidates[0].hash() => (pov_blocks[0].clone(), test_state.persisted_validation_data.clone())}, + hashmap! {}, + ).await; - { - // send the a message again, so we should detect the duplicate - overseer_send( - &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_a.clone(), - chunk_protocol_message(valid.clone()), - ), - ), - ) - .await; + // valid (first, from b) + peer_send_message(&mut virtual_overseer, peer_b.clone(), valid.clone(), BENEFIT_VALID_MESSAGE).await; - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer( - peer, - rep - ) - ) => { - assert_eq!(peer, peer_a); - assert_eq!(rep, COST_PEER_DUPLICATE_MESSAGE); - } - ); - } + // valid (duplicate, from b) + peer_send_message(&mut virtual_overseer, peer_b.clone(), valid.clone(), COST_PEER_DUPLICATE_MESSAGE).await; + + // valid (second, from a) + peer_send_message(&mut virtual_overseer, peer_a.clone(), valid.clone(), BENEFIT_VALID_MESSAGE).await; + + // send the a message again, so we should detect the duplicate + peer_send_message(&mut virtual_overseer, peer_a.clone(), valid.clone(), COST_PEER_DUPLICATE_MESSAGE).await; // peer b sends a message before we have the view // setup peer a with interest in parent x - overseer_send( - &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerDisconnected(peer_b.clone()), - ), - ) - .await; + overseer_send(&mut virtual_overseer, NetworkBridgeEvent::PeerDisconnected(peer_b.clone())).await; - delay!(10); - - overseer_send( - &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full), - ), - ) - .await; + overseer_send(&mut virtual_overseer, NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full)).await; { // send another message - let valid2 = make_valid_availability_gossip( - &test_state, - candidates[2].hash(), - 1, - pov_block_c.clone(), - ); - - // send the a message before we send a view update - overseer_send( - &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage(peer_a.clone(), chunk_protocol_message(valid2)), - ), - ) - .await; - - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer( - peer, - rep - ) - ) => { - assert_eq!(peer, peer_a); - assert_eq!(rep, COST_NOT_A_LIVE_CANDIDATE); - } - ); - } - - { - // send another message - let valid = make_valid_availability_gossip( - &test_state, - candidates[1].hash(), - 2, - pov_block_b.clone(), - ); + let valid = make_valid_availability_gossip(&test_state, 1, 2); // Make peer a and b listen on `current` - overseer_send( - &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![current]), - ), - ) - .await; + overseer_send(&mut virtual_overseer, NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![current])).await; - overseer_send( - &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![current]), - ), - ) - .await; - - overseer_send( - &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_a.clone(), - chunk_protocol_message(valid.clone()), - ), - ), - ) - .await; - - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer( - peer, - rep - ) - ) => { - assert_eq!(peer, peer_a); - assert_eq!(rep, BENEFIT_VALID_MESSAGE_FIRST); - } + let mut chunks = make_erasure_chunks( + test_state.persisted_validation_data.clone(), + validator_public.len(), + pov_blocks[0].clone(), ); - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendValidationMessage( - peers, - protocol_v1::ValidationProtocol::AvailabilityDistribution( - protocol_v1::AvailabilityDistributionMessage::Chunk(hash, chunk), - ), - ) - ) => { - assert_eq!(1, peers.len()); - assert_eq!(peers[0], peer_b); - assert_eq!(candidates[1].hash(), hash); - assert_eq!(valid.erasure_chunk, chunk); - } - ); + // Both peers send us this chunk already + chunks.remove(2); + + expect_chunks_network_message(&mut virtual_overseer, &[peer_a.clone()], &[candidates[0].hash()], &chunks).await; + + overseer_send(&mut virtual_overseer, NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![current])).await; + + expect_chunks_network_message(&mut virtual_overseer, &[peer_b.clone()], &[candidates[0].hash()], &chunks).await; + + peer_send_message(&mut virtual_overseer, peer_a.clone(), valid.clone(), BENEFIT_VALID_MESSAGE_FIRST).await; + + expect_chunks_network_message( + &mut virtual_overseer, + &[peer_b.clone()], + &[candidates[1].hash()], + &[valid.erasure_chunk.clone()], + ).await; // Let B send the same message - overseer_send( - &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_b.clone(), - chunk_protocol_message(valid.clone()), - ), - ), - ) - .await; - - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer( - peer, - rep - ) - ) => { - assert_eq!(peer, peer_b); - assert_eq!(rep, BENEFIT_VALID_MESSAGE); - } - ); - - // There shouldn't be any other message. - assert!(virtual_overseer.recv().timeout(TIMEOUT).await.is_none()); + peer_send_message(&mut virtual_overseer, peer_b.clone(), valid.clone(), BENEFIT_VALID_MESSAGE).await; } }); } +#[test] +fn not_a_live_candidate_is_detected() { + let test_state = TestState::default(); + + let peer_a = PeerId::random(); + + let keystore = test_state.keystore.clone(); + + test_harness(keystore, move |test_harness| async move { + let mut virtual_overseer = test_harness.virtual_overseer; + + let TestState { + relay_parent: current, + validator_public, + ancestors, + candidates, + pov_blocks, + .. + } = test_state.clone(); + + change_our_view( + &mut virtual_overseer, + view![current], + &validator_public, + vec![ancestors[0]], + hashmap! { current => 1 }, + hashmap! { + current => vec![ + dummy_occupied_core(candidates[0].descriptor.para_id), + ], + }, + hashmap! { current => vec![candidates[0].clone()] }, + hashmap! { candidates[0].hash() => true }, + hashmap! { candidates[0].hash() => (pov_blocks[0].clone(), test_state.persisted_validation_data.clone())}, + hashmap! {}, + ).await; + + let valid = make_valid_availability_gossip( + &test_state, + 1, + 1, + ); + + peer_send_message(&mut virtual_overseer, peer_a.clone(), valid.clone(), COST_NOT_A_LIVE_CANDIDATE).await; + }); +} + +#[test] +fn peer_change_view_before_us() { + let test_state = TestState::default(); + + let peer_a = PeerId::random(); + + let keystore = test_state.keystore.clone(); + + test_harness(keystore, move |test_harness| async move { + let mut virtual_overseer = test_harness.virtual_overseer; + + let TestState { + relay_parent: current, + validator_public, + ancestors, + candidates, + pov_blocks, + .. + } = test_state.clone(); + + setup_peer_with_view(&mut virtual_overseer, peer_a.clone(), view![current]).await; + + change_our_view( + &mut virtual_overseer, + view![current], + &validator_public, + vec![ancestors[0]], + hashmap! { current => 1 }, + hashmap! { + current => vec![ + dummy_occupied_core(candidates[0].descriptor.para_id), + ], + }, + hashmap! { current => vec![candidates[0].clone()] }, + hashmap! { candidates[0].hash() => true }, + hashmap! { candidates[0].hash() => (pov_blocks[0].clone(), test_state.persisted_validation_data.clone())}, + hashmap! { candidates[0].hash() => vec![peer_a.clone()] }, + ).await; + + let valid = make_valid_availability_gossip( + &test_state, + 0, + 0, + ); + + // We send peer a all the chunks of candidate0, so we just benefit him for sending a valid message + peer_send_message(&mut virtual_overseer, peer_a.clone(), valid.clone(), BENEFIT_VALID_MESSAGE).await; + }); +} + +#[test] +fn candidate_chunks_are_put_into_message_vault_when_candidate_is_first_seen() { + let test_state = TestState::default(); + + let peer_a = PeerId::random(); + + let keystore = test_state.keystore.clone(); + + test_harness(keystore, move |test_harness| async move { + let mut virtual_overseer = test_harness.virtual_overseer; + + let TestState { + relay_parent: current, + validator_public, + ancestors, + candidates, + pov_blocks, + .. + } = test_state.clone(); + + change_our_view( + &mut virtual_overseer, + view![ancestors[0]], + &validator_public, + vec![ancestors[1]], + hashmap! { ancestors[0] => 1 }, + hashmap! { + ancestors[0] => vec![ + dummy_occupied_core(candidates[0].descriptor.para_id), + ], + }, + hashmap! { ancestors[0] => vec![candidates[0].clone()] }, + hashmap! { candidates[0].hash() => true }, + hashmap! { candidates[0].hash() => (pov_blocks[0].clone(), test_state.persisted_validation_data.clone())}, + hashmap! {}, + ).await; + + change_our_view( + &mut virtual_overseer, + view![current], + &validator_public, + vec![ancestors[0]], + hashmap! { current => 1 }, + hashmap! { + current => vec![ + dummy_occupied_core(candidates[0].descriptor.para_id), + ], + }, + hashmap! { current => vec![candidates[0].clone()] }, + hashmap! { candidates[0].hash() => true }, + hashmap! {}, + hashmap! {}, + ).await; + + // Let peera connect, we should send him all the chunks of the candidate + setup_peer_with_view(&mut virtual_overseer, peer_a.clone(), view![current]).await; + + let chunks = make_erasure_chunks( + test_state.persisted_validation_data.clone(), + validator_public.len(), + pov_blocks[0].clone(), + ); + expect_chunks_network_message( + &mut virtual_overseer, + &[peer_a], + &[candidates[0].hash()], + &chunks, + ).await; + }); +} + #[test] fn k_ancestors_in_session() { let pool = sp_core::testing::TaskExecutor::new(); @@ -1076,3 +1000,289 @@ fn k_ancestors_in_session() { executor::block_on(future::join(test_fut, sut).timeout(Duration::from_millis(1000))); } + +#[test] +fn clean_up_receipts_cache_unions_ancestors_and_view() { + let mut state = ProtocolState::default(); + + let hash_a = [0u8; 32].into(); + let hash_b = [1u8; 32].into(); + let hash_c = [2u8; 32].into(); + let hash_d = [3u8; 32].into(); + + state.receipts.insert(hash_a, HashSet::new()); + state.receipts.insert(hash_b, HashSet::new()); + state.receipts.insert(hash_c, HashSet::new()); + state.receipts.insert(hash_d, HashSet::new()); + + state.per_relay_parent.insert(hash_a, PerRelayParent { + ancestors: vec![hash_b], + live_candidates: HashSet::new(), + }); + + state.per_relay_parent.insert(hash_c, PerRelayParent::default()); + + state.clean_up_receipts_cache(); + + assert_eq!(state.receipts.len(), 3); + assert!(state.receipts.contains_key(&hash_a)); + assert!(state.receipts.contains_key(&hash_b)); + assert!(state.receipts.contains_key(&hash_c)); + assert!(!state.receipts.contains_key(&hash_d)); +} + +#[test] +fn remove_relay_parent_only_removes_per_candidate_if_final() { + let mut state = ProtocolState::default(); + + let hash_a = [0u8; 32].into(); + let hash_b = [1u8; 32].into(); + + let candidate_hash_a = CandidateHash([46u8; 32].into()); + + state.per_relay_parent.insert(hash_a, PerRelayParent { + ancestors: vec![], + live_candidates: std::iter::once(candidate_hash_a).collect(), + }); + + state.per_relay_parent.insert(hash_b, PerRelayParent { + ancestors: vec![], + live_candidates: std::iter::once(candidate_hash_a).collect(), + }); + + state.per_candidate.insert(candidate_hash_a, PerCandidate { + live_in: vec![hash_a, hash_b].into_iter().collect(), + ..Default::default() + }); + + state.remove_relay_parent(&hash_a); + + assert!(!state.per_relay_parent.contains_key(&hash_a)); + assert!(!state.per_candidate.get(&candidate_hash_a).unwrap().live_in.contains(&hash_a)); + assert!(state.per_candidate.get(&candidate_hash_a).unwrap().live_in.contains(&hash_b)); + + state.remove_relay_parent(&hash_b); + + assert!(!state.per_relay_parent.contains_key(&hash_b)); + assert!(!state.per_candidate.contains_key(&candidate_hash_a)); +} + +#[test] +fn add_relay_parent_includes_all_live_candidates() { + let relay_parent = [0u8; 32].into(); + + let mut state = ProtocolState::default(); + + let ancestor_a = [1u8; 32].into(); + + let candidate_hash_a = CandidateHash([10u8; 32].into()); + let candidate_hash_b = CandidateHash([11u8; 32].into()); + + let candidates = vec![ + (candidate_hash_a, FetchedLiveCandidate::Fresh(Default::default())), + (candidate_hash_b, FetchedLiveCandidate::Cached), + ].into_iter().collect(); + + state.add_relay_parent( + relay_parent, + Vec::new(), + None, + candidates, + vec![ancestor_a], + ); + + assert!( + state.per_candidate.get(&candidate_hash_a).unwrap().live_in.contains(&relay_parent) + ); + assert!( + state.per_candidate.get(&candidate_hash_b).unwrap().live_in.contains(&relay_parent) + ); + + let per_relay_parent = state.per_relay_parent.get(&relay_parent).unwrap(); + + assert!(per_relay_parent.live_candidates.contains(&candidate_hash_a)); + assert!(per_relay_parent.live_candidates.contains(&candidate_hash_b)); +} + +#[test] +fn query_pending_availability_at_pulls_from_and_updates_receipts() { + let hash_a = [0u8; 32].into(); + let hash_b = [1u8; 32].into(); + + let para_a = ParaId::from(1); + let para_b = ParaId::from(2); + let para_c = ParaId::from(3); + + let make_candidate = |para_id| { + let mut candidate = CommittedCandidateReceipt::default(); + candidate.descriptor.para_id = para_id; + candidate.descriptor.relay_parent = [69u8; 32].into(); + candidate + }; + + let candidate_a = make_candidate(para_a); + let candidate_b = make_candidate(para_b); + let candidate_c = make_candidate(para_c); + + let candidate_hash_a = candidate_a.hash(); + let candidate_hash_b = candidate_b.hash(); + let candidate_hash_c = candidate_c.hash(); + + // receipts has an initial entry for hash_a but not hash_b. + let mut receipts = HashMap::new(); + receipts.insert(hash_a, vec![candidate_hash_a, candidate_hash_b].into_iter().collect()); + + let pool = sp_core::testing::TaskExecutor::new(); + + let (mut ctx, mut virtual_overseer) = + test_helpers::make_subsystem_context::(pool); + + let test_fut = async move { + let live_candidates = query_pending_availability_at( + &mut ctx, + vec![hash_a, hash_b], + &mut receipts, + ).await.unwrap(); + + // although 'b' is cached from the perspective of hash_a, it gets overwritten when we query what's happening in + // + assert_eq!(live_candidates.len(), 3); + assert_matches!(live_candidates.get(&candidate_hash_a).unwrap(), FetchedLiveCandidate::Cached); + assert_matches!(live_candidates.get(&candidate_hash_b).unwrap(), FetchedLiveCandidate::Cached); + assert_matches!(live_candidates.get(&candidate_hash_c).unwrap(), FetchedLiveCandidate::Fresh(_)); + + assert!(receipts.get(&hash_b).unwrap().contains(&candidate_hash_b)); + assert!(receipts.get(&hash_b).unwrap().contains(&candidate_hash_c)); + }; + + let answer = async move { + // hash_a should be answered out of cache, so we should just have + // queried for hash_b. + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + r, + RuntimeApiRequest::AvailabilityCores(tx), + ) + ) if r == hash_b => { + let _ = tx.send(Ok(vec![ + CoreState::Occupied(OccupiedCore { + para_id: para_b, + next_up_on_available: None, + occupied_since: 0, + time_out_at: 0, + next_up_on_time_out: None, + availability: Default::default(), + group_responsible: GroupIndex::from(0), + }), + CoreState::Occupied(OccupiedCore { + para_id: para_c, + next_up_on_available: None, + occupied_since: 0, + time_out_at: 0, + next_up_on_time_out: None, + availability: Default::default(), + group_responsible: GroupIndex::from(0), + }), + ])); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + r, + RuntimeApiRequest::CandidatePendingAvailability(p, tx), + ) + ) if r == hash_b && p == para_b => { + let _ = tx.send(Ok(Some(candidate_b))); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + r, + RuntimeApiRequest::CandidatePendingAvailability(p, tx), + ) + ) if r == hash_b && p == para_c => { + let _ = tx.send(Ok(Some(candidate_c))); + } + ); + }; + + futures::pin_mut!(test_fut); + futures::pin_mut!(answer); + + executor::block_on(future::join(test_fut, answer)); +} + +#[test] +fn new_peer_gets_all_chunks_send() { + let test_state = TestState::default(); + + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + assert_ne!(&peer_a, &peer_b); + + let keystore = test_state.keystore.clone(); + + test_harness(keystore, move |test_harness| async move { + let mut virtual_overseer = test_harness.virtual_overseer; + + let TestState { + relay_parent: current, + validator_public, + ancestors, + candidates, + pov_blocks, + .. + } = test_state.clone(); + + let valid = make_valid_availability_gossip( + &test_state, + 1, + 2, + ); + + change_our_view( + &mut virtual_overseer, + view![current], + &validator_public, + vec![ancestors[0]], + hashmap! { current => 1 }, + hashmap! { + current => vec![ + dummy_occupied_core(candidates[0].descriptor.para_id), + dummy_occupied_core(candidates[1].descriptor.para_id) + ], + }, + hashmap! { current => vec![candidates[0].clone(), candidates[1].clone()] }, + hashmap! { candidates[0].hash() => true, candidates[1].hash() => false }, + hashmap! { candidates[0].hash() => (pov_blocks[0].clone(), test_state.persisted_validation_data.clone())}, + hashmap! {}, + ).await; + + peer_send_message(&mut virtual_overseer, peer_b.clone(), valid.clone(), BENEFIT_VALID_MESSAGE_FIRST).await; + + setup_peer_with_view(&mut virtual_overseer, peer_a.clone(), view![current]).await; + + let mut chunks = make_erasure_chunks( + test_state.persisted_validation_data.clone(), + validator_public.len(), + pov_blocks[0].clone(), + ); + + chunks.push(valid.erasure_chunk); + + expect_chunks_network_message( + &mut virtual_overseer, + &[peer_a], + &[candidates[0].hash(), candidates[1].hash()], + &chunks, + ).await; + }); +} diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs index 3b7ee32dda..bbd2bc37d7 100644 --- a/polkadot/node/subsystem/src/messages.rs +++ b/polkadot/node/subsystem/src/messages.rs @@ -231,7 +231,7 @@ impl NetworkBridgeMessage { } /// Availability Distribution Message. -#[derive(Debug)] +#[derive(Debug, derive_more::From)] pub enum AvailabilityDistributionMessage { /// Event from the network bridge. NetworkBridgeUpdateV1(NetworkBridgeEvent), diff --git a/polkadot/roadmap/implementers-guide/src/node/availability/availability-distribution.md b/polkadot/roadmap/implementers-guide/src/node/availability/availability-distribution.md index de34f3b8ed..5b1941bc71 100644 --- a/polkadot/roadmap/implementers-guide/src/node/availability/availability-distribution.md +++ b/polkadot/roadmap/implementers-guide/src/node/availability/availability-distribution.md @@ -23,7 +23,8 @@ Output: For each relay-parent in our local view update, look at all backed candidates pending availability. Distribute via gossip all erasure chunks for all candidates that we have to peers. -We define an operation `live_candidates(relay_heads) -> Set` which returns a set of [`CommittedCandidateReceipt`s](../../types/candidate.md#committed-candidate-receipt). +We define an operation `live_candidates(relay_heads) -> Set` which returns a set of hashes corresponding to [`CandidateReceipt`s](../../types/candidate.md#candidate-receipt). + This is defined as all candidates pending availability in any of those relay-chain heads or any of their last `K` ancestors in the same session. We assume that state is not pruned within `K` blocks of the chain-head. `K` commonly is small and is currently fixed to `K=3`. We will send any erasure-chunks that correspond to candidates in `live_candidates(peer_most_recent_view_update)`.