diff --git a/polkadot/node/network/approval-distribution/src/lib.rs b/polkadot/node/network/approval-distribution/src/lib.rs index b10bacb0ac..b32a2c38ff 100644 --- a/polkadot/node/network/approval-distribution/src/lib.rs +++ b/polkadot/node/network/approval-distribution/src/lib.rs @@ -114,12 +114,20 @@ enum ApprovalState { Approved(AssignmentCert, ValidatorSignature), } +#[derive(Debug, Clone, Copy)] +enum LocalSource { + Yes, + No, +} + +type BlockDepth = usize; + /// Information about candidates in the context of a particular block they are included in. /// In other words, multiple `CandidateEntry`s may exist for the same candidate, /// if it is included by multiple blocks - this is likely the case when there are forks. #[derive(Debug, Default)] struct CandidateEntry { - approvals: HashMap, + approvals: HashMap, } #[derive(Debug, Clone)] @@ -135,6 +143,13 @@ impl MessageSource { Self::Local => None, } } + + fn as_local_source(&self) -> LocalSource { + match self { + Self::Local => LocalSource::Yes, + _ => LocalSource::No, + } + } } enum PendingMessage { @@ -229,8 +244,6 @@ impl State { ); { - let _timer = metrics.time_import_pending_now_known(); - let pending_now_known = self.pending_known.keys() .filter(|k| self.blocks.contains_key(k)) .copied() @@ -241,24 +254,34 @@ impl State { .flatten() .collect::>(); - for (peer_id, message) in to_import { - match message { - PendingMessage::Assignment(assignment, claimed_index) => { - self.import_and_circulate_assignment( - ctx, - metrics, - MessageSource::Peer(peer_id), - assignment, - claimed_index, - ).await; - } - PendingMessage::Approval(approval_vote) => { - self.import_and_circulate_approval( - ctx, - metrics, - MessageSource::Peer(peer_id), - approval_vote, - ).await; + if !to_import.is_empty() { + tracing::debug!( + target: LOG_TARGET, + num = to_import.len(), + "Processing pending assignment/approvals", + ); + + let _timer = metrics.time_import_pending_now_known(); + + for (peer_id, message) in to_import { + match message { + PendingMessage::Assignment(assignment, claimed_index) => { + self.import_and_circulate_assignment( + ctx, + metrics, + MessageSource::Peer(peer_id), + assignment, + claimed_index, + ).await; + } + PendingMessage::Approval(approval_vote) => { + self.import_and_circulate_approval( + ctx, + metrics, + MessageSource::Peer(peer_id), + approval_vote, + ).await; + } } } } @@ -489,6 +512,7 @@ impl State { tx, ))).await; + let timer = metrics.time_awaiting_approval_voting(); let result = match rx.await { Ok(result) => result, Err(_) => { @@ -499,6 +523,7 @@ impl State { return; } }; + drop(timer); tracing::trace!( target: LOG_TARGET, @@ -551,6 +576,8 @@ impl State { } } + let local_source = source.as_local_source(); + // Invariant: none of the peers except for the `source` know about the assignment. metrics.on_assignment_imported(); @@ -560,7 +587,7 @@ impl State { // unless the approval state is set already candidate_entry.approvals .entry(validator_index) - .or_insert_with(|| ApprovalState::Assigned(assignment.cert.clone())); + .or_insert_with(|| (ApprovalState::Assigned(assignment.cert.clone()), local_source)); } None => { tracing::warn!( @@ -596,10 +623,11 @@ impl State { if !peers.is_empty() { tracing::trace!( target: LOG_TARGET, - "Sending assignment (block={}, index={}) to {} peers", - block_hash, - claimed_candidate_index, - peers.len(), + ?block_hash, + ?claimed_candidate_index, + ?local_source, + num_peers = peers.len(), + "Sending an assignment to peers", ); ctx.send_message(NetworkBridgeMessage::SendValidationMessage( @@ -762,6 +790,8 @@ impl State { } } + let local_source = source.as_local_source(); + // Invariant: none of the peers except for the `source` know about the approval. metrics.on_approval_imported(); @@ -770,10 +800,10 @@ impl State { // set the approval state for validator_index to Approved // it should be in assigned state already match candidate_entry.approvals.remove(&validator_index) { - Some(ApprovalState::Assigned(cert)) => { + Some((ApprovalState::Assigned(cert), _local)) => { candidate_entry.approvals.insert( validator_index, - ApprovalState::Approved(cert, vote.signature.clone()), + (ApprovalState::Approved(cert, vote.signature.clone()), local_source), ); } _ => { @@ -819,10 +849,11 @@ impl State { if !peers.is_empty() { tracing::trace!( target: LOG_TARGET, - "Sending approval (block={}, index={}) to {} peers", - block_hash, - candidate_index, - peers.len(), + ?block_hash, + ?candidate_index, + ?local_source, + num_peers = peers.len(), + "Sending an approval to peers", ); ctx.send_message(NetworkBridgeMessage::SendValidationMessage( @@ -843,7 +874,7 @@ impl State { ) { metrics.on_unify_with_peer(); let _timer = metrics.time_unify_with_peer(); - let mut to_send = HashSet::new(); + let mut to_send: Vec<(BlockDepth, Hash)> = Vec::new(); let view_finalized_number = view.finalized_number; for head in view.into_iter() { @@ -867,7 +898,7 @@ impl State { block = entry.parent_hash.clone(); Some(interesting_block) }); - to_send.extend(interesting_blocks); + to_send.extend(interesting_blocks.enumerate()); } // step 6. // send all assignments and approvals for all candidates in those blocks to the peer @@ -883,12 +914,16 @@ impl State { entries: &HashMap, ctx: &mut impl SubsystemContext, peer_id: PeerId, - blocks: HashSet, + blocks: Vec<(BlockDepth, Hash)>, ) { + // we will only propagate local assignment/approvals after a certain depth + const DEPTH_THRESHOLD: usize = 5; + let mut assignments = Vec::new(); let mut approvals = Vec::new(); + let num_blocks = blocks.len(); - for block in blocks.into_iter() { + for (depth, block) in blocks.into_iter() { let entry = match entries.get(&block) { Some(entry) => entry, None => continue, // should be unreachable @@ -903,7 +938,10 @@ impl State { for (candidate_index, candidate_entry) in entry.candidates.iter().enumerate() { let candidate_index = candidate_index as u32; - for (validator_index, approval_state) in candidate_entry.approvals.iter() { + for (validator_index, (approval_state, is_local)) in candidate_entry.approvals.iter() { + if depth >= DEPTH_THRESHOLD && !matches!(is_local, LocalSource::Yes) { + continue; + } match approval_state { ApprovalState::Assigned(cert) => { assignments.push((IndirectAssignmentCert { @@ -926,6 +964,14 @@ impl State { } if !assignments.is_empty() { + tracing::trace!( + target: LOG_TARGET, + num = assignments.len(), + ?num_blocks, + ?peer_id, + "Sending assignments to a peer", + ); + ctx.send_message(NetworkBridgeMessage::SendValidationMessage( vec![peer_id.clone()], protocol_v1::ValidationProtocol::ApprovalDistribution( @@ -935,6 +981,14 @@ impl State { } if !approvals.is_empty() { + tracing::trace!( + target: LOG_TARGET, + num = approvals.len(), + ?num_blocks, + ?peer_id, + "Sending approvals to a peer", + ); + ctx.send_message(NetworkBridgeMessage::SendValidationMessage( vec![peer_id], protocol_v1::ValidationProtocol::ApprovalDistribution( diff --git a/polkadot/roadmap/implementers-guide/src/node/approval/approval-distribution.md b/polkadot/roadmap/implementers-guide/src/node/approval/approval-distribution.md index d735dd48cf..e382631afa 100644 --- a/polkadot/roadmap/implementers-guide/src/node/approval/approval-distribution.md +++ b/polkadot/roadmap/implementers-guide/src/node/approval/approval-distribution.md @@ -237,9 +237,11 @@ Imports an approval signature referenced by block hash and candidate index: 1. Initialize a set `fresh_blocks = {}` For each block in the view: - 2. Load the `BlockEntry` for the block. If the block is unknown, or the number is less than or equal to the view's finalized number, go to step 6. + 2. Load the `BlockEntry` for the block. If the block is unknown, or the number is less than or equal to the view's finalized number go to step 6. 3. Inspect the `known_by` set of the `BlockEntry`. If the peer is already present, go to step 6. 4. Add the peer to `known_by` with a cloned version of `block_entry.knowledge`. and add the hash of the block to `fresh_blocks`. - 5. Return to step 2 with the ancestor of the block. + 5. Return to step 2 with the ancestor of the block, keeping track of the block depth (+1). -6. For each block in `fresh_blocks`, send all assignments and approvals for all candidates in those blocks to the peer. +6. For each block in `fresh_blocks`, send all assignments and approvals for all candidates in those blocks to the peer if the block depth threshold is not reached, otherwise, send only assignments and approvals origination with the local source. + +The reason we only send our local assignments and approvals when a certain block depth is reached when unifying with a peer is to avoid DoS attacks. It also helps when a node starts with a large difference between finalized and the highest block.