From d0c97539e48199376a9fea138c03c32522d3253f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Thu, 17 Dec 2020 20:09:17 +0100 Subject: [PATCH] Fix bug and further optimizations in availability distribution (#2104) * Fix bug and further optimizations in availability distribution - There was a bug that resulted in only getting one candidate per block as the candidates were put into the hashmap with the relay block hash as key. The solution for this is to use the candidate hash and the relay block hash as key. - We stored received/sent messages with the candidate hash and chunk index as key. The candidate hash wasn't required in this case, as the messages are already stored per candidate. * Update node/core/bitfield-signing/src/lib.rs Co-authored-by: Robert Habermeier * Remove the reverse map * major refactor of receipts & query_live * finish refactoring remove ancestory mapping, improve relay-parent cleanup & receipts-cache cleanup, add descriptor to `PerCandidate` * rename and rewrite query_pending_availability * add a bunch of consistency tests * Add some last changes * xy * fz * Make it compile again * Fix one test * Fix logging * Remove some buggy code * Make tests work again * Move stuff around * Remove dbg * Remove state from test_harness * More refactor and new test * New test and fixes * Move metric * Remove "duplicated code" * Fix tests * New test * Change break to continue * Update node/core/av-store/src/lib.rs * Update node/core/av-store/src/lib.rs * Update node/core/bitfield-signing/src/lib.rs Co-authored-by: Fedor Sakharov * update guide to match live_candidates changes * add comment * fix bitfield signing Co-authored-by: Robert Habermeier Co-authored-by: Bernhard Schuster Co-authored-by: Fedor Sakharov --- polkadot/Cargo.lock | 6 +- polkadot/node/core/av-store/src/lib.rs | 50 +- .../node/core/bitfield-signing/src/lib.rs | 23 +- .../availability-distribution/Cargo.toml | 6 +- .../availability-distribution/src/lib.rs | 609 +++---- .../availability-distribution/src/tests.rs | 1506 ++++++++++------- polkadot/node/subsystem/src/messages.rs | 2 +- .../availability/availability-distribution.md | 3 +- 8 files changed, 1203 insertions(+), 1002 deletions(-) diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index a55b062db8..692a9c1c48 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -4914,10 +4914,8 @@ name = "polkadot-availability-distribution" version = "0.1.0" dependencies = [ "assert_matches", - "env_logger 0.8.2", "futures 0.3.8", - "futures-timer 3.0.2", - "log", + "maplit", "parity-scale-codec", "polkadot-erasure-coding", "polkadot-node-network-protocol", @@ -4926,11 +4924,11 @@ dependencies = [ "polkadot-node-subsystem-util", "polkadot-primitives", "sc-keystore", - "smallvec 1.5.1", "sp-application-crypto", "sp-core", "sp-keyring", "sp-keystore", + "sp-tracing", "thiserror", "tracing", "tracing-futures", diff --git a/polkadot/node/core/av-store/src/lib.rs b/polkadot/node/core/av-store/src/lib.rs index d1d81d031c..66bec12142 100644 --- a/polkadot/node/core/av-store/src/lib.rs +++ b/polkadot/node/core/av-store/src/lib.rs @@ -713,25 +713,51 @@ where match msg { QueryAvailableData(hash, tx) => { - tx.send(available_data(&subsystem.inner, &hash).map(|d| d.data)) - .map_err(|_| oneshot::Canceled)?; + tx.send(available_data(&subsystem.inner, &hash).map(|d| d.data)).map_err(|_| oneshot::Canceled)?; } QueryDataAvailability(hash, tx) => { - tx.send(available_data(&subsystem.inner, &hash).is_some()) - .map_err(|_| oneshot::Canceled)?; + let result = available_data(&subsystem.inner, &hash).is_some(); + + tracing::trace!( + target: LOG_TARGET, + candidate_hash = ?hash, + availability = ?result, + "Queried data availability", + ); + + tx.send(result).map_err(|_| oneshot::Canceled)?; } QueryChunk(hash, id, tx) => { - tx.send(get_chunk(subsystem, &hash, id)?) - .map_err(|_| oneshot::Canceled)?; + tx.send(get_chunk(subsystem, &hash, id)?).map_err(|_| oneshot::Canceled)?; } QueryChunkAvailability(hash, id, tx) => { - tx.send(get_chunk(subsystem, &hash, id)?.is_some()) - .map_err(|_| oneshot::Canceled)?; + let result = get_chunk(subsystem, &hash, id).map(|r| r.is_some()); + + tracing::trace!( + target: LOG_TARGET, + candidate_hash = ?hash, + availability = ?result, + "Queried chunk availability", + ); + + tx.send(result?).map_err(|_| oneshot::Canceled)?; } StoreChunk { candidate_hash, relay_parent, validator_index, chunk, tx } => { + let chunk_index = chunk.index; // Current block number is relay_parent block number + 1. let block_number = get_block_number(ctx, relay_parent).await? + 1; - match store_chunk(subsystem, &candidate_hash, validator_index, chunk, block_number) { + let result = store_chunk(subsystem, &candidate_hash, validator_index, chunk, block_number); + + tracing::trace!( + target: LOG_TARGET, + %chunk_index, + ?candidate_hash, + %block_number, + ?result, + "Stored chunk", + ); + + match result { Err(e) => { tx.send(Err(())).map_err(|_| oneshot::Canceled)?; return Err(e); @@ -742,7 +768,11 @@ where } } StoreAvailableData(hash, id, n_validators, av_data, tx) => { - match store_available_data(subsystem, &hash, id, n_validators, av_data) { + let result = store_available_data(subsystem, &hash, id, n_validators, av_data); + + tracing::trace!(target: LOG_TARGET, candidate_hash = ?hash, ?result, "Stored available data"); + + match result { Err(e) => { tx.send(Err(())).map_err(|_| oneshot::Canceled)?; return Err(e); diff --git a/polkadot/node/core/bitfield-signing/src/lib.rs b/polkadot/node/core/bitfield-signing/src/lib.rs index e02bd5661f..d5c7f4b29a 100644 --- a/polkadot/node/core/bitfield-signing/src/lib.rs +++ b/polkadot/node/core/bitfield-signing/src/lib.rs @@ -78,6 +78,8 @@ async fn get_core_availability( ) -> Result { let span = jaeger::hash_span(&relay_parent, "core_availability"); if let CoreState::Occupied(core) = core { + tracing::trace!(target: LOG_TARGET, para_id = %core.para_id, "Getting core availability"); + let _span = span.child("occupied"); let (tx, rx) = oneshot::channel(); sender @@ -93,7 +95,10 @@ async fn get_core_availability( let committed_candidate_receipt = match rx.await? { Ok(Some(ccr)) => ccr, - Ok(None) => return Ok(false), + Ok(None) => { + tracing::trace!(target: LOG_TARGET, para_id = %core.para_id, "No committed candidate"); + return Ok(false) + }, Err(e) => { // Don't take down the node on runtime API errors. tracing::warn!(target: LOG_TARGET, err = ?e, "Encountered a runtime API error"); @@ -103,6 +108,7 @@ async fn get_core_availability( drop(_span); let _span = span.child("query chunk"); + let candidate_hash = committed_candidate_receipt.hash(); let (tx, rx) = oneshot::channel(); sender @@ -110,13 +116,24 @@ async fn get_core_availability( .await .send( AllMessages::from(AvailabilityStoreMessage::QueryChunkAvailability( - committed_candidate_receipt.hash(), + candidate_hash, validator_idx, tx, )).into(), ) .await?; - return rx.await.map_err(Into::into); + + let res = rx.await.map_err(Into::into); + + tracing::trace!( + target: LOG_TARGET, + para_id = %core.para_id, + availability = ?res, + ?candidate_hash, + "Candidate availability", + ); + + return res; } Ok(false) diff --git a/polkadot/node/network/availability-distribution/Cargo.toml b/polkadot/node/network/availability-distribution/Cargo.toml index dd1c6f61c3..b3dc14422c 100644 --- a/polkadot/node/network/availability-distribution/Cargo.toml +++ b/polkadot/node/network/availability-distribution/Cargo.toml @@ -23,9 +23,7 @@ polkadot-subsystem-testhelpers = { package = "polkadot-node-subsystem-test-helpe sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] } sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } -futures-timer = "3.0.2" -env_logger = "0.8.2" assert_matches = "1.4.0" -smallvec = "1.5.1" -log = "0.4.11" +maplit = "1.0" diff --git a/polkadot/node/network/availability-distribution/src/lib.rs b/polkadot/node/network/availability-distribution/src/lib.rs index 67d1d4ffa8..5bf11defd2 100644 --- a/polkadot/node/network/availability-distribution/src/lib.rs +++ b/polkadot/node/network/availability-distribution/src/lib.rs @@ -38,6 +38,7 @@ use polkadot_node_subsystem_util::metrics::{self, prometheus}; use polkadot_primitives::v1::{ BlakeTwo256, CommittedCandidateReceipt, CoreState, ErasureChunk, Hash, HashT, Id as ParaId, SessionIndex, ValidatorId, ValidatorIndex, PARACHAIN_KEY_TYPE_ID, CandidateHash, + CandidateDescriptor, }; use polkadot_subsystem::messages::{ AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage, ChainApiMessage, @@ -50,6 +51,7 @@ use polkadot_subsystem::{ SubsystemContext, SubsystemError, }; use std::collections::{HashMap, HashSet}; +use std::collections::hash_map::Entry; use std::iter; use thiserror::Error; @@ -116,6 +118,12 @@ pub struct AvailabilityGossipMessage { pub erasure_chunk: ErasureChunk, } +impl From for protocol_v1::AvailabilityDistributionMessage { + fn from(message: AvailabilityGossipMessage) -> Self { + Self::Chunk(message.candidate_hash, message.erasure_chunk) + } +} + /// Data used to track information of peers and relay parents the /// overseer ordered us to work on. #[derive(Default, Clone, Debug)] @@ -129,19 +137,8 @@ struct ProtocolState { /// Caches a mapping of relay parents or ancestor to live candidate receipts. /// Allows fast intersection of live candidates with views and consecutive unioning. - /// Maps relay parent / ancestor -> live candidate receipts + its hash. - receipts: HashMap>, - - /// Allow reverse caching of view checks. - /// Maps candidate hash -> relay parent for extracting meta information from `PerRelayParent`. - /// Note that the presence of this is not sufficient to determine if deletion is OK, i.e. - /// two histories could cover this. - reverse: HashMap, - - /// Keeps track of which candidate receipts are required due to ancestors of which relay parents - /// of our view. - /// Maps ancestor -> relay parents in view - ancestry: HashMap>, + /// Maps relay parent / ancestor -> candidate receipts. + receipts: HashMap>, /// Track things needed to start and stop work on a particular relay parent. per_relay_parent: HashMap, @@ -157,24 +154,30 @@ struct PerCandidate { /// candidate hash + erasure chunk index -> gossip message message_vault: HashMap, - /// Track received candidate hashes and validator indices from peers. - received_messages: HashMap>, + /// Track received erasure chunk indices per peer. + received_messages: HashMap>, - /// Track already sent candidate hashes and the erasure chunk index to the peers. - sent_messages: HashMap>, + /// Track sent erasure chunk indices per peer. + sent_messages: HashMap>, /// The set of validators. validators: Vec, /// If this node is a validator, note the index in the validator set. validator_index: Option, + + /// The descriptor of this candidate. + descriptor: CandidateDescriptor, + + /// The set of relay chain blocks this appears to be live in. + live_in: HashSet, } impl PerCandidate { - /// Returns `true` iff the given `message` is required by the given `peer`. - fn message_required_by_peer(&self, peer: &PeerId, message: &(CandidateHash, ValidatorIndex)) -> bool { - self.received_messages.get(peer).map(|v| !v.contains(message)).unwrap_or(true) - && self.sent_messages.get(peer).map(|v| !v.contains(message)).unwrap_or(true) + /// Returns `true` iff the given `validator_index` is required by the given `peer`. + fn message_required_by_peer(&self, peer: &PeerId, validator_index: &ValidatorIndex) -> bool { + self.received_messages.get(peer).map(|v| !v.contains(validator_index)).unwrap_or(true) + && self.sent_messages.get(peer).map(|v| !v.contains(validator_index)).unwrap_or(true) } } @@ -182,139 +185,85 @@ impl PerCandidate { struct PerRelayParent { /// Set of `K` ancestors for this relay parent. ancestors: Vec, + /// Live candidates, according to this relay parent. + live_candidates: HashSet, } impl ProtocolState { - /// Collects the relay_parents ancestors including the relay parents themselfes. - #[tracing::instrument(level = "trace", skip(relay_parents), fields(subsystem = LOG_TARGET))] - fn extend_with_ancestors<'a>( - &'a self, - relay_parents: impl IntoIterator + 'a, - ) -> HashSet { - relay_parents - .into_iter() - .map(|relay_parent| { - self.per_relay_parent - .get(relay_parent) - .into_iter() - .map(|per_relay_parent| per_relay_parent.ancestors.iter().cloned()) - .flatten() - .chain(iter::once(*relay_parent)) - }) - .flatten() - .collect::>() - } - - /// Unionize all cached entries for the given relay parents and its ancestors. + /// Unionize all live candidate hashes of the given relay parents and their recent + /// ancestors. + /// /// Ignores all non existent relay parents, so this can be used directly with a peers view. - /// Returns a map from candidate hash -> receipt + /// Returns a set of candidate hashes. #[tracing::instrument(level = "trace", skip(relay_parents), fields(subsystem = LOG_TARGET))] fn cached_live_candidates_unioned<'a>( &'a self, relay_parents: impl IntoIterator + 'a, - ) -> HashMap { - let relay_parents_and_ancestors = self.extend_with_ancestors(relay_parents); - relay_parents_and_ancestors + ) -> HashSet { + relay_parents .into_iter() - .filter_map(|relay_parent_or_ancestor| self.receipts.get(&relay_parent_or_ancestor)) - .map(|receipt_set| receipt_set.into_iter()) + .filter_map(|r| self.per_relay_parent.get(r)) + .map(|per_relay_parent| per_relay_parent.live_candidates.iter().cloned()) .flatten() - .map(|(receipt_hash, receipt)| (receipt_hash.clone(), receipt.clone())) .collect() } - #[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))] - async fn add_relay_parent( + #[tracing::instrument(level = "trace", skip(candidates), fields(subsystem = LOG_TARGET))] + fn add_relay_parent( &mut self, - ctx: &mut Context, relay_parent: Hash, validators: Vec, validator_index: Option, - ) -> Result<()> - where - Context: SubsystemContext, - { - let candidates = query_live_candidates(ctx, self, std::iter::once(relay_parent)).await?; + candidates: HashMap, + ancestors: Vec, + ) { + let candidate_hashes: Vec<_> = candidates.keys().cloned().collect(); // register the relation of relay_parent to candidate.. - // ..and the reverse association. - for (relay_parent_or_ancestor, (receipt_hash, receipt)) in candidates.clone() { - self.reverse - .insert(receipt_hash.clone(), relay_parent_or_ancestor.clone()); - let per_candidate = self.per_candidate.entry(receipt_hash.clone()).or_default(); - per_candidate.validator_index = validator_index.clone(); - per_candidate.validators = validators.clone(); + for (receipt_hash, fetched) in candidates { + let per_candidate = self.per_candidate.entry(receipt_hash).or_default(); - self.receipts - .entry(relay_parent_or_ancestor) - .or_default() - .insert((receipt_hash, receipt)); + // Cached candidates already have entries and thus don't need this + // information to be set. + if let FetchedLiveCandidate::Fresh(descriptor) = fetched { + per_candidate.validator_index = validator_index.clone(); + per_candidate.validators = validators.clone(); + per_candidate.descriptor = descriptor; + } + per_candidate.live_in.insert(relay_parent); } - // collect the ancestors again from the hash map - let ancestors = candidates - .iter() - .filter_map(|(ancestor_or_relay_parent, _receipt)| { - if ancestor_or_relay_parent == &relay_parent { - None - } else { - Some(*ancestor_or_relay_parent) - } - }) - .collect::>(); - - // mark all the ancestors as "needed" by this newly added relay parent - for ancestor in ancestors.iter() { - self.ancestry - .entry(ancestor.clone()) - .or_default() - .insert(relay_parent); - } - - self.per_relay_parent - .entry(relay_parent) - .or_default() - .ancestors = ancestors; - - Ok(()) + let per_relay_parent = self.per_relay_parent.entry(relay_parent).or_default(); + per_relay_parent.ancestors = ancestors; + per_relay_parent.live_candidates.extend(candidate_hashes); } #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] fn remove_relay_parent(&mut self, relay_parent: &Hash) { - // we might be ancestor of some other relay_parent - if let Some(ref mut descendants) = self.ancestry.get_mut(relay_parent) { - // if we were the last user, and it is - // not explicitly set to be worked on by the overseer - if descendants.is_empty() { - // remove from the ancestry index - self.ancestry.remove(relay_parent); - // and also remove the actual receipt - if let Some(candidates) = self.receipts.remove(relay_parent) { - candidates.into_iter().for_each(|c| { self.per_candidate.remove(&c.0); }); - } - } - } if let Some(per_relay_parent) = self.per_relay_parent.remove(relay_parent) { - // remove all "references" from the hash maps and sets for all ancestors - for ancestor in per_relay_parent.ancestors { - // one of our decendants might be ancestor of some other relay_parent - if let Some(ref mut descendants) = self.ancestry.get_mut(&ancestor) { - // we do not need this descendant anymore - descendants.remove(&relay_parent); - // if we were the last user, and it is - // not explicitly set to be worked on by the overseer - if descendants.is_empty() && !self.per_relay_parent.contains_key(&ancestor) { - // remove from the ancestry index - self.ancestry.remove(&ancestor); - // and also remove the actual receipt - if let Some(candidates) = self.receipts.remove(&ancestor) { - candidates.into_iter().for_each(|c| { self.per_candidate.remove(&c.0); }); - } + for candidate_hash in per_relay_parent.live_candidates { + // Prune the candidate if this was the last member of our view + // to consider it live (including its ancestors). + if let Entry::Occupied(mut occ) = self.per_candidate.entry(candidate_hash) { + occ.get_mut().live_in.remove(relay_parent); + if occ.get().live_in.is_empty() { + occ.remove(); } } } } } + + // Removes all entries from receipts which aren't referenced in the ancestry of + // one of our live relay-chain heads. + fn clean_up_receipts_cache(&mut self) { + let extended_view: HashSet<_> = self.per_relay_parent.iter() + .map(|(r_hash, v)| v.ancestors.iter().cloned().chain(std::iter::once(*r_hash))) + .flatten() + .collect(); + + self.receipts.retain(|ancestor_hash, _| extended_view.contains(ancestor_hash)); + } } /// Deal with network bridge updates and track what needs to be tracked @@ -387,27 +336,30 @@ where for added in view.difference(&old_view) { let validators = query_validators(ctx, *added).await?; let validator_index = obtain_our_validator_index(&validators, keystore.clone()).await; - state - .add_relay_parent(ctx, *added, validators, validator_index) - .await?; + let (candidates, ancestors) + = query_live_candidates(ctx, &mut state.receipts, *added).await?; + + state.add_relay_parent( + *added, + validators, + validator_index, + candidates, + ancestors, + ); } // handle all candidates - for (candidate_hash, _receipt) in state.cached_live_candidates_unioned(view.difference(&old_view)) { - let per_candidate = state.per_candidate.entry(candidate_hash).or_default(); - - // assure the node has the validator role - if per_candidate.validator_index.is_none() { - continue; - }; + for candidate_hash in state.cached_live_candidates_unioned(view.difference(&old_view)) { + // If we are not a validator for this candidate, let's skip it. + if state.per_candidate.entry(candidate_hash).or_default().validator_index.is_none() { + continue + } // check if the availability is present in the store exists if !query_data_availability(ctx, candidate_hash).await? { continue; } - let validator_count = per_candidate.validators.len(); - // obtain interested peers in the candidate hash let peers: Vec = state .peer_views @@ -417,79 +369,66 @@ where // collect all direct interests of a peer w/o ancestors state .cached_live_candidates_unioned(view.heads.iter()) - .contains_key(&candidate_hash) + .contains(&candidate_hash) }) .map(|(peer, _view)| peer.clone()) .collect(); + let per_candidate = state.per_candidate.entry(candidate_hash).or_default(); + + let validator_count = per_candidate.validators.len(); + // distribute all erasure messages to interested peers for chunk_index in 0u32..(validator_count as u32) { - // only the peers which did not receive this particular erasure chunk - let per_candidate = state.per_candidate.entry(candidate_hash).or_default(); - - // obtain the chunks from the cache, if not fallback - // and query the availability store - let message_id = (candidate_hash, chunk_index); - let erasure_chunk = if let Some(message) = per_candidate.message_vault.get(&chunk_index) { - message.erasure_chunk.clone() + let message = if let Some(message) = per_candidate.message_vault.get(&chunk_index) { + tracing::trace!( + target: LOG_TARGET, + %chunk_index, + ?candidate_hash, + "Retrieved chunk from message vault", + ); + message.clone() } else if let Some(erasure_chunk) = query_chunk(ctx, candidate_hash, chunk_index as ValidatorIndex).await? { - erasure_chunk + tracing::trace!( + target: LOG_TARGET, + %chunk_index, + ?candidate_hash, + "Retrieved chunk from availability storage", + ); + + AvailabilityGossipMessage { + candidate_hash, + erasure_chunk, + } } else { + tracing::error!( + target: LOG_TARGET, + %chunk_index, + ?candidate_hash, + "Availability store reported that we have the availability data, but we could not retrieve a chunk of it!", + ); continue; }; - debug_assert_eq!(erasure_chunk.index, chunk_index); + debug_assert_eq!(message.erasure_chunk.index, chunk_index); let peers = peers .iter() - .filter(|peer| per_candidate.message_required_by_peer(peer, &message_id)) + .filter(|peer| per_candidate.message_required_by_peer(peer, &chunk_index)) .cloned() .collect::>(); - let message = AvailabilityGossipMessage { - candidate_hash, - erasure_chunk, - }; - send_tracked_gossip_message_to_peers(ctx, per_candidate, metrics, peers, message).await; + send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message)).await; } } // cleanup the removed relay parents and their states - let removed = old_view.difference(&view).collect::>(); - for removed in removed { - state.remove_relay_parent(&removed); - } + old_view.difference(&view).for_each(|r| state.remove_relay_parent(r)); + state.clean_up_receipts_cache(); + Ok(()) } -#[inline(always)] -async fn send_tracked_gossip_message_to_peers( - ctx: &mut Context, - per_candidate: &mut PerCandidate, - metrics: &Metrics, - peers: Vec, - message: AvailabilityGossipMessage, -) -where - Context: SubsystemContext, -{ - send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message)).await -} - -#[inline(always)] -async fn send_tracked_gossip_messages_to_peer( - ctx: &mut Context, - per_candidate: &mut PerCandidate, - metrics: &Metrics, - peer: PeerId, - message_iter: impl IntoIterator, -) -where - Context: SubsystemContext, -{ - send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, vec![peer], message_iter).await -} - #[tracing::instrument(level = "trace", skip(ctx, metrics, message_iter), fields(subsystem = LOG_TARGET))] async fn send_tracked_gossip_messages_to_peers( ctx: &mut Context, @@ -501,37 +440,27 @@ async fn send_tracked_gossip_messages_to_peers( where Context: SubsystemContext, { - if peers.is_empty() { - return; - } for message in message_iter { for peer in peers.iter() { - let message_id = (message.candidate_hash, message.erasure_chunk.index); per_candidate .sent_messages .entry(peer.clone()) .or_default() - .insert(message_id); + .insert(message.erasure_chunk.index); } per_candidate .message_vault .insert(message.erasure_chunk.index, message.clone()); - let wire_message = protocol_v1::AvailabilityDistributionMessage::Chunk( - message.candidate_hash, - message.erasure_chunk, - ); - - ctx.send_message(AllMessages::NetworkBridge( - NetworkBridgeMessage::SendValidationMessage( + if !peers.is_empty() { + ctx.send_message(NetworkBridgeMessage::SendValidationMessage( peers.clone(), - protocol_v1::ValidationProtocol::AvailabilityDistribution(wire_message), - ), - )) - .await; + protocol_v1::ValidationProtocol::AvailabilityDistribution(message.into()), + ).into()).await; - metrics.on_chunk_distributed(); + metrics.on_chunk_distributed(); + } } } @@ -558,29 +487,25 @@ where // the union of all relay parent's candidates. let added_candidates = state.cached_live_candidates_unioned(added.iter()); - // Send all messages we've seen before and the peer is now interested - // in to that peer. - - for (candidate_hash, _receipt) in added_candidates { + // Send all messages we've seen before and the peer is now interested in. + for candidate_hash in added_candidates { let per_candidate = state.per_candidate.entry(candidate_hash).or_default(); // obtain the relevant chunk indices not sent yet let messages = ((0 as ValidatorIndex)..(per_candidate.validators.len() as ValidatorIndex)) .into_iter() .filter_map(|erasure_chunk_index: ValidatorIndex| { - let message_id = (candidate_hash, erasure_chunk_index); - // try to pick up the message from the message vault // so we send as much as we have per_candidate .message_vault .get(&erasure_chunk_index) - .filter(|_| per_candidate.message_required_by_peer(&origin, &message_id)) + .filter(|_| per_candidate.message_required_by_peer(&origin, &erasure_chunk_index)) }) .cloned() .collect::>(); - send_tracked_gossip_messages_to_peer(ctx, per_candidate, metrics, origin.clone(), messages).await; + send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, vec![origin.clone()], messages).await; } } @@ -622,37 +547,59 @@ where let live_candidates = state.cached_live_candidates_unioned(state.view.heads.iter()); // check if the candidate is of interest - let live_candidate = if let Some(live_candidate) = live_candidates.get(&message.candidate_hash) { - live_candidate + let descriptor = if live_candidates.contains(&message.candidate_hash) { + state.per_candidate + .get(&message.candidate_hash) + .expect("All live candidates are contained in per_candidate; qed") + .descriptor + .clone() } else { + tracing::trace!( + target: LOG_TARGET, + candidate_hash = ?message.candidate_hash, + peer = %origin, + "Peer send not live candidate", + ); modify_reputation(ctx, origin, COST_NOT_A_LIVE_CANDIDATE).await; - return Ok(()); + return Ok(()) }; - // check the merkle proof - let root = &live_candidate.descriptor.erasure_root; - let anticipated_hash = if let Ok(hash) = branch_hash( - root, + // check the merkle proof against the erasure root in the candidate descriptor. + let anticipated_hash = match branch_hash( + &descriptor.erasure_root, &message.erasure_chunk.proof, message.erasure_chunk.index as usize, ) { - hash - } else { - modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await; - return Ok(()); + Ok(hash) => hash, + Err(e) => { + tracing::trace!( + target: LOG_TARGET, + candidate_hash = ?message.candidate_hash, + peer = %origin, + error = ?e, + "Failed to calculate chunk merkle proof", + ); + modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await; + return Ok(()); + }, }; let erasure_chunk_hash = BlakeTwo256::hash(&message.erasure_chunk.chunk); if anticipated_hash != erasure_chunk_hash { + tracing::trace!( + target: LOG_TARGET, + candidate_hash = ?message.candidate_hash, + peer = %origin, + "Peer send chunk with invalid merkle proof", + ); modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await; return Ok(()); } - // an internal unique identifier of this message - let message_id = (message.candidate_hash, message.erasure_chunk.index); + let erasure_chunk_index = &message.erasure_chunk.index; { - let per_candidate = state.per_candidate.entry(message_id.0.clone()).or_default(); + let per_candidate = state.per_candidate.entry(message.candidate_hash).or_default(); // check if this particular erasure chunk was already sent by that peer before { @@ -660,18 +607,16 @@ where .received_messages .entry(origin.clone()) .or_default(); - if received_set.contains(&message_id) { + if !received_set.insert(*erasure_chunk_index) { modify_reputation(ctx, origin, COST_PEER_DUPLICATE_MESSAGE).await; return Ok(()); - } else { - received_set.insert(message_id.clone()); } } // insert into known messages and change reputation if per_candidate .message_vault - .insert(message_id.1, message.clone()) + .insert(*erasure_chunk_index, message.clone()) .is_some() { modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await; @@ -679,22 +624,18 @@ where modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE_FIRST).await; // save the chunk for our index - if let Some(validator_index) = per_candidate.validator_index { - if message.erasure_chunk.index == validator_index { - if let Err(_e) = store_chunk( - ctx, - message.candidate_hash.clone(), - live_candidate.descriptor.relay_parent.clone(), - message.erasure_chunk.index, - message.erasure_chunk.clone(), - ) - .await? - { - tracing::warn!( - target: LOG_TARGET, - "Failed to store erasure chunk to availability store" - ); - } + if Some(*erasure_chunk_index) == per_candidate.validator_index { + if store_chunk( + ctx, + message.candidate_hash, + descriptor.relay_parent, + message.erasure_chunk.index, + message.erasure_chunk.clone(), + ).await?.is_err() { + tracing::warn!( + target: LOG_TARGET, + "Failed to store erasure chunk to availability store" + ); } } }; @@ -704,24 +645,24 @@ where .peer_views .clone() .into_iter() - .filter(|(_peer, view)| { + .filter(|(_, view)| { // peers view must contain the candidate hash too state .cached_live_candidates_unioned(view.heads.iter()) - .contains_key(&message_id.0) + .contains(&message.candidate_hash) }) .map(|(peer, _)| -> PeerId { peer.clone() }) .collect::>(); - let per_candidate = state.per_candidate.entry(message_id.0.clone()).or_default(); + let per_candidate = state.per_candidate.entry(message.candidate_hash).or_default(); let peers = peers .into_iter() - .filter(|peer| per_candidate.message_required_by_peer(peer, &message_id)) + .filter(|peer| per_candidate.message_required_by_peer(peer, erasure_chunk_index)) .collect::>(); // gossip that message to interested peers - send_tracked_gossip_message_to_peers(ctx, per_candidate, metrics, peers, message).await; + send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message)).await; Ok(()) } @@ -743,13 +684,21 @@ impl AvailabilityDistributionSubsystem { } /// Start processing work as passed on from the Overseer. + async fn run(self, ctx: Context) -> Result<()> + where + Context: SubsystemContext, + { + let mut state = ProtocolState::default(); + self.run_inner(ctx, &mut state).await + } + + /// Start processing work. #[tracing::instrument(skip(self, ctx), fields(subsystem = LOG_TARGET))] - async fn run(self, mut ctx: Context) -> Result<()> + async fn run_inner(self, mut ctx: Context, state: &mut ProtocolState) -> Result<()> where Context: SubsystemContext, { // work: process incoming messages from the overseer. - let mut state = ProtocolState::default(); loop { let message = ctx .recv() @@ -762,7 +711,7 @@ impl AvailabilityDistributionSubsystem { if let Err(e) = handle_network_msg( &mut ctx, &self.keystore.clone(), - &mut state, + state, &self.metrics, event, ) @@ -807,96 +756,102 @@ where } } -/// Obtain all live candidates based on an iterator of relay heads. -#[tracing::instrument(level = "trace", skip(ctx, relay_parents), fields(subsystem = LOG_TARGET))] -async fn query_live_candidates_without_ancestors( +// Metadata about a candidate that is part of the live_candidates set. +// +// Those which were not present in a cache are "fresh" and have their candidate descriptor attached. This +// information is propagated to the higher level where it can be used to create data entries. Cached candidates +// already have entries associated with them, and thus don't need this metadata to be fetched. +#[derive(Debug)] +enum FetchedLiveCandidate { + Cached, + Fresh(CandidateDescriptor), +} + +/// Obtain all live candidates for all given `relay_blocks`. +/// +/// This returns a set of all candidate hashes pending availability within the state +/// of the explicitly referenced relay heads. +/// +/// This also queries the provided `receipts` cache before reaching into the +/// runtime and updates it with the information learned. +#[tracing::instrument(level = "trace", skip(ctx, relay_blocks, receipts), fields(subsystem = LOG_TARGET))] +async fn query_pending_availability_at( ctx: &mut Context, - relay_parents: impl IntoIterator, -) -> Result> + relay_blocks: impl IntoIterator, + receipts: &mut HashMap>, +) -> Result> where Context: SubsystemContext, { - let iter = relay_parents.into_iter(); - let hint = iter.size_hint(); + let mut live_candidates = HashMap::new(); - let mut live_candidates = HashSet::with_capacity(hint.1.unwrap_or(hint.0)); - for relay_parent in iter { - let paras = query_para_ids(ctx, relay_parent).await?; - for para in paras { + // fetch and fill out cache for each of these + for relay_parent in relay_blocks { + let receipts_for = match receipts.entry(relay_parent) { + Entry::Occupied(e) => { + live_candidates.extend( + e.get().iter().cloned().map(|c| (c, FetchedLiveCandidate::Cached)) + ); + continue + }, + e => e.or_default(), + }; + + for para in query_para_ids(ctx, relay_parent).await? { if let Some(ccr) = query_pending_availability(ctx, relay_parent, para).await? { - live_candidates.insert(ccr); + let receipt_hash = ccr.hash(); + let descriptor = ccr.descriptor().clone(); + + // unfortunately we have no good way of telling the candidate was + // cached until now. But we don't clobber a `Cached` entry if there + // is one already. + live_candidates.entry(receipt_hash) + .or_insert(FetchedLiveCandidate::Fresh(descriptor)); + + receipts_for.insert(receipt_hash); } } } + Ok(live_candidates) } -/// Obtain all live candidates based on an iterator or relay heads including `k` ancestors. +/// Obtain all live candidates under a particular relay head. This implicitly includes +/// `K` ancestors of the head, such that the candidates pending availability in all of +/// the states of the head and the ancestors are unioned together to produce the +/// return type of this function. Each candidate hash is paired. /// -/// Relay parent. -#[tracing::instrument(level = "trace", skip(ctx, relay_parents), fields(subsystem = LOG_TARGET))] +/// This also updates all `receipts` cached by the protocol state and returns a list +/// of up to `K` ancestors of the relay-parent. +#[tracing::instrument(level = "trace", skip(ctx, receipts), fields(subsystem = LOG_TARGET))] async fn query_live_candidates( ctx: &mut Context, - state: &mut ProtocolState, - relay_parents: impl IntoIterator, -) -> Result> + receipts: &mut HashMap>, + relay_parent: Hash, +) -> Result<(HashMap, Vec)> where Context: SubsystemContext, { - let iter = relay_parents.into_iter(); - let hint = iter.size_hint(); + // register one of relay parents (not the ancestors) + let ancestors = query_up_to_k_ancestors_in_same_session( + ctx, + relay_parent, + AvailabilityDistributionSubsystem::K, + ) + .await?; - let capacity = hint.1.unwrap_or(hint.0) * (1 + AvailabilityDistributionSubsystem::K); - let mut live_candidates = - HashMap::::with_capacity(capacity); + // query the ones that were not present in the receipts cache and add them + // to it. + let live_candidates = query_pending_availability_at( + ctx, + ancestors.iter().cloned().chain(std::iter::once(relay_parent)), + receipts, + ).await?; - for relay_parent in iter { - // register one of relay parents (not the ancestors) - let mut ancestors = query_up_to_k_ancestors_in_same_session( - ctx, - relay_parent, - AvailabilityDistributionSubsystem::K, - ) - .await?; - - ancestors.push(relay_parent); - - // ancestors might overlap, so check the cache too - let unknown = ancestors - .into_iter() - .filter(|relay_parent_or_ancestor| { - // use the ones which we pulled before - // but keep the unknown relay parents - state - .receipts - .get(relay_parent_or_ancestor) - .and_then(|receipts| { - // directly extend the live_candidates with the cached value - live_candidates.extend(receipts.into_iter().map( - |(receipt_hash, receipt)| { - (relay_parent, (receipt_hash.clone(), receipt.clone())) - }, - )); - Some(()) - }) - .is_none() - }) - .collect::>(); - - // query the ones that were not present in the receipts cache - let receipts = query_live_candidates_without_ancestors(ctx, unknown.clone()).await?; - live_candidates.extend( - unknown.into_iter().zip( - receipts - .into_iter() - .map(|receipt| (receipt.hash(), receipt)), - ), - ); - } - Ok(live_candidates) + Ok((live_candidates, ancestors)) } -/// Query all para IDs. +/// Query all para IDs that are occupied under a given relay-parent. #[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))] async fn query_para_ids(ctx: &mut Context, relay_parent: Hash) -> Result> where @@ -909,7 +864,7 @@ where ))) .await; - let all_para_ids: Vec<_> = rx + let all_para_ids = rx .await .map_err(|e| Error::AvailabilityCoresResponseChannel(e))? .map_err(|e| Error::AvailabilityCores(e))?; @@ -955,8 +910,7 @@ where AvailabilityStoreMessage::QueryDataAvailability(candidate_hash, tx), )).await; - rx.await - .map_err(|e| Error::QueryAvailabilityResponseChannel(e)) + rx.await.map_err(|e| Error::QueryAvailabilityResponseChannel(e)) } #[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))] @@ -1111,18 +1065,11 @@ where // iterate from youngest to oldest let mut iter = ancestors.into_iter().peekable(); - while let Some(ancestor) = iter.next() { - if let Some(ancestor_parent) = iter.peek() { - let session = query_session_index_for_child(ctx, *ancestor_parent).await?; - if session != desired_session { - break; - } - acc.push(ancestor); - } else { - // either ended up at genesis or the blocks were - // already pruned + while let Some((ancestor, ancestor_parent)) = iter.next().and_then(|a| iter.peek().map(|ap| (a, ap))) { + if query_session_index_for_child(ctx, *ancestor_parent).await? != desired_session { break; } + acc.push(ancestor); } debug_assert!(acc.len() <= k); diff --git a/polkadot/node/network/availability-distribution/src/tests.rs b/polkadot/node/network/availability-distribution/src/tests.rs index 3e7f0d83e4..310ca9e87c 100644 --- a/polkadot/node/network/availability-distribution/src/tests.rs +++ b/polkadot/node/network/availability-distribution/src/tests.rs @@ -26,17 +26,16 @@ use polkadot_primitives::v1::{ use polkadot_subsystem_testhelpers as test_helpers; use futures::{executor, future, Future}; -use futures_timer::Delay; use sc_keystore::LocalKeystore; -use smallvec::smallvec; use sp_application_crypto::AppKey; use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr}; use std::{sync::Arc, time::Duration}; +use maplit::hashmap; - -macro_rules! delay { - ($delay:expr) => { - Delay::new(Duration::from_millis($delay)).await; +macro_rules! view { + ( $( $hash:expr ),* $(,)? ) => { + // Finalized number unimportant for availability distribution. + View { heads: vec![ $( $hash.clone() ),* ], finalized_number: 0 } }; } @@ -55,66 +54,43 @@ struct TestHarness { fn test_harness>( keystore: SyncCryptoStorePtr, - test: impl FnOnce(TestHarness) -> T, -) { - let _ = env_logger::builder() - .is_test(true) - .filter( - Some("polkadot_availability_distribution"), - log::LevelFilter::Trace, - ) - .try_init(); + test_fx: impl FnOnce(TestHarness) -> T, +) -> ProtocolState { + sp_tracing::try_init_simple(); let pool = sp_core::testing::TaskExecutor::new(); - let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone()); let subsystem = AvailabilityDistributionSubsystem::new(keystore, Default::default()); - let subsystem = subsystem.run(context); + let mut state = ProtocolState::default(); + { + let subsystem = subsystem.run_inner(context, &mut state); - let test_fut = test(TestHarness { virtual_overseer }); + let test_fut = test_fx(TestHarness { virtual_overseer }); - futures::pin_mut!(test_fut); - futures::pin_mut!(subsystem); + futures::pin_mut!(test_fut); + futures::pin_mut!(subsystem); - executor::block_on(future::select(test_fut, subsystem)); -} + executor::block_on(future::select(test_fut, subsystem)); + } -const TIMEOUT: Duration = Duration::from_millis(100); - -async fn overseer_signal( - overseer: &mut test_helpers::TestSubsystemContextHandle, - signal: OverseerSignal, -) { - delay!(50); - overseer - .send(FromOverseer::Signal(signal)) - .timeout(TIMEOUT) - .await - .expect("10ms is more than enough for sending signals."); + state } async fn overseer_send( overseer: &mut test_helpers::TestSubsystemContextHandle, - msg: AvailabilityDistributionMessage, + msg: impl Into, ) { + let msg = msg.into(); tracing::trace!(msg = ?msg, "sending message"); - overseer - .send(FromOverseer::Communication { msg }) - .timeout(TIMEOUT) - .await - .expect("10ms is more than enough for sending messages."); + overseer.send(FromOverseer::Communication { msg }).await } async fn overseer_recv( overseer: &mut test_helpers::TestSubsystemContextHandle, ) -> AllMessages { tracing::trace!("waiting for message ..."); - let msg = overseer - .recv() - .timeout(TIMEOUT) - .await - .expect("TIMEOUT is enough to recv."); + let msg = overseer.recv().await; tracing::trace!(msg = ?msg, "received message"); msg } @@ -138,7 +114,6 @@ struct TestState { chain_ids: Vec, validators: Vec, validator_public: Vec, - validator_index: Option, validator_groups: (Vec>, GroupRotationInfo), head_data: HashMap, keystore: SyncCryptoStorePtr, @@ -146,6 +121,8 @@ struct TestState { ancestors: Vec, availability_cores: Vec, persisted_validation_data: PersistedValidationData, + candidates: Vec, + pov_blocks: Vec, } fn validator_pubkeys(val_ids: &[Sr25519Keyring]) -> Vec { @@ -216,7 +193,36 @@ impl Default for TestState { max_pov_size: 1024, }; - let validator_index = Some((validators.len() - 1) as ValidatorIndex); + let pov_block_a = PoV { + block_data: BlockData(vec![42, 43, 44]), + }; + + let pov_block_b = PoV { + block_data: BlockData(vec![45, 46, 47]), + }; + + let candidates = vec![ + TestCandidateBuilder { + para_id: chain_ids[0], + relay_parent: relay_parent, + pov_hash: pov_block_a.hash(), + erasure_root: make_erasure_root(persisted_validation_data.clone(), validators.len(), pov_block_a.clone()), + head_data: head_data.get(&chain_ids[0]).unwrap().clone(), + ..Default::default() + } + .build(), + TestCandidateBuilder { + para_id: chain_ids[1], + relay_parent: relay_parent, + pov_hash: pov_block_b.hash(), + erasure_root: make_erasure_root(persisted_validation_data.clone(), validators.len(), pov_block_b.clone()), + head_data: head_data.get(&chain_ids[1]).unwrap().clone(), + ..Default::default() + } + .build(), + ]; + + let pov_blocks = vec![pov_block_a, pov_block_b]; Self { chain_ids, @@ -229,34 +235,42 @@ impl Default for TestState { persisted_validation_data, relay_parent, ancestors, - validator_index, + candidates, + pov_blocks, } } } -fn make_available_data(test: &TestState, pov: PoV) -> AvailableData { +fn make_available_data(validation_data: PersistedValidationData, pov: PoV) -> AvailableData { AvailableData { - validation_data: test.persisted_validation_data.clone(), + validation_data, pov: Arc::new(pov), } } -fn make_erasure_root(test: &TestState, pov: PoV) -> Hash { - let available_data = make_available_data(test, pov); +fn make_erasure_root(peristed: PersistedValidationData, validator_count: usize, pov: PoV) -> Hash { + let available_data = make_available_data(peristed, pov); - let chunks = obtain_chunks(test.validators.len(), &available_data).unwrap(); + let chunks = obtain_chunks(validator_count, &available_data).unwrap(); branches(&chunks).root() } +fn make_erasure_chunks(peristed: PersistedValidationData, validator_count: usize, pov: PoV) -> Vec { + let available_data = make_available_data(peristed, pov); + + derive_erasure_chunks_with_proofs(validator_count, &available_data) +} + fn make_valid_availability_gossip( test: &TestState, - candidate_hash: CandidateHash, + candidate: usize, erasure_chunk_index: u32, - pov: PoV, ) -> AvailabilityGossipMessage { - let available_data = make_available_data(test, pov); - - let erasure_chunks = derive_erasure_chunks_with_proofs(test.validators.len(), &available_data); + let erasure_chunks = make_erasure_chunks( + test.persisted_validation_data.clone(), + test.validator_public.len(), + test.pov_blocks[candidate].clone(), + ); let erasure_chunk: ErasureChunk = erasure_chunks .get(erasure_chunk_index as usize) @@ -264,7 +278,7 @@ fn make_valid_availability_gossip( .clone(); AvailabilityGossipMessage { - candidate_hash, + candidate_hash: test.candidates[candidate].hash(), erasure_chunk, } } @@ -300,25 +314,13 @@ impl TestCandidateBuilder { fn helper_integrity() { let test_state = TestState::default(); - let pov_block = PoV { - block_data: BlockData(vec![42, 43, 44]), - }; + let message = make_valid_availability_gossip( + &test_state, + 0, + 2, + ); - let pov_hash = pov_block.hash(); - - let candidate = TestCandidateBuilder { - para_id: test_state.chain_ids[0], - relay_parent: test_state.relay_parent, - pov_hash, - erasure_root: make_erasure_root(&test_state, pov_block.clone()), - ..Default::default() - } - .build(); - - let message = - make_valid_availability_gossip(&test_state, candidate.hash(), 2, pov_block.clone()); - - let root = dbg!(&candidate.descriptor.erasure_root); + let root = &test_state.candidates[0].descriptor.erasure_root; let anticipated_hash = branch_hash( root, @@ -353,652 +355,574 @@ fn derive_erasure_chunks_with_proofs( erasure_chunks } -#[test] -fn reputation_verification() { - let test_state = TestState::default(); - - test_harness(test_state.keystore.clone(), |test_harness| async move { - let TestHarness { - mut virtual_overseer, - } = test_harness; - - let expected_head_data = test_state.head_data.get(&test_state.chain_ids[0]).unwrap(); - - let pov_block_a = PoV { - block_data: BlockData(vec![42, 43, 44]), - }; - - let pov_block_b = PoV { - block_data: BlockData(vec![45, 46, 47]), - }; - - let pov_block_c = PoV { - block_data: BlockData(vec![48, 49, 50]), - }; - - let pov_hash_a = pov_block_a.hash(); - let pov_hash_b = pov_block_b.hash(); - let pov_hash_c = pov_block_c.hash(); - - let candidates = vec![ - TestCandidateBuilder { - para_id: test_state.chain_ids[0], - relay_parent: test_state.relay_parent, - pov_hash: pov_hash_a, - erasure_root: make_erasure_root(&test_state, pov_block_a.clone()), - ..Default::default() - } - .build(), - TestCandidateBuilder { - para_id: test_state.chain_ids[1], - relay_parent: test_state.relay_parent, - pov_hash: pov_hash_b, - erasure_root: make_erasure_root(&test_state, pov_block_b.clone()), - head_data: expected_head_data.clone(), - ..Default::default() - } - .build(), - TestCandidateBuilder { - para_id: test_state.chain_ids[1], - relay_parent: Hash::repeat_byte(0xFA), - pov_hash: pov_hash_c, - erasure_root: make_erasure_root(&test_state, pov_block_c.clone()), - head_data: test_state - .head_data - .get(&test_state.chain_ids[1]) - .unwrap() - .clone(), - ..Default::default() - } - .build(), - ]; - - let TestState { - chain_ids, - keystore: _, - validators: _, - validator_public, - validator_groups, - availability_cores, - head_data: _, - persisted_validation_data: _, - relay_parent: current, - ancestors, - validator_index: _, - } = test_state.clone(); - - let _ = validator_groups; - let _ = availability_cores; - - let peer_a = PeerId::random(); - let peer_b = PeerId::random(); - assert_ne!(&peer_a, &peer_b); - - tracing::trace!("peer A: {:?}", peer_a); - tracing::trace!("peer B: {:?}", peer_b); - - tracing::trace!("candidate A: {:?}", candidates[0].hash()); - tracing::trace!("candidate B: {:?}", candidates[1].hash()); - - overseer_signal( - &mut virtual_overseer, - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: smallvec![current.clone()], - deactivated: smallvec![], - }), - ) - .await; - - overseer_send( - &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::OurViewChange(view![current,]), - ), - ) - .await; - - // obtain the validators per relay parent +async fn expect_chunks_network_message( + virtual_overseer: &mut test_helpers::TestSubsystemContextHandle, + peers: &[PeerId], + candidates: &[CandidateHash], + chunks: &[ErasureChunk], +) { + for _ in 0..chunks.len() { assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::Validators(tx), - )) => { - assert_eq!(relay_parent, current); - tx.send(Ok(validator_public.clone())).unwrap(); + overseer_recv(virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::SendValidationMessage( + send_peers, + protocol_v1::ValidationProtocol::AvailabilityDistribution( + protocol_v1::AvailabilityDistributionMessage::Chunk(send_candidate, send_chunk), + ), + ) + ) => { + assert!(candidates.contains(&send_candidate), format!("Could not find candidate: {:?}", send_candidate)); + assert!(chunks.iter().any(|c| c == &send_chunk), format!("Could not find chunk: {:?}", send_chunk)); + assert_eq!(peers.len(), send_peers.len()); + assert!(peers.iter().all(|p| send_peers.contains(p))); } ); + } +} - let genesis = Hash::repeat_byte(0xAA); - // query of k ancestors, we only provide one - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::ChainApi(ChainApiMessage::Ancestors { - hash: relay_parent, - k, - response_channel: tx, - }) => { - assert_eq!(relay_parent, current); - assert_eq!(k, AvailabilityDistributionSubsystem::K + 1); - // 0xAA..AA will not be included, since there is no mean to determine - // its session index - tx.send(Ok(vec![ancestors[0].clone(), genesis])).unwrap(); - } - ); +async fn change_our_view( + virtual_overseer: &mut test_helpers::TestSubsystemContextHandle, + view: View, + validator_public: &[ValidatorId], + ancestors: Vec, + session_per_relay_parent: HashMap, + availability_cores_per_relay_parent: HashMap>, + candidate_pending_availabilities_per_relay_parent: HashMap>, + data_availability: HashMap, + chunk_data_per_candidate: HashMap, + send_chunks_to: HashMap>, +) { + overseer_send(virtual_overseer, NetworkBridgeEvent::OurViewChange(view.clone())).await; - // state query for each of them + // obtain the validators per relay parent + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Validators(tx), + )) => { + assert!(view.contains(&relay_parent)); + tx.send(Ok(validator_public.to_vec())).unwrap(); + } + ); + + // query of k ancestors, we only provide one + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::ChainApi(ChainApiMessage::Ancestors { + hash: relay_parent, + k, + response_channel: tx, + }) => { + assert!(view.contains(&relay_parent)); + assert_eq!(k, AvailabilityDistributionSubsystem::K + 1); + tx.send(Ok(ancestors.clone())).unwrap(); + } + ); + + for _ in 0..session_per_relay_parent.len() { assert_matches!( - overseer_recv(&mut virtual_overseer).await, + overseer_recv(virtual_overseer).await, AllMessages::RuntimeApi(RuntimeApiMessage::Request( relay_parent, RuntimeApiRequest::SessionIndexForChild(tx) )) => { - assert_eq!(relay_parent, current); - tx.send(Ok(1 as SessionIndex)).unwrap(); + let index = session_per_relay_parent.get(&relay_parent) + .expect(&format!("Session index for relay parent {:?} does not exist", relay_parent)); + tx.send(Ok(*index)).unwrap(); } ); + } - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::SessionIndexForChild(tx) - )) => { - assert_eq!(relay_parent, genesis); - tx.send(Ok(1 as SessionIndex)).unwrap(); - } - ); - - // subsystem peer id collection - // which will query the availability cores - assert_matches!( - overseer_recv(&mut virtual_overseer).await, + for _ in 0..availability_cores_per_relay_parent.len() { + let relay_parent = assert_matches!( + overseer_recv(virtual_overseer).await, AllMessages::RuntimeApi(RuntimeApiMessage::Request( relay_parent, RuntimeApiRequest::AvailabilityCores(tx) )) => { - assert_eq!(relay_parent, ancestors[0]); - // respond with a set of availability core states - tx.send(Ok(vec![ - dummy_occupied_core(chain_ids[0]), - dummy_occupied_core(chain_ids[1]) - ])).unwrap(); + let cores = availability_cores_per_relay_parent.get(&relay_parent) + .expect(&format!("Availability core for relay parent {:?} does not exist", relay_parent)); + + tx.send(Ok(cores.clone())).unwrap(); + relay_parent } ); - // now each of the relay parents in the view (1) will - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::CandidatePendingAvailability(para, tx) - )) => { - assert_eq!(relay_parent, ancestors[0]); - assert_eq!(para, chain_ids[0]); - tx.send(Ok(Some( - candidates[0].clone() - ))).unwrap(); - } - ); + let pending_availability = candidate_pending_availabilities_per_relay_parent.get(&relay_parent) + .expect(&format!("Candidate pending availability for relay parent {:?} does not exist", relay_parent)); - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::CandidatePendingAvailability(para, tx) - )) => { - assert_eq!(relay_parent, ancestors[0]); - assert_eq!(para, chain_ids[1]); - tx.send(Ok(Some( - candidates[1].clone() - ))).unwrap(); - } - ); - - for _ in 0usize..1 { + for _ in 0..pending_availability.len() { assert_matches!( - overseer_recv(&mut virtual_overseer).await, + overseer_recv(virtual_overseer).await, AllMessages::RuntimeApi(RuntimeApiMessage::Request( - _relay_parent, - RuntimeApiRequest::AvailabilityCores(tx), + hash, + RuntimeApiRequest::CandidatePendingAvailability(para, tx) )) => { - tx.send(Ok(vec![ - CoreState::Occupied(OccupiedCore { - para_id: chain_ids[0].clone(), - next_up_on_available: None, - occupied_since: 0, - time_out_at: 10, - next_up_on_time_out: None, - availability: Default::default(), - group_responsible: GroupIndex::from(0), - }), - CoreState::Free, - CoreState::Free, - CoreState::Occupied(OccupiedCore { - para_id: chain_ids[1].clone(), - next_up_on_available: None, - occupied_since: 1, - time_out_at: 7, - next_up_on_time_out: None, - availability: Default::default(), - group_responsible: GroupIndex::from(0), - }), - CoreState::Free, - CoreState::Free, - ])).unwrap(); - } - ); + assert_eq!(relay_parent, hash); - // query the availability cores for each of the paras (2) - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi( - RuntimeApiMessage::Request( - _relay_parent, - RuntimeApiRequest::CandidatePendingAvailability(para, tx), - ) - ) => { - assert_eq!(para, chain_ids[0]); - tx.send(Ok(Some( - candidates[0].clone() - ))).unwrap(); - } - ); + let candidate = pending_availability.iter() + .find(|c| c.descriptor.para_id == para) + .expect(&format!("Pending candidate for para {} does not exist", para)); - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - _relay_parent, - RuntimeApiRequest::CandidatePendingAvailability(para, tx), - )) => { - assert_eq!(para, chain_ids[1]); - tx.send(Ok(Some( - candidates[1].clone() - ))).unwrap(); + tx.send(Ok(Some(candidate.clone()))).unwrap(); } ); } + } - let mut candidates2 = candidates.clone(); - // check if the availability store can provide the desired erasure chunks - for i in 0usize..2 { - tracing::trace!("0000"); - let avail_data = make_available_data(&test_state, pov_block_a.clone()); - let chunks = - derive_erasure_chunks_with_proofs(test_state.validators.len(), &avail_data); + for _ in 0..data_availability.len() { + let (available, candidate_hash) = assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::AvailabilityStore( + AvailabilityStoreMessage::QueryDataAvailability( + candidate_hash, + tx, + ) + ) => { + let available = data_availability.get(&candidate_hash) + .expect(&format!("No data availability for candidate {:?}", candidate_hash)); - let expected; - // store the chunk to the av store - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::AvailabilityStore( - AvailabilityStoreMessage::QueryDataAvailability( - candidate_hash, - tx, - ) - ) => { - let index = candidates2.iter().enumerate().find(|x| { x.1.hash() == candidate_hash }).map(|x| x.0).unwrap(); - expected = candidates2.swap_remove(index).hash(); - tx.send(i == 0).unwrap(); - } - ); + tx.send(*available).unwrap(); + (available, candidate_hash) + } + ); - assert_eq!(chunks.len(), test_state.validators.len()); + if !available { + continue; + } - tracing::trace!("xxxx"); - // retrieve a stored chunk - for (j, chunk) in chunks.into_iter().enumerate() { - tracing::trace!("yyyy i={}, j={}", i, j); - if i != 0 { - // not a validator, so this never happens - break; - } - assert_matches!( - overseer_recv(&mut virtual_overseer).await, + if let Some((pov, persisted)) = chunk_data_per_candidate.get(&candidate_hash) { + let chunks = make_erasure_chunks(persisted.clone(), validator_public.len(), pov.clone()); + + for _ in 0..chunks.len() { + let chunk = assert_matches!( + overseer_recv(virtual_overseer).await, AllMessages::AvailabilityStore( AvailabilityStoreMessage::QueryChunk( candidate_hash, - idx, + index, tx, ) ) => { - assert_eq!(candidate_hash, expected); - assert_eq!(j as u32, chunk.index); - assert_eq!(idx, j as u32); - tx.send( - Some(chunk.clone()) - ).unwrap(); + tracing::trace!("Query chunk {} for candidate {:?}", index, candidate_hash); + let chunk = chunks[index as usize].clone(); + tx.send(Some(chunk.clone())).unwrap(); + chunk } ); + + if let Some(peers) = send_chunks_to.get(&candidate_hash) { + expect_chunks_network_message(virtual_overseer, &peers, &[candidate_hash], &[chunk]).await; + } } } - // setup peer a with interest in current - overseer_send( - &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full), - ), - ) - .await; + } +} - overseer_send( +async fn setup_peer_with_view( + virtual_overseer: &mut test_helpers::TestSubsystemContextHandle, + peer: PeerId, + view: View, +) { + overseer_send(virtual_overseer, NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full)).await; + + overseer_send(virtual_overseer, NetworkBridgeEvent::PeerViewChange(peer, view)).await; +} + +async fn peer_send_message( + virtual_overseer: &mut test_helpers::TestSubsystemContextHandle, + peer: PeerId, + message: AvailabilityGossipMessage, + expected_reputation_change: Rep, +) { + overseer_send(virtual_overseer, NetworkBridgeEvent::PeerMessage(peer.clone(), chunk_protocol_message(message))).await; + + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer( + rep_peer, + rep, + ) + ) => { + assert_eq!(peer, rep_peer); + assert_eq!(expected_reputation_change, rep); + } + ); +} + +#[test] +fn check_views() { + let test_state = TestState::default(); + + let peer_a = PeerId::random(); + let peer_a_2 = peer_a.clone(); + let peer_b = PeerId::random(); + let peer_b_2 = peer_b.clone(); + assert_ne!(&peer_a, &peer_b); + + let keystore = test_state.keystore.clone(); + let current = test_state.relay_parent; + let ancestors = test_state.ancestors.clone(); + + let state = test_harness(keystore, move |test_harness| async move { + let mut virtual_overseer = test_harness.virtual_overseer; + + let TestState { + chain_ids, + validator_public, + relay_parent: current, + ancestors, + candidates, + pov_blocks, + .. + } = test_state.clone(); + + let genesis = Hash::repeat_byte(0xAA); + change_our_view( &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![current]), - ), - ) - .await; + view![current], + &validator_public, + vec![ancestors[0], genesis], + hashmap! { current => 1, genesis => 1 }, + hashmap! { + ancestors[0] => vec![dummy_occupied_core(chain_ids[0]), dummy_occupied_core(chain_ids[1])], + current => vec![ + CoreState::Occupied(OccupiedCore { + para_id: chain_ids[0].clone(), + next_up_on_available: None, + occupied_since: 0, + time_out_at: 10, + next_up_on_time_out: None, + availability: Default::default(), + group_responsible: GroupIndex::from(0), + }), + CoreState::Free, + CoreState::Free, + CoreState::Occupied(OccupiedCore { + para_id: chain_ids[1].clone(), + next_up_on_available: None, + occupied_since: 1, + time_out_at: 7, + next_up_on_time_out: None, + availability: Default::default(), + group_responsible: GroupIndex::from(0), + }), + CoreState::Free, + CoreState::Free, + ] + }, + hashmap! { + ancestors[0] => vec![candidates[0].clone(), candidates[1].clone()], + current => vec![candidates[0].clone(), candidates[1].clone()], + }, + hashmap! { + candidates[0].hash() => true, + candidates[1].hash() => false, + }, + hashmap! { + candidates[0].hash() => (pov_blocks[0].clone(), test_state.persisted_validation_data.clone()), + }, + hashmap! {}, + ).await; + + // setup peer a with interest in current + setup_peer_with_view(&mut virtual_overseer, peer_a.clone(), view![current]).await; // setup peer b with interest in ancestor - overseer_send( - &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full), - ), - ) - .await; + setup_peer_with_view(&mut virtual_overseer, peer_b.clone(), view![ancestors[0]]).await; + }); - overseer_send( - &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![ancestors[0]]), - ), - ) - .await; + assert_matches! { + state, + ProtocolState { + peer_views, + view, + .. + } => { + assert_eq!( + peer_views, + hashmap! { + peer_a_2 => view![current], + peer_b_2 => view![ancestors[0]], + }, + ); + assert_eq!(view, view![current]); + } + }; +} - delay!(100); +#[test] +fn reputation_verification() { + let test_state = TestState::default(); - let valid: AvailabilityGossipMessage = make_valid_availability_gossip( + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + assert_ne!(&peer_a, &peer_b); + + let keystore = test_state.keystore.clone(); + + test_harness(keystore, move |test_harness| async move { + let mut virtual_overseer = test_harness.virtual_overseer; + + let TestState { + relay_parent: current, + validator_public, + ancestors, + candidates, + pov_blocks, + .. + } = test_state.clone(); + + let valid = make_valid_availability_gossip( &test_state, - candidates[0].hash(), + 0, 2, - pov_block_a.clone(), ); - { - // valid (first, from b) - overseer_send( - &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_b.clone(), - chunk_protocol_message(valid.clone()), - ), - ), - ) - .await; - - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer( - peer, - rep - ) - ) => { - assert_eq!(peer, peer_b); - assert_eq!(rep, BENEFIT_VALID_MESSAGE_FIRST); - } - ); - } - - { - // valid (duplicate, from b) - overseer_send( - &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_b.clone(), - chunk_protocol_message(valid.clone()), - ), - ), - ) - .await; - - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendValidationMessage( - peers, - protocol_v1::ValidationProtocol::AvailabilityDistribution( - protocol_v1::AvailabilityDistributionMessage::Chunk(hash, chunk), - ), - ) - ) => { - assert_eq!(1, peers.len()); - assert_eq!(peers[0], peer_a); - assert_eq!(candidates[0].hash(), hash); - assert_eq!(valid.erasure_chunk, chunk); - } - ); - - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer( - peer, - rep - ) - ) => { - assert_eq!(peer, peer_b); - assert_eq!(rep, COST_PEER_DUPLICATE_MESSAGE); - } - ); - } - - { - // valid (second, from a) - overseer_send( - &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_a.clone(), - chunk_protocol_message(valid.clone()), - ), - ), - ) - .await; - - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer( - peer, - rep - ) - ) => { - assert_eq!(peer, peer_a); - assert_eq!(rep, BENEFIT_VALID_MESSAGE); - } - ); - } - - // peer a is not interested in anything anymore - overseer_send( + change_our_view( &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![]), - ), - ) - .await; + view![current], + &validator_public, + vec![ancestors[0]], + hashmap! { current => 1 }, + hashmap! { + current => vec![ + dummy_occupied_core(candidates[0].descriptor.para_id), + dummy_occupied_core(candidates[1].descriptor.para_id) + ], + }, + hashmap! { current => vec![candidates[0].clone(), candidates[1].clone()] }, + hashmap! { candidates[0].hash() => true, candidates[1].hash() => false }, + hashmap! { candidates[0].hash() => (pov_blocks[0].clone(), test_state.persisted_validation_data.clone())}, + hashmap! {}, + ).await; - { - // send the a message again, so we should detect the duplicate - overseer_send( - &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_a.clone(), - chunk_protocol_message(valid.clone()), - ), - ), - ) - .await; + // valid (first, from b) + peer_send_message(&mut virtual_overseer, peer_b.clone(), valid.clone(), BENEFIT_VALID_MESSAGE).await; - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer( - peer, - rep - ) - ) => { - assert_eq!(peer, peer_a); - assert_eq!(rep, COST_PEER_DUPLICATE_MESSAGE); - } - ); - } + // valid (duplicate, from b) + peer_send_message(&mut virtual_overseer, peer_b.clone(), valid.clone(), COST_PEER_DUPLICATE_MESSAGE).await; + + // valid (second, from a) + peer_send_message(&mut virtual_overseer, peer_a.clone(), valid.clone(), BENEFIT_VALID_MESSAGE).await; + + // send the a message again, so we should detect the duplicate + peer_send_message(&mut virtual_overseer, peer_a.clone(), valid.clone(), COST_PEER_DUPLICATE_MESSAGE).await; // peer b sends a message before we have the view // setup peer a with interest in parent x - overseer_send( - &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerDisconnected(peer_b.clone()), - ), - ) - .await; + overseer_send(&mut virtual_overseer, NetworkBridgeEvent::PeerDisconnected(peer_b.clone())).await; - delay!(10); - - overseer_send( - &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full), - ), - ) - .await; + overseer_send(&mut virtual_overseer, NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full)).await; { // send another message - let valid2 = make_valid_availability_gossip( - &test_state, - candidates[2].hash(), - 1, - pov_block_c.clone(), - ); - - // send the a message before we send a view update - overseer_send( - &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage(peer_a.clone(), chunk_protocol_message(valid2)), - ), - ) - .await; - - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer( - peer, - rep - ) - ) => { - assert_eq!(peer, peer_a); - assert_eq!(rep, COST_NOT_A_LIVE_CANDIDATE); - } - ); - } - - { - // send another message - let valid = make_valid_availability_gossip( - &test_state, - candidates[1].hash(), - 2, - pov_block_b.clone(), - ); + let valid = make_valid_availability_gossip(&test_state, 1, 2); // Make peer a and b listen on `current` - overseer_send( - &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![current]), - ), - ) - .await; + overseer_send(&mut virtual_overseer, NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![current])).await; - overseer_send( - &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![current]), - ), - ) - .await; - - overseer_send( - &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_a.clone(), - chunk_protocol_message(valid.clone()), - ), - ), - ) - .await; - - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer( - peer, - rep - ) - ) => { - assert_eq!(peer, peer_a); - assert_eq!(rep, BENEFIT_VALID_MESSAGE_FIRST); - } + let mut chunks = make_erasure_chunks( + test_state.persisted_validation_data.clone(), + validator_public.len(), + pov_blocks[0].clone(), ); - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendValidationMessage( - peers, - protocol_v1::ValidationProtocol::AvailabilityDistribution( - protocol_v1::AvailabilityDistributionMessage::Chunk(hash, chunk), - ), - ) - ) => { - assert_eq!(1, peers.len()); - assert_eq!(peers[0], peer_b); - assert_eq!(candidates[1].hash(), hash); - assert_eq!(valid.erasure_chunk, chunk); - } - ); + // Both peers send us this chunk already + chunks.remove(2); + + expect_chunks_network_message(&mut virtual_overseer, &[peer_a.clone()], &[candidates[0].hash()], &chunks).await; + + overseer_send(&mut virtual_overseer, NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![current])).await; + + expect_chunks_network_message(&mut virtual_overseer, &[peer_b.clone()], &[candidates[0].hash()], &chunks).await; + + peer_send_message(&mut virtual_overseer, peer_a.clone(), valid.clone(), BENEFIT_VALID_MESSAGE_FIRST).await; + + expect_chunks_network_message( + &mut virtual_overseer, + &[peer_b.clone()], + &[candidates[1].hash()], + &[valid.erasure_chunk.clone()], + ).await; // Let B send the same message - overseer_send( - &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_b.clone(), - chunk_protocol_message(valid.clone()), - ), - ), - ) - .await; - - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer( - peer, - rep - ) - ) => { - assert_eq!(peer, peer_b); - assert_eq!(rep, BENEFIT_VALID_MESSAGE); - } - ); - - // There shouldn't be any other message. - assert!(virtual_overseer.recv().timeout(TIMEOUT).await.is_none()); + peer_send_message(&mut virtual_overseer, peer_b.clone(), valid.clone(), BENEFIT_VALID_MESSAGE).await; } }); } +#[test] +fn not_a_live_candidate_is_detected() { + let test_state = TestState::default(); + + let peer_a = PeerId::random(); + + let keystore = test_state.keystore.clone(); + + test_harness(keystore, move |test_harness| async move { + let mut virtual_overseer = test_harness.virtual_overseer; + + let TestState { + relay_parent: current, + validator_public, + ancestors, + candidates, + pov_blocks, + .. + } = test_state.clone(); + + change_our_view( + &mut virtual_overseer, + view![current], + &validator_public, + vec![ancestors[0]], + hashmap! { current => 1 }, + hashmap! { + current => vec![ + dummy_occupied_core(candidates[0].descriptor.para_id), + ], + }, + hashmap! { current => vec![candidates[0].clone()] }, + hashmap! { candidates[0].hash() => true }, + hashmap! { candidates[0].hash() => (pov_blocks[0].clone(), test_state.persisted_validation_data.clone())}, + hashmap! {}, + ).await; + + let valid = make_valid_availability_gossip( + &test_state, + 1, + 1, + ); + + peer_send_message(&mut virtual_overseer, peer_a.clone(), valid.clone(), COST_NOT_A_LIVE_CANDIDATE).await; + }); +} + +#[test] +fn peer_change_view_before_us() { + let test_state = TestState::default(); + + let peer_a = PeerId::random(); + + let keystore = test_state.keystore.clone(); + + test_harness(keystore, move |test_harness| async move { + let mut virtual_overseer = test_harness.virtual_overseer; + + let TestState { + relay_parent: current, + validator_public, + ancestors, + candidates, + pov_blocks, + .. + } = test_state.clone(); + + setup_peer_with_view(&mut virtual_overseer, peer_a.clone(), view![current]).await; + + change_our_view( + &mut virtual_overseer, + view![current], + &validator_public, + vec![ancestors[0]], + hashmap! { current => 1 }, + hashmap! { + current => vec![ + dummy_occupied_core(candidates[0].descriptor.para_id), + ], + }, + hashmap! { current => vec![candidates[0].clone()] }, + hashmap! { candidates[0].hash() => true }, + hashmap! { candidates[0].hash() => (pov_blocks[0].clone(), test_state.persisted_validation_data.clone())}, + hashmap! { candidates[0].hash() => vec![peer_a.clone()] }, + ).await; + + let valid = make_valid_availability_gossip( + &test_state, + 0, + 0, + ); + + // We send peer a all the chunks of candidate0, so we just benefit him for sending a valid message + peer_send_message(&mut virtual_overseer, peer_a.clone(), valid.clone(), BENEFIT_VALID_MESSAGE).await; + }); +} + +#[test] +fn candidate_chunks_are_put_into_message_vault_when_candidate_is_first_seen() { + let test_state = TestState::default(); + + let peer_a = PeerId::random(); + + let keystore = test_state.keystore.clone(); + + test_harness(keystore, move |test_harness| async move { + let mut virtual_overseer = test_harness.virtual_overseer; + + let TestState { + relay_parent: current, + validator_public, + ancestors, + candidates, + pov_blocks, + .. + } = test_state.clone(); + + change_our_view( + &mut virtual_overseer, + view![ancestors[0]], + &validator_public, + vec![ancestors[1]], + hashmap! { ancestors[0] => 1 }, + hashmap! { + ancestors[0] => vec![ + dummy_occupied_core(candidates[0].descriptor.para_id), + ], + }, + hashmap! { ancestors[0] => vec![candidates[0].clone()] }, + hashmap! { candidates[0].hash() => true }, + hashmap! { candidates[0].hash() => (pov_blocks[0].clone(), test_state.persisted_validation_data.clone())}, + hashmap! {}, + ).await; + + change_our_view( + &mut virtual_overseer, + view![current], + &validator_public, + vec![ancestors[0]], + hashmap! { current => 1 }, + hashmap! { + current => vec![ + dummy_occupied_core(candidates[0].descriptor.para_id), + ], + }, + hashmap! { current => vec![candidates[0].clone()] }, + hashmap! { candidates[0].hash() => true }, + hashmap! {}, + hashmap! {}, + ).await; + + // Let peera connect, we should send him all the chunks of the candidate + setup_peer_with_view(&mut virtual_overseer, peer_a.clone(), view![current]).await; + + let chunks = make_erasure_chunks( + test_state.persisted_validation_data.clone(), + validator_public.len(), + pov_blocks[0].clone(), + ); + expect_chunks_network_message( + &mut virtual_overseer, + &[peer_a], + &[candidates[0].hash()], + &chunks, + ).await; + }); +} + #[test] fn k_ancestors_in_session() { let pool = sp_core::testing::TaskExecutor::new(); @@ -1076,3 +1000,289 @@ fn k_ancestors_in_session() { executor::block_on(future::join(test_fut, sut).timeout(Duration::from_millis(1000))); } + +#[test] +fn clean_up_receipts_cache_unions_ancestors_and_view() { + let mut state = ProtocolState::default(); + + let hash_a = [0u8; 32].into(); + let hash_b = [1u8; 32].into(); + let hash_c = [2u8; 32].into(); + let hash_d = [3u8; 32].into(); + + state.receipts.insert(hash_a, HashSet::new()); + state.receipts.insert(hash_b, HashSet::new()); + state.receipts.insert(hash_c, HashSet::new()); + state.receipts.insert(hash_d, HashSet::new()); + + state.per_relay_parent.insert(hash_a, PerRelayParent { + ancestors: vec![hash_b], + live_candidates: HashSet::new(), + }); + + state.per_relay_parent.insert(hash_c, PerRelayParent::default()); + + state.clean_up_receipts_cache(); + + assert_eq!(state.receipts.len(), 3); + assert!(state.receipts.contains_key(&hash_a)); + assert!(state.receipts.contains_key(&hash_b)); + assert!(state.receipts.contains_key(&hash_c)); + assert!(!state.receipts.contains_key(&hash_d)); +} + +#[test] +fn remove_relay_parent_only_removes_per_candidate_if_final() { + let mut state = ProtocolState::default(); + + let hash_a = [0u8; 32].into(); + let hash_b = [1u8; 32].into(); + + let candidate_hash_a = CandidateHash([46u8; 32].into()); + + state.per_relay_parent.insert(hash_a, PerRelayParent { + ancestors: vec![], + live_candidates: std::iter::once(candidate_hash_a).collect(), + }); + + state.per_relay_parent.insert(hash_b, PerRelayParent { + ancestors: vec![], + live_candidates: std::iter::once(candidate_hash_a).collect(), + }); + + state.per_candidate.insert(candidate_hash_a, PerCandidate { + live_in: vec![hash_a, hash_b].into_iter().collect(), + ..Default::default() + }); + + state.remove_relay_parent(&hash_a); + + assert!(!state.per_relay_parent.contains_key(&hash_a)); + assert!(!state.per_candidate.get(&candidate_hash_a).unwrap().live_in.contains(&hash_a)); + assert!(state.per_candidate.get(&candidate_hash_a).unwrap().live_in.contains(&hash_b)); + + state.remove_relay_parent(&hash_b); + + assert!(!state.per_relay_parent.contains_key(&hash_b)); + assert!(!state.per_candidate.contains_key(&candidate_hash_a)); +} + +#[test] +fn add_relay_parent_includes_all_live_candidates() { + let relay_parent = [0u8; 32].into(); + + let mut state = ProtocolState::default(); + + let ancestor_a = [1u8; 32].into(); + + let candidate_hash_a = CandidateHash([10u8; 32].into()); + let candidate_hash_b = CandidateHash([11u8; 32].into()); + + let candidates = vec![ + (candidate_hash_a, FetchedLiveCandidate::Fresh(Default::default())), + (candidate_hash_b, FetchedLiveCandidate::Cached), + ].into_iter().collect(); + + state.add_relay_parent( + relay_parent, + Vec::new(), + None, + candidates, + vec![ancestor_a], + ); + + assert!( + state.per_candidate.get(&candidate_hash_a).unwrap().live_in.contains(&relay_parent) + ); + assert!( + state.per_candidate.get(&candidate_hash_b).unwrap().live_in.contains(&relay_parent) + ); + + let per_relay_parent = state.per_relay_parent.get(&relay_parent).unwrap(); + + assert!(per_relay_parent.live_candidates.contains(&candidate_hash_a)); + assert!(per_relay_parent.live_candidates.contains(&candidate_hash_b)); +} + +#[test] +fn query_pending_availability_at_pulls_from_and_updates_receipts() { + let hash_a = [0u8; 32].into(); + let hash_b = [1u8; 32].into(); + + let para_a = ParaId::from(1); + let para_b = ParaId::from(2); + let para_c = ParaId::from(3); + + let make_candidate = |para_id| { + let mut candidate = CommittedCandidateReceipt::default(); + candidate.descriptor.para_id = para_id; + candidate.descriptor.relay_parent = [69u8; 32].into(); + candidate + }; + + let candidate_a = make_candidate(para_a); + let candidate_b = make_candidate(para_b); + let candidate_c = make_candidate(para_c); + + let candidate_hash_a = candidate_a.hash(); + let candidate_hash_b = candidate_b.hash(); + let candidate_hash_c = candidate_c.hash(); + + // receipts has an initial entry for hash_a but not hash_b. + let mut receipts = HashMap::new(); + receipts.insert(hash_a, vec![candidate_hash_a, candidate_hash_b].into_iter().collect()); + + let pool = sp_core::testing::TaskExecutor::new(); + + let (mut ctx, mut virtual_overseer) = + test_helpers::make_subsystem_context::(pool); + + let test_fut = async move { + let live_candidates = query_pending_availability_at( + &mut ctx, + vec![hash_a, hash_b], + &mut receipts, + ).await.unwrap(); + + // although 'b' is cached from the perspective of hash_a, it gets overwritten when we query what's happening in + // + assert_eq!(live_candidates.len(), 3); + assert_matches!(live_candidates.get(&candidate_hash_a).unwrap(), FetchedLiveCandidate::Cached); + assert_matches!(live_candidates.get(&candidate_hash_b).unwrap(), FetchedLiveCandidate::Cached); + assert_matches!(live_candidates.get(&candidate_hash_c).unwrap(), FetchedLiveCandidate::Fresh(_)); + + assert!(receipts.get(&hash_b).unwrap().contains(&candidate_hash_b)); + assert!(receipts.get(&hash_b).unwrap().contains(&candidate_hash_c)); + }; + + let answer = async move { + // hash_a should be answered out of cache, so we should just have + // queried for hash_b. + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + r, + RuntimeApiRequest::AvailabilityCores(tx), + ) + ) if r == hash_b => { + let _ = tx.send(Ok(vec![ + CoreState::Occupied(OccupiedCore { + para_id: para_b, + next_up_on_available: None, + occupied_since: 0, + time_out_at: 0, + next_up_on_time_out: None, + availability: Default::default(), + group_responsible: GroupIndex::from(0), + }), + CoreState::Occupied(OccupiedCore { + para_id: para_c, + next_up_on_available: None, + occupied_since: 0, + time_out_at: 0, + next_up_on_time_out: None, + availability: Default::default(), + group_responsible: GroupIndex::from(0), + }), + ])); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + r, + RuntimeApiRequest::CandidatePendingAvailability(p, tx), + ) + ) if r == hash_b && p == para_b => { + let _ = tx.send(Ok(Some(candidate_b))); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + r, + RuntimeApiRequest::CandidatePendingAvailability(p, tx), + ) + ) if r == hash_b && p == para_c => { + let _ = tx.send(Ok(Some(candidate_c))); + } + ); + }; + + futures::pin_mut!(test_fut); + futures::pin_mut!(answer); + + executor::block_on(future::join(test_fut, answer)); +} + +#[test] +fn new_peer_gets_all_chunks_send() { + let test_state = TestState::default(); + + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + assert_ne!(&peer_a, &peer_b); + + let keystore = test_state.keystore.clone(); + + test_harness(keystore, move |test_harness| async move { + let mut virtual_overseer = test_harness.virtual_overseer; + + let TestState { + relay_parent: current, + validator_public, + ancestors, + candidates, + pov_blocks, + .. + } = test_state.clone(); + + let valid = make_valid_availability_gossip( + &test_state, + 1, + 2, + ); + + change_our_view( + &mut virtual_overseer, + view![current], + &validator_public, + vec![ancestors[0]], + hashmap! { current => 1 }, + hashmap! { + current => vec![ + dummy_occupied_core(candidates[0].descriptor.para_id), + dummy_occupied_core(candidates[1].descriptor.para_id) + ], + }, + hashmap! { current => vec![candidates[0].clone(), candidates[1].clone()] }, + hashmap! { candidates[0].hash() => true, candidates[1].hash() => false }, + hashmap! { candidates[0].hash() => (pov_blocks[0].clone(), test_state.persisted_validation_data.clone())}, + hashmap! {}, + ).await; + + peer_send_message(&mut virtual_overseer, peer_b.clone(), valid.clone(), BENEFIT_VALID_MESSAGE_FIRST).await; + + setup_peer_with_view(&mut virtual_overseer, peer_a.clone(), view![current]).await; + + let mut chunks = make_erasure_chunks( + test_state.persisted_validation_data.clone(), + validator_public.len(), + pov_blocks[0].clone(), + ); + + chunks.push(valid.erasure_chunk); + + expect_chunks_network_message( + &mut virtual_overseer, + &[peer_a], + &[candidates[0].hash(), candidates[1].hash()], + &chunks, + ).await; + }); +} diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs index 3b7ee32dda..bbd2bc37d7 100644 --- a/polkadot/node/subsystem/src/messages.rs +++ b/polkadot/node/subsystem/src/messages.rs @@ -231,7 +231,7 @@ impl NetworkBridgeMessage { } /// Availability Distribution Message. -#[derive(Debug)] +#[derive(Debug, derive_more::From)] pub enum AvailabilityDistributionMessage { /// Event from the network bridge. NetworkBridgeUpdateV1(NetworkBridgeEvent), diff --git a/polkadot/roadmap/implementers-guide/src/node/availability/availability-distribution.md b/polkadot/roadmap/implementers-guide/src/node/availability/availability-distribution.md index de34f3b8ed..5b1941bc71 100644 --- a/polkadot/roadmap/implementers-guide/src/node/availability/availability-distribution.md +++ b/polkadot/roadmap/implementers-guide/src/node/availability/availability-distribution.md @@ -23,7 +23,8 @@ Output: For each relay-parent in our local view update, look at all backed candidates pending availability. Distribute via gossip all erasure chunks for all candidates that we have to peers. -We define an operation `live_candidates(relay_heads) -> Set` which returns a set of [`CommittedCandidateReceipt`s](../../types/candidate.md#committed-candidate-receipt). +We define an operation `live_candidates(relay_heads) -> Set` which returns a set of hashes corresponding to [`CandidateReceipt`s](../../types/candidate.md#candidate-receipt). + This is defined as all candidates pending availability in any of those relay-chain heads or any of their last `K` ancestors in the same session. We assume that state is not pruned within `K` blocks of the chain-head. `K` commonly is small and is currently fixed to `K=3`. We will send any erasure-chunks that correspond to candidates in `live_candidates(peer_most_recent_view_update)`.