mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-29 12:37:57 +00:00
impl approval distribution (#2160)
* initial impl approval distribution * initial tests and fixes * batching seems difficult: different peers have different needs * bridge: fix test after merge * some guide updates * only send assignments to peers who know about the block * fix a test, add approvals test * simplify * do not send assignment to peers for finalized blocks * guide: protocol input and output * one more test * more comments, logs, initial metrics * fix a typo * one more thing: early return when reimporting a thing locally
This commit is contained in:
@@ -0,0 +1,889 @@
|
||||
// Copyright 2020 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! [`ApprovalDistributionSubsystem`] implementation.
|
||||
//!
|
||||
//! https://w3f.github.io/parachain-implementers-guide/node/approval/approval-distribution.html
|
||||
|
||||
#![warn(missing_docs)]
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
|
||||
use std::collections::{BTreeMap, HashMap, HashSet, hash_map};
|
||||
use futures::{channel::oneshot, FutureExt as _};
|
||||
use polkadot_primitives::v1::{
|
||||
Hash, BlockNumber, ValidatorIndex, ValidatorSignature, CandidateIndex,
|
||||
};
|
||||
use polkadot_node_primitives::{
|
||||
approval::{AssignmentCert, BlockApprovalMeta, IndirectSignedApprovalVote, IndirectAssignmentCert},
|
||||
};
|
||||
use polkadot_node_subsystem::{
|
||||
messages::{
|
||||
AllMessages, ApprovalDistributionMessage, ApprovalVotingMessage, NetworkBridgeMessage,
|
||||
AssignmentCheckResult, ApprovalCheckResult,
|
||||
},
|
||||
ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext,
|
||||
};
|
||||
use polkadot_node_subsystem_util::metrics::{self, prometheus};
|
||||
use polkadot_node_network_protocol::{
|
||||
PeerId, View, NetworkBridgeEvent, v1 as protocol_v1, ReputationChange as Rep,
|
||||
};
|
||||
|
||||
const LOG_TARGET: &str = "approval_distribution";
|
||||
|
||||
const COST_UNEXPECTED_MESSAGE: Rep = Rep::new(-100, "Peer sent an out-of-view assignment or approval");
|
||||
const COST_DUPLICATE_MESSAGE: Rep = Rep::new(-100, "Peer sent identical messages");
|
||||
const COST_ASSIGNMENT_TOO_FAR_IN_THE_FUTURE: Rep = Rep::new(-30, "The vote was valid but too far in the future");
|
||||
const COST_INVALID_MESSAGE: Rep = Rep::new(-1000, "The vote was bad");
|
||||
|
||||
const BENEFIT_VALID_MESSAGE: Rep = Rep::new(10, "Peer sent a valid message");
|
||||
const BENEFIT_VALID_MESSAGE_FIRST: Rep = Rep::new(15, "Valid message with new information");
|
||||
|
||||
|
||||
/// The Approval Distribution subsystem.
|
||||
pub struct ApprovalDistribution {
|
||||
metrics: Metrics,
|
||||
}
|
||||
|
||||
/// The [`State`] struct is responsible for tracking the overall state of the subsystem.
|
||||
///
|
||||
/// It tracks metadata about our view of the unfinalized chain,
|
||||
/// which assignments and approvals we have seen, and our peers' views.
|
||||
#[derive(Default)]
|
||||
struct State {
|
||||
/// These two fields are used in conjunction to construct a view over the unfinalized chain.
|
||||
blocks_by_number: BTreeMap<BlockNumber, Vec<Hash>>,
|
||||
blocks: HashMap<Hash, BlockEntry>,
|
||||
|
||||
/// Peer view data is partially stored here, and partially inline within the [`BlockEntry`]s
|
||||
peer_views: HashMap<PeerId, View>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
|
||||
enum MessageFingerprint {
|
||||
Assignment(Hash, CandidateIndex, ValidatorIndex),
|
||||
Approval(Hash, CandidateIndex, ValidatorIndex),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
struct Knowledge {
|
||||
known_messages: HashSet<MessageFingerprint>,
|
||||
}
|
||||
|
||||
/// Information about blocks in our current view as well as whether peers know of them.
|
||||
struct BlockEntry {
|
||||
/// Peers who we know are aware of this block and thus, the candidates within it.
|
||||
/// This maps to their knowledge of messages.
|
||||
known_by: HashMap<PeerId, Knowledge>,
|
||||
/// The number of the block.
|
||||
number: BlockNumber,
|
||||
/// The parent hash of the block.
|
||||
parent_hash: Hash,
|
||||
/// Our knowledge of messages.
|
||||
knowledge: Knowledge,
|
||||
/// A votes entry for each candidate indexed by [`CandidateIndex`].
|
||||
candidates: Vec<CandidateEntry>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum ApprovalState {
|
||||
Assigned(AssignmentCert),
|
||||
Approved(AssignmentCert, ValidatorSignature),
|
||||
}
|
||||
|
||||
/// Information about candidates in the context of a particular block they are included in.
|
||||
/// In other words, multiple `CandidateEntry`s may exist for the same candidate,
|
||||
/// if it is included by multiple blocks - this is likely the case when there are forks.
|
||||
#[derive(Debug, Default)]
|
||||
struct CandidateEntry {
|
||||
approvals: HashMap<ValidatorIndex, ApprovalState>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
enum MessageSource {
|
||||
Peer(PeerId),
|
||||
Local,
|
||||
}
|
||||
|
||||
impl MessageSource {
|
||||
fn peer_id(&self) -> Option<PeerId> {
|
||||
match self {
|
||||
Self::Peer(id) => Some(id.clone()),
|
||||
Self::Local => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl State {
|
||||
async fn handle_network_msg(
|
||||
&mut self,
|
||||
ctx: &mut impl SubsystemContext<Message = ApprovalDistributionMessage>,
|
||||
metrics: &Metrics,
|
||||
event: NetworkBridgeEvent<protocol_v1::ApprovalDistributionMessage>,
|
||||
) {
|
||||
match event {
|
||||
NetworkBridgeEvent::PeerConnected(peer_id, _role) => {
|
||||
// insert a blank view if none already present
|
||||
self.peer_views.entry(peer_id).or_default();
|
||||
}
|
||||
NetworkBridgeEvent::PeerDisconnected(peer_id) => {
|
||||
self.peer_views.remove(&peer_id);
|
||||
self.blocks.iter_mut().for_each(|(_hash, entry)| {
|
||||
entry.known_by.remove(&peer_id);
|
||||
})
|
||||
}
|
||||
NetworkBridgeEvent::PeerViewChange(peer_id, view) => {
|
||||
self.handle_peer_view_change(ctx, peer_id, view).await;
|
||||
}
|
||||
NetworkBridgeEvent::OurViewChange(_view) => {
|
||||
// handled by `BlockFinalized` notification
|
||||
}
|
||||
NetworkBridgeEvent::PeerMessage(peer_id, msg) => {
|
||||
self.process_incoming_peer_message(ctx, metrics, peer_id, msg).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_new_blocks(
|
||||
&mut self,
|
||||
ctx: &mut impl SubsystemContext<Message = ApprovalDistributionMessage>,
|
||||
metas: Vec<BlockApprovalMeta>,
|
||||
) {
|
||||
let mut new_hashes = HashSet::new();
|
||||
for meta in metas.into_iter() {
|
||||
match self.blocks.entry(meta.hash.clone()) {
|
||||
hash_map::Entry::Vacant(entry) => {
|
||||
let candidates_count = meta.candidates.len();
|
||||
let mut candidates = Vec::with_capacity(candidates_count);
|
||||
candidates.resize_with(candidates_count, Default::default);
|
||||
|
||||
entry.insert(BlockEntry {
|
||||
known_by: HashMap::new(),
|
||||
number: meta.number,
|
||||
parent_hash: meta.parent_hash.clone(),
|
||||
knowledge: Knowledge::default(),
|
||||
candidates,
|
||||
});
|
||||
new_hashes.insert(meta.hash.clone());
|
||||
}
|
||||
_ => continue,
|
||||
}
|
||||
self.blocks_by_number.entry(meta.number).or_default().push(meta.hash);
|
||||
}
|
||||
for (peer_id, view) in self.peer_views.iter() {
|
||||
let intersection = view.heads.iter().filter(|h| new_hashes.contains(h));
|
||||
let view_intersection = View {
|
||||
heads: intersection.cloned().collect(),
|
||||
finalized_number: view.finalized_number,
|
||||
};
|
||||
Self::unify_with_peer(
|
||||
&mut self.blocks,
|
||||
ctx,
|
||||
peer_id.clone(),
|
||||
view_intersection,
|
||||
).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn process_incoming_peer_message(
|
||||
&mut self,
|
||||
ctx: &mut impl SubsystemContext<Message = ApprovalDistributionMessage>,
|
||||
metrics: &Metrics,
|
||||
peer_id: PeerId,
|
||||
msg: protocol_v1::ApprovalDistributionMessage,
|
||||
) {
|
||||
match msg {
|
||||
protocol_v1::ApprovalDistributionMessage::Assignments(assignments) => {
|
||||
tracing::trace!(
|
||||
target: LOG_TARGET,
|
||||
peer_id = %peer_id,
|
||||
num = assignments.len(),
|
||||
"Processing assignments from a peer",
|
||||
);
|
||||
for (assignment, claimed_index) in assignments.into_iter() {
|
||||
self.import_and_circulate_assignment(
|
||||
ctx,
|
||||
metrics,
|
||||
MessageSource::Peer(peer_id.clone()),
|
||||
assignment,
|
||||
claimed_index,
|
||||
).await;
|
||||
}
|
||||
}
|
||||
protocol_v1::ApprovalDistributionMessage::Approvals(approvals) => {
|
||||
tracing::trace!(
|
||||
target: LOG_TARGET,
|
||||
peer_id = %peer_id,
|
||||
num = approvals.len(),
|
||||
"Processing approvals from a peer",
|
||||
);
|
||||
for approval_vote in approvals.into_iter() {
|
||||
self.import_and_circulate_approval(
|
||||
ctx,
|
||||
metrics,
|
||||
MessageSource::Peer(peer_id.clone()),
|
||||
approval_vote,
|
||||
).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_peer_view_change(
|
||||
&mut self,
|
||||
ctx: &mut impl SubsystemContext<Message = ApprovalDistributionMessage>,
|
||||
peer_id: PeerId,
|
||||
view: View,
|
||||
) {
|
||||
Self::unify_with_peer(&mut self.blocks, ctx, peer_id.clone(), view.clone()).await;
|
||||
let finalized_number = view.finalized_number;
|
||||
let old_view = self.peer_views.insert(peer_id.clone(), view);
|
||||
let old_finalized_number = old_view.map(|v| v.finalized_number).unwrap_or(0);
|
||||
|
||||
// we want to prune every block known_by peer up to (including) view.finalized_number
|
||||
let blocks = &mut self.blocks;
|
||||
// the `BTreeMap::range` is constrained by stored keys
|
||||
// so the loop won't take ages if the new finalized_number skyrockets
|
||||
// but we need to make sure the range is not empty, otherwise it will panic
|
||||
// it shouldn't be, we make sure of this in the network bridge
|
||||
let range = old_finalized_number..=finalized_number;
|
||||
if !range.is_empty() {
|
||||
self.blocks_by_number
|
||||
.range(range)
|
||||
.map(|(_number, hashes)| hashes)
|
||||
.flatten()
|
||||
.for_each(|hash| {
|
||||
if let Some(entry) = blocks.get_mut(hash) {
|
||||
entry.known_by.remove(&peer_id);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_block_finalized(
|
||||
&mut self,
|
||||
finalized_number: BlockNumber,
|
||||
) {
|
||||
// we want to prune every block up to (including) finalized_number
|
||||
// why +1 here?
|
||||
// split_off returns everything after the given key, including the key
|
||||
let split_point = finalized_number.saturating_add(1);
|
||||
let mut old_blocks = self.blocks_by_number.split_off(&split_point);
|
||||
// after split_off old_blocks actually contains new blocks, we need to swap
|
||||
std::mem::swap(&mut self.blocks_by_number, &mut old_blocks);
|
||||
|
||||
// now that we pruned `self.blocks_by_number`, let's clean up `self.blocks` too
|
||||
old_blocks.values()
|
||||
.flatten()
|
||||
.for_each(|h| {
|
||||
self.blocks.remove(h);
|
||||
});
|
||||
}
|
||||
|
||||
async fn import_and_circulate_assignment(
|
||||
&mut self,
|
||||
ctx: &mut impl SubsystemContext<Message = ApprovalDistributionMessage>,
|
||||
metrics: &Metrics,
|
||||
source: MessageSource,
|
||||
assignment: IndirectAssignmentCert,
|
||||
claimed_candidate_index: CandidateIndex,
|
||||
) {
|
||||
let block_hash = assignment.block_hash.clone();
|
||||
let validator_index = assignment.validator;
|
||||
|
||||
let entry = match self.blocks.get_mut(&block_hash) {
|
||||
Some(entry) => entry,
|
||||
None => {
|
||||
if let Some(peer_id) = source.peer_id() {
|
||||
modify_reputation(ctx, peer_id, COST_UNEXPECTED_MESSAGE).await;
|
||||
}
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// compute a fingerprint of the assignment
|
||||
let fingerprint = MessageFingerprint::Assignment(
|
||||
block_hash,
|
||||
claimed_candidate_index,
|
||||
validator_index,
|
||||
);
|
||||
|
||||
if let Some(peer_id) = source.peer_id() {
|
||||
// check if our knowledge of the peer already contains this assignment
|
||||
match entry.known_by.entry(peer_id.clone()) {
|
||||
hash_map::Entry::Occupied(knowledge) => {
|
||||
if knowledge.get().known_messages.contains(&fingerprint) {
|
||||
modify_reputation(ctx, peer_id, COST_DUPLICATE_MESSAGE).await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
hash_map::Entry::Vacant(_) => {
|
||||
modify_reputation(ctx, peer_id.clone(), COST_UNEXPECTED_MESSAGE).await;
|
||||
}
|
||||
}
|
||||
|
||||
// if the assignment is known to be valid, reward the peer
|
||||
if entry.knowledge.known_messages.contains(&fingerprint) {
|
||||
modify_reputation(ctx, peer_id.clone(), BENEFIT_VALID_MESSAGE).await;
|
||||
if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
|
||||
peer_knowledge.known_messages.insert(fingerprint.clone());
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
ctx.send_message(AllMessages::ApprovalVoting(ApprovalVotingMessage::CheckAndImportAssignment(
|
||||
assignment.clone(),
|
||||
tx,
|
||||
))).await;
|
||||
|
||||
let result = match rx.await {
|
||||
Ok(result) => result,
|
||||
Err(_) => {
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
"The approval voting subsystem is down",
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
match result {
|
||||
AssignmentCheckResult::Accepted => {
|
||||
modify_reputation(ctx, peer_id.clone(), BENEFIT_VALID_MESSAGE_FIRST).await;
|
||||
entry.knowledge.known_messages.insert(fingerprint.clone());
|
||||
if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
|
||||
peer_knowledge.known_messages.insert(fingerprint.clone());
|
||||
}
|
||||
}
|
||||
AssignmentCheckResult::AcceptedDuplicate => {
|
||||
// "duplicate" assignments aren't necessarily equal.
|
||||
// There is more than one way each validator can be assigned to each core.
|
||||
// cf. https://github.com/paritytech/polkadot/pull/2160#discussion_r557628699
|
||||
if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
|
||||
peer_knowledge.known_messages.insert(fingerprint);
|
||||
}
|
||||
return;
|
||||
}
|
||||
AssignmentCheckResult::TooFarInFuture => {
|
||||
modify_reputation(ctx, peer_id, COST_ASSIGNMENT_TOO_FAR_IN_THE_FUTURE).await;
|
||||
return;
|
||||
}
|
||||
AssignmentCheckResult::Bad => {
|
||||
modify_reputation(ctx, peer_id, COST_INVALID_MESSAGE).await;
|
||||
tracing::info!(
|
||||
target: LOG_TARGET,
|
||||
peer = ?peer_id,
|
||||
"Got a bad assignment from peer",
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if !entry.knowledge.known_messages.insert(fingerprint.clone()) {
|
||||
// if we already imported an assignment, there is no need to distribute it again
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Invariant: none of the peers except for the `source` know about the assignment.
|
||||
metrics.on_assignment_imported();
|
||||
|
||||
match entry.candidates.get_mut(claimed_candidate_index as usize) {
|
||||
Some(candidate_entry) => {
|
||||
// set the approval state for validator_index to Assigned
|
||||
// unless the approval state is set already
|
||||
candidate_entry.approvals
|
||||
.entry(validator_index)
|
||||
.or_insert_with(|| ApprovalState::Assigned(assignment.cert.clone()));
|
||||
}
|
||||
None => {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
hash = ?block_hash,
|
||||
?claimed_candidate_index,
|
||||
"Expected a candidate entry on import_and_circulate_assignment",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Dispatch a ApprovalDistributionV1Message::Assignment(assignment, candidate_index)
|
||||
// to all peers in the BlockEntry's known_by set who know about the block,
|
||||
// excluding the peer in the source, if source has kind MessageSource::Peer.
|
||||
let maybe_peer_id = source.peer_id();
|
||||
let peers = entry
|
||||
.known_by
|
||||
.keys()
|
||||
.cloned()
|
||||
.filter(|key| maybe_peer_id.as_ref().map_or(true, |id| id != key))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let assignments = vec![(assignment, claimed_candidate_index)];
|
||||
|
||||
// Add the fingerprint of the assignment to the knowledge of each peer.
|
||||
for peer in peers.iter() {
|
||||
// we already filtered peers above, so this should always be Some
|
||||
if let Some(entry) = entry.known_by.get_mut(peer) {
|
||||
entry.known_messages.insert(fingerprint.clone());
|
||||
}
|
||||
}
|
||||
|
||||
if !peers.is_empty() {
|
||||
ctx.send_message(NetworkBridgeMessage::SendValidationMessage(
|
||||
peers,
|
||||
protocol_v1::ValidationProtocol::ApprovalDistribution(
|
||||
protocol_v1::ApprovalDistributionMessage::Assignments(assignments)
|
||||
),
|
||||
).into()).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn import_and_circulate_approval(
|
||||
&mut self,
|
||||
ctx: &mut impl SubsystemContext<Message = ApprovalDistributionMessage>,
|
||||
metrics: &Metrics,
|
||||
source: MessageSource,
|
||||
vote: IndirectSignedApprovalVote,
|
||||
) {
|
||||
let block_hash = vote.block_hash.clone();
|
||||
let validator_index = vote.validator;
|
||||
let candidate_index = vote.candidate_index;
|
||||
|
||||
let entry = match self.blocks.get_mut(&block_hash) {
|
||||
Some(entry) if entry.candidates.get(candidate_index as usize).is_some() => entry,
|
||||
_ => {
|
||||
if let Some(peer_id) = source.peer_id() {
|
||||
modify_reputation(ctx, peer_id, COST_UNEXPECTED_MESSAGE).await;
|
||||
}
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// compute a fingerprint of the approval
|
||||
let fingerprint = MessageFingerprint::Approval(
|
||||
block_hash.clone(),
|
||||
candidate_index,
|
||||
validator_index,
|
||||
);
|
||||
|
||||
if let Some(peer_id) = source.peer_id() {
|
||||
let assignment_fingerprint = MessageFingerprint::Assignment(
|
||||
block_hash.clone(),
|
||||
candidate_index,
|
||||
validator_index,
|
||||
);
|
||||
|
||||
if !entry.knowledge.known_messages.contains(&assignment_fingerprint) {
|
||||
modify_reputation(ctx, peer_id, COST_UNEXPECTED_MESSAGE).await;
|
||||
return;
|
||||
}
|
||||
|
||||
// check if our knowledge of the peer already contains this approval
|
||||
match entry.known_by.entry(peer_id.clone()) {
|
||||
hash_map::Entry::Occupied(knowledge) => {
|
||||
if knowledge.get().known_messages.contains(&fingerprint) {
|
||||
modify_reputation(ctx, peer_id, COST_DUPLICATE_MESSAGE).await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
hash_map::Entry::Vacant(_) => {
|
||||
modify_reputation(ctx, peer_id.clone(), COST_UNEXPECTED_MESSAGE).await;
|
||||
}
|
||||
}
|
||||
|
||||
// if the approval is known to be valid, reward the peer
|
||||
if entry.knowledge.known_messages.contains(&fingerprint) {
|
||||
modify_reputation(ctx, peer_id.clone(), BENEFIT_VALID_MESSAGE).await;
|
||||
if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
|
||||
peer_knowledge.known_messages.insert(fingerprint.clone());
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
ctx.send_message(AllMessages::ApprovalVoting(ApprovalVotingMessage::CheckAndImportApproval(
|
||||
vote.clone(),
|
||||
tx,
|
||||
))).await;
|
||||
|
||||
let result = match rx.await {
|
||||
Ok(result) => result,
|
||||
Err(_) => {
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
"The approval voting subsystem is down",
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
match result {
|
||||
ApprovalCheckResult::Accepted => {
|
||||
modify_reputation(ctx, peer_id.clone(), BENEFIT_VALID_MESSAGE_FIRST).await;
|
||||
|
||||
entry.knowledge.known_messages.insert(fingerprint.clone());
|
||||
if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
|
||||
peer_knowledge.known_messages.insert(fingerprint.clone());
|
||||
}
|
||||
}
|
||||
ApprovalCheckResult::Bad => {
|
||||
modify_reputation(ctx, peer_id, COST_INVALID_MESSAGE).await;
|
||||
tracing::info!(
|
||||
target: LOG_TARGET,
|
||||
peer = ?peer_id,
|
||||
"Got a bad approval from peer",
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if !entry.knowledge.known_messages.insert(fingerprint.clone()) {
|
||||
// if we already imported an approval, there is no need to distribute it again
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Invariant: none of the peers except for the `source` know about the approval.
|
||||
metrics.on_approval_imported();
|
||||
|
||||
match entry.candidates.get_mut(candidate_index as usize) {
|
||||
Some(candidate_entry) => {
|
||||
// set the approval state for validator_index to Approved
|
||||
// it should be in assigned state already
|
||||
match candidate_entry.approvals.remove(&validator_index) {
|
||||
Some(ApprovalState::Assigned(cert)) => {
|
||||
candidate_entry.approvals.insert(
|
||||
validator_index,
|
||||
ApprovalState::Approved(cert, vote.signature.clone()),
|
||||
);
|
||||
}
|
||||
_ => {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
hash = ?block_hash,
|
||||
?candidate_index,
|
||||
"Expected a candidate entry with `ApprovalState::Assigned`",
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
hash = ?block_hash,
|
||||
?candidate_index,
|
||||
"Expected a candidate entry on import_and_circulate_approval",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Dispatch a ApprovalDistributionV1Message::Approval(vote)
|
||||
// to all peers in the BlockEntry's known_by set who know about the block,
|
||||
// excluding the peer in the source, if source has kind MessageSource::Peer.
|
||||
let maybe_peer_id = source.peer_id();
|
||||
let peers = entry
|
||||
.known_by
|
||||
.keys()
|
||||
.cloned()
|
||||
.filter(|key| maybe_peer_id.as_ref().map_or(true, |id| id != key))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Add the fingerprint of the assignment to the knowledge of each peer.
|
||||
for peer in peers.iter() {
|
||||
// we already filtered peers above, so this should always be Some
|
||||
if let Some(entry) = entry.known_by.get_mut(peer) {
|
||||
entry.known_messages.insert(fingerprint.clone());
|
||||
}
|
||||
}
|
||||
|
||||
let approvals = vec![vote];
|
||||
if !peers.is_empty() {
|
||||
ctx.send_message(NetworkBridgeMessage::SendValidationMessage(
|
||||
peers,
|
||||
protocol_v1::ValidationProtocol::ApprovalDistribution(
|
||||
protocol_v1::ApprovalDistributionMessage::Approvals(approvals)
|
||||
),
|
||||
).into()).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn unify_with_peer(
|
||||
entries: &mut HashMap<Hash, BlockEntry>,
|
||||
ctx: &mut impl SubsystemContext<Message = ApprovalDistributionMessage>,
|
||||
peer_id: PeerId,
|
||||
view: View,
|
||||
) {
|
||||
let mut to_send = HashSet::new();
|
||||
|
||||
let view_finalized_number = view.finalized_number;
|
||||
for head in view.heads.into_iter() {
|
||||
let mut block = head;
|
||||
let interesting_blocks = std::iter::from_fn(|| {
|
||||
// step 2.
|
||||
let entry = match entries.get_mut(&block) {
|
||||
Some(entry) if entry.number > view_finalized_number => entry,
|
||||
_ => return None,
|
||||
};
|
||||
let interesting_block = match entry.known_by.entry(peer_id.clone()) {
|
||||
// step 3.
|
||||
hash_map::Entry::Occupied(_) => return None,
|
||||
// step 4.
|
||||
hash_map::Entry::Vacant(vacant) => {
|
||||
vacant.insert(entry.knowledge.clone());
|
||||
block
|
||||
}
|
||||
};
|
||||
// step 5.
|
||||
block = entry.parent_hash.clone();
|
||||
Some(interesting_block)
|
||||
});
|
||||
to_send.extend(interesting_blocks);
|
||||
}
|
||||
// step 6.
|
||||
// send all assignments and approvals for all candidates in those blocks to the peer
|
||||
Self::send_gossip_messages_to_peer(
|
||||
entries,
|
||||
ctx,
|
||||
peer_id,
|
||||
to_send
|
||||
).await;
|
||||
}
|
||||
|
||||
async fn send_gossip_messages_to_peer(
|
||||
entries: &HashMap<Hash, BlockEntry>,
|
||||
ctx: &mut impl SubsystemContext<Message = ApprovalDistributionMessage>,
|
||||
peer_id: PeerId,
|
||||
blocks: HashSet<Hash>,
|
||||
) {
|
||||
let mut assignments = Vec::new();
|
||||
let mut approvals = Vec::new();
|
||||
|
||||
for block in blocks.into_iter() {
|
||||
let entry = match entries.get(&block) {
|
||||
Some(entry) => entry,
|
||||
None => continue, // should be unreachable
|
||||
};
|
||||
for (candidate_index, candidate_entry) in entry.candidates.iter().enumerate() {
|
||||
let candidate_index = candidate_index as u32;
|
||||
for (validator_index, approval_state) in candidate_entry.approvals.iter() {
|
||||
match approval_state {
|
||||
ApprovalState::Assigned(cert) => {
|
||||
assignments.push((IndirectAssignmentCert {
|
||||
block_hash: block.clone(),
|
||||
validator: validator_index.clone(),
|
||||
cert: cert.clone(),
|
||||
}, candidate_index.clone()));
|
||||
}
|
||||
ApprovalState::Approved(_, signature) => {
|
||||
approvals.push(IndirectSignedApprovalVote {
|
||||
block_hash: block.clone(),
|
||||
validator: validator_index.clone(),
|
||||
candidate_index: candidate_index.clone(),
|
||||
signature: signature.clone(),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !assignments.is_empty() {
|
||||
ctx.send_message(NetworkBridgeMessage::SendValidationMessage(
|
||||
vec![peer_id.clone()],
|
||||
protocol_v1::ValidationProtocol::ApprovalDistribution(
|
||||
protocol_v1::ApprovalDistributionMessage::Assignments(assignments)
|
||||
),
|
||||
).into()).await;
|
||||
}
|
||||
|
||||
if !approvals.is_empty() {
|
||||
ctx.send_message(NetworkBridgeMessage::SendValidationMessage(
|
||||
vec![peer_id],
|
||||
protocol_v1::ValidationProtocol::ApprovalDistribution(
|
||||
protocol_v1::ApprovalDistributionMessage::Approvals(approvals)
|
||||
),
|
||||
).into()).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Modify the reputation of a peer based on its behavior.
|
||||
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
|
||||
async fn modify_reputation(
|
||||
ctx: &mut impl SubsystemContext<Message = ApprovalDistributionMessage>,
|
||||
peer_id: PeerId,
|
||||
rep: Rep,
|
||||
) {
|
||||
tracing::trace!(
|
||||
target: LOG_TARGET,
|
||||
reputation = ?rep,
|
||||
?peer_id,
|
||||
"Reputation change for peer",
|
||||
);
|
||||
|
||||
ctx.send_message(AllMessages::NetworkBridge(
|
||||
NetworkBridgeMessage::ReportPeer(peer_id, rep),
|
||||
)).await;
|
||||
}
|
||||
|
||||
impl ApprovalDistribution {
|
||||
/// Create a new instance of the [`ApprovalDistribution`] subsystem.
|
||||
pub fn new(metrics: Metrics) -> Self {
|
||||
Self { metrics }
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self, ctx), fields(subsystem = LOG_TARGET))]
|
||||
async fn run<Context>(self, ctx: Context)
|
||||
where
|
||||
Context: SubsystemContext<Message = ApprovalDistributionMessage>,
|
||||
{
|
||||
let mut state = State::default();
|
||||
self.run_inner(ctx, &mut state).await
|
||||
}
|
||||
|
||||
/// Used for testing.
|
||||
#[tracing::instrument(skip(self, ctx, state), fields(subsystem = LOG_TARGET))]
|
||||
async fn run_inner<Context>(self, mut ctx: Context, state: &mut State)
|
||||
where
|
||||
Context: SubsystemContext<Message = ApprovalDistributionMessage>,
|
||||
{
|
||||
loop {
|
||||
let message = match ctx.recv().await {
|
||||
Ok(message) => message,
|
||||
Err(e) => {
|
||||
tracing::debug!(target: LOG_TARGET, err = ?e, "Failed to receive a message from Overseer, exiting");
|
||||
return;
|
||||
},
|
||||
};
|
||||
match message {
|
||||
FromOverseer::Communication {
|
||||
msg: ApprovalDistributionMessage::NetworkBridgeUpdateV1(event),
|
||||
} => {
|
||||
tracing::debug!(target: LOG_TARGET, "Processing network message");
|
||||
state.handle_network_msg(&mut ctx, &self.metrics, event).await;
|
||||
}
|
||||
FromOverseer::Communication {
|
||||
msg: ApprovalDistributionMessage::NewBlocks(metas),
|
||||
} => {
|
||||
tracing::debug!(target: LOG_TARGET, "Processing NewBlocks");
|
||||
state.handle_new_blocks(&mut ctx, metas).await;
|
||||
}
|
||||
FromOverseer::Communication {
|
||||
msg: ApprovalDistributionMessage::DistributeAssignment(cert, candidate_index),
|
||||
} => {
|
||||
tracing::debug!(target: LOG_TARGET, "Processing DistributeAssignment");
|
||||
state.import_and_circulate_assignment(
|
||||
&mut ctx,
|
||||
&self.metrics,
|
||||
MessageSource::Local,
|
||||
cert,
|
||||
candidate_index,
|
||||
).await;
|
||||
}
|
||||
FromOverseer::Communication {
|
||||
msg: ApprovalDistributionMessage::DistributeApproval(vote),
|
||||
} => {
|
||||
tracing::debug!(target: LOG_TARGET, "Processing DistributeApproval");
|
||||
state.import_and_circulate_approval(
|
||||
&mut ctx,
|
||||
&self.metrics,
|
||||
MessageSource::Local,
|
||||
vote,
|
||||
).await;
|
||||
}
|
||||
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { .. })) => {
|
||||
tracing::trace!(target: LOG_TARGET, "active leaves signal (ignored)");
|
||||
// handled by NewBlocks
|
||||
}
|
||||
FromOverseer::Signal(OverseerSignal::BlockFinalized(_hash, number)) => {
|
||||
tracing::trace!(target: LOG_TARGET, number = %number, "finalized signal");
|
||||
state.handle_block_finalized(number);
|
||||
},
|
||||
FromOverseer::Signal(OverseerSignal::Conclude) => {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<C> Subsystem<C> for ApprovalDistribution
|
||||
where
|
||||
C: SubsystemContext<Message = ApprovalDistributionMessage> + Sync + Send,
|
||||
{
|
||||
fn start(self, ctx: C) -> SpawnedSubsystem {
|
||||
let future = self.run(ctx)
|
||||
.map(|_| Ok(()))
|
||||
.boxed();
|
||||
|
||||
SpawnedSubsystem {
|
||||
name: "approval-distribution-subsystem",
|
||||
future,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Approval Distribution metrics.
|
||||
#[derive(Default, Clone)]
|
||||
pub struct Metrics(Option<MetricsInner>);
|
||||
|
||||
#[derive(Clone)]
|
||||
struct MetricsInner {
|
||||
assignments_imported_total: prometheus::Counter<prometheus::U64>,
|
||||
approvals_imported_total: prometheus::Counter<prometheus::U64>,
|
||||
}
|
||||
|
||||
impl Metrics {
|
||||
fn on_assignment_imported(&self) {
|
||||
if let Some(metrics) = &self.0 {
|
||||
metrics.assignments_imported_total.inc();
|
||||
}
|
||||
}
|
||||
|
||||
fn on_approval_imported(&self) {
|
||||
if let Some(metrics) = &self.0 {
|
||||
metrics.approvals_imported_total.inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl metrics::Metrics for Metrics {
|
||||
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
|
||||
let metrics = MetricsInner {
|
||||
assignments_imported_total: prometheus::register(
|
||||
prometheus::Counter::new(
|
||||
"parachain_assignments_imported_total",
|
||||
"Number of valid assignments imported locally or from other peers.",
|
||||
)?,
|
||||
registry,
|
||||
)?,
|
||||
approvals_imported_total: prometheus::register(
|
||||
prometheus::Counter::new(
|
||||
"parachain_approvals_imported_total",
|
||||
"Number of valid approvals imported locally or from other peers.",
|
||||
)?,
|
||||
registry,
|
||||
)?,
|
||||
};
|
||||
Ok(Metrics(Some(metrics)))
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,822 @@
|
||||
// Copyright 2020 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use std::time::Duration;
|
||||
use futures::{future, Future, executor};
|
||||
use assert_matches::assert_matches;
|
||||
use polkadot_node_subsystem_test_helpers as test_helpers;
|
||||
use polkadot_node_subsystem_util::TimeoutExt as _;
|
||||
use polkadot_node_network_protocol::{view, ObservedRole};
|
||||
use polkadot_node_primitives::approval::{
|
||||
AssignmentCertKind, RELAY_VRF_MODULO_CONTEXT, VRFOutput, VRFProof,
|
||||
};
|
||||
use super::*;
|
||||
|
||||
type VirtualOverseer = test_helpers::TestSubsystemContextHandle<ApprovalDistributionMessage>;
|
||||
|
||||
fn test_harness<T: Future<Output = ()>>(
|
||||
mut state: State,
|
||||
test_fn: impl FnOnce(VirtualOverseer) -> T,
|
||||
) -> State {
|
||||
let _ = env_logger::builder()
|
||||
.is_test(true)
|
||||
.filter(
|
||||
Some(LOG_TARGET),
|
||||
log::LevelFilter::Trace,
|
||||
)
|
||||
.try_init();
|
||||
|
||||
let pool = sp_core::testing::TaskExecutor::new();
|
||||
let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
|
||||
|
||||
let subsystem = ApprovalDistribution::new(Default::default());
|
||||
{
|
||||
let subsystem = subsystem.run_inner(context, &mut state);
|
||||
|
||||
let test_fut = test_fn(virtual_overseer);
|
||||
|
||||
futures::pin_mut!(test_fut);
|
||||
futures::pin_mut!(subsystem);
|
||||
|
||||
executor::block_on(future::select(test_fut, subsystem));
|
||||
}
|
||||
|
||||
state
|
||||
}
|
||||
|
||||
const TIMEOUT: Duration = Duration::from_millis(100);
|
||||
|
||||
async fn overseer_send(
|
||||
overseer: &mut VirtualOverseer,
|
||||
msg: ApprovalDistributionMessage,
|
||||
) {
|
||||
tracing::trace!(msg = ?msg, "Sending message");
|
||||
overseer
|
||||
.send(FromOverseer::Communication { msg })
|
||||
.timeout(TIMEOUT)
|
||||
.await
|
||||
.expect("msg send timeout");
|
||||
}
|
||||
|
||||
async fn overseer_signal_block_finalized(
|
||||
overseer: &mut VirtualOverseer,
|
||||
number: BlockNumber,
|
||||
) {
|
||||
tracing::trace!(
|
||||
?number,
|
||||
"Sending a finalized signal",
|
||||
);
|
||||
// we don't care about the block hash
|
||||
overseer
|
||||
.send(FromOverseer::Signal(OverseerSignal::BlockFinalized(Hash::zero(), number)))
|
||||
.timeout(TIMEOUT)
|
||||
.await
|
||||
.expect("signal send timeout");
|
||||
}
|
||||
|
||||
async fn overseer_recv(
|
||||
overseer: &mut VirtualOverseer,
|
||||
) -> AllMessages {
|
||||
tracing::trace!("Waiting for a message");
|
||||
let msg = overseer
|
||||
.recv()
|
||||
.timeout(TIMEOUT)
|
||||
.await
|
||||
.expect("msg recv timeout");
|
||||
|
||||
tracing::trace!(msg = ?msg, "Received message");
|
||||
|
||||
msg
|
||||
}
|
||||
|
||||
async fn setup_peer_with_view(
|
||||
virtual_overseer: &mut VirtualOverseer,
|
||||
peer_id: &PeerId,
|
||||
view: View,
|
||||
) {
|
||||
overseer_send(
|
||||
virtual_overseer,
|
||||
ApprovalDistributionMessage::NetworkBridgeUpdateV1(
|
||||
NetworkBridgeEvent::PeerConnected(peer_id.clone(), ObservedRole::Full)
|
||||
)
|
||||
).await;
|
||||
overseer_send(
|
||||
virtual_overseer,
|
||||
ApprovalDistributionMessage::NetworkBridgeUpdateV1(
|
||||
NetworkBridgeEvent::PeerViewChange(peer_id.clone(), view)
|
||||
)
|
||||
).await;
|
||||
}
|
||||
|
||||
async fn send_message_from_peer(
|
||||
virtual_overseer: &mut VirtualOverseer,
|
||||
peer_id: &PeerId,
|
||||
msg: protocol_v1::ApprovalDistributionMessage,
|
||||
) {
|
||||
overseer_send(
|
||||
virtual_overseer,
|
||||
ApprovalDistributionMessage::NetworkBridgeUpdateV1(
|
||||
NetworkBridgeEvent::PeerMessage(peer_id.clone(), msg)
|
||||
)
|
||||
).await;
|
||||
}
|
||||
|
||||
fn fake_assignment_cert(
|
||||
block_hash: Hash,
|
||||
validator: ValidatorIndex,
|
||||
) -> IndirectAssignmentCert {
|
||||
let ctx = schnorrkel::signing_context(RELAY_VRF_MODULO_CONTEXT);
|
||||
let msg = b"WhenParachains?";
|
||||
let mut prng = rand_core::OsRng;
|
||||
let keypair = schnorrkel::Keypair::generate_with(&mut prng);
|
||||
let (inout, proof, _) = keypair.vrf_sign(ctx.bytes(msg));
|
||||
let out = inout.to_output();
|
||||
|
||||
IndirectAssignmentCert {
|
||||
block_hash,
|
||||
validator,
|
||||
cert: AssignmentCert {
|
||||
kind: AssignmentCertKind::RelayVRFModulo {
|
||||
sample: 1,
|
||||
},
|
||||
vrf: (VRFOutput(out), VRFProof(proof)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn expect_reputation_change(
|
||||
virtual_overseer: &mut VirtualOverseer,
|
||||
peer_id: &PeerId,
|
||||
expected_reputation_change: Rep,
|
||||
) {
|
||||
assert_matches!(
|
||||
overseer_recv(virtual_overseer).await,
|
||||
AllMessages::NetworkBridge(
|
||||
NetworkBridgeMessage::ReportPeer(
|
||||
rep_peer,
|
||||
rep,
|
||||
)
|
||||
) => {
|
||||
assert_eq!(peer_id, &rep_peer);
|
||||
assert_eq!(expected_reputation_change, rep);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
/// import an assignment
|
||||
/// connect a new peer
|
||||
/// the new peer sends us the same assignment
|
||||
#[test]
|
||||
fn try_import_the_same_assignment() {
|
||||
let peer_a = PeerId::random();
|
||||
let peer_b = PeerId::random();
|
||||
let peer_c = PeerId::random();
|
||||
let peer_d = PeerId::random();
|
||||
let parent_hash = Hash::repeat_byte(0xFF);
|
||||
let hash = Hash::repeat_byte(0xAA);
|
||||
|
||||
let _ = test_harness(State::default(), |mut virtual_overseer| async move {
|
||||
let overseer = &mut virtual_overseer;
|
||||
// setup peers
|
||||
setup_peer_with_view(overseer, &peer_a, view![]).await;
|
||||
setup_peer_with_view(overseer, &peer_b, view![hash]).await;
|
||||
setup_peer_with_view(overseer, &peer_c, view![hash]).await;
|
||||
|
||||
// new block `hash_a` with 1 candidates
|
||||
let meta = BlockApprovalMeta {
|
||||
hash,
|
||||
parent_hash,
|
||||
number: 2,
|
||||
candidates: vec![Default::default(); 1],
|
||||
slot_number: 1,
|
||||
};
|
||||
let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]);
|
||||
overseer_send(overseer, msg).await;
|
||||
|
||||
// send the assignment related to `hash`
|
||||
let validator_index = 0u32;
|
||||
let cert = fake_assignment_cert(hash, validator_index);
|
||||
let assignments = vec![(cert.clone(), 0u32)];
|
||||
|
||||
let msg = protocol_v1::ApprovalDistributionMessage::Assignments(assignments.clone());
|
||||
send_message_from_peer(overseer, &peer_a, msg).await;
|
||||
|
||||
expect_reputation_change(overseer, &peer_a, COST_UNEXPECTED_MESSAGE).await;
|
||||
|
||||
// send an `Accept` message from the Approval Voting subsystem
|
||||
assert_matches!(
|
||||
overseer_recv(overseer).await,
|
||||
AllMessages::ApprovalVoting(ApprovalVotingMessage::CheckAndImportAssignment(
|
||||
assignment,
|
||||
tx,
|
||||
)) => {
|
||||
assert_eq!(assignment, cert);
|
||||
tx.send(AssignmentCheckResult::Accepted).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
expect_reputation_change(overseer, &peer_a, BENEFIT_VALID_MESSAGE_FIRST).await;
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(overseer).await,
|
||||
AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
|
||||
peers,
|
||||
protocol_v1::ValidationProtocol::ApprovalDistribution(
|
||||
protocol_v1::ApprovalDistributionMessage::Assignments(assignments)
|
||||
)
|
||||
)) => {
|
||||
assert_eq!(peers.len(), 2);
|
||||
assert_eq!(assignments.len(), 1);
|
||||
}
|
||||
);
|
||||
|
||||
// setup new peer
|
||||
setup_peer_with_view(overseer, &peer_d, view![]).await;
|
||||
|
||||
// send the same assignment from peer_d
|
||||
let msg = protocol_v1::ApprovalDistributionMessage::Assignments(assignments);
|
||||
send_message_from_peer(overseer, &peer_d, msg).await;
|
||||
|
||||
expect_reputation_change(overseer, &peer_d, COST_UNEXPECTED_MESSAGE).await;
|
||||
expect_reputation_change(overseer, &peer_d, BENEFIT_VALID_MESSAGE).await;
|
||||
|
||||
assert!(overseer
|
||||
.recv()
|
||||
.timeout(TIMEOUT)
|
||||
.await
|
||||
.is_none(),
|
||||
"no message should be sent",
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
/// https://github.com/paritytech/polkadot/pull/2160#discussion_r547594835
|
||||
///
|
||||
/// 1. Send a view update that removes block B from their view.
|
||||
/// 2. Send a message from B that they incur COST_UNEXPECTED_MESSAGE for,
|
||||
/// but then they receive BENEFIT_VALID_MESSAGE.
|
||||
/// 3. Send all other messages related to B.
|
||||
#[test]
|
||||
fn spam_attack_results_in_negative_reputation_change() {
|
||||
let parent_hash = Hash::repeat_byte(0xFF);
|
||||
let peer_a = PeerId::random();
|
||||
let hash_b = Hash::repeat_byte(0xBB);
|
||||
|
||||
let _ = test_harness(State::default(), |mut virtual_overseer| async move {
|
||||
let overseer = &mut virtual_overseer;
|
||||
let peer = &peer_a;
|
||||
setup_peer_with_view(overseer, peer, view![]).await;
|
||||
|
||||
// new block `hash_b` with 20 candidates
|
||||
let candidates_count = 20;
|
||||
let meta = BlockApprovalMeta {
|
||||
hash: hash_b.clone(),
|
||||
parent_hash,
|
||||
number: 2,
|
||||
candidates: vec![Default::default(); candidates_count],
|
||||
slot_number: 1,
|
||||
};
|
||||
|
||||
let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]);
|
||||
overseer_send(overseer, msg).await;
|
||||
|
||||
// send 20 assignments related to `hash_b`
|
||||
// to populate our knowledge
|
||||
let assignments: Vec<_> = (0..candidates_count)
|
||||
.map(|candidate_index| {
|
||||
let validator_index = candidate_index as u32;
|
||||
let cert = fake_assignment_cert(hash_b, validator_index);
|
||||
(cert, candidate_index as u32)
|
||||
}).collect();
|
||||
|
||||
let msg = protocol_v1::ApprovalDistributionMessage::Assignments(assignments.clone());
|
||||
send_message_from_peer(overseer, peer, msg.clone()).await;
|
||||
|
||||
for i in 0..candidates_count {
|
||||
expect_reputation_change(overseer, peer, COST_UNEXPECTED_MESSAGE).await;
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(overseer).await,
|
||||
AllMessages::ApprovalVoting(ApprovalVotingMessage::CheckAndImportAssignment(
|
||||
assignment,
|
||||
tx,
|
||||
)) => {
|
||||
assert_eq!(assignment, assignments[i].0);
|
||||
tx.send(AssignmentCheckResult::Accepted).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
expect_reputation_change(overseer, peer, BENEFIT_VALID_MESSAGE_FIRST).await;
|
||||
}
|
||||
|
||||
// send a view update that removes block B from peer's view by bumping the finalized_number
|
||||
overseer_send(
|
||||
overseer,
|
||||
ApprovalDistributionMessage::NetworkBridgeUpdateV1(
|
||||
NetworkBridgeEvent::PeerViewChange(peer.clone(), View { heads: Default::default(), finalized_number: 2 })
|
||||
)
|
||||
).await;
|
||||
|
||||
// send the assignments again
|
||||
send_message_from_peer(overseer, peer, msg.clone()).await;
|
||||
|
||||
// each of them will incur `COST_UNEXPECTED_MESSAGE`, not only the first one
|
||||
for _ in 0..candidates_count {
|
||||
expect_reputation_change(overseer, peer, COST_UNEXPECTED_MESSAGE).await;
|
||||
expect_reputation_change(overseer, peer, BENEFIT_VALID_MESSAGE).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn import_approval_happy_path() {
|
||||
let peer_a = PeerId::random();
|
||||
let peer_b = PeerId::random();
|
||||
let peer_c = PeerId::random();
|
||||
let parent_hash = Hash::repeat_byte(0xFF);
|
||||
let hash = Hash::repeat_byte(0xAA);
|
||||
|
||||
let _ = test_harness(State::default(), |mut virtual_overseer| async move {
|
||||
let overseer = &mut virtual_overseer;
|
||||
// setup peers
|
||||
setup_peer_with_view(overseer, &peer_a, view![]).await;
|
||||
setup_peer_with_view(overseer, &peer_b, view![hash]).await;
|
||||
setup_peer_with_view(overseer, &peer_c, view![hash]).await;
|
||||
|
||||
// new block `hash_a` with 1 candidates
|
||||
let meta = BlockApprovalMeta {
|
||||
hash,
|
||||
parent_hash,
|
||||
number: 1,
|
||||
candidates: vec![Default::default(); 1],
|
||||
slot_number: 1,
|
||||
};
|
||||
let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]);
|
||||
overseer_send(overseer, msg).await;
|
||||
|
||||
// import an assignment related to `hash` locally
|
||||
let validator_index = 0u32;
|
||||
let candidate_index = 0u32;
|
||||
let cert = fake_assignment_cert(hash, validator_index);
|
||||
overseer_send(
|
||||
overseer,
|
||||
ApprovalDistributionMessage::DistributeAssignment(cert, candidate_index)
|
||||
).await;
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(overseer).await,
|
||||
AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
|
||||
peers,
|
||||
protocol_v1::ValidationProtocol::ApprovalDistribution(
|
||||
protocol_v1::ApprovalDistributionMessage::Assignments(assignments)
|
||||
)
|
||||
)) => {
|
||||
assert_eq!(peers.len(), 2);
|
||||
assert_eq!(assignments.len(), 1);
|
||||
}
|
||||
);
|
||||
|
||||
// send the an approval from peer_b
|
||||
let approval = IndirectSignedApprovalVote {
|
||||
block_hash: hash,
|
||||
candidate_index,
|
||||
validator: validator_index,
|
||||
signature: Default::default(),
|
||||
};
|
||||
let msg = protocol_v1::ApprovalDistributionMessage::Approvals(vec![approval.clone()]);
|
||||
send_message_from_peer(overseer, &peer_b, msg).await;
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(overseer).await,
|
||||
AllMessages::ApprovalVoting(ApprovalVotingMessage::CheckAndImportApproval(
|
||||
vote,
|
||||
tx,
|
||||
)) => {
|
||||
assert_eq!(vote, approval);
|
||||
tx.send(ApprovalCheckResult::Accepted).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
expect_reputation_change(overseer, &peer_b, BENEFIT_VALID_MESSAGE_FIRST).await;
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(overseer).await,
|
||||
AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
|
||||
peers,
|
||||
protocol_v1::ValidationProtocol::ApprovalDistribution(
|
||||
protocol_v1::ApprovalDistributionMessage::Approvals(approvals)
|
||||
)
|
||||
)) => {
|
||||
assert_eq!(peers.len(), 1);
|
||||
assert_eq!(approvals.len(), 1);
|
||||
}
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn import_approval_bad() {
|
||||
let peer_a = PeerId::random();
|
||||
let peer_b = PeerId::random();
|
||||
let parent_hash = Hash::repeat_byte(0xFF);
|
||||
let hash = Hash::repeat_byte(0xAA);
|
||||
|
||||
let _ = test_harness(State::default(), |mut virtual_overseer| async move {
|
||||
let overseer = &mut virtual_overseer;
|
||||
// setup peers
|
||||
setup_peer_with_view(overseer, &peer_a, view![]).await;
|
||||
setup_peer_with_view(overseer, &peer_b, view![hash]).await;
|
||||
|
||||
// new block `hash_a` with 1 candidates
|
||||
let meta = BlockApprovalMeta {
|
||||
hash,
|
||||
parent_hash,
|
||||
number: 1,
|
||||
candidates: vec![Default::default(); 1],
|
||||
slot_number: 1,
|
||||
};
|
||||
let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]);
|
||||
overseer_send(overseer, msg).await;
|
||||
|
||||
let validator_index = 0u32;
|
||||
let candidate_index = 0u32;
|
||||
let cert = fake_assignment_cert(hash, validator_index);
|
||||
|
||||
// send the an approval from peer_b, we don't have an assignment yet
|
||||
let approval = IndirectSignedApprovalVote {
|
||||
block_hash: hash,
|
||||
candidate_index,
|
||||
validator: validator_index,
|
||||
signature: Default::default(),
|
||||
};
|
||||
let msg = protocol_v1::ApprovalDistributionMessage::Approvals(vec![approval.clone()]);
|
||||
send_message_from_peer(overseer, &peer_b, msg).await;
|
||||
|
||||
expect_reputation_change(overseer, &peer_b, COST_UNEXPECTED_MESSAGE).await;
|
||||
|
||||
// now import an assignment from peer_b
|
||||
let assignments = vec![(cert.clone(), candidate_index)];
|
||||
let msg = protocol_v1::ApprovalDistributionMessage::Assignments(assignments);
|
||||
send_message_from_peer(overseer, &peer_b, msg).await;
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(overseer).await,
|
||||
AllMessages::ApprovalVoting(ApprovalVotingMessage::CheckAndImportAssignment(
|
||||
assignment,
|
||||
tx,
|
||||
)) => {
|
||||
assert_eq!(assignment, cert);
|
||||
tx.send(AssignmentCheckResult::Accepted).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
expect_reputation_change(overseer, &peer_b, BENEFIT_VALID_MESSAGE_FIRST).await;
|
||||
|
||||
// and try again
|
||||
let msg = protocol_v1::ApprovalDistributionMessage::Approvals(vec![approval.clone()]);
|
||||
send_message_from_peer(overseer, &peer_b, msg).await;
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(overseer).await,
|
||||
AllMessages::ApprovalVoting(ApprovalVotingMessage::CheckAndImportApproval(
|
||||
vote,
|
||||
tx,
|
||||
)) => {
|
||||
assert_eq!(vote, approval);
|
||||
tx.send(ApprovalCheckResult::Bad).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
expect_reputation_change(overseer, &peer_b, COST_INVALID_MESSAGE).await;
|
||||
});
|
||||
}
|
||||
|
||||
/// make sure we clean up the state on block finalized
|
||||
#[test]
|
||||
fn update_our_view() {
|
||||
let parent_hash = Hash::repeat_byte(0xFF);
|
||||
let hash_a = Hash::repeat_byte(0xAA);
|
||||
let hash_b = Hash::repeat_byte(0xBB);
|
||||
let hash_c = Hash::repeat_byte(0xCC);
|
||||
|
||||
let state = test_harness(State::default(), |mut virtual_overseer| async move {
|
||||
let overseer = &mut virtual_overseer;
|
||||
// new block `hash_a` with 1 candidates
|
||||
let meta_a = BlockApprovalMeta {
|
||||
hash: hash_a,
|
||||
parent_hash,
|
||||
number: 1,
|
||||
candidates: vec![Default::default(); 1],
|
||||
slot_number: 1,
|
||||
};
|
||||
let meta_b = BlockApprovalMeta {
|
||||
hash: hash_b,
|
||||
parent_hash: hash_a,
|
||||
number: 2,
|
||||
candidates: vec![Default::default(); 1],
|
||||
slot_number: 1,
|
||||
};
|
||||
let meta_c = BlockApprovalMeta {
|
||||
hash: hash_c,
|
||||
parent_hash: hash_b,
|
||||
number: 3,
|
||||
candidates: vec![Default::default(); 1],
|
||||
slot_number: 1,
|
||||
};
|
||||
|
||||
let msg = ApprovalDistributionMessage::NewBlocks(vec![meta_a, meta_b, meta_c]);
|
||||
overseer_send(overseer, msg).await;
|
||||
});
|
||||
|
||||
assert!(state.blocks_by_number.get(&1).is_some());
|
||||
assert!(state.blocks_by_number.get(&2).is_some());
|
||||
assert!(state.blocks_by_number.get(&3).is_some());
|
||||
assert!(state.blocks.get(&hash_a).is_some());
|
||||
assert!(state.blocks.get(&hash_b).is_some());
|
||||
assert!(state.blocks.get(&hash_c).is_some());
|
||||
|
||||
let state = test_harness(state, |mut virtual_overseer| async move {
|
||||
let overseer = &mut virtual_overseer;
|
||||
// finalize a block
|
||||
overseer_signal_block_finalized(overseer, 2).await;
|
||||
});
|
||||
|
||||
assert!(state.blocks_by_number.get(&1).is_none());
|
||||
assert!(state.blocks_by_number.get(&2).is_none());
|
||||
assert!(state.blocks_by_number.get(&3).is_some());
|
||||
assert!(state.blocks.get(&hash_a).is_none());
|
||||
assert!(state.blocks.get(&hash_b).is_none());
|
||||
assert!(state.blocks.get(&hash_c).is_some());
|
||||
|
||||
let state = test_harness(state, |mut virtual_overseer| async move {
|
||||
let overseer = &mut virtual_overseer;
|
||||
// finalize a very high block
|
||||
overseer_signal_block_finalized(overseer, 4_000_000_000).await;
|
||||
});
|
||||
|
||||
assert!(state.blocks_by_number.get(&3).is_none());
|
||||
assert!(state.blocks.get(&hash_c).is_none());
|
||||
}
|
||||
|
||||
/// make sure we unify with peers and clean up the state
|
||||
#[test]
|
||||
fn update_peer_view() {
|
||||
let parent_hash = Hash::repeat_byte(0xFF);
|
||||
let hash_a = Hash::repeat_byte(0xAA);
|
||||
let hash_b = Hash::repeat_byte(0xBB);
|
||||
let hash_c = Hash::repeat_byte(0xCC);
|
||||
let hash_d = Hash::repeat_byte(0xDD);
|
||||
let peer_a = PeerId::random();
|
||||
let peer = &peer_a;
|
||||
|
||||
let state = test_harness(State::default(), |mut virtual_overseer| async move {
|
||||
let overseer = &mut virtual_overseer;
|
||||
// new block `hash_a` with 1 candidates
|
||||
let meta_a = BlockApprovalMeta {
|
||||
hash: hash_a,
|
||||
parent_hash,
|
||||
number: 1,
|
||||
candidates: vec![Default::default(); 1],
|
||||
slot_number: 1,
|
||||
};
|
||||
let meta_b = BlockApprovalMeta {
|
||||
hash: hash_b,
|
||||
parent_hash: hash_a,
|
||||
number: 2,
|
||||
candidates: vec![Default::default(); 1],
|
||||
slot_number: 1,
|
||||
};
|
||||
let meta_c = BlockApprovalMeta {
|
||||
hash: hash_c,
|
||||
parent_hash: hash_b,
|
||||
number: 3,
|
||||
candidates: vec![Default::default(); 1],
|
||||
slot_number: 1,
|
||||
};
|
||||
|
||||
let msg = ApprovalDistributionMessage::NewBlocks(vec![meta_a, meta_b, meta_c]);
|
||||
overseer_send(overseer, msg).await;
|
||||
|
||||
let cert_a = fake_assignment_cert(hash_a, 0);
|
||||
let cert_b = fake_assignment_cert(hash_b, 0);
|
||||
|
||||
overseer_send(
|
||||
overseer,
|
||||
ApprovalDistributionMessage::DistributeAssignment(cert_a, 0)
|
||||
).await;
|
||||
|
||||
overseer_send(
|
||||
overseer,
|
||||
ApprovalDistributionMessage::DistributeAssignment(cert_b, 0)
|
||||
).await;
|
||||
|
||||
// connect a peer
|
||||
setup_peer_with_view(overseer, peer, view![hash_a]).await;
|
||||
|
||||
// we should send relevant assignments to the peer
|
||||
assert_matches!(
|
||||
overseer_recv(overseer).await,
|
||||
AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
|
||||
peers,
|
||||
protocol_v1::ValidationProtocol::ApprovalDistribution(
|
||||
protocol_v1::ApprovalDistributionMessage::Assignments(assignments)
|
||||
)
|
||||
)) => {
|
||||
assert_eq!(peers.len(), 1);
|
||||
assert_eq!(assignments.len(), 1);
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
assert_eq!(state.peer_views.get(peer).map(|v| v.finalized_number), Some(0));
|
||||
assert_eq!(
|
||||
state.blocks
|
||||
.get(&hash_a)
|
||||
.unwrap()
|
||||
.known_by
|
||||
.get(peer)
|
||||
.unwrap()
|
||||
.known_messages
|
||||
.len(),
|
||||
1,
|
||||
);
|
||||
|
||||
let state = test_harness(state, |mut virtual_overseer| async move {
|
||||
let overseer = &mut virtual_overseer;
|
||||
// update peer's view
|
||||
overseer_send(
|
||||
overseer,
|
||||
ApprovalDistributionMessage::NetworkBridgeUpdateV1(
|
||||
NetworkBridgeEvent::PeerViewChange(peer.clone(), View { heads: vec![hash_b, hash_c, hash_d], finalized_number: 2 })
|
||||
)
|
||||
).await;
|
||||
|
||||
let cert_c = fake_assignment_cert(hash_c, 0);
|
||||
|
||||
overseer_send(
|
||||
overseer,
|
||||
ApprovalDistributionMessage::DistributeAssignment(cert_c.clone(), 0)
|
||||
).await;
|
||||
|
||||
// we should send relevant assignments to the peer
|
||||
assert_matches!(
|
||||
overseer_recv(overseer).await,
|
||||
AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
|
||||
peers,
|
||||
protocol_v1::ValidationProtocol::ApprovalDistribution(
|
||||
protocol_v1::ApprovalDistributionMessage::Assignments(assignments)
|
||||
)
|
||||
)) => {
|
||||
assert_eq!(peers.len(), 1);
|
||||
assert_eq!(assignments.len(), 1);
|
||||
assert_eq!(assignments[0].0, cert_c);
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
assert_eq!(state.peer_views.get(peer).map(|v| v.finalized_number), Some(2));
|
||||
assert_eq!(
|
||||
state.blocks
|
||||
.get(&hash_c)
|
||||
.unwrap()
|
||||
.known_by
|
||||
.get(peer)
|
||||
.unwrap()
|
||||
.known_messages
|
||||
.len(),
|
||||
1,
|
||||
);
|
||||
|
||||
let finalized_number = 4_000_000_000;
|
||||
let state = test_harness(state, |mut virtual_overseer| async move {
|
||||
let overseer = &mut virtual_overseer;
|
||||
// update peer's view
|
||||
overseer_send(
|
||||
overseer,
|
||||
ApprovalDistributionMessage::NetworkBridgeUpdateV1(
|
||||
NetworkBridgeEvent::PeerViewChange(peer.clone(), View { heads: vec![], finalized_number })
|
||||
)
|
||||
).await;
|
||||
});
|
||||
|
||||
assert_eq!(state.peer_views.get(peer).map(|v| v.finalized_number), Some(finalized_number));
|
||||
assert!(
|
||||
state.blocks
|
||||
.get(&hash_c)
|
||||
.unwrap()
|
||||
.known_by
|
||||
.get(peer)
|
||||
.is_none()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn import_remotely_then_locally() {
|
||||
let peer_a = PeerId::random();
|
||||
let parent_hash = Hash::repeat_byte(0xFF);
|
||||
let hash = Hash::repeat_byte(0xAA);
|
||||
let peer = &peer_a;
|
||||
|
||||
let _ = test_harness(State::default(), |mut virtual_overseer| async move {
|
||||
let overseer = &mut virtual_overseer;
|
||||
// setup the peer
|
||||
setup_peer_with_view(overseer, peer, view![hash]).await;
|
||||
|
||||
// new block `hash_a` with 1 candidates
|
||||
let meta = BlockApprovalMeta {
|
||||
hash,
|
||||
parent_hash,
|
||||
number: 1,
|
||||
candidates: vec![Default::default(); 1],
|
||||
slot_number: 1,
|
||||
};
|
||||
let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]);
|
||||
overseer_send(overseer, msg).await;
|
||||
|
||||
// import the assignment remotely first
|
||||
let validator_index = 0u32;
|
||||
let candidate_index = 0u32;
|
||||
let cert = fake_assignment_cert(hash, validator_index);
|
||||
let assignments = vec![(cert.clone(), candidate_index)];
|
||||
let msg = protocol_v1::ApprovalDistributionMessage::Assignments(assignments.clone());
|
||||
send_message_from_peer(overseer, peer, msg).await;
|
||||
|
||||
// send an `Accept` message from the Approval Voting subsystem
|
||||
assert_matches!(
|
||||
overseer_recv(overseer).await,
|
||||
AllMessages::ApprovalVoting(ApprovalVotingMessage::CheckAndImportAssignment(
|
||||
assignment,
|
||||
tx,
|
||||
)) => {
|
||||
assert_eq!(assignment, cert);
|
||||
tx.send(AssignmentCheckResult::Accepted).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
expect_reputation_change(overseer, peer, BENEFIT_VALID_MESSAGE_FIRST).await;
|
||||
|
||||
// import the same assignment locally
|
||||
overseer_send(
|
||||
overseer,
|
||||
ApprovalDistributionMessage::DistributeAssignment(cert, candidate_index)
|
||||
).await;
|
||||
|
||||
assert!(overseer
|
||||
.recv()
|
||||
.timeout(TIMEOUT)
|
||||
.await
|
||||
.is_none(),
|
||||
"no message should be sent",
|
||||
);
|
||||
|
||||
// send the approval remotely
|
||||
let approval = IndirectSignedApprovalVote {
|
||||
block_hash: hash,
|
||||
candidate_index,
|
||||
validator: validator_index,
|
||||
signature: Default::default(),
|
||||
};
|
||||
let msg = protocol_v1::ApprovalDistributionMessage::Approvals(vec![approval.clone()]);
|
||||
send_message_from_peer(overseer, peer, msg).await;
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(overseer).await,
|
||||
AllMessages::ApprovalVoting(ApprovalVotingMessage::CheckAndImportApproval(
|
||||
vote,
|
||||
tx,
|
||||
)) => {
|
||||
assert_eq!(vote, approval);
|
||||
tx.send(ApprovalCheckResult::Accepted).unwrap();
|
||||
}
|
||||
);
|
||||
expect_reputation_change(overseer, peer, BENEFIT_VALID_MESSAGE_FIRST).await;
|
||||
|
||||
// import the same approval locally
|
||||
overseer_send(
|
||||
overseer,
|
||||
ApprovalDistributionMessage::DistributeApproval(approval)
|
||||
).await;
|
||||
|
||||
assert!(overseer
|
||||
.recv()
|
||||
.timeout(TIMEOUT)
|
||||
.await
|
||||
.is_none(),
|
||||
"no message should be sent",
|
||||
);
|
||||
});
|
||||
}
|
||||
Reference in New Issue
Block a user