diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 9ac33b0313..a46bec3d30 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -4884,6 +4884,27 @@ dependencies = [ "thiserror", ] +[[package]] +name = "polkadot-approval-distribution" +version = "0.1.0" +dependencies = [ + "assert_matches", + "env_logger 0.8.2", + "futures 0.3.12", + "log", + "polkadot-node-network-protocol", + "polkadot-node-primitives", + "polkadot-node-subsystem", + "polkadot-node-subsystem-test-helpers", + "polkadot-node-subsystem-util", + "polkadot-primitives", + "rand_core 0.5.1", + "schnorrkel", + "sp-core", + "tracing", + "tracing-futures", +] + [[package]] name = "polkadot-availability-bitfield-distribution" version = "0.1.0" @@ -5289,6 +5310,7 @@ dependencies = [ "parity-scale-codec", "polkadot-primitives", "polkadot-statement-table", + "sp-consensus-slots", "sp-consensus-vrf", "sp-core", "sp-runtime", @@ -5683,6 +5705,7 @@ dependencies = [ "pallet-im-online", "pallet-staking", "pallet-transaction-payment-rpc-runtime-api", + "polkadot-approval-distribution", "polkadot-availability-bitfield-distribution", "polkadot-availability-distribution", "polkadot-availability-recovery", diff --git a/polkadot/Cargo.toml b/polkadot/Cargo.toml index a8d85ab365..329e8a51fd 100644 --- a/polkadot/Cargo.toml +++ b/polkadot/Cargo.toml @@ -52,6 +52,7 @@ members = [ "node/core/proposer", "node/core/provisioner", "node/core/runtime-api", + "node/network/approval-distribution", "node/network/bridge", "node/network/pov-distribution", "node/network/protocol", diff --git a/polkadot/node/network/approval-distribution/Cargo.toml b/polkadot/node/network/approval-distribution/Cargo.toml new file mode 100644 index 0000000000..26081d34d3 --- /dev/null +++ b/polkadot/node/network/approval-distribution/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "polkadot-approval-distribution" +version = "0.1.0" +authors = ["Parity Technologies "] +edition = "2018" + +[dependencies] +polkadot-node-primitives = { path = "../../primitives" } +polkadot-node-network-protocol = { path = "../protocol" } +polkadot-node-subsystem = { path = "../../subsystem" } +polkadot-node-subsystem-util = { path = "../../subsystem-util" } +polkadot-primitives = { path = "../../../primitives" } + +futures = "0.3.8" +tracing = "0.1.22" +tracing-futures = "0.2.4" + +[dev-dependencies] +sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] } + +polkadot-node-subsystem-util = { path = "../../subsystem-util" } +polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } + +assert_matches = "1.4.0" +schnorrkel = { version = "0.9.1", default-features = false } +rand_core = "0.5.1" # should match schnorrkel +env_logger = "0.8.2" +log = "0.4.13" diff --git a/polkadot/node/network/approval-distribution/src/lib.rs b/polkadot/node/network/approval-distribution/src/lib.rs new file mode 100644 index 0000000000..fa543481e0 --- /dev/null +++ b/polkadot/node/network/approval-distribution/src/lib.rs @@ -0,0 +1,889 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! [`ApprovalDistributionSubsystem`] implementation. +//! +//! https://w3f.github.io/parachain-implementers-guide/node/approval/approval-distribution.html + +#![warn(missing_docs)] + +#[cfg(test)] +mod tests; + + +use std::collections::{BTreeMap, HashMap, HashSet, hash_map}; +use futures::{channel::oneshot, FutureExt as _}; +use polkadot_primitives::v1::{ + Hash, BlockNumber, ValidatorIndex, ValidatorSignature, CandidateIndex, +}; +use polkadot_node_primitives::{ + approval::{AssignmentCert, BlockApprovalMeta, IndirectSignedApprovalVote, IndirectAssignmentCert}, +}; +use polkadot_node_subsystem::{ + messages::{ + AllMessages, ApprovalDistributionMessage, ApprovalVotingMessage, NetworkBridgeMessage, + AssignmentCheckResult, ApprovalCheckResult, + }, + ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, +}; +use polkadot_node_subsystem_util::metrics::{self, prometheus}; +use polkadot_node_network_protocol::{ + PeerId, View, NetworkBridgeEvent, v1 as protocol_v1, ReputationChange as Rep, +}; + +const LOG_TARGET: &str = "approval_distribution"; + +const COST_UNEXPECTED_MESSAGE: Rep = Rep::new(-100, "Peer sent an out-of-view assignment or approval"); +const COST_DUPLICATE_MESSAGE: Rep = Rep::new(-100, "Peer sent identical messages"); +const COST_ASSIGNMENT_TOO_FAR_IN_THE_FUTURE: Rep = Rep::new(-30, "The vote was valid but too far in the future"); +const COST_INVALID_MESSAGE: Rep = Rep::new(-1000, "The vote was bad"); + +const BENEFIT_VALID_MESSAGE: Rep = Rep::new(10, "Peer sent a valid message"); +const BENEFIT_VALID_MESSAGE_FIRST: Rep = Rep::new(15, "Valid message with new information"); + + +/// The Approval Distribution subsystem. +pub struct ApprovalDistribution { + metrics: Metrics, +} + +/// The [`State`] struct is responsible for tracking the overall state of the subsystem. +/// +/// It tracks metadata about our view of the unfinalized chain, +/// which assignments and approvals we have seen, and our peers' views. +#[derive(Default)] +struct State { + /// These two fields are used in conjunction to construct a view over the unfinalized chain. + blocks_by_number: BTreeMap>, + blocks: HashMap, + + /// Peer view data is partially stored here, and partially inline within the [`BlockEntry`]s + peer_views: HashMap, +} + +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +enum MessageFingerprint { + Assignment(Hash, CandidateIndex, ValidatorIndex), + Approval(Hash, CandidateIndex, ValidatorIndex), +} + +#[derive(Debug, Clone, Default)] +struct Knowledge { + known_messages: HashSet, +} + +/// Information about blocks in our current view as well as whether peers know of them. +struct BlockEntry { + /// Peers who we know are aware of this block and thus, the candidates within it. + /// This maps to their knowledge of messages. + known_by: HashMap, + /// The number of the block. + number: BlockNumber, + /// The parent hash of the block. + parent_hash: Hash, + /// Our knowledge of messages. + knowledge: Knowledge, + /// A votes entry for each candidate indexed by [`CandidateIndex`]. + candidates: Vec, +} + +#[derive(Debug)] +enum ApprovalState { + Assigned(AssignmentCert), + Approved(AssignmentCert, ValidatorSignature), +} + +/// Information about candidates in the context of a particular block they are included in. +/// In other words, multiple `CandidateEntry`s may exist for the same candidate, +/// if it is included by multiple blocks - this is likely the case when there are forks. +#[derive(Debug, Default)] +struct CandidateEntry { + approvals: HashMap, +} + +#[derive(Debug, Clone)] +enum MessageSource { + Peer(PeerId), + Local, +} + +impl MessageSource { + fn peer_id(&self) -> Option { + match self { + Self::Peer(id) => Some(id.clone()), + Self::Local => None, + } + } +} + +impl State { + async fn handle_network_msg( + &mut self, + ctx: &mut impl SubsystemContext, + metrics: &Metrics, + event: NetworkBridgeEvent, + ) { + match event { + NetworkBridgeEvent::PeerConnected(peer_id, _role) => { + // insert a blank view if none already present + self.peer_views.entry(peer_id).or_default(); + } + NetworkBridgeEvent::PeerDisconnected(peer_id) => { + self.peer_views.remove(&peer_id); + self.blocks.iter_mut().for_each(|(_hash, entry)| { + entry.known_by.remove(&peer_id); + }) + } + NetworkBridgeEvent::PeerViewChange(peer_id, view) => { + self.handle_peer_view_change(ctx, peer_id, view).await; + } + NetworkBridgeEvent::OurViewChange(_view) => { + // handled by `BlockFinalized` notification + } + NetworkBridgeEvent::PeerMessage(peer_id, msg) => { + self.process_incoming_peer_message(ctx, metrics, peer_id, msg).await; + } + } + } + + async fn handle_new_blocks( + &mut self, + ctx: &mut impl SubsystemContext, + metas: Vec, + ) { + let mut new_hashes = HashSet::new(); + for meta in metas.into_iter() { + match self.blocks.entry(meta.hash.clone()) { + hash_map::Entry::Vacant(entry) => { + let candidates_count = meta.candidates.len(); + let mut candidates = Vec::with_capacity(candidates_count); + candidates.resize_with(candidates_count, Default::default); + + entry.insert(BlockEntry { + known_by: HashMap::new(), + number: meta.number, + parent_hash: meta.parent_hash.clone(), + knowledge: Knowledge::default(), + candidates, + }); + new_hashes.insert(meta.hash.clone()); + } + _ => continue, + } + self.blocks_by_number.entry(meta.number).or_default().push(meta.hash); + } + for (peer_id, view) in self.peer_views.iter() { + let intersection = view.heads.iter().filter(|h| new_hashes.contains(h)); + let view_intersection = View { + heads: intersection.cloned().collect(), + finalized_number: view.finalized_number, + }; + Self::unify_with_peer( + &mut self.blocks, + ctx, + peer_id.clone(), + view_intersection, + ).await; + } + } + + async fn process_incoming_peer_message( + &mut self, + ctx: &mut impl SubsystemContext, + metrics: &Metrics, + peer_id: PeerId, + msg: protocol_v1::ApprovalDistributionMessage, + ) { + match msg { + protocol_v1::ApprovalDistributionMessage::Assignments(assignments) => { + tracing::trace!( + target: LOG_TARGET, + peer_id = %peer_id, + num = assignments.len(), + "Processing assignments from a peer", + ); + for (assignment, claimed_index) in assignments.into_iter() { + self.import_and_circulate_assignment( + ctx, + metrics, + MessageSource::Peer(peer_id.clone()), + assignment, + claimed_index, + ).await; + } + } + protocol_v1::ApprovalDistributionMessage::Approvals(approvals) => { + tracing::trace!( + target: LOG_TARGET, + peer_id = %peer_id, + num = approvals.len(), + "Processing approvals from a peer", + ); + for approval_vote in approvals.into_iter() { + self.import_and_circulate_approval( + ctx, + metrics, + MessageSource::Peer(peer_id.clone()), + approval_vote, + ).await; + } + } + } + } + + async fn handle_peer_view_change( + &mut self, + ctx: &mut impl SubsystemContext, + peer_id: PeerId, + view: View, + ) { + Self::unify_with_peer(&mut self.blocks, ctx, peer_id.clone(), view.clone()).await; + let finalized_number = view.finalized_number; + let old_view = self.peer_views.insert(peer_id.clone(), view); + let old_finalized_number = old_view.map(|v| v.finalized_number).unwrap_or(0); + + // we want to prune every block known_by peer up to (including) view.finalized_number + let blocks = &mut self.blocks; + // the `BTreeMap::range` is constrained by stored keys + // so the loop won't take ages if the new finalized_number skyrockets + // but we need to make sure the range is not empty, otherwise it will panic + // it shouldn't be, we make sure of this in the network bridge + let range = old_finalized_number..=finalized_number; + if !range.is_empty() { + self.blocks_by_number + .range(range) + .map(|(_number, hashes)| hashes) + .flatten() + .for_each(|hash| { + if let Some(entry) = blocks.get_mut(hash) { + entry.known_by.remove(&peer_id); + } + }); + } + } + + fn handle_block_finalized( + &mut self, + finalized_number: BlockNumber, + ) { + // we want to prune every block up to (including) finalized_number + // why +1 here? + // split_off returns everything after the given key, including the key + let split_point = finalized_number.saturating_add(1); + let mut old_blocks = self.blocks_by_number.split_off(&split_point); + // after split_off old_blocks actually contains new blocks, we need to swap + std::mem::swap(&mut self.blocks_by_number, &mut old_blocks); + + // now that we pruned `self.blocks_by_number`, let's clean up `self.blocks` too + old_blocks.values() + .flatten() + .for_each(|h| { + self.blocks.remove(h); + }); + } + + async fn import_and_circulate_assignment( + &mut self, + ctx: &mut impl SubsystemContext, + metrics: &Metrics, + source: MessageSource, + assignment: IndirectAssignmentCert, + claimed_candidate_index: CandidateIndex, + ) { + let block_hash = assignment.block_hash.clone(); + let validator_index = assignment.validator; + + let entry = match self.blocks.get_mut(&block_hash) { + Some(entry) => entry, + None => { + if let Some(peer_id) = source.peer_id() { + modify_reputation(ctx, peer_id, COST_UNEXPECTED_MESSAGE).await; + } + return; + } + }; + + // compute a fingerprint of the assignment + let fingerprint = MessageFingerprint::Assignment( + block_hash, + claimed_candidate_index, + validator_index, + ); + + if let Some(peer_id) = source.peer_id() { + // check if our knowledge of the peer already contains this assignment + match entry.known_by.entry(peer_id.clone()) { + hash_map::Entry::Occupied(knowledge) => { + if knowledge.get().known_messages.contains(&fingerprint) { + modify_reputation(ctx, peer_id, COST_DUPLICATE_MESSAGE).await; + return; + } + } + hash_map::Entry::Vacant(_) => { + modify_reputation(ctx, peer_id.clone(), COST_UNEXPECTED_MESSAGE).await; + } + } + + // if the assignment is known to be valid, reward the peer + if entry.knowledge.known_messages.contains(&fingerprint) { + modify_reputation(ctx, peer_id.clone(), BENEFIT_VALID_MESSAGE).await; + if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) { + peer_knowledge.known_messages.insert(fingerprint.clone()); + } + return; + } + + let (tx, rx) = oneshot::channel(); + + ctx.send_message(AllMessages::ApprovalVoting(ApprovalVotingMessage::CheckAndImportAssignment( + assignment.clone(), + tx, + ))).await; + + let result = match rx.await { + Ok(result) => result, + Err(_) => { + tracing::debug!( + target: LOG_TARGET, + "The approval voting subsystem is down", + ); + return; + } + }; + + match result { + AssignmentCheckResult::Accepted => { + modify_reputation(ctx, peer_id.clone(), BENEFIT_VALID_MESSAGE_FIRST).await; + entry.knowledge.known_messages.insert(fingerprint.clone()); + if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) { + peer_knowledge.known_messages.insert(fingerprint.clone()); + } + } + AssignmentCheckResult::AcceptedDuplicate => { + // "duplicate" assignments aren't necessarily equal. + // There is more than one way each validator can be assigned to each core. + // cf. https://github.com/paritytech/polkadot/pull/2160#discussion_r557628699 + if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) { + peer_knowledge.known_messages.insert(fingerprint); + } + return; + } + AssignmentCheckResult::TooFarInFuture => { + modify_reputation(ctx, peer_id, COST_ASSIGNMENT_TOO_FAR_IN_THE_FUTURE).await; + return; + } + AssignmentCheckResult::Bad => { + modify_reputation(ctx, peer_id, COST_INVALID_MESSAGE).await; + tracing::info!( + target: LOG_TARGET, + peer = ?peer_id, + "Got a bad assignment from peer", + ); + return; + } + } + } else { + if !entry.knowledge.known_messages.insert(fingerprint.clone()) { + // if we already imported an assignment, there is no need to distribute it again + return; + } + } + + // Invariant: none of the peers except for the `source` know about the assignment. + metrics.on_assignment_imported(); + + match entry.candidates.get_mut(claimed_candidate_index as usize) { + Some(candidate_entry) => { + // set the approval state for validator_index to Assigned + // unless the approval state is set already + candidate_entry.approvals + .entry(validator_index) + .or_insert_with(|| ApprovalState::Assigned(assignment.cert.clone())); + } + None => { + tracing::warn!( + target: LOG_TARGET, + hash = ?block_hash, + ?claimed_candidate_index, + "Expected a candidate entry on import_and_circulate_assignment", + ); + } + } + + // Dispatch a ApprovalDistributionV1Message::Assignment(assignment, candidate_index) + // to all peers in the BlockEntry's known_by set who know about the block, + // excluding the peer in the source, if source has kind MessageSource::Peer. + let maybe_peer_id = source.peer_id(); + let peers = entry + .known_by + .keys() + .cloned() + .filter(|key| maybe_peer_id.as_ref().map_or(true, |id| id != key)) + .collect::>(); + + let assignments = vec![(assignment, claimed_candidate_index)]; + + // Add the fingerprint of the assignment to the knowledge of each peer. + for peer in peers.iter() { + // we already filtered peers above, so this should always be Some + if let Some(entry) = entry.known_by.get_mut(peer) { + entry.known_messages.insert(fingerprint.clone()); + } + } + + if !peers.is_empty() { + ctx.send_message(NetworkBridgeMessage::SendValidationMessage( + peers, + protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(assignments) + ), + ).into()).await; + } + } + + async fn import_and_circulate_approval( + &mut self, + ctx: &mut impl SubsystemContext, + metrics: &Metrics, + source: MessageSource, + vote: IndirectSignedApprovalVote, + ) { + let block_hash = vote.block_hash.clone(); + let validator_index = vote.validator; + let candidate_index = vote.candidate_index; + + let entry = match self.blocks.get_mut(&block_hash) { + Some(entry) if entry.candidates.get(candidate_index as usize).is_some() => entry, + _ => { + if let Some(peer_id) = source.peer_id() { + modify_reputation(ctx, peer_id, COST_UNEXPECTED_MESSAGE).await; + } + return; + } + }; + + // compute a fingerprint of the approval + let fingerprint = MessageFingerprint::Approval( + block_hash.clone(), + candidate_index, + validator_index, + ); + + if let Some(peer_id) = source.peer_id() { + let assignment_fingerprint = MessageFingerprint::Assignment( + block_hash.clone(), + candidate_index, + validator_index, + ); + + if !entry.knowledge.known_messages.contains(&assignment_fingerprint) { + modify_reputation(ctx, peer_id, COST_UNEXPECTED_MESSAGE).await; + return; + } + + // check if our knowledge of the peer already contains this approval + match entry.known_by.entry(peer_id.clone()) { + hash_map::Entry::Occupied(knowledge) => { + if knowledge.get().known_messages.contains(&fingerprint) { + modify_reputation(ctx, peer_id, COST_DUPLICATE_MESSAGE).await; + return; + } + } + hash_map::Entry::Vacant(_) => { + modify_reputation(ctx, peer_id.clone(), COST_UNEXPECTED_MESSAGE).await; + } + } + + // if the approval is known to be valid, reward the peer + if entry.knowledge.known_messages.contains(&fingerprint) { + modify_reputation(ctx, peer_id.clone(), BENEFIT_VALID_MESSAGE).await; + if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) { + peer_knowledge.known_messages.insert(fingerprint.clone()); + } + return; + } + + let (tx, rx) = oneshot::channel(); + + ctx.send_message(AllMessages::ApprovalVoting(ApprovalVotingMessage::CheckAndImportApproval( + vote.clone(), + tx, + ))).await; + + let result = match rx.await { + Ok(result) => result, + Err(_) => { + tracing::debug!( + target: LOG_TARGET, + "The approval voting subsystem is down", + ); + return; + } + }; + + match result { + ApprovalCheckResult::Accepted => { + modify_reputation(ctx, peer_id.clone(), BENEFIT_VALID_MESSAGE_FIRST).await; + + entry.knowledge.known_messages.insert(fingerprint.clone()); + if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) { + peer_knowledge.known_messages.insert(fingerprint.clone()); + } + } + ApprovalCheckResult::Bad => { + modify_reputation(ctx, peer_id, COST_INVALID_MESSAGE).await; + tracing::info!( + target: LOG_TARGET, + peer = ?peer_id, + "Got a bad approval from peer", + ); + return; + } + } + } else { + if !entry.knowledge.known_messages.insert(fingerprint.clone()) { + // if we already imported an approval, there is no need to distribute it again + return; + } + } + + // Invariant: none of the peers except for the `source` know about the approval. + metrics.on_approval_imported(); + + match entry.candidates.get_mut(candidate_index as usize) { + Some(candidate_entry) => { + // set the approval state for validator_index to Approved + // it should be in assigned state already + match candidate_entry.approvals.remove(&validator_index) { + Some(ApprovalState::Assigned(cert)) => { + candidate_entry.approvals.insert( + validator_index, + ApprovalState::Approved(cert, vote.signature.clone()), + ); + } + _ => { + tracing::warn!( + target: LOG_TARGET, + hash = ?block_hash, + ?candidate_index, + "Expected a candidate entry with `ApprovalState::Assigned`", + ); + } + } + } + None => { + tracing::warn!( + target: LOG_TARGET, + hash = ?block_hash, + ?candidate_index, + "Expected a candidate entry on import_and_circulate_approval", + ); + } + } + + // Dispatch a ApprovalDistributionV1Message::Approval(vote) + // to all peers in the BlockEntry's known_by set who know about the block, + // excluding the peer in the source, if source has kind MessageSource::Peer. + let maybe_peer_id = source.peer_id(); + let peers = entry + .known_by + .keys() + .cloned() + .filter(|key| maybe_peer_id.as_ref().map_or(true, |id| id != key)) + .collect::>(); + + // Add the fingerprint of the assignment to the knowledge of each peer. + for peer in peers.iter() { + // we already filtered peers above, so this should always be Some + if let Some(entry) = entry.known_by.get_mut(peer) { + entry.known_messages.insert(fingerprint.clone()); + } + } + + let approvals = vec![vote]; + if !peers.is_empty() { + ctx.send_message(NetworkBridgeMessage::SendValidationMessage( + peers, + protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Approvals(approvals) + ), + ).into()).await; + } + } + + async fn unify_with_peer( + entries: &mut HashMap, + ctx: &mut impl SubsystemContext, + peer_id: PeerId, + view: View, + ) { + let mut to_send = HashSet::new(); + + let view_finalized_number = view.finalized_number; + for head in view.heads.into_iter() { + let mut block = head; + let interesting_blocks = std::iter::from_fn(|| { + // step 2. + let entry = match entries.get_mut(&block) { + Some(entry) if entry.number > view_finalized_number => entry, + _ => return None, + }; + let interesting_block = match entry.known_by.entry(peer_id.clone()) { + // step 3. + hash_map::Entry::Occupied(_) => return None, + // step 4. + hash_map::Entry::Vacant(vacant) => { + vacant.insert(entry.knowledge.clone()); + block + } + }; + // step 5. + block = entry.parent_hash.clone(); + Some(interesting_block) + }); + to_send.extend(interesting_blocks); + } + // step 6. + // send all assignments and approvals for all candidates in those blocks to the peer + Self::send_gossip_messages_to_peer( + entries, + ctx, + peer_id, + to_send + ).await; + } + + async fn send_gossip_messages_to_peer( + entries: &HashMap, + ctx: &mut impl SubsystemContext, + peer_id: PeerId, + blocks: HashSet, + ) { + let mut assignments = Vec::new(); + let mut approvals = Vec::new(); + + for block in blocks.into_iter() { + let entry = match entries.get(&block) { + Some(entry) => entry, + None => continue, // should be unreachable + }; + for (candidate_index, candidate_entry) in entry.candidates.iter().enumerate() { + let candidate_index = candidate_index as u32; + for (validator_index, approval_state) in candidate_entry.approvals.iter() { + match approval_state { + ApprovalState::Assigned(cert) => { + assignments.push((IndirectAssignmentCert { + block_hash: block.clone(), + validator: validator_index.clone(), + cert: cert.clone(), + }, candidate_index.clone())); + } + ApprovalState::Approved(_, signature) => { + approvals.push(IndirectSignedApprovalVote { + block_hash: block.clone(), + validator: validator_index.clone(), + candidate_index: candidate_index.clone(), + signature: signature.clone(), + }); + } + } + } + } + } + + if !assignments.is_empty() { + ctx.send_message(NetworkBridgeMessage::SendValidationMessage( + vec![peer_id.clone()], + protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(assignments) + ), + ).into()).await; + } + + if !approvals.is_empty() { + ctx.send_message(NetworkBridgeMessage::SendValidationMessage( + vec![peer_id], + protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Approvals(approvals) + ), + ).into()).await; + } + } +} + + +/// Modify the reputation of a peer based on its behavior. +#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))] +async fn modify_reputation( + ctx: &mut impl SubsystemContext, + peer_id: PeerId, + rep: Rep, +) { + tracing::trace!( + target: LOG_TARGET, + reputation = ?rep, + ?peer_id, + "Reputation change for peer", + ); + + ctx.send_message(AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer(peer_id, rep), + )).await; +} + +impl ApprovalDistribution { + /// Create a new instance of the [`ApprovalDistribution`] subsystem. + pub fn new(metrics: Metrics) -> Self { + Self { metrics } + } + + #[tracing::instrument(skip(self, ctx), fields(subsystem = LOG_TARGET))] + async fn run(self, ctx: Context) + where + Context: SubsystemContext, + { + let mut state = State::default(); + self.run_inner(ctx, &mut state).await + } + + /// Used for testing. + #[tracing::instrument(skip(self, ctx, state), fields(subsystem = LOG_TARGET))] + async fn run_inner(self, mut ctx: Context, state: &mut State) + where + Context: SubsystemContext, + { + loop { + let message = match ctx.recv().await { + Ok(message) => message, + Err(e) => { + tracing::debug!(target: LOG_TARGET, err = ?e, "Failed to receive a message from Overseer, exiting"); + return; + }, + }; + match message { + FromOverseer::Communication { + msg: ApprovalDistributionMessage::NetworkBridgeUpdateV1(event), + } => { + tracing::debug!(target: LOG_TARGET, "Processing network message"); + state.handle_network_msg(&mut ctx, &self.metrics, event).await; + } + FromOverseer::Communication { + msg: ApprovalDistributionMessage::NewBlocks(metas), + } => { + tracing::debug!(target: LOG_TARGET, "Processing NewBlocks"); + state.handle_new_blocks(&mut ctx, metas).await; + } + FromOverseer::Communication { + msg: ApprovalDistributionMessage::DistributeAssignment(cert, candidate_index), + } => { + tracing::debug!(target: LOG_TARGET, "Processing DistributeAssignment"); + state.import_and_circulate_assignment( + &mut ctx, + &self.metrics, + MessageSource::Local, + cert, + candidate_index, + ).await; + } + FromOverseer::Communication { + msg: ApprovalDistributionMessage::DistributeApproval(vote), + } => { + tracing::debug!(target: LOG_TARGET, "Processing DistributeApproval"); + state.import_and_circulate_approval( + &mut ctx, + &self.metrics, + MessageSource::Local, + vote, + ).await; + } + FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { .. })) => { + tracing::trace!(target: LOG_TARGET, "active leaves signal (ignored)"); + // handled by NewBlocks + } + FromOverseer::Signal(OverseerSignal::BlockFinalized(_hash, number)) => { + tracing::trace!(target: LOG_TARGET, number = %number, "finalized signal"); + state.handle_block_finalized(number); + }, + FromOverseer::Signal(OverseerSignal::Conclude) => { + return; + } + } + } + } +} + +impl Subsystem for ApprovalDistribution +where + C: SubsystemContext + Sync + Send, +{ + fn start(self, ctx: C) -> SpawnedSubsystem { + let future = self.run(ctx) + .map(|_| Ok(())) + .boxed(); + + SpawnedSubsystem { + name: "approval-distribution-subsystem", + future, + } + } +} + + +/// Approval Distribution metrics. +#[derive(Default, Clone)] +pub struct Metrics(Option); + +#[derive(Clone)] +struct MetricsInner { + assignments_imported_total: prometheus::Counter, + approvals_imported_total: prometheus::Counter, +} + +impl Metrics { + fn on_assignment_imported(&self) { + if let Some(metrics) = &self.0 { + metrics.assignments_imported_total.inc(); + } + } + + fn on_approval_imported(&self) { + if let Some(metrics) = &self.0 { + metrics.approvals_imported_total.inc(); + } + } +} + +impl metrics::Metrics for Metrics { + fn try_register(registry: &prometheus::Registry) -> Result { + let metrics = MetricsInner { + assignments_imported_total: prometheus::register( + prometheus::Counter::new( + "parachain_assignments_imported_total", + "Number of valid assignments imported locally or from other peers.", + )?, + registry, + )?, + approvals_imported_total: prometheus::register( + prometheus::Counter::new( + "parachain_approvals_imported_total", + "Number of valid approvals imported locally or from other peers.", + )?, + registry, + )?, + }; + Ok(Metrics(Some(metrics))) + } +} diff --git a/polkadot/node/network/approval-distribution/src/tests.rs b/polkadot/node/network/approval-distribution/src/tests.rs new file mode 100644 index 0000000000..df13004fca --- /dev/null +++ b/polkadot/node/network/approval-distribution/src/tests.rs @@ -0,0 +1,822 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use std::time::Duration; +use futures::{future, Future, executor}; +use assert_matches::assert_matches; +use polkadot_node_subsystem_test_helpers as test_helpers; +use polkadot_node_subsystem_util::TimeoutExt as _; +use polkadot_node_network_protocol::{view, ObservedRole}; +use polkadot_node_primitives::approval::{ + AssignmentCertKind, RELAY_VRF_MODULO_CONTEXT, VRFOutput, VRFProof, +}; +use super::*; + +type VirtualOverseer = test_helpers::TestSubsystemContextHandle; + +fn test_harness>( + mut state: State, + test_fn: impl FnOnce(VirtualOverseer) -> T, +) -> State { + let _ = env_logger::builder() + .is_test(true) + .filter( + Some(LOG_TARGET), + log::LevelFilter::Trace, + ) + .try_init(); + + let pool = sp_core::testing::TaskExecutor::new(); + let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone()); + + let subsystem = ApprovalDistribution::new(Default::default()); + { + let subsystem = subsystem.run_inner(context, &mut state); + + let test_fut = test_fn(virtual_overseer); + + futures::pin_mut!(test_fut); + futures::pin_mut!(subsystem); + + executor::block_on(future::select(test_fut, subsystem)); + } + + state +} + +const TIMEOUT: Duration = Duration::from_millis(100); + +async fn overseer_send( + overseer: &mut VirtualOverseer, + msg: ApprovalDistributionMessage, +) { + tracing::trace!(msg = ?msg, "Sending message"); + overseer + .send(FromOverseer::Communication { msg }) + .timeout(TIMEOUT) + .await + .expect("msg send timeout"); +} + +async fn overseer_signal_block_finalized( + overseer: &mut VirtualOverseer, + number: BlockNumber, +) { + tracing::trace!( + ?number, + "Sending a finalized signal", + ); + // we don't care about the block hash + overseer + .send(FromOverseer::Signal(OverseerSignal::BlockFinalized(Hash::zero(), number))) + .timeout(TIMEOUT) + .await + .expect("signal send timeout"); +} + +async fn overseer_recv( + overseer: &mut VirtualOverseer, +) -> AllMessages { + tracing::trace!("Waiting for a message"); + let msg = overseer + .recv() + .timeout(TIMEOUT) + .await + .expect("msg recv timeout"); + + tracing::trace!(msg = ?msg, "Received message"); + + msg +} + +async fn setup_peer_with_view( + virtual_overseer: &mut VirtualOverseer, + peer_id: &PeerId, + view: View, +) { + overseer_send( + virtual_overseer, + ApprovalDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerConnected(peer_id.clone(), ObservedRole::Full) + ) + ).await; + overseer_send( + virtual_overseer, + ApprovalDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerViewChange(peer_id.clone(), view) + ) + ).await; +} + +async fn send_message_from_peer( + virtual_overseer: &mut VirtualOverseer, + peer_id: &PeerId, + msg: protocol_v1::ApprovalDistributionMessage, +) { + overseer_send( + virtual_overseer, + ApprovalDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage(peer_id.clone(), msg) + ) + ).await; +} + +fn fake_assignment_cert( + block_hash: Hash, + validator: ValidatorIndex, +) -> IndirectAssignmentCert { + let ctx = schnorrkel::signing_context(RELAY_VRF_MODULO_CONTEXT); + let msg = b"WhenParachains?"; + let mut prng = rand_core::OsRng; + let keypair = schnorrkel::Keypair::generate_with(&mut prng); + let (inout, proof, _) = keypair.vrf_sign(ctx.bytes(msg)); + let out = inout.to_output(); + + IndirectAssignmentCert { + block_hash, + validator, + cert: AssignmentCert { + kind: AssignmentCertKind::RelayVRFModulo { + sample: 1, + }, + vrf: (VRFOutput(out), VRFProof(proof)), + } + } +} + +async fn expect_reputation_change( + virtual_overseer: &mut VirtualOverseer, + peer_id: &PeerId, + expected_reputation_change: Rep, +) { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer( + rep_peer, + rep, + ) + ) => { + assert_eq!(peer_id, &rep_peer); + assert_eq!(expected_reputation_change, rep); + } + ); +} + + +/// import an assignment +/// connect a new peer +/// the new peer sends us the same assignment +#[test] +fn try_import_the_same_assignment() { + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + let peer_c = PeerId::random(); + let peer_d = PeerId::random(); + let parent_hash = Hash::repeat_byte(0xFF); + let hash = Hash::repeat_byte(0xAA); + + let _ = test_harness(State::default(), |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + // setup peers + setup_peer_with_view(overseer, &peer_a, view![]).await; + setup_peer_with_view(overseer, &peer_b, view![hash]).await; + setup_peer_with_view(overseer, &peer_c, view![hash]).await; + + // new block `hash_a` with 1 candidates + let meta = BlockApprovalMeta { + hash, + parent_hash, + number: 2, + candidates: vec![Default::default(); 1], + slot_number: 1, + }; + let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); + overseer_send(overseer, msg).await; + + // send the assignment related to `hash` + let validator_index = 0u32; + let cert = fake_assignment_cert(hash, validator_index); + let assignments = vec![(cert.clone(), 0u32)]; + + let msg = protocol_v1::ApprovalDistributionMessage::Assignments(assignments.clone()); + send_message_from_peer(overseer, &peer_a, msg).await; + + expect_reputation_change(overseer, &peer_a, COST_UNEXPECTED_MESSAGE).await; + + // send an `Accept` message from the Approval Voting subsystem + assert_matches!( + overseer_recv(overseer).await, + AllMessages::ApprovalVoting(ApprovalVotingMessage::CheckAndImportAssignment( + assignment, + tx, + )) => { + assert_eq!(assignment, cert); + tx.send(AssignmentCheckResult::Accepted).unwrap(); + } + ); + + expect_reputation_change(overseer, &peer_a, BENEFIT_VALID_MESSAGE_FIRST).await; + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( + peers, + protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(assignments) + ) + )) => { + assert_eq!(peers.len(), 2); + assert_eq!(assignments.len(), 1); + } + ); + + // setup new peer + setup_peer_with_view(overseer, &peer_d, view![]).await; + + // send the same assignment from peer_d + let msg = protocol_v1::ApprovalDistributionMessage::Assignments(assignments); + send_message_from_peer(overseer, &peer_d, msg).await; + + expect_reputation_change(overseer, &peer_d, COST_UNEXPECTED_MESSAGE).await; + expect_reputation_change(overseer, &peer_d, BENEFIT_VALID_MESSAGE).await; + + assert!(overseer + .recv() + .timeout(TIMEOUT) + .await + .is_none(), + "no message should be sent", + ); + }); +} + +/// https://github.com/paritytech/polkadot/pull/2160#discussion_r547594835 +/// +/// 1. Send a view update that removes block B from their view. +/// 2. Send a message from B that they incur COST_UNEXPECTED_MESSAGE for, +/// but then they receive BENEFIT_VALID_MESSAGE. +/// 3. Send all other messages related to B. +#[test] +fn spam_attack_results_in_negative_reputation_change() { + let parent_hash = Hash::repeat_byte(0xFF); + let peer_a = PeerId::random(); + let hash_b = Hash::repeat_byte(0xBB); + + let _ = test_harness(State::default(), |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + let peer = &peer_a; + setup_peer_with_view(overseer, peer, view![]).await; + + // new block `hash_b` with 20 candidates + let candidates_count = 20; + let meta = BlockApprovalMeta { + hash: hash_b.clone(), + parent_hash, + number: 2, + candidates: vec![Default::default(); candidates_count], + slot_number: 1, + }; + + let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); + overseer_send(overseer, msg).await; + + // send 20 assignments related to `hash_b` + // to populate our knowledge + let assignments: Vec<_> = (0..candidates_count) + .map(|candidate_index| { + let validator_index = candidate_index as u32; + let cert = fake_assignment_cert(hash_b, validator_index); + (cert, candidate_index as u32) + }).collect(); + + let msg = protocol_v1::ApprovalDistributionMessage::Assignments(assignments.clone()); + send_message_from_peer(overseer, peer, msg.clone()).await; + + for i in 0..candidates_count { + expect_reputation_change(overseer, peer, COST_UNEXPECTED_MESSAGE).await; + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::ApprovalVoting(ApprovalVotingMessage::CheckAndImportAssignment( + assignment, + tx, + )) => { + assert_eq!(assignment, assignments[i].0); + tx.send(AssignmentCheckResult::Accepted).unwrap(); + } + ); + + expect_reputation_change(overseer, peer, BENEFIT_VALID_MESSAGE_FIRST).await; + } + + // send a view update that removes block B from peer's view by bumping the finalized_number + overseer_send( + overseer, + ApprovalDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerViewChange(peer.clone(), View { heads: Default::default(), finalized_number: 2 }) + ) + ).await; + + // send the assignments again + send_message_from_peer(overseer, peer, msg.clone()).await; + + // each of them will incur `COST_UNEXPECTED_MESSAGE`, not only the first one + for _ in 0..candidates_count { + expect_reputation_change(overseer, peer, COST_UNEXPECTED_MESSAGE).await; + expect_reputation_change(overseer, peer, BENEFIT_VALID_MESSAGE).await; + } + }); +} + +#[test] +fn import_approval_happy_path() { + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + let peer_c = PeerId::random(); + let parent_hash = Hash::repeat_byte(0xFF); + let hash = Hash::repeat_byte(0xAA); + + let _ = test_harness(State::default(), |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + // setup peers + setup_peer_with_view(overseer, &peer_a, view![]).await; + setup_peer_with_view(overseer, &peer_b, view![hash]).await; + setup_peer_with_view(overseer, &peer_c, view![hash]).await; + + // new block `hash_a` with 1 candidates + let meta = BlockApprovalMeta { + hash, + parent_hash, + number: 1, + candidates: vec![Default::default(); 1], + slot_number: 1, + }; + let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); + overseer_send(overseer, msg).await; + + // import an assignment related to `hash` locally + let validator_index = 0u32; + let candidate_index = 0u32; + let cert = fake_assignment_cert(hash, validator_index); + overseer_send( + overseer, + ApprovalDistributionMessage::DistributeAssignment(cert, candidate_index) + ).await; + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( + peers, + protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(assignments) + ) + )) => { + assert_eq!(peers.len(), 2); + assert_eq!(assignments.len(), 1); + } + ); + + // send the an approval from peer_b + let approval = IndirectSignedApprovalVote { + block_hash: hash, + candidate_index, + validator: validator_index, + signature: Default::default(), + }; + let msg = protocol_v1::ApprovalDistributionMessage::Approvals(vec![approval.clone()]); + send_message_from_peer(overseer, &peer_b, msg).await; + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::ApprovalVoting(ApprovalVotingMessage::CheckAndImportApproval( + vote, + tx, + )) => { + assert_eq!(vote, approval); + tx.send(ApprovalCheckResult::Accepted).unwrap(); + } + ); + + expect_reputation_change(overseer, &peer_b, BENEFIT_VALID_MESSAGE_FIRST).await; + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( + peers, + protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Approvals(approvals) + ) + )) => { + assert_eq!(peers.len(), 1); + assert_eq!(approvals.len(), 1); + } + ); + }); +} + +#[test] +fn import_approval_bad() { + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + let parent_hash = Hash::repeat_byte(0xFF); + let hash = Hash::repeat_byte(0xAA); + + let _ = test_harness(State::default(), |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + // setup peers + setup_peer_with_view(overseer, &peer_a, view![]).await; + setup_peer_with_view(overseer, &peer_b, view![hash]).await; + + // new block `hash_a` with 1 candidates + let meta = BlockApprovalMeta { + hash, + parent_hash, + number: 1, + candidates: vec![Default::default(); 1], + slot_number: 1, + }; + let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); + overseer_send(overseer, msg).await; + + let validator_index = 0u32; + let candidate_index = 0u32; + let cert = fake_assignment_cert(hash, validator_index); + + // send the an approval from peer_b, we don't have an assignment yet + let approval = IndirectSignedApprovalVote { + block_hash: hash, + candidate_index, + validator: validator_index, + signature: Default::default(), + }; + let msg = protocol_v1::ApprovalDistributionMessage::Approvals(vec![approval.clone()]); + send_message_from_peer(overseer, &peer_b, msg).await; + + expect_reputation_change(overseer, &peer_b, COST_UNEXPECTED_MESSAGE).await; + + // now import an assignment from peer_b + let assignments = vec![(cert.clone(), candidate_index)]; + let msg = protocol_v1::ApprovalDistributionMessage::Assignments(assignments); + send_message_from_peer(overseer, &peer_b, msg).await; + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::ApprovalVoting(ApprovalVotingMessage::CheckAndImportAssignment( + assignment, + tx, + )) => { + assert_eq!(assignment, cert); + tx.send(AssignmentCheckResult::Accepted).unwrap(); + } + ); + + expect_reputation_change(overseer, &peer_b, BENEFIT_VALID_MESSAGE_FIRST).await; + + // and try again + let msg = protocol_v1::ApprovalDistributionMessage::Approvals(vec![approval.clone()]); + send_message_from_peer(overseer, &peer_b, msg).await; + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::ApprovalVoting(ApprovalVotingMessage::CheckAndImportApproval( + vote, + tx, + )) => { + assert_eq!(vote, approval); + tx.send(ApprovalCheckResult::Bad).unwrap(); + } + ); + + expect_reputation_change(overseer, &peer_b, COST_INVALID_MESSAGE).await; + }); +} + +/// make sure we clean up the state on block finalized +#[test] +fn update_our_view() { + let parent_hash = Hash::repeat_byte(0xFF); + let hash_a = Hash::repeat_byte(0xAA); + let hash_b = Hash::repeat_byte(0xBB); + let hash_c = Hash::repeat_byte(0xCC); + + let state = test_harness(State::default(), |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + // new block `hash_a` with 1 candidates + let meta_a = BlockApprovalMeta { + hash: hash_a, + parent_hash, + number: 1, + candidates: vec![Default::default(); 1], + slot_number: 1, + }; + let meta_b = BlockApprovalMeta { + hash: hash_b, + parent_hash: hash_a, + number: 2, + candidates: vec![Default::default(); 1], + slot_number: 1, + }; + let meta_c = BlockApprovalMeta { + hash: hash_c, + parent_hash: hash_b, + number: 3, + candidates: vec![Default::default(); 1], + slot_number: 1, + }; + + let msg = ApprovalDistributionMessage::NewBlocks(vec![meta_a, meta_b, meta_c]); + overseer_send(overseer, msg).await; + }); + + assert!(state.blocks_by_number.get(&1).is_some()); + assert!(state.blocks_by_number.get(&2).is_some()); + assert!(state.blocks_by_number.get(&3).is_some()); + assert!(state.blocks.get(&hash_a).is_some()); + assert!(state.blocks.get(&hash_b).is_some()); + assert!(state.blocks.get(&hash_c).is_some()); + + let state = test_harness(state, |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + // finalize a block + overseer_signal_block_finalized(overseer, 2).await; + }); + + assert!(state.blocks_by_number.get(&1).is_none()); + assert!(state.blocks_by_number.get(&2).is_none()); + assert!(state.blocks_by_number.get(&3).is_some()); + assert!(state.blocks.get(&hash_a).is_none()); + assert!(state.blocks.get(&hash_b).is_none()); + assert!(state.blocks.get(&hash_c).is_some()); + + let state = test_harness(state, |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + // finalize a very high block + overseer_signal_block_finalized(overseer, 4_000_000_000).await; + }); + + assert!(state.blocks_by_number.get(&3).is_none()); + assert!(state.blocks.get(&hash_c).is_none()); +} + +/// make sure we unify with peers and clean up the state +#[test] +fn update_peer_view() { + let parent_hash = Hash::repeat_byte(0xFF); + let hash_a = Hash::repeat_byte(0xAA); + let hash_b = Hash::repeat_byte(0xBB); + let hash_c = Hash::repeat_byte(0xCC); + let hash_d = Hash::repeat_byte(0xDD); + let peer_a = PeerId::random(); + let peer = &peer_a; + + let state = test_harness(State::default(), |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + // new block `hash_a` with 1 candidates + let meta_a = BlockApprovalMeta { + hash: hash_a, + parent_hash, + number: 1, + candidates: vec![Default::default(); 1], + slot_number: 1, + }; + let meta_b = BlockApprovalMeta { + hash: hash_b, + parent_hash: hash_a, + number: 2, + candidates: vec![Default::default(); 1], + slot_number: 1, + }; + let meta_c = BlockApprovalMeta { + hash: hash_c, + parent_hash: hash_b, + number: 3, + candidates: vec![Default::default(); 1], + slot_number: 1, + }; + + let msg = ApprovalDistributionMessage::NewBlocks(vec![meta_a, meta_b, meta_c]); + overseer_send(overseer, msg).await; + + let cert_a = fake_assignment_cert(hash_a, 0); + let cert_b = fake_assignment_cert(hash_b, 0); + + overseer_send( + overseer, + ApprovalDistributionMessage::DistributeAssignment(cert_a, 0) + ).await; + + overseer_send( + overseer, + ApprovalDistributionMessage::DistributeAssignment(cert_b, 0) + ).await; + + // connect a peer + setup_peer_with_view(overseer, peer, view![hash_a]).await; + + // we should send relevant assignments to the peer + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( + peers, + protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(assignments) + ) + )) => { + assert_eq!(peers.len(), 1); + assert_eq!(assignments.len(), 1); + } + ); + }); + + assert_eq!(state.peer_views.get(peer).map(|v| v.finalized_number), Some(0)); + assert_eq!( + state.blocks + .get(&hash_a) + .unwrap() + .known_by + .get(peer) + .unwrap() + .known_messages + .len(), + 1, + ); + + let state = test_harness(state, |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + // update peer's view + overseer_send( + overseer, + ApprovalDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerViewChange(peer.clone(), View { heads: vec![hash_b, hash_c, hash_d], finalized_number: 2 }) + ) + ).await; + + let cert_c = fake_assignment_cert(hash_c, 0); + + overseer_send( + overseer, + ApprovalDistributionMessage::DistributeAssignment(cert_c.clone(), 0) + ).await; + + // we should send relevant assignments to the peer + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( + peers, + protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(assignments) + ) + )) => { + assert_eq!(peers.len(), 1); + assert_eq!(assignments.len(), 1); + assert_eq!(assignments[0].0, cert_c); + } + ); + }); + + assert_eq!(state.peer_views.get(peer).map(|v| v.finalized_number), Some(2)); + assert_eq!( + state.blocks + .get(&hash_c) + .unwrap() + .known_by + .get(peer) + .unwrap() + .known_messages + .len(), + 1, + ); + + let finalized_number = 4_000_000_000; + let state = test_harness(state, |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + // update peer's view + overseer_send( + overseer, + ApprovalDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerViewChange(peer.clone(), View { heads: vec![], finalized_number }) + ) + ).await; + }); + + assert_eq!(state.peer_views.get(peer).map(|v| v.finalized_number), Some(finalized_number)); + assert!( + state.blocks + .get(&hash_c) + .unwrap() + .known_by + .get(peer) + .is_none() + ); +} + +#[test] +fn import_remotely_then_locally() { + let peer_a = PeerId::random(); + let parent_hash = Hash::repeat_byte(0xFF); + let hash = Hash::repeat_byte(0xAA); + let peer = &peer_a; + + let _ = test_harness(State::default(), |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + // setup the peer + setup_peer_with_view(overseer, peer, view![hash]).await; + + // new block `hash_a` with 1 candidates + let meta = BlockApprovalMeta { + hash, + parent_hash, + number: 1, + candidates: vec![Default::default(); 1], + slot_number: 1, + }; + let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); + overseer_send(overseer, msg).await; + + // import the assignment remotely first + let validator_index = 0u32; + let candidate_index = 0u32; + let cert = fake_assignment_cert(hash, validator_index); + let assignments = vec![(cert.clone(), candidate_index)]; + let msg = protocol_v1::ApprovalDistributionMessage::Assignments(assignments.clone()); + send_message_from_peer(overseer, peer, msg).await; + + // send an `Accept` message from the Approval Voting subsystem + assert_matches!( + overseer_recv(overseer).await, + AllMessages::ApprovalVoting(ApprovalVotingMessage::CheckAndImportAssignment( + assignment, + tx, + )) => { + assert_eq!(assignment, cert); + tx.send(AssignmentCheckResult::Accepted).unwrap(); + } + ); + + expect_reputation_change(overseer, peer, BENEFIT_VALID_MESSAGE_FIRST).await; + + // import the same assignment locally + overseer_send( + overseer, + ApprovalDistributionMessage::DistributeAssignment(cert, candidate_index) + ).await; + + assert!(overseer + .recv() + .timeout(TIMEOUT) + .await + .is_none(), + "no message should be sent", + ); + + // send the approval remotely + let approval = IndirectSignedApprovalVote { + block_hash: hash, + candidate_index, + validator: validator_index, + signature: Default::default(), + }; + let msg = protocol_v1::ApprovalDistributionMessage::Approvals(vec![approval.clone()]); + send_message_from_peer(overseer, peer, msg).await; + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::ApprovalVoting(ApprovalVotingMessage::CheckAndImportApproval( + vote, + tx, + )) => { + assert_eq!(vote, approval); + tx.send(ApprovalCheckResult::Accepted).unwrap(); + } + ); + expect_reputation_change(overseer, peer, BENEFIT_VALID_MESSAGE_FIRST).await; + + // import the same approval locally + overseer_send( + overseer, + ApprovalDistributionMessage::DistributeApproval(approval) + ).await; + + assert!(overseer + .recv() + .timeout(TIMEOUT) + .await + .is_none(), + "no message should be sent", + ); + }); +} diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index dc0f0903bd..f5bc89f137 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -30,7 +30,7 @@ use polkadot_subsystem::{ use polkadot_subsystem::messages::{ NetworkBridgeMessage, AllMessages, AvailabilityDistributionMessage, BitfieldDistributionMessage, PoVDistributionMessage, StatementDistributionMessage, - CollatorProtocolMessage, + CollatorProtocolMessage, ApprovalDistributionMessage, }; use polkadot_primitives::v1::{Hash, BlockNumber}; use polkadot_node_network_protocol::{ @@ -401,7 +401,9 @@ async fn handle_peer_messages( for message in messages { outgoing_messages.push(match message { WireMessage::ViewUpdate(new_view) => { - if new_view.heads.len() > MAX_VIEW_HEADS { + if new_view.heads.len() > MAX_VIEW_HEADS || + new_view.finalized_number < peer_data.view.finalized_number + { net.report_peer( peer.clone(), MALFORMED_VIEW_COST, @@ -502,7 +504,11 @@ async fn dispatch_validation_events_to_all( StatementDistributionMessage::NetworkBridgeUpdateV1(m) ))); - a.chain(b).chain(p).chain(s).filter_map(|x| x) + let ap = std::iter::once(event.focus().ok().map(|m| AllMessages::ApprovalDistribution( + ApprovalDistributionMessage::NetworkBridgeUpdateV1(m) + ))); + + a.chain(b).chain(p).chain(s).chain(ap).filter_map(|x| x) }; ctx.send_messages(events.into_iter().flat_map(messages_for)).await @@ -545,8 +551,11 @@ mod tests { use sc_network::Event as NetworkEvent; - use polkadot_subsystem::messages::{StatementDistributionMessage, BitfieldDistributionMessage}; use polkadot_subsystem::{ActiveLeavesUpdate, FromOverseer, OverseerSignal}; + use polkadot_subsystem::messages::{ + StatementDistributionMessage, BitfieldDistributionMessage, + ApprovalDistributionMessage, + }; use polkadot_node_subsystem_test_helpers::{ SingleItemSink, SingleItemStream, TestSubsystemContextHandle, }; @@ -742,6 +751,13 @@ mod tests { StatementDistributionMessage::NetworkBridgeUpdateV1(e) ) if e == event.focus().expect("could not focus message") ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::ApprovalDistribution( + ApprovalDistributionMessage::NetworkBridgeUpdateV1(e) + ) if e == event.focus().expect("could not focus message") + ); } async fn assert_sends_collation_event_to_all( @@ -1246,6 +1262,46 @@ mod tests { }); } + #[test] + fn view_finalized_number_can_not_go_down() { + test_harness(|test_harness| async move { + let TestHarness { mut network_handle, .. } = test_harness; + + let peer_a = PeerId::random(); + + network_handle.connect_peer( + peer_a.clone(), + PeerSet::Validation, + ObservedRole::Full, + ).await; + + network_handle.peer_message( + peer_a.clone(), + PeerSet::Validation, + WireMessage::::ViewUpdate( + View { heads: vec![Hash::repeat_byte(0x01)], finalized_number: 1 }, + ).encode(), + ).await; + + network_handle.peer_message( + peer_a.clone(), + PeerSet::Validation, + WireMessage::::ViewUpdate( + View { heads: vec![], finalized_number: 0 }, + ).encode(), + ).await; + + let actions = network_handle.next_network_actions(1).await; + assert_network_actions_contains( + &actions, + &NetworkAction::ReputationChange( + peer_a.clone(), + MALFORMED_VIEW_COST, + ), + ); + }); + } + #[test] fn send_messages_to_peers() { test_harness(|test_harness| async move { diff --git a/polkadot/node/network/protocol/src/lib.rs b/polkadot/node/network/protocol/src/lib.rs index e7dcf18d35..9969862050 100644 --- a/polkadot/node/network/protocol/src/lib.rs +++ b/polkadot/node/network/protocol/src/lib.rs @@ -282,15 +282,18 @@ impl View { pub mod v1 { use polkadot_primitives::v1::{ Hash, CollatorId, Id as ParaId, ErasureChunk, CandidateReceipt, - SignedAvailabilityBitfield, PoV, CandidateHash, ValidatorIndex, + SignedAvailabilityBitfield, PoV, CandidateHash, ValidatorIndex, CandidateIndex, + }; + use polkadot_node_primitives::{ + SignedFullStatement, + approval::{IndirectAssignmentCert, IndirectSignedApprovalVote}, }; - use polkadot_node_primitives::SignedFullStatement; use parity_scale_codec::{Encode, Decode}; use super::RequestId; use std::convert::TryFrom; /// Network messages used by the availability distribution subsystem - #[derive(Debug, Clone, Encode, Decode, PartialEq)] + #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] pub enum AvailabilityDistributionMessage { /// An erasure chunk for a given candidate hash. #[codec(index = "0")] @@ -298,7 +301,7 @@ pub mod v1 { } /// Network messages used by the availability recovery subsystem. - #[derive(Debug, Clone, Encode, Decode, PartialEq)] + #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] pub enum AvailabilityRecoveryMessage { /// Request a chunk for a given candidate hash and validator index. RequestChunk(RequestId, CandidateHash, ValidatorIndex), @@ -308,7 +311,7 @@ pub mod v1 { } /// Network messages used by the bitfield distribution subsystem. - #[derive(Debug, Clone, Encode, Decode, PartialEq)] + #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] pub enum BitfieldDistributionMessage { /// A signed availability bitfield for a given relay-parent hash. #[codec(index = "0")] @@ -316,7 +319,7 @@ pub mod v1 { } /// Network messages used by the PoV distribution subsystem. - #[derive(Debug, Clone, Encode, Decode, PartialEq)] + #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] pub enum PoVDistributionMessage { /// Notification that we are awaiting the given PoVs (by hash) against a /// specific relay-parent hash. @@ -329,14 +332,27 @@ pub mod v1 { } /// Network messages used by the statement distribution subsystem. - #[derive(Debug, Clone, Encode, Decode, PartialEq)] + #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] pub enum StatementDistributionMessage { /// A signed full statement under a given relay-parent. #[codec(index = "0")] Statement(Hash, SignedFullStatement) } - #[derive(Debug, Clone, Copy, PartialEq, thiserror::Error)] + /// Network messages used by the approval distribution subsystem. + #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] + pub enum ApprovalDistributionMessage { + /// Assignments for candidates in recent, unfinalized blocks. + /// + /// Actually checking the assignment may yield a different result. + #[codec(index = "0")] + Assignments(Vec<(IndirectAssignmentCert, CandidateIndex)>), + /// Approvals for candidates in some recent, unfinalized block. + #[codec(index = "1")] + Approvals(Vec), + } + + #[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)] #[allow(missing_docs)] pub enum CompressedPoVError { #[error("Failed to compress a PoV")] @@ -350,7 +366,7 @@ pub mod v1 { } /// SCALE and Zstd encoded [`PoV`]. - #[derive(Debug, Clone, Encode, Decode, PartialEq)] + #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] pub struct CompressedPoV(Vec); impl CompressedPoV { @@ -398,7 +414,7 @@ pub mod v1 { } /// Network messages used by the collator protocol subsystem - #[derive(Debug, Clone, Encode, Decode, PartialEq)] + #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] pub enum CollatorProtocolMessage { /// Declare the intent to advertise collations under a collator ID. #[codec(index = "0")] @@ -416,7 +432,7 @@ pub mod v1 { } /// All network messages on the validation peer-set. - #[derive(Debug, Clone, Encode, Decode, PartialEq)] + #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] pub enum ValidationProtocol { /// Availability distribution messages #[codec(index = "0")] @@ -433,15 +449,19 @@ pub mod v1 { /// Availability recovery messages #[codec(index = "4")] AvailabilityRecovery(AvailabilityRecoveryMessage), + /// Approval distribution messages + #[codec(index = "5")] + ApprovalDistribution(ApprovalDistributionMessage), } impl_try_from!(ValidationProtocol, AvailabilityDistribution, AvailabilityDistributionMessage); impl_try_from!(ValidationProtocol, BitfieldDistribution, BitfieldDistributionMessage); impl_try_from!(ValidationProtocol, PoVDistribution, PoVDistributionMessage); impl_try_from!(ValidationProtocol, StatementDistribution, StatementDistributionMessage); + impl_try_from!(ValidationProtocol, ApprovalDistribution, ApprovalDistributionMessage); /// All network messages on the collation peer-set. - #[derive(Debug, Clone, Encode, Decode, PartialEq)] + #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] pub enum CollationProtocol { /// Collator protocol messages #[codec(index = "0")] diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 2b2fb0e27f..f797ca4c9c 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -84,8 +84,8 @@ use polkadot_subsystem::messages::{ CandidateSelectionMessage, ChainApiMessage, StatementDistributionMessage, AvailabilityDistributionMessage, BitfieldSigningMessage, BitfieldDistributionMessage, ProvisionerMessage, PoVDistributionMessage, RuntimeApiMessage, - AvailabilityStoreMessage, NetworkBridgeMessage, AllMessages, CollationGenerationMessage, CollatorProtocolMessage, - AvailabilityRecoveryMessage, + AvailabilityStoreMessage, NetworkBridgeMessage, AllMessages, CollationGenerationMessage, + CollatorProtocolMessage, AvailabilityRecoveryMessage, ApprovalDistributionMessage, }; pub use polkadot_subsystem::{ Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult, @@ -558,6 +558,9 @@ pub struct Overseer { /// A Collator Protocol subsystem. collator_protocol_subsystem: OverseenSubsystem, + /// An Approval Distribution subsystem. + approval_distribution_subsystem: OverseenSubsystem, + /// Spawner to spawn tasks to. s: S, @@ -598,7 +601,7 @@ pub struct Overseer { /// subsystems are implemented and the rest can be mocked with the [`DummySubsystem`]. pub struct AllSubsystems< CV = (), CB = (), CS = (), SD = (), AD = (), AR = (), BS = (), BD = (), P = (), - PoVD = (), RA = (), AS = (), NB = (), CA = (), CG = (), CP = (), + PoVD = (), RA = (), AS = (), NB = (), CA = (), CG = (), CP = (), ApD = (), > { /// A candidate validation subsystem. pub candidate_validation: CV, @@ -632,10 +635,12 @@ pub struct AllSubsystems< pub collation_generation: CG, /// A Collator Protocol subsystem. pub collator_protocol: CP, + /// An Approval Distribution subsystem. + pub approval_distribution: ApD, } -impl - AllSubsystems +impl + AllSubsystems { /// Create a new instance of [`AllSubsystems`]. /// @@ -665,7 +670,8 @@ impl DummySubsystem, DummySubsystem, DummySubsystem, - DummySubsystem + DummySubsystem, + DummySubsystem, > { AllSubsystems { candidate_validation: DummySubsystem, @@ -684,6 +690,7 @@ impl chain_api: DummySubsystem, collation_generation: DummySubsystem, collator_protocol: DummySubsystem, + approval_distribution: DummySubsystem, } } @@ -691,7 +698,7 @@ impl pub fn replace_candidate_validation( self, candidate_validation: NEW, - ) -> AllSubsystems { + ) -> AllSubsystems { AllSubsystems { candidate_validation, candidate_backing: self.candidate_backing, @@ -709,6 +716,7 @@ impl chain_api: self.chain_api, collation_generation: self.collation_generation, collator_protocol: self.collator_protocol, + approval_distribution: self.approval_distribution, } } @@ -716,7 +724,7 @@ impl pub fn replace_candidate_backing( self, candidate_backing: NEW, - ) -> AllSubsystems { + ) -> AllSubsystems { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing, @@ -734,6 +742,7 @@ impl chain_api: self.chain_api, collation_generation: self.collation_generation, collator_protocol: self.collator_protocol, + approval_distribution: self.approval_distribution, } } @@ -741,7 +750,7 @@ impl pub fn replace_candidate_selection( self, candidate_selection: NEW, - ) -> AllSubsystems { + ) -> AllSubsystems { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, @@ -759,6 +768,7 @@ impl chain_api: self.chain_api, collation_generation: self.collation_generation, collator_protocol: self.collator_protocol, + approval_distribution: self.approval_distribution, } } @@ -766,7 +776,7 @@ impl pub fn replace_statement_distribution( self, statement_distribution: NEW, - ) -> AllSubsystems { + ) -> AllSubsystems { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, @@ -784,6 +794,7 @@ impl chain_api: self.chain_api, collation_generation: self.collation_generation, collator_protocol: self.collator_protocol, + approval_distribution: self.approval_distribution, } } @@ -791,7 +802,7 @@ impl pub fn replace_availability_distribution( self, availability_distribution: NEW, - ) -> AllSubsystems { + ) -> AllSubsystems { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, @@ -809,6 +820,7 @@ impl chain_api: self.chain_api, collation_generation: self.collation_generation, collator_protocol: self.collator_protocol, + approval_distribution: self.approval_distribution, } } @@ -816,7 +828,7 @@ impl pub fn replace_availability_recovery( self, availability_recovery: NEW, - ) -> AllSubsystems { + ) -> AllSubsystems { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, @@ -834,6 +846,7 @@ impl chain_api: self.chain_api, collation_generation: self.collation_generation, collator_protocol: self.collator_protocol, + approval_distribution: self.approval_distribution, } } @@ -841,7 +854,7 @@ impl pub fn replace_bitfield_signing( self, bitfield_signing: NEW, - ) -> AllSubsystems { + ) -> AllSubsystems { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, @@ -859,6 +872,7 @@ impl chain_api: self.chain_api, collation_generation: self.collation_generation, collator_protocol: self.collator_protocol, + approval_distribution: self.approval_distribution, } } @@ -866,7 +880,7 @@ impl pub fn replace_bitfield_distribution( self, bitfield_distribution: NEW, - ) -> AllSubsystems { + ) -> AllSubsystems { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, @@ -884,6 +898,7 @@ impl chain_api: self.chain_api, collation_generation: self.collation_generation, collator_protocol: self.collator_protocol, + approval_distribution: self.approval_distribution, } } @@ -891,7 +906,7 @@ impl pub fn replace_provisioner( self, provisioner: NEW, - ) -> AllSubsystems { + ) -> AllSubsystems { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, @@ -909,6 +924,7 @@ impl chain_api: self.chain_api, collation_generation: self.collation_generation, collator_protocol: self.collator_protocol, + approval_distribution: self.approval_distribution, } } @@ -916,7 +932,7 @@ impl pub fn replace_pov_distribution( self, pov_distribution: NEW, - ) -> AllSubsystems { + ) -> AllSubsystems { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, @@ -934,6 +950,7 @@ impl chain_api: self.chain_api, collation_generation: self.collation_generation, collator_protocol: self.collator_protocol, + approval_distribution: self.approval_distribution, } } @@ -941,7 +958,7 @@ impl pub fn replace_runtime_api( self, runtime_api: NEW, - ) -> AllSubsystems { + ) -> AllSubsystems { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, @@ -959,6 +976,7 @@ impl chain_api: self.chain_api, collation_generation: self.collation_generation, collator_protocol: self.collator_protocol, + approval_distribution: self.approval_distribution, } } @@ -966,7 +984,7 @@ impl pub fn replace_availability_store( self, availability_store: NEW, - ) -> AllSubsystems { + ) -> AllSubsystems { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, @@ -984,6 +1002,7 @@ impl chain_api: self.chain_api, collation_generation: self.collation_generation, collator_protocol: self.collator_protocol, + approval_distribution: self.approval_distribution, } } @@ -991,7 +1010,7 @@ impl pub fn replace_network_bridge( self, network_bridge: NEW, - ) -> AllSubsystems { + ) -> AllSubsystems { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, @@ -1009,6 +1028,7 @@ impl chain_api: self.chain_api, collation_generation: self.collation_generation, collator_protocol: self.collator_protocol, + approval_distribution: self.approval_distribution, } } @@ -1016,7 +1036,7 @@ impl pub fn replace_chain_api( self, chain_api: NEW, - ) -> AllSubsystems { + ) -> AllSubsystems { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, @@ -1034,6 +1054,7 @@ impl chain_api, collation_generation: self.collation_generation, collator_protocol: self.collator_protocol, + approval_distribution: self.approval_distribution, } } @@ -1041,7 +1062,7 @@ impl pub fn replace_collation_generation( self, collation_generation: NEW, - ) -> AllSubsystems { + ) -> AllSubsystems { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, @@ -1059,6 +1080,7 @@ impl chain_api: self.chain_api, collation_generation, collator_protocol: self.collator_protocol, + approval_distribution: self.approval_distribution, } } @@ -1066,7 +1088,7 @@ impl pub fn replace_collator_protocol( self, collator_protocol: NEW, - ) -> AllSubsystems { + ) -> AllSubsystems { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, @@ -1084,6 +1106,33 @@ impl chain_api: self.chain_api, collation_generation: self.collation_generation, collator_protocol, + approval_distribution: self.approval_distribution, + } + } + + /// Replace the `approval_distribution` instance in `self`. + pub fn replace_approval_distribution( + self, + approval_distribution: NEW, + ) -> AllSubsystems { + AllSubsystems { + candidate_validation: self.candidate_validation, + candidate_backing: self.candidate_backing, + candidate_selection: self.candidate_selection, + statement_distribution: self.statement_distribution, + availability_distribution: self.availability_distribution, + availability_recovery: self.availability_recovery, + bitfield_signing: self.bitfield_signing, + bitfield_distribution: self.bitfield_distribution, + provisioner: self.provisioner, + pov_distribution: self.pov_distribution, + runtime_api: self.runtime_api, + availability_store: self.availability_store, + network_bridge: self.network_bridge, + chain_api: self.chain_api, + collation_generation: self.collation_generation, + collator_protocol: self.collator_protocol, + approval_distribution, } } } @@ -1294,9 +1343,9 @@ where /// # /// # }); } /// ``` - pub fn new( + pub fn new( leaves: impl IntoIterator, - all_subsystems: AllSubsystems, + all_subsystems: AllSubsystems, prometheus_registry: Option<&prometheus::Registry>, mut s: S, ) -> SubsystemResult<(Self, OverseerHandler)> @@ -1317,6 +1366,7 @@ where CA: Subsystem> + Send, CG: Subsystem> + Send, CP: Subsystem> + Send, + ApD: Subsystem> + Send, { let (events_tx, events_rx) = metered::channel(CHANNEL_CAPACITY, "overseer_events"); @@ -1492,6 +1542,15 @@ where &mut seed, )?; + let approval_distribution_subsystem = spawn( + &mut s, + &mut running_subsystems, + metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + all_subsystems.approval_distribution, + &metrics, + &mut seed, + )?; + let leaves = leaves .into_iter() .map(|BlockInfo { hash, parent_hash: _, number }| (hash, number)) @@ -1517,6 +1576,7 @@ where chain_api_subsystem, collation_generation_subsystem, collator_protocol_subsystem, + approval_distribution_subsystem, s, running_subsystems, to_overseer_rx: to_overseer_rx.fuse(), @@ -1549,6 +1609,7 @@ where let _ = self.chain_api_subsystem.send_signal(OverseerSignal::Conclude).await; let _ = self.collator_protocol_subsystem.send_signal(OverseerSignal::Conclude).await; let _ = self.collation_generation_subsystem.send_signal(OverseerSignal::Conclude).await; + let _ = self.approval_distribution_subsystem.send_signal(OverseerSignal::Conclude).await; let mut stop_delay = Delay::new(Duration::from_secs(STOP_DELAY)).fuse(); @@ -1716,7 +1777,8 @@ where self.network_bridge_subsystem.send_signal(signal.clone()).await?; self.chain_api_subsystem.send_signal(signal.clone()).await?; self.collator_protocol_subsystem.send_signal(signal.clone()).await?; - self.collation_generation_subsystem.send_signal(signal).await?; + self.collation_generation_subsystem.send_signal(signal.clone()).await?; + self.approval_distribution_subsystem.send_signal(signal).await?; Ok(()) } @@ -1774,6 +1836,12 @@ where AllMessages::CollatorProtocol(msg) => { self.collator_protocol_subsystem.send_message(msg).await?; }, + AllMessages::ApprovalDistribution(msg) => { + let _ = self.approval_distribution_subsystem.send_message(msg).await; + }, + AllMessages::ApprovalVoting(_msg) => { + // FIXME: https://github.com/paritytech/polkadot/issues/1975 + }, } Ok(()) @@ -2624,6 +2692,10 @@ mod tests { NetworkBridgeMessage::ReportPeer(PeerId::random(), ReputationChange::new(42, "")) } + fn test_approval_distribution_msg() -> ApprovalDistributionMessage { + ApprovalDistributionMessage::NewBlocks(Default::default()) + } + // Checks that `stop`, `broadcast_signal` and `broadcast_message` are implemented correctly. #[test] fn overseer_all_subsystems_receive_signals_and_messages() { @@ -2657,6 +2729,7 @@ mod tests { availability_store: subsystem.clone(), network_bridge: subsystem.clone(), chain_api: subsystem.clone(), + approval_distribution: subsystem.clone(), }; let (overseer, mut handler) = Overseer::new( vec![], @@ -2693,13 +2766,14 @@ mod tests { handler.send_msg(AllMessages::AvailabilityStore(test_availability_store_msg())).await; handler.send_msg(AllMessages::NetworkBridge(test_network_bridge_msg())).await; handler.send_msg(AllMessages::ChainApi(test_chain_api_msg())).await; + handler.send_msg(AllMessages::ApprovalDistribution(test_approval_distribution_msg())).await; // send a stop signal to each subsystems handler.stop().await; select! { res = overseer_fut => { - const NUM_SUBSYSTEMS: usize = 16; + const NUM_SUBSYSTEMS: usize = 17; assert_eq!(stop_signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS); // x2 because of broadcast_signal on startup diff --git a/polkadot/node/primitives/Cargo.toml b/polkadot/node/primitives/Cargo.toml index ecc6f0aceb..0bd5ef85b3 100644 --- a/polkadot/node/primitives/Cargo.toml +++ b/polkadot/node/primitives/Cargo.toml @@ -13,3 +13,4 @@ parity-scale-codec = { version = "1.3.6", default-features = false, features = [ runtime_primitives = { package = "sp-runtime", git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-consensus-vrf = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-consensus-slots = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } diff --git a/polkadot/node/primitives/src/approval.rs b/polkadot/node/primitives/src/approval.rs index 2783cb6602..37b72399bd 100644 --- a/polkadot/node/primitives/src/approval.rs +++ b/polkadot/node/primitives/src/approval.rs @@ -17,9 +17,11 @@ //! Types relevant for approval. pub use sp_consensus_vrf::schnorrkel::{VRFOutput, VRFProof}; +pub use sp_consensus_slots::SlotNumber; use polkadot_primitives::v1::{ CandidateHash, Hash, ValidatorIndex, Signed, ValidatorSignature, CoreIndex, + BlockNumber, CandidateIndex, }; use parity_scale_codec::{Encode, Decode}; @@ -28,10 +30,10 @@ use parity_scale_codec::{Encode, Decode}; pub type DelayTranche = u32; /// A static context used for all relay-vrf-modulo VRFs. -pub const RELAY_VRF_MODULO_CONTEXT: &str = "A&V MOD"; +pub const RELAY_VRF_MODULO_CONTEXT: &[u8] = b"A&V MOD"; /// A static context used for all relay-vrf-delay VRFs. -pub const RELAY_VRF_DELAY_CONTEXT: &str = "A&V TRANCHE"; +pub const RELAY_VRF_DELAY_CONTEXT: &[u8] = b"A&V TRANCHE"; /// random bytes derived from the VRF submitted within the block by the /// block author as a credential and used as input to approval assignment criteria. @@ -40,7 +42,7 @@ pub struct RelayVRF(pub [u8; 32]); /// Different kinds of input data or criteria that can prove a validator's assignment /// to check a particular parachain. -#[derive(Debug, Clone, Encode, Decode)] +#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] pub enum AssignmentCertKind { /// An assignment story based on the VRF that authorized the relay-chain block where the /// candidate was included combined with a sample number. @@ -61,7 +63,7 @@ pub enum AssignmentCertKind { } /// A certification of assignment. -#[derive(Debug, Clone, Encode, Decode)] +#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] pub struct AssignmentCert { /// The criterion which is claimed to be met by this cert. pub kind: AssignmentCertKind, @@ -71,7 +73,7 @@ pub struct AssignmentCert { /// An assignment crt which refers to the candidate under which the assignment is /// relevant by block hash. -#[derive(Debug, Clone, Encode, Decode)] +#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] pub struct IndirectAssignmentCert { /// A block hash where the candidate appears. pub block_hash: Hash, @@ -92,14 +94,30 @@ pub type SignedApprovalVote = Signed; /// /// In practice, we have a look-up from block hash and candidate index to candidate hash, /// so this can be transformed into a `SignedApprovalVote`. -#[derive(Debug, Clone, Encode, Decode)] +#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] pub struct IndirectSignedApprovalVote { /// A block hash where the candidate appears. pub block_hash: Hash, /// The index of the candidate in the list of candidates fully included as-of the block. - pub candidate_index: u32, + pub candidate_index: CandidateIndex, /// The validator index. pub validator: ValidatorIndex, /// The signature by the validator. pub signature: ValidatorSignature, } + +/// Metadata about a block which is now live in the approval protocol. +#[derive(Debug)] +pub struct BlockApprovalMeta { + /// The hash of the block. + pub hash: Hash, + /// The number of the block. + pub number: BlockNumber, + /// The hash of the parent block. + pub parent_hash: Hash, + /// The candidates included by the block. + /// Note that these are not the same as the candidates that appear within the block body. + pub candidates: Vec, + /// The consensus slot number of the block. + pub slot_number: SlotNumber, +} diff --git a/polkadot/node/service/Cargo.toml b/polkadot/node/service/Cargo.toml index ded169a8b0..8fc064047b 100644 --- a/polkadot/node/service/Cargo.toml +++ b/polkadot/node/service/Cargo.toml @@ -94,6 +94,7 @@ polkadot-node-core-provisioner = { path = "../core/provisioner", optional = true polkadot-node-core-runtime-api = { path = "../core/runtime-api", optional = true } polkadot-pov-distribution = { path = "../network/pov-distribution", optional = true } polkadot-statement-distribution = { path = "../network/statement-distribution", optional = true } +polkadot-approval-distribution = { path = "../network/approval-distribution", optional = true } [dev-dependencies] polkadot-test-client = { path = "../test/client" } @@ -130,4 +131,5 @@ real-overseer = [ "polkadot-node-core-runtime-api", "polkadot-pov-distribution", "polkadot-statement-distribution", + "polkadot-approval-distribution", ] diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index 8fb600ac5a..e630ccdd32 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -403,6 +403,7 @@ where use polkadot_node_core_runtime_api::RuntimeApiSubsystem; use polkadot_statement_distribution::StatementDistribution as StatementDistributionSubsystem; use polkadot_availability_recovery::AvailabilityRecoverySubsystem; + use polkadot_approval_distribution::ApprovalDistribution as ApprovalDistributionSubsystem; let all_subsystems = AllSubsystems { availability_distribution: AvailabilityDistributionSubsystem::new( @@ -474,6 +475,9 @@ where statement_distribution: StatementDistributionSubsystem::new( Metrics::register(registry)?, ), + approval_distribution: ApprovalDistributionSubsystem::new( + Metrics::register(registry)?, + ), }; Overseer::new( diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs index da0488e869..e47ebf8326 100644 --- a/polkadot/node/subsystem/src/messages.rs +++ b/polkadot/node/subsystem/src/messages.rs @@ -29,6 +29,7 @@ use polkadot_node_network_protocol::{ }; use polkadot_node_primitives::{ CollationGenerationConfig, MisbehaviorReport, SignedFullStatement, ValidationResult, + approval::{BlockApprovalMeta, IndirectAssignmentCert, IndirectSignedApprovalVote}, }; use polkadot_primitives::v1::{ AuthorityDiscoveryId, AvailableData, BackedCandidate, BlockNumber, SessionInfo, @@ -38,6 +39,7 @@ use polkadot_primitives::v1::{ PersistedValidationData, PoV, SessionIndex, SignedAvailabilityBitfield, ValidationCode, ValidatorId, CandidateHash, ValidatorIndex, ValidatorSignature, InboundDownwardMessage, InboundHrmpMessage, + CandidateIndex, }; use std::{sync::Arc, collections::btree_map::BTreeMap}; @@ -544,7 +546,7 @@ impl BoundToRelayParent for ProvisionerMessage { } } -/// Message to the PoV Distribution Subsystem. +/// Message to the PoV Distribution subsystem. #[derive(Debug)] pub enum PoVDistributionMessage { /// Fetch a PoV from the network. @@ -570,7 +572,7 @@ impl PoVDistributionMessage { } } -/// Message to the Collation Generation Subsystem. +/// Message to the Collation Generation subsystem. #[derive(Debug)] pub enum CollationGenerationMessage { /// Initialize the collation generation subsystem @@ -584,6 +586,72 @@ impl CollationGenerationMessage { } } +/// The result type of [`ApprovalVotingMessage::CheckAndImportAssignment`] request. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum AssignmentCheckResult { + /// The vote was accepted and should be propagated onwards. + Accepted, + /// The vote was valid but duplicate and should not be propagated onwards. + AcceptedDuplicate, + /// The vote was valid but too far in the future to accept right now. + TooFarInFuture, + /// The vote was bad and should be ignored, reporting the peer who propagated it. + Bad, +} + +/// The result type of [`ApprovalVotingMessage::CheckAndImportApproval`] request. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ApprovalCheckResult { + /// The vote was accepted and should be propagated onwards. + Accepted, + /// The vote was bad and should be ignored, reporting the peer who propagated it. + Bad, +} + +/// Message to the Approval Voting subsystem. +#[derive(Debug)] +pub enum ApprovalVotingMessage { + /// Check if the assignment is valid and can be accepted by our view of the protocol. + /// Should not be sent unless the block hash is known. + CheckAndImportAssignment( + IndirectAssignmentCert, + oneshot::Sender, + ), + /// Check if the approval vote is valid and can be accepted by our view of the + /// protocol. + /// + /// Should not be sent unless the block hash within the indirect vote is known. + CheckAndImportApproval( + IndirectSignedApprovalVote, + oneshot::Sender, + ), + /// Returns the highest possible ancestor hash of the provided block hash which is + /// acceptable to vote on finality for. + /// The `BlockNumber` provided is the number of the block's ancestor which is the + /// earliest possible vote. + /// + /// It can also return the same block hash, if that is acceptable to vote upon. + /// Return `None` if the input hash is unrecognized. + ApprovedAncestor(Hash, BlockNumber, oneshot::Sender>), +} + +/// Message to the Approval Distribution subsystem. +#[derive(Debug)] +pub enum ApprovalDistributionMessage { + /// Notify the `ApprovalDistribution` subsystem about new blocks + /// and the candidates contained within them. + NewBlocks(Vec), + /// Distribute an assignment cert from the local validator. The cert is assumed + /// to be valid, relevant, and for the given relay-parent and validator index. + DistributeAssignment(IndirectAssignmentCert, CandidateIndex), + /// Distribute an approval vote for the local validator. The approval vote is assumed to be + /// valid, relevant, and the corresponding approval already issued. + /// If not, the subsystem is free to drop the message. + DistributeApproval(IndirectSignedApprovalVote), + /// An update from the network bridge. + NetworkBridgeUpdateV1(NetworkBridgeEvent), +} + /// A message type tying together all message types that are used across Subsystems. #[derive(Debug, derive_more::From)] pub enum AllMessages { @@ -617,6 +685,10 @@ pub enum AllMessages { AvailabilityStore(AvailabilityStoreMessage), /// Message for the network bridge subsystem. NetworkBridge(NetworkBridgeMessage), - /// Message for the Collation Generation subsystem + /// Message for the Collation Generation subsystem. CollationGeneration(CollationGenerationMessage), + /// Message for the Approval Voting subsystem. + ApprovalVoting(ApprovalVotingMessage), + /// Message for the Approval Distribution subsystem. + ApprovalDistribution(ApprovalDistributionMessage), } diff --git a/polkadot/primitives/src/v1.rs b/polkadot/primitives/src/v1.rs index cf5cc1533a..f2cbaa7d35 100644 --- a/polkadot/primitives/src/v1.rs +++ b/polkadot/primitives/src/v1.rs @@ -150,6 +150,9 @@ mod assigment_app { /// to approve included parachain candidates. pub type AssignmentId = assigment_app::Public; +/// The index of the candidate in the list of candidates fully included as-of the block. +pub type CandidateIndex = u32; + /// Get a collator signature payload on a relay-parent, block-data combo. pub fn collator_signature_payload>( relay_parent: &H, @@ -504,7 +507,7 @@ pub fn check_candidate_backing + Clone + Encode>( } /// The unique (during session) index of a core. -#[derive(Encode, Decode, Default, PartialOrd, Ord, Eq, PartialEq, Clone, Copy)] +#[derive(Encode, Decode, Default, PartialOrd, Ord, Eq, PartialEq, Clone, Copy, Hash)] #[cfg_attr(feature = "std", derive(Debug))] pub struct CoreIndex(pub u32); diff --git a/polkadot/roadmap/implementers-guide/src/node/approval/approval-distribution.md b/polkadot/roadmap/implementers-guide/src/node/approval/approval-distribution.md index cdbe8299ea..59f633e26a 100644 --- a/polkadot/roadmap/implementers-guide/src/node/approval/approval-distribution.md +++ b/polkadot/roadmap/implementers-guide/src/node/approval/approval-distribution.md @@ -25,6 +25,18 @@ However, awareness on its own of a (block, candidate) pair would imply that even ## Protocol +Input: + - `ApprovalDistributionMessage::NewBlocks` + - `ApprovalDistributionMessage::DistributeAssignment` + - `ApprovalDistributionMessage::DistributeApproval` + - `ApprovalDistributionMessage::NetworkBridgeUpdateV1` + - `OverseerSignal::BlockFinalized` + +Output: + - `ApprovalVotingMessage::CheckAndImportAssignment` + - `ApprovalVotingMessage::CheckAndImportApproval` + - `NetworkBridgeMessage::SendValidationMessage::ApprovalDistribution` + ## Functionality ```rust @@ -34,10 +46,9 @@ type BlockScopedCandidate = (Hash, CandidateHash); /// /// It tracks metadata about our view of the unfinalized chain, which assignments and approvals we have seen, and our peers' views. struct State { - // These three fields are used in conjunction to construct a view over the unfinalized chain. + // These two fields are used in conjunction to construct a view over the unfinalized chain. blocks_by_number: BTreeMap>, blocks: HashMap, - finalized_number: BlockNumber, // Peer view data is partially stored here, and partially inline within the `BlockEntry`s peer_views: HashMap, @@ -103,10 +114,6 @@ We augment that by defining `constrain(x)` to output the x bounded by the first From there, we can loop backwards from `constrain(view.finalized_number)` until `constrain(last_view.finalized_number)` is reached, removing the `PeerId` from all `BlockEntry`s referenced at that height. We can break the loop early if we ever exit the bound supplied by the first block in `state.blocks_by_number`. -#### `NetworkBridgeEvent::OurViewChange` - -Prune all lists from `blocks_by_number` with number less than or equal to `view.finalized_number`. Prune all the `BlockEntry`s referenced by those lists. - #### `NetworkBridgeEvent::PeerMessage` If the message is of type `ApprovalDistributionV1Message::Assignment(assignment_cert, claimed_index)`, then call `import_and_circulate_assignment(MessageSource::Peer(sender), assignment_cert, claimed_index)` @@ -125,11 +132,16 @@ For all peers: #### `ApprovalDistributionMessage::DistributeAsignment` -Load the corresponding `BlockEntry`. Distribute to all peers in `known_by`. Add to the corresponding `CandidateEntry`. +Call `import_and_circulate_assignment` with `MessageSource::Local`. #### `ApprovalDistributionMessage::DistributeApproval` -Load the corresponding `BlockEntry`. Distribute to all peers in `known_by`. Add to the corresponding `CandidateEntry`. +Call `import_and_circulate_approval` with `MessageSource::Local`. + +#### `OverseerSignal::BlockFinalized` + +Prune all lists from `blocks_by_number` with number less than or equal to `finalized_number`. Prune all the `BlockEntry`s referenced by those lists. + ### Utility @@ -140,19 +152,29 @@ enum MessageSource { } ``` -#### `import_and_circulate_assignment(source: MessageSource, assignment: IndirectAssignmentCert, claimed_candidate_index: u32)` +#### `import_and_circulate_assignment(source: MessageSource, assignment: IndirectAssignmentCert, claimed_candidate_index: CandidateIndex)` Imports an assignment cert referenced by block hash and candidate index. As a postcondition, if the cert is valid, it will have distributed the cert to all peers who have the block in their view, with the exclusion of the peer referenced by the `MessageSource`. +We maintain a few invariants: + * we only send an assignment to a peer after we add its fingerpring to our knownledge + * we add a fingerprint of an assignment to our knownledge only if it's valid and hasn't been added before + +The algorithm is the following: + * Load the BlockEntry using `assignment.block_hash`. If it does not exist, report the source if it is `MessageSource::Peer` and return. * Compute a fingerprint for the `assignment` using `claimed_candidate_index`. * If the source is `MessageSource::Peer(sender)`: * check if `peer` appears under `known_by` and whether the fingerprint is in the `known_messages` of the peer. If the peer does not know the block, report for providing data out-of-view and proceed. If the peer does know the block and the knowledge contains the fingerprint, report for providing replicate data and return. - * If the message fingerprint appears under the `BlockEntry`'s `Knowledge`, give the peer a small positive reputation boost and return. Note that we must do this after checking for out-of-view to avoid being spammed. If we did this check earlier, a peer could provide data out-of-view repeatedly and be rewarded for it. + * If the message fingerprint appears under the `BlockEntry`'s `Knowledge`, give the peer a small positive reputation boost, + add the fingerpring to the peer's knownledge only if it knows about the block and return. + Note that we must do this after checking for out-of-view and if the peers knows about the block to avoid being spammed. + If we did this check earlier, a peer could provide data out-of-view repeatedly and be rewarded for it. * Dispatch `ApprovalVotingMessage::CheckAndImportAssignment(assignment)` and wait for the response. - * If the result is `AssignmentCheckResult::Accepted` or `AssignmentCheckResult::AcceptedDuplicate` + * If the result is `AssignmentCheckResult::Accepted` * If the vote was accepted but not duplicate, give the peer a positive reputation boost * add the fingerprint to both our and the peer's knowledge in the `BlockEntry`. Note that we only doing this after making sure we have the right fingerprint. + * If the result is `AssignmentCheckResult::AcceptedDuplicate`, add the fingerprint to the peer's knowledge if it knows about the block and return. * If the result is `AssignmentCheckResult::TooFarInFuture`, mildly punish the peer and return. * If the result is `AssignmentCheckResult::Bad`, punish the peer and return. * If the source is `MessageSource::Local(CandidateIndex)` @@ -164,14 +186,16 @@ Imports an assignment cert referenced by block hash and candidate index. As a po #### `import_and_circulate_approval(source: MessageSource, approval: IndirectSignedApprovalVote)` -Imports an approval signature referenced by block hash and candidate index. +Imports an approval signature referenced by block hash and candidate index: * Load the BlockEntry using `approval.block_hash` and the candidate entry using `approval.candidate_entry`. If either does not exist, report the source if it is `MessageSource::Peer` and return. * Compute a fingerprint for the approval. * Compute a fingerprint for the corresponding assignment. If the `BlockEntry`'s knowledge does not contain that fingerprint, then report the source if it is `MessageSource::Peer` and return. All references to a fingerprint after this refer to the approval's, not the assignment's. * If the source is `MessageSource::Peer(sender)`: * check if `peer` appears under `known_by` and whether the fingerprint is in the `known_messages` of the peer. If the peer does not know the block, report for providing data out-of-view and proceed. If the peer does know the block and the knowledge contains the fingerprint, report for providing replicate data and return. - * If the message fingerprint appears under the `BlockEntry`'s `Knowledge`, give the peer a small positive reputation boost and return. Note that we must do this after checking for out-of-view to avoid being spammed. If we did this check earlier, a peer could provide data out-of-view repeatedly and be rewarded for it. + * If the message fingerprint appears under the `BlockEntry`'s `Knowledge`, give the peer a small positive reputation boost, + add the fingerpring to the peer's knownledge only if it knows about the block and return. + Note that we must do this after checking for out-of-view to avoid being spammed. If we did this check earlier, a peer could provide data out-of-view repeatedly and be rewarded for it. * Dispatch `ApprovalVotingMessage::CheckAndImportApproval(approval)` and wait for the response. * If the result is `VoteCheckResult::Accepted(())`: * Give the peer a positive reputation boost and add the fingerprint to both our and the peer's knowledge. @@ -187,10 +211,12 @@ Imports an approval signature referenced by block hash and candidate index. #### `unify_with_peer(peer: PeerId, view)`: +1. Initialize a set `fresh_blocks = {}` + For each block in the view: - 1. Initialize a set `fresh_blocks = {}` - 2. Load the `BlockEntry` for the block. If the block is unknown, or the number is less than the view's finalized number, go to step 6. + 2. Load the `BlockEntry` for the block. If the block is unknown, or the number is less than or equal to the view's finalized number, go to step 6. 3. Inspect the `known_by` set of the `BlockEntry`. If the peer is already present, go to step 6. 4. Add the peer to `known_by` with a cloned version of `block_entry.knowledge`. and add the hash of the block to `fresh_blocks`. 5. Return to step 2 with the ancestor of the block. - 6. For each block in `fresh_blocks`, send all assignments and approvals for all candidates in those blocks to the peer. + +6. For each block in `fresh_blocks`, send all assignments and approvals for all candidates in those blocks to the peer. diff --git a/polkadot/roadmap/implementers-guide/src/node/approval/approval-voting.md b/polkadot/roadmap/implementers-guide/src/node/approval/approval-voting.md index 1c673f0324..b598f90792 100644 --- a/polkadot/roadmap/implementers-guide/src/node/approval/approval-voting.md +++ b/polkadot/roadmap/implementers-guide/src/node/approval/approval-voting.md @@ -113,7 +113,7 @@ In-memory state: struct ApprovalVoteRequest { validator_index: ValidatorIndex, block_hash: Hash, - candidate_index: u32, + candidate_index: CandidateIndex, } struct State { @@ -121,7 +121,7 @@ struct State { session_info: Vec, keystore: KeyStorePtr, wakeups: BTreeMap>, // Tick -> [(Relay Block, Candidate Hash)] - + // These are connected to each other. approval_vote_tx: mpsc::Sender, approval_vote_rx: mpsc::Receiver, @@ -148,7 +148,7 @@ On receiving an `OverseerSignal::BlockFinalized(h)`, we fetch the block number ` #### `OverseerSignal::ActiveLeavesUpdate` On receiving an `OverseerSignal::ActiveLeavesUpdate(update)`: - * We determine the set of new blocks that were not in our previous view. This is done by querying the ancestry of all new items in the view and contrasting against the stored `BlockNumber`s. Typically, there will be only one new block. We fetch the headers and information on these blocks from the ChainApi subsystem. + * We determine the set of new blocks that were not in our previous view. This is done by querying the ancestry of all new items in the view and contrasting against the stored `BlockNumber`s. Typically, there will be only one new block. We fetch the headers and information on these blocks from the ChainApi subsystem. * We update the `StoredBlockRange` and the `BlockNumber` maps. * We use the RuntimeApiSubsystem to determine information about these blocks. It is generally safe to assume that runtime state is available for recent, unfinalized blocks. In the case that it isn't, it means that we are catching up to the head of the chain and needn't worry about assignments to those blocks anyway, as the security assumption of the protocol tolerates nodes being temporarily offline or out-of-date. * We fetch the set of candidates included by each block by dispatching a `RuntimeApiRequest::CandidateEvents` and checking the `CandidateIncluded` events. @@ -156,7 +156,7 @@ On receiving an `OverseerSignal::ActiveLeavesUpdate(update)`: * If the `session index - APPROVAL_SESSIONS > state.earliest_session`, then bump `state.earliest_sessions` to that amount and prune earlier sessions. * If the session isn't in our `state.session_info`, load the session info for it and for all sessions since the earliest-session, including the earliest-session, if that is missing. And it can be, just after pruning, if we've done a big jump forward, as is the case when we've just finished chain synchronization. * If any of the runtime API calls fail, we just warn and skip the block. - * We use the RuntimeApiSubsystem to determine the set of candidates included in these blocks and use BABE logic to determine the slot number and VRF of the blocks. + * We use the RuntimeApiSubsystem to determine the set of candidates included in these blocks and use BABE logic to determine the slot number and VRF of the blocks. * We also note how late we appear to have received the block. We create a `BlockEntry` for each block and a `CandidateEntry` for each candidate obtained from `CandidateIncluded` events after making a `RuntimeApiRequest::CandidateEvents` request. * Ensure that the `CandidateEntry` contains a `block_assignments` entry for the block, with the correct backing group set. * If a validator in this session, compute and assign `our_assignment` for the `block_assignments` @@ -222,7 +222,7 @@ On receiving an `ApprovedAncestor(Hash, BlockNumber, response_channel)`: ```rust enum RequiredTranches { - // All validators appear to be required, based on tranches already taken and remaining no-shows. + // All validators appear to be required, based on tranches already taken and remaining no-shows. All, // More tranches required - We're awaiting more assignments. The given `DelayTranche` indicates the // upper bound of tranches that should broadcast based on the last no-show. diff --git a/polkadot/roadmap/implementers-guide/src/types/approval.md b/polkadot/roadmap/implementers-guide/src/types/approval.md index 1db305e760..5cebd35b28 100644 --- a/polkadot/roadmap/implementers-guide/src/types/approval.md +++ b/polkadot/roadmap/implementers-guide/src/types/approval.md @@ -30,7 +30,7 @@ struct AssignmentCert { > TODO: RelayEquivocation cert. Probably can only be broadcast to chains that have handled an equivocation report. -## IndirectAssignmentCert +## IndirectAssignmentCert An assignment cert which refers to the candidate under which the assignment is relevant by block hash. @@ -74,7 +74,7 @@ struct IndirectSignedApprovalVote { // A block hash where the candidate appears. block_hash: Hash, // The index of the candidate in the list of candidates fully included as-of the block. - candidate_index: u32, + candidate_index: CandidateIndex, validator: ValidatorIndex, signature: ValidatorSignature, } @@ -98,4 +98,4 @@ struct CheckedAssignmentCert { ```rust type DelayTranche = u32; -``` \ No newline at end of file +``` diff --git a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md index 341b62326b..ebdfa37da2 100644 --- a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md +++ b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md @@ -95,6 +95,8 @@ struct BlockApprovalMeta { hash: Hash, /// The number of the block. number: BlockNumber, + /// The hash of the parent block. + parent_hash: Hash, /// The candidates included by the block. Note that these are not the same as the candidates that appear within the /// block body. candidates: Vec, @@ -108,9 +110,7 @@ enum ApprovalDistributionMessage { NewBlocks(Vec), /// Distribute an assignment cert from the local validator. The cert is assumed /// to be valid, relevant, and for the given relay-parent and validator index. - /// - /// The `u32` param is the candidate index in the fully-included list. - DistributeAssignment(IndirectAssignmentCert, u32), + DistributeAssignment(IndirectAssignmentCert, CandidateIndex), /// Distribute an approval vote for the local validator. The approval vote is assumed to be /// valid, relevant, and the corresponding approval already issued. If not, the subsystem is free to drop /// the message.