diff --git a/polkadot/erasure-coding/src/lib.rs b/polkadot/erasure-coding/src/lib.rs index 370c228e34..2b335a8168 100644 --- a/polkadot/erasure-coding/src/lib.rs +++ b/polkadot/erasure-coding/src/lib.rs @@ -119,12 +119,18 @@ impl CodeParams { .expect("this struct is not created with invalid shard number; qed") } } - -fn code_params(n_validators: usize) -> Result { +/// Returns the maximum number of allowed, faulty chunks +/// which does not prevent recovery given all other pieces +/// are correct. +const fn n_faulty(n_validators: usize) -> Result { if n_validators > MAX_VALIDATORS { return Err(Error::TooManyValidators) } if n_validators <= 1 { return Err(Error::NotEnoughValidators) } - let n_faulty = n_validators.saturating_sub(1) / 3; + Ok(n_validators.saturating_sub(1) / 3) +} + +fn code_params(n_validators: usize) -> Result { + let n_faulty = n_faulty(n_validators)?; let n_good = n_validators - n_faulty; Ok(CodeParams { @@ -133,6 +139,13 @@ fn code_params(n_validators: usize) -> Result { }) } +/// Obtain a threshold of chunks that should be enough to recover the data. +pub fn recovery_threshold(n_validators: usize) -> Result { + let n_faulty = n_faulty(n_validators)?; + + Ok(n_faulty + 1) +} + /// Obtain erasure-coded chunks for v0 `AvailableData`, one for each validator. /// /// Works only up to 65536 validators, and `n_validators` must be non-zero. diff --git a/polkadot/node/network/availability-distribution/src/lib.rs b/polkadot/node/network/availability-distribution/src/lib.rs index 7ce4afe2ad..bf67965ca0 100644 --- a/polkadot/node/network/availability-distribution/src/lib.rs +++ b/polkadot/node/network/availability-distribution/src/lib.rs @@ -121,7 +121,7 @@ impl From for protocol_v1::AvailabilityDistributionMe /// Data used to track information of peers and relay parents the /// overseer ordered us to work on. -#[derive(Default, Clone, Debug)] +#[derive(Debug, Default)] struct ProtocolState { /// Track all active peers and their views /// to determine what is relevant to them. @@ -142,7 +142,7 @@ struct ProtocolState { per_candidate: HashMap, } -#[derive(Debug, Clone, Default)] +#[derive(Debug)] struct PerCandidate { /// A Candidate and a set of known erasure chunks in form of messages to be gossiped / distributed if the peer view wants that. /// This is _across_ peers and not specific to a particular one. @@ -166,13 +166,30 @@ struct PerCandidate { /// The set of relay chain blocks this appears to be live in. live_in: HashSet, + + /// A Jaeger span relating to this candidate. + span: jaeger::JaegerSpan, } impl PerCandidate { /// Returns `true` iff the given `validator_index` is required by the given `peer`. - fn message_required_by_peer(&self, peer: &PeerId, validator_index: &ValidatorIndex) -> bool { - self.received_messages.get(peer).map(|v| !v.contains(validator_index)).unwrap_or(true) - && self.sent_messages.get(peer).map(|v| !v.contains(validator_index)).unwrap_or(true) + fn message_required_by_peer(&self, peer: &PeerId, validator_index: ValidatorIndex) -> bool { + self.received_messages.get(peer).map(|v| !v.contains(&validator_index)).unwrap_or(true) + && self.sent_messages.get(peer).map(|v| !v.contains(&validator_index)).unwrap_or(true) + } + + /// Add a chunk to the message vault. Overwrites anything that was already present. + fn add_message(&mut self, chunk_index: u32, message: AvailabilityGossipMessage) { + let _ = self.message_vault.insert(chunk_index, message); + } + + /// Clean up the span if we've got our own chunk. + fn drop_span_after_own_availability(&mut self) { + if let Some(validator_index) = self.validator_index { + if self.message_vault.contains_key(&validator_index) { + self.span = jaeger::JaegerSpan::Disabled; + } + } } } @@ -195,12 +212,10 @@ impl ProtocolState { &'a self, relay_parents: impl IntoIterator + 'a, ) -> HashSet { - relay_parents - .into_iter() - .filter_map(|r| self.per_relay_parent.get(r)) - .map(|per_relay_parent| per_relay_parent.live_candidates.iter().cloned()) - .flatten() - .collect() + cached_live_candidates_unioned( + &self.per_relay_parent, + relay_parents + ) } #[tracing::instrument(level = "trace", skip(candidates), fields(subsystem = LOG_TARGET))] @@ -218,16 +233,32 @@ impl ProtocolState { // register the relation of relay_parent to candidate.. for (receipt_hash, fetched) in candidates { - let per_candidate = self.per_candidate.entry(receipt_hash).or_default(); + let candidate_entry = match self.per_candidate.entry(receipt_hash) { + Entry::Occupied(e) => e.into_mut(), + Entry::Vacant(e) => { + if let FetchedLiveCandidate::Fresh(descriptor) = fetched { + e.insert(PerCandidate { + message_vault: HashMap::new(), + received_messages: HashMap::new(), + sent_messages: HashMap::new(), + validators: validators.clone(), + validator_index, + descriptor, + live_in: HashSet::new(), + span: if validator_index.is_some() { + jaeger::candidate_hash_span(&receipt_hash, "pending-availability") + } else { + jaeger::JaegerSpan::Disabled + }, + }) + } else { + tracing::warn!(target: LOG_TARGET, "No `per_candidate` but not fresh. logic error"); + continue; + } + } + }; - // Cached candidates already have entries and thus don't need this - // information to be set. - if let FetchedLiveCandidate::Fresh(descriptor) = fetched { - per_candidate.validator_index = validator_index.clone(); - per_candidate.validators = validators.clone(); - per_candidate.descriptor = descriptor; - } - per_candidate.live_in.insert(relay_parent); + candidate_entry.live_in.insert(relay_parent); } } @@ -259,6 +290,18 @@ impl ProtocolState { } } +fn cached_live_candidates_unioned<'a>( + per_relay_parent: &'a HashMap, + relay_parents: impl IntoIterator + 'a, +) -> HashSet { + relay_parents + .into_iter() + .filter_map(|r| per_relay_parent.get(r)) + .map(|per_relay_parent| per_relay_parent.live_candidates.iter().cloned()) + .flatten() + .collect() +} + /// Deal with network bridge updates and track what needs to be tracked /// which depends on the message type received. #[tracing::instrument(level = "trace", skip(ctx, keystore, metrics), fields(subsystem = LOG_TARGET))] @@ -297,8 +340,6 @@ where } }; - let mut _span = jaeger::hash_span(&gossiped_availability.candidate_hash.0, "availability-message-received"); - process_incoming_peer_message(ctx, state, remote, gossiped_availability, metrics) .await?; } @@ -344,9 +385,11 @@ where // handle all candidates for candidate_hash in state.cached_live_candidates_unioned(view.difference(&old_view)) { // If we are not a validator for this candidate, let's skip it. - if state.per_candidate.entry(candidate_hash).or_default().validator_index.is_none() { - continue - } + match state.per_candidate.get(&candidate_hash) { + None => continue, + Some(c) if c.validator_index.is_none() => continue, + Some(_) => {}, + }; // check if the availability is present in the store exists if !query_data_availability(ctx, candidate_hash).await? { @@ -367,12 +410,18 @@ where .map(|(peer, _view)| peer.clone()) .collect(); - let per_candidate = state.per_candidate.entry(candidate_hash).or_default(); + let per_candidate = state.per_candidate.get_mut(&candidate_hash) + .expect("existence checked above; qed"); let validator_count = per_candidate.validators.len(); // distribute all erasure messages to interested peers for chunk_index in 0u32..(validator_count as u32) { + let _span = { + let mut span = per_candidate.span.child("load-and-distribute"); + span.add_string_tag("chunk-index", &format!("{}", chunk_index)); + span + }; let message = if let Some(message) = per_candidate.message_vault.get(&chunk_index) { tracing::trace!( target: LOG_TARGET, @@ -389,10 +438,15 @@ where "Retrieved chunk from availability storage", ); - AvailabilityGossipMessage { + + let msg = AvailabilityGossipMessage { candidate_hash, erasure_chunk, - } + }; + + per_candidate.add_message(chunk_index, msg.clone()); + + msg } else { tracing::error!( target: LOG_TARGET, @@ -407,12 +461,15 @@ where let peers = peers .iter() - .filter(|peer| per_candidate.message_required_by_peer(peer, &chunk_index)) + .filter(|peer| per_candidate.message_required_by_peer(peer, chunk_index)) .cloned() .collect::>(); send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message)).await; } + + // traces are better if we wait until the loop is done to drop. + per_candidate.drop_span_after_own_availability(); } // cleanup the removed relay parents and their states @@ -442,10 +499,6 @@ where .insert(message.erasure_chunk.index); } - per_candidate - .message_vault - .insert(message.erasure_chunk.index, message.clone()); - if !peers.is_empty() { ctx.send_message(NetworkBridgeMessage::SendValidationMessage( peers.clone(), @@ -482,7 +535,10 @@ where // Send all messages we've seen before and the peer is now interested in. for candidate_hash in added_candidates { - let per_candidate = state.per_candidate.entry(candidate_hash).or_default(); + let per_candidate = match state.per_candidate.get_mut(&candidate_hash) { + Some(p) => p, + None => continue, + }; // obtain the relevant chunk indices not sent yet let messages = ((0 as ValidatorIndex)..(per_candidate.validators.len() as ValidatorIndex)) @@ -493,7 +549,7 @@ where per_candidate .message_vault .get(&erasure_chunk_index) - .filter(|_| per_candidate.message_required_by_peer(&origin, &erasure_chunk_index)) + .filter(|_| per_candidate.message_required_by_peer(&origin, erasure_chunk_index)) }) .cloned() .collect::>(); @@ -540,12 +596,10 @@ where let live_candidates = state.cached_live_candidates_unioned(state.view.heads.iter()); // check if the candidate is of interest - let descriptor = if live_candidates.contains(&message.candidate_hash) { + let candidate_entry = if live_candidates.contains(&message.candidate_hash) { state.per_candidate - .get(&message.candidate_hash) + .get_mut(&message.candidate_hash) .expect("All live candidates are contained in per_candidate; qed") - .descriptor - .clone() } else { tracing::trace!( target: LOG_TARGET, @@ -557,105 +611,140 @@ where return Ok(()) }; - // check the merkle proof against the erasure root in the candidate descriptor. - let anticipated_hash = match branch_hash( - &descriptor.erasure_root, - &message.erasure_chunk.proof, - message.erasure_chunk.index as usize, - ) { - Ok(hash) => hash, - Err(e) => { - tracing::trace!( - target: LOG_TARGET, - candidate_hash = ?message.candidate_hash, - peer = %origin, - error = ?e, - "Failed to calculate chunk merkle proof", - ); - modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await; - return Ok(()); - }, - }; - - let erasure_chunk_hash = BlakeTwo256::hash(&message.erasure_chunk.chunk); - if anticipated_hash != erasure_chunk_hash { - tracing::trace!( - target: LOG_TARGET, - candidate_hash = ?message.candidate_hash, - peer = %origin, - "Peer send chunk with invalid merkle proof", - ); - modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await; - return Ok(()); - } - - let erasure_chunk_index = &message.erasure_chunk.index; - - { - let per_candidate = state.per_candidate.entry(message.candidate_hash).or_default(); - + // Handle a duplicate before doing expensive checks. + if let Some(existing) = candidate_entry.message_vault.get(&message.erasure_chunk.index) { + let span = candidate_entry.span.child("handle-duplicate"); // check if this particular erasure chunk was already sent by that peer before { - let received_set = per_candidate + let _span = span.child("check-entry"); + let received_set = candidate_entry .received_messages .entry(origin.clone()) .or_default(); - if !received_set.insert(*erasure_chunk_index) { + + if !received_set.insert(message.erasure_chunk.index) { modify_reputation(ctx, origin, COST_PEER_DUPLICATE_MESSAGE).await; return Ok(()); } } - // insert into known messages and change reputation - if per_candidate - .message_vault - .insert(*erasure_chunk_index, message.clone()) - .is_some() + // check that the message content matches what we have already before rewarding + // the peer. { - modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await; - } else { - modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE_FIRST).await; - - // save the chunk for our index - if Some(*erasure_chunk_index) == per_candidate.validator_index { - if store_chunk( - ctx, - message.candidate_hash, - descriptor.relay_parent, - message.erasure_chunk.index, - message.erasure_chunk.clone(), - ).await?.is_err() { - tracing::warn!( - target: LOG_TARGET, - "Failed to store erasure chunk to availability store" - ); - } + let _span = span.child("check-accurate"); + if existing == &message { + modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await; + } else { + modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await; } - }; + } + + return Ok(()); } + + let span = { + let mut span = candidate_entry.span.child("process-new-chunk"); + span.add_string_tag("peer-id", &origin.to_base58()); + span + }; + + // check the merkle proof against the erasure root in the candidate descriptor. + let anticipated_hash = { + let _span = span.child("check-merkle-root"); + match branch_hash( + &candidate_entry.descriptor.erasure_root, + &message.erasure_chunk.proof, + message.erasure_chunk.index as usize, + ) { + Ok(hash) => hash, + Err(e) => { + tracing::trace!( + target: LOG_TARGET, + candidate_hash = ?message.candidate_hash, + peer = %origin, + error = ?e, + "Failed to calculate chunk merkle proof", + ); + modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await; + return Ok(()); + }, + } + }; + + { + let _span = span.child("check-chunk-hash"); + let erasure_chunk_hash = BlakeTwo256::hash(&message.erasure_chunk.chunk); + if anticipated_hash != erasure_chunk_hash { + tracing::trace!( + target: LOG_TARGET, + candidate_hash = ?message.candidate_hash, + peer = %origin, + "Peer sent chunk with invalid merkle proof", + ); + modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await; + return Ok(()); + } + } + + { + // insert into known messages and change reputation. we've guaranteed + // above that the message vault doesn't contain any message under this + // chunk index already. + + candidate_entry + .received_messages + .entry(origin.clone()) + .or_default() + .insert(message.erasure_chunk.index); + + modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE_FIRST).await; + + // save the chunk for our index + if Some(message.erasure_chunk.index) == candidate_entry.validator_index { + let _span = span.child("store-our-chunk"); + if store_chunk( + ctx, + message.candidate_hash, + candidate_entry.descriptor.relay_parent, + message.erasure_chunk.index, + message.erasure_chunk.clone(), + ).await?.is_err() { + tracing::warn!( + target: LOG_TARGET, + "Failed to store erasure chunk to availability store" + ); + } + } + + candidate_entry.add_message(message.erasure_chunk.index, message.clone()); + candidate_entry.drop_span_after_own_availability(); + } + // condense the peers to the peers with interest on the candidate - let peers = state - .peer_views - .clone() - .into_iter() - .filter(|(_, view)| { - // peers view must contain the candidate hash too - state - .cached_live_candidates_unioned(view.heads.iter()) - .contains(&message.candidate_hash) - }) - .map(|(peer, _)| -> PeerId { peer.clone() }) - .collect::>(); + let peers = { + let _span = span.child("determine-recipient-peers"); + let per_relay_parent = &state.per_relay_parent; - let per_candidate = state.per_candidate.entry(message.candidate_hash).or_default(); - - let peers = peers - .into_iter() - .filter(|peer| per_candidate.message_required_by_peer(peer, erasure_chunk_index)) - .collect::>(); + state + .peer_views + .clone() + .into_iter() + .filter(|(_, view)| { + // peers view must contain the candidate hash too + cached_live_candidates_unioned( + per_relay_parent, + view.heads.iter(), + ).contains(&message.candidate_hash) + }) + .map(|(peer, _)| -> PeerId { peer.clone() }) + .filter(|peer| candidate_entry.message_required_by_peer(peer, message.erasure_chunk.index)) + .collect::>() + }; + drop(span); // gossip that message to interested peers - send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message)).await; + send_tracked_gossip_messages_to_peers(ctx, candidate_entry, metrics, peers, iter::once(message)).await; + Ok(()) } @@ -681,7 +770,14 @@ impl AvailabilityDistributionSubsystem { where Context: SubsystemContext, { - let mut state = ProtocolState::default(); + let mut state = ProtocolState { + peer_views: HashMap::new(), + view: Default::default(), + live_under: HashMap::new(), + per_relay_parent: HashMap::new(), + per_candidate: HashMap::new(), + }; + self.run_inner(ctx, &mut state).await } diff --git a/polkadot/node/network/availability-distribution/src/tests.rs b/polkadot/node/network/availability-distribution/src/tests.rs index d06a98402d..2479c8ec6c 100644 --- a/polkadot/node/network/availability-distribution/src/tests.rs +++ b/polkadot/node/network/availability-distribution/src/tests.rs @@ -50,6 +50,19 @@ fn chunk_protocol_message( ) } +fn make_per_candidate() -> PerCandidate { + PerCandidate { + live_in: HashSet::new(), + message_vault: HashMap::new(), + received_messages: HashMap::new(), + sent_messages: HashMap::new(), + validators: Vec::new(), + validator_index: None, + descriptor: Default::default(), + span: jaeger::JaegerSpan::Disabled, + } +} + struct TestHarness { virtual_overseer: test_helpers::TestSubsystemContextHandle, } @@ -1024,9 +1037,10 @@ fn remove_relay_parent_only_removes_per_candidate_if_final() { live_candidates: std::iter::once(candidate_hash_a).collect(), }); - state.per_candidate.insert(candidate_hash_a, PerCandidate { - live_in: vec![hash_a, hash_b].into_iter().collect(), - ..Default::default() + state.per_candidate.insert(candidate_hash_a, { + let mut per_candidate = make_per_candidate(); + per_candidate.live_in = vec![hash_a, hash_b].into_iter().collect(); + per_candidate }); state.remove_relay_parent(&hash_a); @@ -1052,6 +1066,8 @@ fn add_relay_parent_includes_all_live_candidates() { let candidate_hash_a = CandidateHash([10u8; 32].into()); let candidate_hash_b = CandidateHash([11u8; 32].into()); + state.per_candidate.insert(candidate_hash_b, make_per_candidate()); + let candidates = vec![ (candidate_hash_a, FetchedLiveCandidate::Fresh(Default::default())), (candidate_hash_b, FetchedLiveCandidate::Cached), diff --git a/polkadot/node/network/bitfield-distribution/src/lib.rs b/polkadot/node/network/bitfield-distribution/src/lib.rs index 966cded33d..39cacf1041 100644 --- a/polkadot/node/network/bitfield-distribution/src/lib.rs +++ b/polkadot/node/network/bitfield-distribution/src/lib.rs @@ -495,8 +495,16 @@ where NetworkBridgeEvent::PeerMessage(remote, message) => { match message { protocol_v1::BitfieldDistributionMessage::Bitfield(relay_parent, bitfield) => { - let mut _span = jaeger::hash_span(&relay_parent, "bitfield-gossip-received"); - _span.add_string_tag("peer-id", &remote.to_base58()); + let mut _span = { + let mut span = jaeger::hash_span(&relay_parent, "bitfield-gossip-received"); + span.add_string_tag("peer-id", &remote.to_base58()); + span.add_string_tag( + "claimed-validator", + &format!("{}", bitfield.validator_index()), + ); + span + }; + tracing::trace!(target: LOG_TARGET, peer_id = %remote, "received bitfield gossip from peer"); let gossiped_bitfield = BitfieldGossipMessage { relay_parent, diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index 951f76d526..6fee53975c 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -731,6 +731,10 @@ async fn handle_incoming_message<'a>( "candidate-hash", &format!("{:?}", candidate_hash.0), ); + span.add_string_tag( + "peer-id", + &peer.to_base58(), + ); span }; diff --git a/polkadot/node/subsystem/src/jaeger.rs b/polkadot/node/subsystem/src/jaeger.rs index 35f5325116..9e2bb577f2 100644 --- a/polkadot/node/subsystem/src/jaeger.rs +++ b/polkadot/node/subsystem/src/jaeger.rs @@ -129,6 +129,12 @@ impl JaegerSpan { } } +impl std::fmt::Debug for JaegerSpan { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "") + } +} + impl From> for JaegerSpan { fn from(src: Option) -> Self { if let Some(span) = src { @@ -146,9 +152,12 @@ impl From for JaegerSpan { } /// Shortcut for [`candidate_hash_span`] with the hash of the `Candidate` block. -#[inline(always)] pub fn candidate_hash_span(candidate_hash: &CandidateHash, span_name: impl Into) -> JaegerSpan { - INSTANCE.read_recursive().span(|| { candidate_hash.0 }, span_name).into() + let mut span: JaegerSpan = INSTANCE.read_recursive() + .span(|| { candidate_hash.0 }, span_name).into(); + + span.add_string_tag("candidate-hash", &format!("{:?}", candidate_hash.0)); + span } /// Shortcut for [`hash_span`] with the hash of the `PoV`.