Delay reputation updates (#7214)

* Add futures-timer

* Make cost_or_benefit public

* Update ReportPeer message format

* Add delay to reputation updates (dirtywork)

* Update ReputationAggregator

* Update tests

* Fix flucky tests

* Move reputation to state

* Use the main loop for handling reputation sendings

* Update

* Move reputation to utils

* Update reputation sending

* Fix arguments order

* Update state

* Remove new from state

* Add constant

* Add failing test for delay

* Change mocking approach

* Fix type errors

* Fix comments

* Add message handling to select

* Fix bitfields-distribution tests

* Add docs to reputation aggregator

* Replace .into_base_rep

* Use one REPUTATION_CHANGE_INTERVAL by default

* Add reputation change to statement-distribution

* Update polkadot-availability-bitfield-distribution

* Update futures selecting in subsystems

* Update reputation adding

* Send malicious changes right away without adding to state

* Add reputation to StatementDistributionSubsystem

* Handle reputation in statement distribution

* Add delay test for polkadot-statement-distribution

* Fix collator-protocol tests before applying reputation delay

* Remove into_base_rep

* Add reputation to State

* Fix failed tests

* Add reputation delay

* Update tests

* Add batched network message for peer reporting

* Update approval-distribution tests

* Update bitfield-distribution tests

* Update statement-distribution tests

* Update collator-protocol tests

* Remove levels in matching

* Address clippy errors

* Fix overseer test

* Add a metric for original count of rep changes

* Update Reputation

* Revert "Add a metric for original count of rep changes"

This reverts commit 6c9b0c1ec34491d16e562bdcba8db6b9dcf484db.

* Update node/subsystem-util/src/reputation.rs

Co-authored-by: Vsevolod Stakhov <vsevolod.stakhov@parity.io>

* Remove redundant vec

---------

Co-authored-by: Vsevolod Stakhov <vsevolod.stakhov@parity.io>
This commit is contained in:
Andrei Eres
2023-06-15 15:46:06 +02:00
committed by GitHub
parent d3d9d4ae66
commit 0a1bc654d9
27 changed files with 2231 additions and 805 deletions
+2
View File
@@ -6905,6 +6905,7 @@ dependencies = [
"assert_matches",
"env_logger 0.9.0",
"futures",
"futures-timer",
"log",
"polkadot-node-jaeger",
"polkadot-node-metrics",
@@ -6932,6 +6933,7 @@ dependencies = [
"bitvec",
"env_logger 0.9.0",
"futures",
"futures-timer",
"log",
"maplit",
"polkadot-node-network-protocol",
@@ -9,18 +9,19 @@ polkadot-node-metrics = { path = "../../metrics" }
polkadot-node-network-protocol = { path = "../protocol" }
polkadot-node-primitives = { path = "../../primitives" }
polkadot-node-subsystem = { path = "../../subsystem" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
polkadot-primitives = { path = "../../../primitives" }
polkadot-node-jaeger = { path = "../../jaeger" }
rand = "0.8"
futures = "0.3.21"
futures-timer = "3.0.2"
gum = { package = "tracing-gum", path = "../../gum" }
[dev-dependencies]
sp-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
polkadot-primitives-test-helpers = { path = "../../../primitives/test-helpers" }
@@ -20,7 +20,7 @@
#![warn(missing_docs)]
use futures::{channel::oneshot, FutureExt as _};
use futures::{channel::oneshot, select, FutureExt as _};
use polkadot_node_jaeger as jaeger;
use polkadot_node_network_protocol::{
self as net_protocol,
@@ -38,11 +38,15 @@ use polkadot_node_subsystem::{
},
overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
};
use polkadot_node_subsystem_util::reputation::{ReputationAggregator, REPUTATION_CHANGE_INTERVAL};
use polkadot_primitives::{
BlockNumber, CandidateIndex, Hash, SessionIndex, ValidatorIndex, ValidatorSignature,
};
use rand::{CryptoRng, Rng, SeedableRng};
use std::collections::{hash_map, BTreeMap, HashMap, HashSet, VecDeque};
use std::{
collections::{hash_map, BTreeMap, HashMap, HashSet, VecDeque},
time::Duration,
};
use self::metrics::Metrics;
@@ -187,6 +191,9 @@ struct State {
/// Current approval checking finality lag.
approval_checking_lag: BlockNumber,
/// Aggregated reputation change
reputation: ReputationAggregator,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -755,7 +762,13 @@ impl State {
"Unexpected assignment",
);
if !self.recent_outdated_blocks.is_recent_outdated(&block_hash) {
modify_reputation(ctx.sender(), peer_id, COST_UNEXPECTED_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
COST_UNEXPECTED_MESSAGE,
)
.await;
}
}
return
@@ -780,7 +793,13 @@ impl State {
?message_subject,
"Duplicate assignment",
);
modify_reputation(ctx.sender(), peer_id, COST_DUPLICATE_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
COST_DUPLICATE_MESSAGE,
)
.await;
}
return
}
@@ -792,13 +811,25 @@ impl State {
?message_subject,
"Assignment from a peer is out of view",
);
modify_reputation(ctx.sender(), peer_id, COST_UNEXPECTED_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
COST_UNEXPECTED_MESSAGE,
)
.await;
},
}
// if the assignment is known to be valid, reward the peer
if entry.knowledge.contains(&message_subject, message_kind) {
modify_reputation(ctx.sender(), peer_id, BENEFIT_VALID_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
BENEFIT_VALID_MESSAGE,
)
.await;
if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
gum::trace!(target: LOG_TARGET, ?peer_id, ?message_subject, "Known assignment");
peer_knowledge.received.insert(message_subject, message_kind);
@@ -834,7 +865,13 @@ impl State {
);
match result {
AssignmentCheckResult::Accepted => {
modify_reputation(ctx.sender(), peer_id, BENEFIT_VALID_MESSAGE_FIRST).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
BENEFIT_VALID_MESSAGE_FIRST,
)
.await;
entry.knowledge.known_messages.insert(message_subject.clone(), message_kind);
if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
peer_knowledge.received.insert(message_subject.clone(), message_kind);
@@ -862,8 +899,13 @@ impl State {
?peer_id,
"Got an assignment too far in the future",
);
modify_reputation(ctx.sender(), peer_id, COST_ASSIGNMENT_TOO_FAR_IN_THE_FUTURE)
.await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
COST_ASSIGNMENT_TOO_FAR_IN_THE_FUTURE,
)
.await;
return
},
AssignmentCheckResult::Bad(error) => {
@@ -874,7 +916,13 @@ impl State {
%error,
"Got a bad assignment from peer",
);
modify_reputation(ctx.sender(), peer_id, COST_INVALID_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
COST_INVALID_MESSAGE,
)
.await;
return
},
}
@@ -1024,7 +1072,13 @@ impl State {
_ => {
if let Some(peer_id) = source.peer_id() {
if !self.recent_outdated_blocks.is_recent_outdated(&block_hash) {
modify_reputation(ctx.sender(), peer_id, COST_UNEXPECTED_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
COST_UNEXPECTED_MESSAGE,
)
.await;
}
}
return
@@ -1043,7 +1097,13 @@ impl State {
?message_subject,
"Unknown approval assignment",
);
modify_reputation(ctx.sender(), peer_id, COST_UNEXPECTED_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
COST_UNEXPECTED_MESSAGE,
)
.await;
return
}
@@ -1060,7 +1120,13 @@ impl State {
"Duplicate approval",
);
modify_reputation(ctx.sender(), peer_id, COST_DUPLICATE_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
COST_DUPLICATE_MESSAGE,
)
.await;
}
return
}
@@ -1072,14 +1138,26 @@ impl State {
?message_subject,
"Approval from a peer is out of view",
);
modify_reputation(ctx.sender(), peer_id, COST_UNEXPECTED_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
COST_UNEXPECTED_MESSAGE,
)
.await;
},
}
// if the approval is known to be valid, reward the peer
if entry.knowledge.contains(&message_subject, message_kind) {
gum::trace!(target: LOG_TARGET, ?peer_id, ?message_subject, "Known approval");
modify_reputation(ctx.sender(), peer_id, BENEFIT_VALID_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
BENEFIT_VALID_MESSAGE,
)
.await;
if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
peer_knowledge.received.insert(message_subject.clone(), message_kind);
}
@@ -1110,7 +1188,13 @@ impl State {
);
match result {
ApprovalCheckResult::Accepted => {
modify_reputation(ctx.sender(), peer_id, BENEFIT_VALID_MESSAGE_FIRST).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
BENEFIT_VALID_MESSAGE_FIRST,
)
.await;
entry.knowledge.insert(message_subject.clone(), message_kind);
if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
@@ -1118,7 +1202,13 @@ impl State {
}
},
ApprovalCheckResult::Bad(error) => {
modify_reputation(ctx.sender(), peer_id, COST_INVALID_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
COST_INVALID_MESSAGE,
)
.await;
gum::info!(
target: LOG_TARGET,
?peer_id,
@@ -1669,6 +1759,7 @@ async fn adjust_required_routing_and_propagate<Context, BlockFilter, RoutingModi
/// Modify the reputation of a peer based on its behavior.
async fn modify_reputation(
reputation: &mut ReputationAggregator,
sender: &mut impl overseer::ApprovalDistributionSenderTrait,
peer_id: PeerId,
rep: Rep,
@@ -1679,8 +1770,7 @@ async fn modify_reputation(
?peer_id,
"Reputation change for peer",
);
sender.send_message(NetworkBridgeTxMessage::ReportPeer(peer_id, rep)).await;
reputation.modify(sender, peer_id, rep).await;
}
#[overseer::contextbounds(ApprovalDistribution, prefix = self::overseer)]
@@ -1696,7 +1786,7 @@ impl ApprovalDistribution {
// According to the docs of `rand`, this is a ChaCha12 RNG in practice
// and will always be chosen for strong performance and security properties.
let mut rng = rand::rngs::StdRng::from_entropy();
self.run_inner(ctx, &mut state, &mut rng).await
self.run_inner(ctx, &mut state, REPUTATION_CHANGE_INTERVAL, &mut rng).await
}
/// Used for testing.
@@ -1704,37 +1794,49 @@ impl ApprovalDistribution {
self,
mut ctx: Context,
state: &mut State,
reputation_interval: Duration,
rng: &mut (impl CryptoRng + Rng),
) {
let new_reputation_delay = || futures_timer::Delay::new(reputation_interval).fuse();
let mut reputation_delay = new_reputation_delay();
loop {
let message = match ctx.recv().await {
Ok(message) => message,
Err(e) => {
gum::debug!(target: LOG_TARGET, err = ?e, "Failed to receive a message from Overseer, exiting");
return
select! {
_ = reputation_delay => {
state.reputation.send(ctx.sender()).await;
reputation_delay = new_reputation_delay();
},
};
match message {
FromOrchestra::Communication { msg } =>
Self::handle_incoming(&mut ctx, state, msg, &self.metrics, rng).await,
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
gum::trace!(target: LOG_TARGET, "active leaves signal (ignored)");
// the relay chain blocks relevant to the approval subsystems
// are those that are available, but not finalized yet
// actived and deactivated heads hence are irrelevant to this subsystem, other than
// for tracing purposes.
if let Some(activated) = update.activated {
let head = activated.hash;
let approval_distribution_span =
jaeger::PerLeafSpan::new(activated.span, "approval-distribution");
state.spans.insert(head, approval_distribution_span);
message = ctx.recv().fuse() => {
let message = match message {
Ok(message) => message,
Err(e) => {
gum::debug!(target: LOG_TARGET, err = ?e, "Failed to receive a message from Overseer, exiting");
return
},
};
match message {
FromOrchestra::Communication { msg } =>
Self::handle_incoming(&mut ctx, state, msg, &self.metrics, rng).await,
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
gum::trace!(target: LOG_TARGET, "active leaves signal (ignored)");
// the relay chain blocks relevant to the approval subsystems
// are those that are available, but not finalized yet
// actived and deactivated heads hence are irrelevant to this subsystem, other than
// for tracing purposes.
if let Some(activated) = update.activated {
let head = activated.hash;
let approval_distribution_span =
jaeger::PerLeafSpan::new(activated.span, "approval-distribution");
state.spans.insert(head, approval_distribution_span);
}
},
FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, number)) => {
gum::trace!(target: LOG_TARGET, number = %number, "finalized signal");
state.handle_block_finalized(&mut ctx, &self.metrics, number).await;
},
FromOrchestra::Signal(OverseerSignal::Conclude) => return,
}
},
FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, number)) => {
gum::trace!(target: LOG_TARGET, number = %number, "finalized signal");
state.handle_block_finalized(&mut ctx, &self.metrics, number).await;
},
FromOrchestra::Signal(OverseerSignal::Conclude) => return,
}
}
}
@@ -26,9 +26,11 @@ use polkadot_node_network_protocol::{
use polkadot_node_primitives::approval::{
AssignmentCertKind, VrfOutput, VrfProof, VrfSignature, RELAY_VRF_MODULO_CONTEXT,
};
use polkadot_node_subsystem::messages::{network_bridge_event, AllMessages, ApprovalCheckError};
use polkadot_node_subsystem::messages::{
network_bridge_event, AllMessages, ApprovalCheckError, ReportPeerMessage,
};
use polkadot_node_subsystem_test_helpers as test_helpers;
use polkadot_node_subsystem_util::TimeoutExt as _;
use polkadot_node_subsystem_util::{reputation::add_reputation, TimeoutExt as _};
use polkadot_primitives::{AuthorityDiscoveryId, BlakeTwo256, HashT};
use polkadot_primitives_test_helpers::dummy_signature;
use rand::SeedableRng;
@@ -54,7 +56,8 @@ fn test_harness<T: Future<Output = VirtualOverseer>>(
{
let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(12345);
let subsystem = subsystem.run_inner(context, &mut state, &mut rng);
let subsystem =
subsystem.run_inner(context, &mut state, REPUTATION_CHANGE_TEST_INTERVAL, &mut rng);
let test_fut = test_fn(virtual_overseer);
@@ -78,6 +81,7 @@ fn test_harness<T: Future<Output = VirtualOverseer>>(
}
const TIMEOUT: Duration = Duration::from_millis(200);
const REPUTATION_CHANGE_TEST_INTERVAL: Duration = Duration::from_millis(1);
async fn overseer_send(overseer: &mut VirtualOverseer, msg: ApprovalDistributionMessage) {
gum::trace!(msg = ?msg, "Sending message");
@@ -273,22 +277,46 @@ fn fake_assignment_cert(block_hash: Hash, validator: ValidatorIndex) -> Indirect
async fn expect_reputation_change(
virtual_overseer: &mut VirtualOverseer,
peer_id: &PeerId,
expected_reputation_change: Rep,
rep: Rep,
) {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::ReportPeer(
rep_peer,
rep,
)
) => {
assert_eq!(peer_id, &rep_peer);
assert_eq!(expected_reputation_change, rep);
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(
ReportPeerMessage::Single(p, r),
)) => {
assert_eq!(p, *peer_id);
assert_eq!(r, rep.into());
}
);
}
async fn expect_reputation_changes(
virtual_overseer: &mut VirtualOverseer,
peer_id: &PeerId,
reps: Vec<Rep>,
) {
let mut acc = HashMap::new();
for rep in reps {
add_reputation(&mut acc, *peer_id, rep);
}
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(
ReportPeerMessage::Batch(v),
)) => {
assert_eq!(v, acc);
}
);
}
fn state_without_reputation_delay() -> State {
State { reputation: ReputationAggregator::new(|_| true), ..Default::default() }
}
fn state_with_reputation_delay() -> State {
State { reputation: ReputationAggregator::new(|_| false), ..Default::default() }
}
/// import an assignment
/// connect a new peer
/// the new peer sends us the same assignment
@@ -301,7 +329,7 @@ fn try_import_the_same_assignment() {
let parent_hash = Hash::repeat_byte(0xFF);
let hash = Hash::repeat_byte(0xAA);
let _ = test_harness(State::default(), |mut virtual_overseer| async move {
let _ = test_harness(state_without_reputation_delay(), |mut virtual_overseer| async move {
let overseer = &mut virtual_overseer;
// setup peers
setup_peer_with_view(overseer, &peer_a, view![]).await;
@@ -373,6 +401,65 @@ fn try_import_the_same_assignment() {
});
}
/// import an assignment
/// connect a new peer
/// state sends aggregated reputation change
#[test]
fn delay_reputation_change() {
let peer = PeerId::random();
let parent_hash = Hash::repeat_byte(0xFF);
let hash = Hash::repeat_byte(0xAA);
let _ = test_harness(state_with_reputation_delay(), |mut virtual_overseer| async move {
let overseer = &mut virtual_overseer;
// Setup peers
setup_peer_with_view(overseer, &peer, view![]).await;
// new block `hash_a` with 1 candidates
let meta = BlockApprovalMeta {
hash,
parent_hash,
number: 2,
candidates: vec![Default::default(); 1],
slot: 1.into(),
session: 1,
};
let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]);
overseer_send(overseer, msg).await;
// send the assignment related to `hash`
let validator_index = ValidatorIndex(0);
let cert = fake_assignment_cert(hash, validator_index);
let assignments = vec![(cert.clone(), 0u32)];
let msg = protocol_v1::ApprovalDistributionMessage::Assignments(assignments.clone());
send_message_from_peer(overseer, &peer, msg).await;
// send an `Accept` message from the Approval Voting subsystem
assert_matches!(
overseer_recv(overseer).await,
AllMessages::ApprovalVoting(ApprovalVotingMessage::CheckAndImportAssignment(
assignment,
0u32,
tx,
)) => {
assert_eq!(assignment, cert);
tx.send(AssignmentCheckResult::Accepted).unwrap();
}
);
expect_reputation_changes(
overseer,
&peer,
vec![COST_UNEXPECTED_MESSAGE, BENEFIT_VALID_MESSAGE_FIRST],
)
.await;
assert!(overseer.recv().timeout(TIMEOUT).await.is_none(), "no message should be sent");
virtual_overseer
});
}
/// <https://github.com/paritytech/polkadot/pull/2160#discussion_r547594835>
///
/// 1. Send a view update that removes block B from their view.
@@ -385,7 +472,7 @@ fn spam_attack_results_in_negative_reputation_change() {
let peer_a = PeerId::random();
let hash_b = Hash::repeat_byte(0xBB);
let _ = test_harness(State::default(), |mut virtual_overseer| async move {
let _ = test_harness(state_without_reputation_delay(), |mut virtual_overseer| async move {
let overseer = &mut virtual_overseer;
let peer = &peer_a;
setup_peer_with_view(overseer, peer, view![]).await;
@@ -469,7 +556,7 @@ fn peer_sending_us_the_same_we_just_sent_them_is_ok() {
let peer_a = PeerId::random();
let hash = Hash::repeat_byte(0xAA);
let _ = test_harness(State::default(), |mut virtual_overseer| async move {
let _ = test_harness(state_without_reputation_delay(), |mut virtual_overseer| async move {
let overseer = &mut virtual_overseer;
let peer = &peer_a;
setup_peer_with_view(overseer, peer, view![]).await;
@@ -545,7 +632,7 @@ fn import_approval_happy_path() {
let parent_hash = Hash::repeat_byte(0xFF);
let hash = Hash::repeat_byte(0xAA);
let _ = test_harness(State::default(), |mut virtual_overseer| async move {
let _ = test_harness(state_without_reputation_delay(), |mut virtual_overseer| async move {
let overseer = &mut virtual_overseer;
// setup peers
setup_peer_with_view(overseer, &peer_a, view![]).await;
@@ -633,7 +720,7 @@ fn import_approval_bad() {
let parent_hash = Hash::repeat_byte(0xFF);
let hash = Hash::repeat_byte(0xAA);
let _ = test_harness(State::default(), |mut virtual_overseer| async move {
let _ = test_harness(state_without_reputation_delay(), |mut virtual_overseer| async move {
let overseer = &mut virtual_overseer;
// setup peers
setup_peer_with_view(overseer, &peer_a, view![]).await;
@@ -942,7 +1029,7 @@ fn import_remotely_then_locally() {
let hash = Hash::repeat_byte(0xAA);
let peer = &peer_a;
let _ = test_harness(State::default(), |mut virtual_overseer| async move {
let _ = test_harness(state_without_reputation_delay(), |mut virtual_overseer| async move {
let overseer = &mut virtual_overseer;
// setup the peer
setup_peer_with_view(overseer, peer, view![hash]).await;
@@ -1114,7 +1201,7 @@ fn race_condition_in_local_vs_remote_view_update() {
let peer_a = PeerId::random();
let hash_b = Hash::repeat_byte(0xBB);
let _ = test_harness(State::default(), |mut virtual_overseer| async move {
let _ = test_harness(state_without_reputation_delay(), |mut virtual_overseer| async move {
let overseer = &mut virtual_overseer;
let peer = &peer_a;
@@ -1294,7 +1381,7 @@ fn propagates_assignments_along_unshared_dimension() {
let peers = make_peers_and_authority_ids(100);
let _ = test_harness(State::default(), |mut virtual_overseer| async move {
let _ = test_harness(state_without_reputation_delay(), |mut virtual_overseer| async move {
let overseer = &mut virtual_overseer;
// Connect all peers.
@@ -1883,7 +1970,7 @@ fn non_originator_aggression_l1() {
let peers = make_peers_and_authority_ids(100);
let mut state = State::default();
let mut state = state_without_reputation_delay();
state.aggression_config.resend_unfinalized_period = None;
let aggression_l1_threshold = state.aggression_config.l1_threshold.clone().unwrap();
@@ -1987,7 +2074,7 @@ fn non_originator_aggression_l2() {
let peers = make_peers_and_authority_ids(100);
let mut state = State::default();
let mut state = state_without_reputation_delay();
state.aggression_config.resend_unfinalized_period = None;
let aggression_l1_threshold = state.aggression_config.l1_threshold.clone().unwrap();
@@ -2154,7 +2241,7 @@ fn resends_messages_periodically() {
let peers = make_peers_and_authority_ids(100);
let mut state = State::default();
let mut state = state_without_reputation_delay();
state.aggression_config.l1_threshold = None;
state.aggression_config.l2_threshold = None;
state.aggression_config.resend_unfinalized_period = Some(2);
@@ -2298,7 +2385,8 @@ fn batch_test_round(message_count: usize) {
let subsystem = ApprovalDistribution::new(Default::default());
let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(12345);
let mut sender = context.sender().clone();
let subsystem = subsystem.run_inner(context, &mut state, &mut rng);
let subsystem =
subsystem.run_inner(context, &mut state, REPUTATION_CHANGE_TEST_INTERVAL, &mut rng);
let test_fut = async move {
let overseer = &mut virtual_overseer;
@@ -6,6 +6,7 @@ edition.workspace = true
[dependencies]
futures = "0.3.21"
futures-timer = "3.0.2"
gum = { package = "tracing-gum", path = "../../gum" }
polkadot-primitives = { path = "../../../primitives" }
polkadot-node-subsystem = {path = "../../subsystem" }
@@ -35,11 +35,18 @@ use polkadot_node_subsystem::{
jaeger, messages::*, overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, PerLeafSpan,
SpawnedSubsystem, SubsystemError, SubsystemResult,
};
use polkadot_node_subsystem_util::{self as util};
use polkadot_node_subsystem_util::{
self as util,
reputation::{ReputationAggregator, REPUTATION_CHANGE_INTERVAL},
};
use futures::select;
use polkadot_primitives::{Hash, SignedAvailabilityBitfield, SigningContext, ValidatorId};
use rand::{CryptoRng, Rng, SeedableRng};
use std::collections::{HashMap, HashSet};
use std::{
collections::{HashMap, HashSet},
time::Duration,
};
use self::metrics::Metrics;
@@ -97,6 +104,9 @@ struct ProtocolState {
/// Additional data particular to a relay parent.
per_relay_parent: HashMap<Hash, PerRelayParentData>,
/// Aggregated reputation change
reputation: ReputationAggregator,
}
/// Data for a particular relay parent.
@@ -178,95 +188,107 @@ impl BitfieldDistribution {
async fn run<Context>(self, ctx: Context) {
let mut state = ProtocolState::default();
let mut rng = rand::rngs::StdRng::from_entropy();
self.run_inner(ctx, &mut state, &mut rng).await
self.run_inner(ctx, &mut state, REPUTATION_CHANGE_INTERVAL, &mut rng).await
}
async fn run_inner<Context>(
self,
mut ctx: Context,
state: &mut ProtocolState,
reputation_interval: Duration,
rng: &mut (impl CryptoRng + Rng),
) {
// work: process incoming messages from the overseer and process accordingly.
let new_reputation_delay = || futures_timer::Delay::new(reputation_interval).fuse();
let mut reputation_delay = new_reputation_delay();
loop {
let message = match ctx.recv().await {
Ok(message) => message,
Err(err) => {
gum::error!(
target: LOG_TARGET,
?err,
"Failed to receive a message from Overseer, exiting"
);
return
select! {
_ = reputation_delay => {
state.reputation.send(ctx.sender()).await;
reputation_delay = new_reputation_delay();
},
};
match message {
FromOrchestra::Communication {
msg:
BitfieldDistributionMessage::DistributeBitfield(
relay_parent,
signed_availability,
),
} => {
gum::trace!(target: LOG_TARGET, ?relay_parent, "Processing DistributeBitfield");
handle_bitfield_distribution(
&mut ctx,
state,
&self.metrics,
relay_parent,
signed_availability,
rng,
)
.await;
},
FromOrchestra::Communication {
msg: BitfieldDistributionMessage::NetworkBridgeUpdate(event),
} => {
gum::trace!(target: LOG_TARGET, "Processing NetworkMessage");
// a network message was received
handle_network_msg(&mut ctx, state, &self.metrics, event, rng).await;
},
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated,
..
})) => {
let _timer = self.metrics.time_active_leaves_update();
if let Some(activated) = activated {
let relay_parent = activated.hash;
gum::trace!(target: LOG_TARGET, ?relay_parent, "activated");
let span = PerLeafSpan::new(activated.span, "bitfield-distribution");
let _span = span.child("query-basics");
// query validator set and signing context per relay_parent once only
match query_basics(&mut ctx, relay_parent).await {
Ok(Some((validator_set, signing_context))) => {
// If our runtime API fails, we don't take down the node,
// but we might alter peers' reputations erroneously as a result
// of not having the correct bookkeeping. If we have lost a race
// with state pruning, it is unlikely that peers will be sending
// us anything to do with this relay-parent anyway.
let _ = state.per_relay_parent.insert(
message = ctx.recv().fuse() => {
let message = match message {
Ok(message) => message,
Err(err) => {
gum::error!(
target: LOG_TARGET,
?err,
"Failed to receive a message from Overseer, exiting"
);
return
},
};
match message {
FromOrchestra::Communication {
msg:
BitfieldDistributionMessage::DistributeBitfield(
relay_parent,
PerRelayParentData::new(signing_context, validator_set, span),
);
},
Err(err) => {
gum::warn!(target: LOG_TARGET, ?err, "query_basics has failed");
},
_ => {},
}
signed_availability,
),
} => {
gum::trace!(target: LOG_TARGET, ?relay_parent, "Processing DistributeBitfield");
handle_bitfield_distribution(
&mut ctx,
state,
&self.metrics,
relay_parent,
signed_availability,
rng,
)
.await;
},
FromOrchestra::Communication {
msg: BitfieldDistributionMessage::NetworkBridgeUpdate(event),
} => {
gum::trace!(target: LOG_TARGET, "Processing NetworkMessage");
// a network message was received
handle_network_msg(&mut ctx, state, &self.metrics, event, rng).await;
},
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated,
..
})) => {
let _timer = self.metrics.time_active_leaves_update();
if let Some(activated) = activated {
let relay_parent = activated.hash;
gum::trace!(target: LOG_TARGET, ?relay_parent, "activated");
let span = PerLeafSpan::new(activated.span, "bitfield-distribution");
let _span = span.child("query-basics");
// query validator set and signing context per relay_parent once only
match query_basics(&mut ctx, relay_parent).await {
Ok(Some((validator_set, signing_context))) => {
// If our runtime API fails, we don't take down the node,
// but we might alter peers' reputations erroneously as a result
// of not having the correct bookkeeping. If we have lost a race
// with state pruning, it is unlikely that peers will be sending
// us anything to do with this relay-parent anyway.
let _ = state.per_relay_parent.insert(
relay_parent,
PerRelayParentData::new(signing_context, validator_set, span),
);
},
Err(err) => {
gum::warn!(target: LOG_TARGET, ?err, "query_basics has failed");
},
_ => {},
}
}
},
FromOrchestra::Signal(OverseerSignal::BlockFinalized(hash, number)) => {
gum::trace!(target: LOG_TARGET, ?hash, %number, "block finalized");
},
FromOrchestra::Signal(OverseerSignal::Conclude) => {
gum::info!(target: LOG_TARGET, "Conclude");
return
},
}
},
FromOrchestra::Signal(OverseerSignal::BlockFinalized(hash, number)) => {
gum::trace!(target: LOG_TARGET, ?hash, %number, "block finalized");
},
FromOrchestra::Signal(OverseerSignal::Conclude) => {
gum::info!(target: LOG_TARGET, "Conclude");
return
},
}
}
}
}
@@ -274,6 +296,7 @@ impl BitfieldDistribution {
/// Modify the reputation of a peer based on its behavior.
async fn modify_reputation(
reputation: &mut ReputationAggregator,
sender: &mut impl overseer::BitfieldDistributionSenderTrait,
relay_parent: Hash,
peer: PeerId,
@@ -281,7 +304,7 @@ async fn modify_reputation(
) {
gum::trace!(target: LOG_TARGET, ?relay_parent, ?rep, %peer, "reputation change");
sender.send_message(NetworkBridgeTxMessage::ReportPeer(peer, rep)).await
reputation.modify(sender, peer, rep).await;
}
/// Distribute a given valid and signature checked bitfield message.
///
@@ -454,7 +477,14 @@ async fn process_incoming_peer_message<Context>(
);
// we don't care about this, not part of our view.
if !state.view.contains(&relay_parent) {
modify_reputation(ctx.sender(), relay_parent, origin, COST_NOT_IN_VIEW).await;
modify_reputation(
&mut state.reputation,
ctx.sender(),
relay_parent,
origin,
COST_NOT_IN_VIEW,
)
.await;
return
}
@@ -463,7 +493,14 @@ async fn process_incoming_peer_message<Context>(
let job_data: &mut _ = if let Some(ref mut job_data) = job_data {
job_data
} else {
modify_reputation(ctx.sender(), relay_parent, origin, COST_NOT_IN_VIEW).await;
modify_reputation(
&mut state.reputation,
ctx.sender(),
relay_parent,
origin,
COST_NOT_IN_VIEW,
)
.await;
return
};
@@ -480,7 +517,14 @@ async fn process_incoming_peer_message<Context>(
let validator_set = &job_data.validator_set;
if validator_set.is_empty() {
gum::trace!(target: LOG_TARGET, ?relay_parent, ?origin, "Validator set is empty",);
modify_reputation(ctx.sender(), relay_parent, origin, COST_MISSING_PEER_SESSION_KEY).await;
modify_reputation(
&mut state.reputation,
ctx.sender(),
relay_parent,
origin,
COST_MISSING_PEER_SESSION_KEY,
)
.await;
return
}
@@ -490,7 +534,14 @@ async fn process_incoming_peer_message<Context>(
let validator = if let Some(validator) = validator_set.get(validator_index.0 as usize) {
validator.clone()
} else {
modify_reputation(ctx.sender(), relay_parent, origin, COST_VALIDATOR_INDEX_INVALID).await;
modify_reputation(
&mut state.reputation,
ctx.sender(),
relay_parent,
origin,
COST_VALIDATOR_INDEX_INVALID,
)
.await;
return
};
@@ -503,7 +554,14 @@ async fn process_incoming_peer_message<Context>(
received_set.insert(validator.clone());
} else {
gum::trace!(target: LOG_TARGET, ?validator_index, ?origin, "Duplicate message");
modify_reputation(ctx.sender(), relay_parent, origin, COST_PEER_DUPLICATE_MESSAGE).await;
modify_reputation(
&mut state.reputation,
ctx.sender(),
relay_parent,
origin,
COST_PEER_DUPLICATE_MESSAGE,
)
.await;
return
};
@@ -517,13 +575,27 @@ async fn process_incoming_peer_message<Context>(
"already received a message for validator",
);
if old_message.signed_availability.as_unchecked() == &bitfield {
modify_reputation(ctx.sender(), relay_parent, origin, BENEFIT_VALID_MESSAGE).await;
modify_reputation(
&mut state.reputation,
ctx.sender(),
relay_parent,
origin,
BENEFIT_VALID_MESSAGE,
)
.await;
}
return
}
let signed_availability = match bitfield.try_into_checked(&signing_context, &validator) {
Err(_) => {
modify_reputation(ctx.sender(), relay_parent, origin, COST_SIGNATURE_INVALID).await;
modify_reputation(
&mut state.reputation,
ctx.sender(),
relay_parent,
origin,
COST_SIGNATURE_INVALID,
)
.await;
return
},
Ok(bitfield) => bitfield,
@@ -552,7 +624,14 @@ async fn process_incoming_peer_message<Context>(
)
.await;
modify_reputation(ctx.sender(), relay_parent, origin, BENEFIT_VALID_MESSAGE_FIRST).await
modify_reputation(
&mut state.reputation,
ctx.sender(),
relay_parent,
origin,
BENEFIT_VALID_MESSAGE_FIRST,
)
.await
}
/// Deal with network bridge updates and track what needs to be tracked
@@ -28,9 +28,10 @@ use polkadot_node_network_protocol::{
use polkadot_node_subsystem::{
jaeger,
jaeger::{PerLeafSpan, Span},
messages::ReportPeerMessage,
};
use polkadot_node_subsystem_test_helpers::make_subsystem_context;
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_node_subsystem_util::{reputation::add_reputation, TimeoutExt};
use polkadot_primitives::{AvailabilityBitfield, Signed, ValidatorIndex};
use rand_chacha::ChaCha12Rng;
use sp_application_crypto::AppCrypto;
@@ -87,14 +88,16 @@ fn prewarmed_state(
peer_views: peers.iter().cloned().map(|peer| (peer, view!(relay_parent))).collect(),
topologies,
view: our_view!(relay_parent),
reputation: ReputationAggregator::new(|_| true),
}
}
fn state_with_view(
view: OurView,
relay_parent: Hash,
reputation: ReputationAggregator,
) -> (ProtocolState, SigningContext, KeystorePtr, ValidatorId) {
let mut state = ProtocolState::default();
let mut state = ProtocolState { reputation, ..Default::default() };
let signing_context = SigningContext { session_index: 1, parent_hash: relay_parent.clone() };
@@ -234,10 +237,10 @@ fn receive_invalid_signature() {
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::ReportPeer(peer, rep)
NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(peer, rep))
) => {
assert_eq!(peer, peer_b);
assert_eq!(rep, COST_SIGNATURE_INVALID)
assert_eq!(rep.value, COST_SIGNATURE_INVALID.cost_or_benefit())
}
);
});
@@ -258,8 +261,11 @@ fn receive_invalid_validator_index() {
assert_ne!(peer_a, peer_b);
// validator 0 key pair
let (mut state, signing_context, keystore, validator) =
state_with_view(our_view![hash_a, hash_b], hash_a.clone());
let (mut state, signing_context, keystore, validator) = state_with_view(
our_view![hash_a, hash_b],
hash_a.clone(),
ReputationAggregator::new(|_| true),
);
state.peer_views.insert(peer_b.clone(), view![hash_a]);
@@ -295,10 +301,10 @@ fn receive_invalid_validator_index() {
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::ReportPeer(peer, rep)
NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(peer, rep))
) => {
assert_eq!(peer, peer_b);
assert_eq!(rep, COST_VALIDATOR_INDEX_INVALID)
assert_eq!(rep.value, COST_VALIDATOR_INDEX_INVALID.cost_or_benefit())
}
);
});
@@ -319,8 +325,11 @@ fn receive_duplicate_messages() {
assert_ne!(peer_a, peer_b);
// validator 0 key pair
let (mut state, signing_context, keystore, validator) =
state_with_view(our_view![hash_a, hash_b], hash_a.clone());
let (mut state, signing_context, keystore, validator) = state_with_view(
our_view![hash_a, hash_b],
hash_a.clone(),
ReputationAggregator::new(|_| true),
);
// create a signed message by validator 0
let payload = AvailabilityBitfield(bitvec![u8, bitvec::order::Lsb0; 1u8; 32]);
@@ -371,10 +380,10 @@ fn receive_duplicate_messages() {
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::ReportPeer(peer, rep)
NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(peer, rep))
) => {
assert_eq!(peer, peer_b);
assert_eq!(rep, BENEFIT_VALID_MESSAGE_FIRST)
assert_eq!(rep.value, BENEFIT_VALID_MESSAGE_FIRST.cost_or_benefit())
}
);
@@ -390,10 +399,10 @@ fn receive_duplicate_messages() {
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::ReportPeer(peer, rep)
NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(peer, rep))
) => {
assert_eq!(peer, peer_a);
assert_eq!(rep, BENEFIT_VALID_MESSAGE)
assert_eq!(rep.value, BENEFIT_VALID_MESSAGE.cost_or_benefit())
}
);
@@ -409,15 +418,126 @@ fn receive_duplicate_messages() {
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::ReportPeer(peer, rep)
NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(peer, rep))
) => {
assert_eq!(peer, peer_b);
assert_eq!(rep, COST_PEER_DUPLICATE_MESSAGE)
assert_eq!(rep.value, COST_PEER_DUPLICATE_MESSAGE.cost_or_benefit())
}
);
});
}
#[test]
fn delay_reputation_change() {
let _ = env_logger::builder()
.filter(None, log::LevelFilter::Trace)
.is_test(true)
.try_init();
let hash_a: Hash = [0; 32].into();
let hash_b: Hash = [1; 32].into();
let peer = PeerId::random();
// validator 0 key pair
let (mut state, signing_context, keystore, validator) = state_with_view(
our_view![hash_a, hash_b],
hash_a.clone(),
ReputationAggregator::new(|_| false),
);
// create a signed message by validator 0
let payload = AvailabilityBitfield(bitvec![u8, bitvec::order::Lsb0; 1u8; 32]);
let signed_bitfield = Signed::<AvailabilityBitfield>::sign(
&keystore,
payload,
&signing_context,
ValidatorIndex(0),
&validator,
)
.ok()
.flatten()
.expect("should be signed");
let msg = BitfieldGossipMessage {
relay_parent: hash_a.clone(),
signed_availability: signed_bitfield.clone(),
};
let pool = sp_core::testing::TaskExecutor::new();
let (ctx, mut handle) = make_subsystem_context::<BitfieldDistributionMessage, _>(pool);
let mut rng = dummy_rng();
let reputation_interval = Duration::from_millis(1);
let bg = async move {
let subsystem = BitfieldDistribution::new(Default::default());
subsystem.run_inner(ctx, &mut state, reputation_interval, &mut rng).await;
};
let test_fut = async move {
// send a first message
handle
.send(FromOrchestra::Communication {
msg: BitfieldDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::PeerMessage(
peer.clone(),
msg.clone().into_network_message(),
),
),
})
.await;
// none of our peers has any interest in any messages
// so we do not receive a network send type message here
// but only the one for the next subsystem
assert_matches!(
handle.recv().await,
AllMessages::Provisioner(ProvisionerMessage::ProvisionableData(
_,
ProvisionableData::Bitfield(hash, signed)
)) => {
assert_eq!(hash, hash_a);
assert_eq!(signed, signed_bitfield)
}
);
// let peer send the initial message again
handle
.send(FromOrchestra::Communication {
msg: BitfieldDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::PeerMessage(
peer.clone(),
msg.clone().into_network_message(),
),
),
})
.await;
// Wait enough to fire reputation delay
futures_timer::Delay::new(reputation_interval).await;
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Batch(v))
) => {
let mut expected_change = HashMap::new();
for rep in vec![BENEFIT_VALID_MESSAGE_FIRST, COST_PEER_DUPLICATE_MESSAGE] {
add_reputation(&mut expected_change, peer, rep)
}
assert_eq!(v, expected_change)
}
);
handle.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await;
};
futures::pin_mut!(bg);
futures::pin_mut!(test_fut);
executor::block_on(futures::future::join(bg, test_fut));
}
#[test]
fn do_not_relay_message_twice() {
let _ = env_logger::builder()
@@ -433,7 +553,7 @@ fn do_not_relay_message_twice() {
// validator 0 key pair
let (mut state, signing_context, keystore, validator) =
state_with_view(our_view![hash], hash.clone());
state_with_view(our_view![hash], hash.clone(), ReputationAggregator::new(|_| true));
// create a signed message by validator 0
let payload = AvailabilityBitfield(bitvec![u8, bitvec::order::Lsb0; 1u8; 32]);
@@ -543,8 +663,11 @@ fn changing_view() {
assert_ne!(peer_a, peer_b);
// validator 0 key pair
let (mut state, signing_context, keystore, validator) =
state_with_view(our_view![hash_a, hash_b], hash_a.clone());
let (mut state, signing_context, keystore, validator) = state_with_view(
our_view![hash_a, hash_b],
hash_a.clone(),
ReputationAggregator::new(|_| true),
);
// create a signed message by validator 0
let payload = AvailabilityBitfield(bitvec![u8, bitvec::order::Lsb0; 1u8; 32]);
@@ -618,10 +741,10 @@ fn changing_view() {
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::ReportPeer(peer, rep)
NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(peer, rep))
) => {
assert_eq!(peer, peer_b);
assert_eq!(rep, BENEFIT_VALID_MESSAGE_FIRST)
assert_eq!(rep.value, BENEFIT_VALID_MESSAGE_FIRST.cost_or_benefit())
}
);
@@ -650,10 +773,10 @@ fn changing_view() {
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::ReportPeer(peer, rep)
NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(peer, rep))
) => {
assert_eq!(peer, peer_b);
assert_eq!(rep, COST_PEER_DUPLICATE_MESSAGE)
assert_eq!(rep.value, COST_PEER_DUPLICATE_MESSAGE.cost_or_benefit())
}
);
@@ -682,10 +805,10 @@ fn changing_view() {
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::ReportPeer(peer, rep)
NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(peer, rep))
) => {
assert_eq!(peer, peer_a);
assert_eq!(rep, COST_NOT_IN_VIEW)
assert_eq!(rep.value, COST_NOT_IN_VIEW.cost_or_benefit())
}
);
});
@@ -705,7 +828,8 @@ fn do_not_send_message_back_to_origin() {
assert_ne!(peer_a, peer_b);
// validator 0 key pair
let (mut state, signing_context, keystore, validator) = state_with_view(our_view![hash], hash);
let (mut state, signing_context, keystore, validator) =
state_with_view(our_view![hash], hash, ReputationAggregator::new(|_| true));
// create a signed message by validator 0
let payload = AvailabilityBitfield(bitvec![u8, bitvec::order::Lsb0; 1u8; 32]);
@@ -767,10 +891,10 @@ fn do_not_send_message_back_to_origin() {
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::ReportPeer(peer, rep)
NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(peer, rep))
) => {
assert_eq!(peer, peer_b);
assert_eq!(rep, BENEFIT_VALID_MESSAGE_FIRST)
assert_eq!(rep.value, BENEFIT_VALID_MESSAGE_FIRST.cost_or_benefit())
}
);
});
@@ -786,7 +910,8 @@ fn topology_test() {
let hash: Hash = [0; 32].into();
// validator 0 key pair
let (mut state, signing_context, keystore, validator) = state_with_view(our_view![hash], hash);
let (mut state, signing_context, keystore, validator) =
state_with_view(our_view![hash], hash, ReputationAggregator::new(|_| true));
// Create a simple grid without any shuffling. We occupy position 1.
let topology_peer_info: Vec<_> = (0..49)
@@ -888,10 +1013,10 @@ fn topology_test() {
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::ReportPeer(peer, rep)
NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(peer, rep))
) => {
assert_eq!(peer, peers_x[0]);
assert_eq!(rep, BENEFIT_VALID_MESSAGE_FIRST)
assert_eq!(rep.value, BENEFIT_VALID_MESSAGE_FIRST.cost_or_benefit())
}
);
});
+5 -5
View File
@@ -24,13 +24,13 @@ use parity_scale_codec::Encode;
use sc_network::{
config::parse_addr, multiaddr::Multiaddr, types::ProtocolName, Event as NetworkEvent,
IfDisconnected, NetworkEventStream, NetworkNotification, NetworkPeers, NetworkRequest,
NetworkService, OutboundFailure, RequestFailure,
NetworkService, OutboundFailure, ReputationChange, RequestFailure,
};
use polkadot_node_network_protocol::{
peer_set::{PeerSet, PeerSetProtocolNames, ProtocolVersion},
request_response::{OutgoingRequest, Recipient, ReqProtocolNames, Requests},
PeerId, UnifiedReputationChange as Rep,
PeerId,
};
use polkadot_primitives::{AuthorityDiscoveryId, Block, Hash};
@@ -106,7 +106,7 @@ pub trait Network: Clone + Send + 'static {
);
/// Report a given peer as either beneficial (+) or costly (-) according to the given scalar.
fn report_peer(&self, who: PeerId, cost_benefit: Rep);
fn report_peer(&self, who: PeerId, rep: ReputationChange);
/// Disconnect a given peer from the protocol specified without harming reputation.
fn disconnect_peer(&self, who: PeerId, protocol: ProtocolName);
@@ -133,8 +133,8 @@ impl Network for Arc<NetworkService<Block, Hash>> {
NetworkService::remove_peers_from_reserved_set(&**self, protocol, peers);
}
fn report_peer(&self, who: PeerId, cost_benefit: Rep) {
NetworkService::report_peer(&**self, who, cost_benefit.into_base_rep());
fn report_peer(&self, who: PeerId, rep: ReputationChange) {
NetworkService::report_peer(&**self, who, rep);
}
fn disconnect_peer(&self, who: PeerId, protocol: ProtocolName) {
+4 -4
View File
@@ -364,7 +364,7 @@ where
let v_messages = match v_messages {
Err(rep) => {
gum::debug!(target: LOG_TARGET, action = "ReportPeer");
network_service.report_peer(remote, rep);
network_service.report_peer(remote, rep.into());
continue
},
@@ -395,7 +395,7 @@ where
let c_messages = match c_messages {
Err(rep) => {
gum::debug!(target: LOG_TARGET, action = "ReportPeer");
network_service.report_peer(remote, rep);
network_service.report_peer(remote, rep.into());
continue
},
@@ -441,7 +441,7 @@ where
};
for report in reports {
network_service.report_peer(remote, report);
network_service.report_peer(remote, report.into());
}
dispatch_validation_events_to_all(events, &mut sender).await;
@@ -474,7 +474,7 @@ where
};
for report in reports {
network_service.report_peer(remote, report);
network_service.report_peer(remote, report.into());
}
dispatch_collation_events_to_all(events, &mut sender).await;
+7 -7
View File
@@ -27,7 +27,7 @@ use std::{
sync::atomic::{AtomicBool, Ordering},
};
use sc_network::{Event as NetworkEvent, IfDisconnected, ProtocolName};
use sc_network::{Event as NetworkEvent, IfDisconnected, ProtocolName, ReputationChange};
use polkadot_node_network_protocol::{
peer_set::PeerSetProtocolNames,
@@ -51,12 +51,12 @@ use polkadot_primitives::{AuthorityDiscoveryId, Hash};
use sc_network::Multiaddr;
use sp_keyring::Sr25519Keyring;
use crate::{network::Network, validator_discovery::AuthorityDiscovery, Rep};
use crate::{network::Network, validator_discovery::AuthorityDiscovery};
#[derive(Debug, PartialEq)]
pub enum NetworkAction {
/// Note a change in reputation for a peer.
ReputationChange(PeerId, Rep),
ReputationChange(PeerId, ReputationChange),
/// Disconnect a peer from the given peer-set.
DisconnectPeer(PeerId, PeerSet),
/// Write a notification to a given peer on the given peer-set.
@@ -128,10 +128,10 @@ impl Network for TestNetwork {
) {
}
fn report_peer(&self, who: PeerId, cost_benefit: Rep) {
fn report_peer(&self, who: PeerId, rep: ReputationChange) {
self.action_tx
.lock()
.unbounded_send(NetworkAction::ReputationChange(who, cost_benefit))
.unbounded_send(NetworkAction::ReputationChange(who, rep))
.unwrap();
}
@@ -947,7 +947,7 @@ fn relays_collation_protocol_messages() {
let actions = network_handle.next_network_actions(3).await;
assert_network_actions_contains(
&actions,
&NetworkAction::ReputationChange(peer_a.clone(), UNCONNECTED_PEERSET_COST),
&NetworkAction::ReputationChange(peer_a.clone(), UNCONNECTED_PEERSET_COST.into()),
);
// peer B has the message relayed.
@@ -1142,7 +1142,7 @@ fn view_finalized_number_can_not_go_down() {
let actions = network_handle.next_network_actions(2).await;
assert_network_actions_contains(
&actions,
&NetworkAction::ReputationChange(peer_a.clone(), MALFORMED_VIEW_COST),
&NetworkAction::ReputationChange(peer_a.clone(), MALFORMED_VIEW_COST.into()),
);
virtual_overseer
});
+17 -4
View File
@@ -24,14 +24,16 @@ use polkadot_node_network_protocol::{
};
use polkadot_node_subsystem::{
errors::SubsystemError, messages::NetworkBridgeTxMessage, overseer, FromOrchestra,
OverseerSignal, SpawnedSubsystem,
errors::SubsystemError,
messages::{NetworkBridgeTxMessage, ReportPeerMessage},
overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem,
};
/// Peer set info for network initialization.
///
/// To be added to [`NetworkConfiguration::extra_sets`].
pub use polkadot_node_network_protocol::peer_set::{peer_sets_info, IsAuthority};
use sc_network::ReputationChange;
use crate::validator_discovery;
@@ -148,14 +150,25 @@ where
AD: validator_discovery::AuthorityDiscovery + Clone,
{
match msg {
NetworkBridgeTxMessage::ReportPeer(peer, rep) => {
if !rep.is_benefit() {
NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(peer, rep)) => {
if !rep.value.is_positive() {
gum::debug!(target: LOG_TARGET, ?peer, ?rep, action = "ReportPeer");
}
metrics.on_report_event();
network_service.report_peer(peer, rep);
},
NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Batch(batch)) => {
for (peer, score) in batch {
let rep = ReputationChange::new(score, "Aggregated reputation change");
if !rep.value.is_positive() {
gum::debug!(target: LOG_TARGET, ?peer, ?rep, action = "ReportPeer");
}
metrics.on_report_event();
network_service.report_peer(peer, rep);
}
},
NetworkBridgeTxMessage::DisconnectPeer(peer, peer_set) => {
gum::trace!(
target: LOG_TARGET,
+5 -5
View File
@@ -22,7 +22,7 @@ use async_trait::async_trait;
use parking_lot::Mutex;
use std::collections::HashSet;
use sc_network::{Event as NetworkEvent, IfDisconnected, ProtocolName};
use sc_network::{Event as NetworkEvent, IfDisconnected, ProtocolName, ReputationChange};
use polkadot_node_network_protocol::{
peer_set::PeerSetProtocolNames,
@@ -39,12 +39,12 @@ use sp_keyring::Sr25519Keyring;
const TIMEOUT: std::time::Duration = polkadot_node_subsystem_test_helpers::TestSubsystemContextHandle::<NetworkBridgeTxMessage>::TIMEOUT;
use crate::{network::Network, validator_discovery::AuthorityDiscovery, Rep};
use crate::{network::Network, validator_discovery::AuthorityDiscovery};
#[derive(Debug, PartialEq)]
pub enum NetworkAction {
/// Note a change in reputation for a peer.
ReputationChange(PeerId, Rep),
ReputationChange(PeerId, ReputationChange),
/// Disconnect a peer from the given peer-set.
DisconnectPeer(PeerId, PeerSet),
/// Write a notification to a given peer on the given peer-set.
@@ -116,10 +116,10 @@ impl Network for TestNetwork {
) {
}
fn report_peer(&self, who: PeerId, cost_benefit: Rep) {
fn report_peer(&self, who: PeerId, rep: ReputationChange) {
self.action_tx
.lock()
.unbounded_send(NetworkAction::ReputationChange(who, cost_benefit))
.unbounded_send(NetworkAction::ReputationChange(who, rep))
.unwrap();
}
@@ -174,7 +174,7 @@ mod tests {
PeerId,
};
use polkadot_primitives::Hash;
use sc_network::{Event as NetworkEvent, IfDisconnected, ProtocolName};
use sc_network::{Event as NetworkEvent, IfDisconnected, ProtocolName, ReputationChange};
use sp_keyring::Sr25519Keyring;
use std::collections::{HashMap, HashSet};
@@ -249,7 +249,7 @@ mod tests {
) {
}
fn report_peer(&self, _: PeerId, _: crate::Rep) {
fn report_peer(&self, _: PeerId, _: ReputationChange) {
panic!()
}
@@ -44,6 +44,7 @@ use polkadot_node_subsystem::{
overseer, FromOrchestra, OverseerSignal, PerLeafSpan,
};
use polkadot_node_subsystem_util::{
reputation::{ReputationAggregator, REPUTATION_CHANGE_INTERVAL},
runtime::{get_availability_cores, get_group_rotation_info, RuntimeInfo},
TimeoutExt,
};
@@ -53,7 +54,10 @@ use polkadot_primitives::{
};
use super::LOG_TARGET;
use crate::error::{log_error, Error, FatalError, Result};
use crate::{
error::{log_error, Error, FatalError, Result},
modify_reputation,
};
use fatality::Split;
mod metrics;
@@ -245,12 +249,20 @@ struct State {
///
/// Each future returns the relay parent of the finished collation fetch.
active_collation_fetches: ActiveCollationFetches,
/// Aggregated reputation change
reputation: ReputationAggregator,
}
impl State {
/// Creates a new `State` instance with the given parameters and setting all remaining
/// state fields to their default values (i.e. empty).
fn new(local_peer_id: PeerId, collator_pair: CollatorPair, metrics: Metrics) -> State {
fn new(
local_peer_id: PeerId,
collator_pair: CollatorPair,
metrics: Metrics,
reputation: ReputationAggregator,
) -> State {
State {
local_peer_id,
collator_pair,
@@ -267,6 +279,7 @@ impl State {
last_connected_at: None,
waiting_collation_fetches: Default::default(),
active_collation_fetches: Default::default(),
reputation,
}
}
@@ -707,7 +720,7 @@ async fn handle_incoming_peer_message<Context>(
"AdvertiseCollation message is not expected on the collator side of the protocol",
);
ctx.send_message(NetworkBridgeTxMessage::ReportPeer(origin, COST_UNEXPECTED_MESSAGE))
modify_reputation(&mut state.reputation, ctx.sender(), origin, COST_UNEXPECTED_MESSAGE)
.await;
// If we are advertised to, this is another collator, and we should disconnect.
@@ -794,8 +807,13 @@ async fn handle_incoming_request<Context>(
target: LOG_TARGET,
"Dropping incoming request as peer has a request in flight already."
);
ctx.send_message(NetworkBridgeTxMessage::ReportPeer(req.peer, COST_APPARENT_FLOOD))
.await;
modify_reputation(
&mut state.reputation,
ctx.sender(),
req.peer,
COST_APPARENT_FLOOD.into(),
)
.await;
return Ok(())
}
@@ -940,15 +958,40 @@ async fn handle_our_view_change(state: &mut State, view: OurView) -> Result<()>
/// The collator protocol collator side main loop.
#[overseer::contextbounds(CollatorProtocol, prefix = crate::overseer)]
pub(crate) async fn run<Context>(
ctx: Context,
local_peer_id: PeerId,
collator_pair: CollatorPair,
req_receiver: IncomingRequestReceiver<request_v1::CollationFetchingRequest>,
metrics: Metrics,
) -> std::result::Result<(), FatalError> {
run_inner(
ctx,
local_peer_id,
collator_pair,
req_receiver,
metrics,
ReputationAggregator::default(),
REPUTATION_CHANGE_INTERVAL,
)
.await
}
#[overseer::contextbounds(CollatorProtocol, prefix = crate::overseer)]
async fn run_inner<Context>(
mut ctx: Context,
local_peer_id: PeerId,
collator_pair: CollatorPair,
mut req_receiver: IncomingRequestReceiver<request_v1::CollationFetchingRequest>,
metrics: Metrics,
reputation: ReputationAggregator,
reputation_interval: Duration,
) -> std::result::Result<(), FatalError> {
use OverseerSignal::*;
let mut state = State::new(local_peer_id, collator_pair, metrics);
let new_reputation_delay = || futures_timer::Delay::new(reputation_interval).fuse();
let mut reputation_delay = new_reputation_delay();
let mut state = State::new(local_peer_id, collator_pair, metrics, reputation);
let mut runtime = RuntimeInfo::new(None);
let reconnect_stream = super::tick_stream(RECONNECT_POLL);
@@ -958,6 +1001,10 @@ pub(crate) async fn run<Context>(
let recv_req = req_receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse();
pin_mut!(recv_req);
select! {
_ = reputation_delay => {
state.reputation.send(ctx.sender()).await;
reputation_delay = new_reputation_delay();
},
msg = ctx.recv().fuse() => match msg.map_err(FatalError::SubsystemReceive)? {
FromOrchestra::Communication { msg } => {
log_error(
File diff suppressed because it is too large Load Diff
@@ -28,6 +28,7 @@ use futures::{
FutureExt, TryFutureExt,
};
use polkadot_node_subsystem_util::reputation::ReputationAggregator;
use sp_keystore::KeystorePtr;
use polkadot_node_network_protocol::{
@@ -36,9 +37,7 @@ use polkadot_node_network_protocol::{
};
use polkadot_primitives::CollatorPair;
use polkadot_node_subsystem::{
errors::SubsystemError, messages::NetworkBridgeTxMessage, overseer, SpawnedSubsystem,
};
use polkadot_node_subsystem::{errors::SubsystemError, overseer, SpawnedSubsystem};
mod error;
@@ -124,6 +123,7 @@ impl<Context> CollatorProtocolSubsystem {
/// Modify the reputation of a peer based on its behavior.
async fn modify_reputation(
reputation: &mut ReputationAggregator,
sender: &mut impl overseer::CollatorProtocolSenderTrait,
peer: PeerId,
rep: Rep,
@@ -135,7 +135,7 @@ async fn modify_reputation(
"reputation change for peer",
);
sender.send_message(NetworkBridgeTxMessage::ReportPeer(peer, rep)).await;
reputation.modify(sender, peer, rep).await;
}
/// Wait until tick and return the timestamp for the following one.
@@ -52,7 +52,10 @@ use polkadot_node_subsystem::{
},
overseer, FromOrchestra, OverseerSignal, PerLeafSpan, SubsystemSender,
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_node_subsystem_util::{
metrics::{self, prometheus},
reputation::{ReputationAggregator, REPUTATION_CHANGE_INTERVAL},
};
use polkadot_primitives::{CandidateReceipt, CollatorId, Hash, Id as ParaId};
use crate::error::Result;
@@ -612,6 +615,9 @@ struct State {
/// Keep track of all pending candidate collations
pending_candidates: HashMap<Hash, CollationEvent>,
/// Aggregated reputation change
reputation: ReputationAggregator,
}
// O(n) search for collator ID by iterating through the peers map. This should be fast enough
@@ -675,28 +681,31 @@ async fn fetch_collation(
/// Report a collator for some malicious actions.
async fn report_collator(
reputation: &mut ReputationAggregator,
sender: &mut impl overseer::CollatorProtocolSenderTrait,
peer_data: &HashMap<PeerId, PeerData>,
id: CollatorId,
) {
if let Some(peer_id) = collator_peer_id(peer_data, &id) {
modify_reputation(sender, peer_id, COST_REPORT_BAD).await;
modify_reputation(reputation, sender, peer_id, COST_REPORT_BAD).await;
}
}
/// Some other subsystem has reported a collator as a good one, bump reputation.
async fn note_good_collation(
reputation: &mut ReputationAggregator,
sender: &mut impl overseer::CollatorProtocolSenderTrait,
peer_data: &HashMap<PeerId, PeerData>,
id: CollatorId,
) {
if let Some(peer_id) = collator_peer_id(peer_data, &id) {
modify_reputation(sender, peer_id, BENEFIT_NOTIFY_GOOD).await;
modify_reputation(reputation, sender, peer_id, BENEFIT_NOTIFY_GOOD).await;
}
}
/// Notify a collator that its collation got seconded.
async fn notify_collation_seconded(
reputation: &mut ReputationAggregator,
sender: &mut impl overseer::CollatorProtocolSenderTrait,
peer_id: PeerId,
relay_parent: Hash,
@@ -711,7 +720,7 @@ async fn notify_collation_seconded(
))
.await;
modify_reputation(sender, peer_id, BENEFIT_NOTIFY_GOOD).await;
modify_reputation(reputation, sender, peer_id, BENEFIT_NOTIFY_GOOD).await;
}
/// A peer's view has changed. A number of things should be done:
@@ -813,7 +822,13 @@ async fn process_incoming_peer_message<Context>(
match msg {
Declare(collator_id, para_id, signature) => {
if collator_peer_id(&state.peer_data, &collator_id).is_some() {
modify_reputation(ctx.sender(), origin, COST_UNEXPECTED_MESSAGE).await;
modify_reputation(
&mut state.reputation,
ctx.sender(),
origin,
COST_UNEXPECTED_MESSAGE,
)
.await;
return
}
@@ -826,7 +841,13 @@ async fn process_incoming_peer_message<Context>(
?para_id,
"Unknown peer",
);
modify_reputation(ctx.sender(), origin, COST_UNEXPECTED_MESSAGE).await;
modify_reputation(
&mut state.reputation,
ctx.sender(),
origin,
COST_UNEXPECTED_MESSAGE,
)
.await;
return
},
};
@@ -838,7 +859,13 @@ async fn process_incoming_peer_message<Context>(
?para_id,
"Peer is not in the collating state",
);
modify_reputation(ctx.sender(), origin, COST_UNEXPECTED_MESSAGE).await;
modify_reputation(
&mut state.reputation,
ctx.sender(),
origin,
COST_UNEXPECTED_MESSAGE,
)
.await;
return
}
@@ -849,7 +876,13 @@ async fn process_incoming_peer_message<Context>(
?para_id,
"Signature verification failure",
);
modify_reputation(ctx.sender(), origin, COST_INVALID_SIGNATURE).await;
modify_reputation(
&mut state.reputation,
ctx.sender(),
origin,
COST_INVALID_SIGNATURE,
)
.await;
return
}
@@ -872,7 +905,13 @@ async fn process_incoming_peer_message<Context>(
"Declared as collator for unneeded para",
);
modify_reputation(ctx.sender(), origin, COST_UNNEEDED_COLLATOR).await;
modify_reputation(
&mut state.reputation,
ctx.sender(),
origin,
COST_UNNEEDED_COLLATOR,
)
.await;
gum::trace!(target: LOG_TARGET, "Disconnecting unneeded collator");
disconnect_peer(ctx.sender(), origin).await;
}
@@ -890,7 +929,13 @@ async fn process_incoming_peer_message<Context>(
"Advertise collation out of view",
);
modify_reputation(ctx.sender(), origin, COST_UNEXPECTED_MESSAGE).await;
modify_reputation(
&mut state.reputation,
ctx.sender(),
origin,
COST_UNEXPECTED_MESSAGE,
)
.await;
return
}
@@ -902,7 +947,13 @@ async fn process_incoming_peer_message<Context>(
?relay_parent,
"Advertise collation message has been received from an unknown peer",
);
modify_reputation(ctx.sender(), origin, COST_UNEXPECTED_MESSAGE).await;
modify_reputation(
&mut state.reputation,
ctx.sender(),
origin,
COST_UNEXPECTED_MESSAGE,
)
.await;
return
},
Some(p) => p,
@@ -961,7 +1012,13 @@ async fn process_incoming_peer_message<Context>(
"Invalid advertisement",
);
modify_reputation(ctx.sender(), origin, COST_UNEXPECTED_MESSAGE).await;
modify_reputation(
&mut state.reputation,
ctx.sender(),
origin,
COST_UNEXPECTED_MESSAGE,
)
.await;
},
}
},
@@ -1106,7 +1163,7 @@ async fn process_msg<Context>(
);
},
ReportCollator(id) => {
report_collator(ctx.sender(), &state.peer_data, id).await;
report_collator(&mut state.reputation, ctx.sender(), &state.peer_data, id).await;
},
NetworkBridgeUpdate(event) => {
if let Err(e) = handle_network_msg(ctx, state, keystore, event).await {
@@ -1121,8 +1178,21 @@ async fn process_msg<Context>(
if let Some(collation_event) = state.pending_candidates.remove(&parent) {
let (collator_id, pending_collation) = collation_event;
let PendingCollation { relay_parent, peer_id, .. } = pending_collation;
note_good_collation(ctx.sender(), &state.peer_data, collator_id).await;
notify_collation_seconded(ctx.sender(), peer_id, relay_parent, stmt).await;
note_good_collation(
&mut state.reputation,
ctx.sender(),
&state.peer_data,
collator_id,
)
.await;
notify_collation_seconded(
&mut state.reputation,
ctx.sender(),
peer_id,
relay_parent,
stmt,
)
.await;
if let Some(collations) = state.collations_per_relay_parent.get_mut(&parent) {
collations.status = CollationStatus::Seconded;
@@ -1153,7 +1223,8 @@ async fn process_msg<Context>(
Entry::Vacant(_) => return,
};
report_collator(ctx.sender(), &state.peer_data, id.clone()).await;
report_collator(&mut state.reputation, ctx.sender(), &state.peer_data, id.clone())
.await;
dequeue_next_collation_and_fetch(ctx, state, parent, id).await;
},
@@ -1163,12 +1234,35 @@ async fn process_msg<Context>(
/// The main run loop.
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
pub(crate) async fn run<Context>(
mut ctx: Context,
ctx: Context,
keystore: KeystorePtr,
eviction_policy: crate::CollatorEvictionPolicy,
metrics: Metrics,
) -> std::result::Result<(), crate::error::FatalError> {
let mut state = State { metrics, ..Default::default() };
run_inner(
ctx,
keystore,
eviction_policy,
metrics,
ReputationAggregator::default(),
REPUTATION_CHANGE_INTERVAL,
)
.await
}
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn run_inner<Context>(
mut ctx: Context,
keystore: KeystorePtr,
eviction_policy: crate::CollatorEvictionPolicy,
metrics: Metrics,
reputation: ReputationAggregator,
reputation_interval: Duration,
) -> std::result::Result<(), crate::error::FatalError> {
let new_reputation_delay = || futures_timer::Delay::new(reputation_interval).fuse();
let mut reputation_delay = new_reputation_delay();
let mut state = State { metrics, reputation, ..Default::default() };
let next_inactivity_stream = tick_stream(ACTIVITY_POLL);
futures::pin_mut!(next_inactivity_stream);
@@ -1178,6 +1272,10 @@ pub(crate) async fn run<Context>(
loop {
select! {
_ = reputation_delay => {
state.reputation.send(ctx.sender()).await;
reputation_delay = new_reputation_delay();
},
res = ctx.recv().fuse() => {
match res {
Ok(FromOrchestra::Communication { msg }) => {
@@ -1217,7 +1315,7 @@ pub(crate) async fn run<Context>(
).await;
for (peer_id, rep) in reputation_changes {
modify_reputation(ctx.sender(), peer_id, rep).await;
modify_reputation(&mut state.reputation,ctx.sender(), peer_id, rep).await;
}
},
}
@@ -29,9 +29,11 @@ use polkadot_node_network_protocol::{
ObservedRole,
};
use polkadot_node_primitives::BlockData;
use polkadot_node_subsystem::messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest};
use polkadot_node_subsystem::messages::{
AllMessages, ReportPeerMessage, RuntimeApiMessage, RuntimeApiRequest,
};
use polkadot_node_subsystem_test_helpers as test_helpers;
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_node_subsystem_util::{reputation::add_reputation, TimeoutExt};
use polkadot_primitives::{
CollatorPair, CoreState, GroupIndex, GroupRotationInfo, OccupiedCore, ScheduledCore,
ValidatorId, ValidatorIndex,
@@ -42,6 +44,7 @@ use polkadot_primitives_test_helpers::{
const ACTIVITY_TIMEOUT: Duration = Duration::from_millis(500);
const DECLARE_TIMEOUT: Duration = Duration::from_millis(25);
const REPUTATION_CHANGE_TEST_INTERVAL: Duration = Duration::from_millis(10);
#[derive(Clone)]
struct TestState {
@@ -119,7 +122,10 @@ struct TestHarness {
virtual_overseer: VirtualOverseer,
}
fn test_harness<T: Future<Output = VirtualOverseer>>(test: impl FnOnce(TestHarness) -> T) {
fn test_harness<T: Future<Output = VirtualOverseer>>(
reputation: ReputationAggregator,
test: impl FnOnce(TestHarness) -> T,
) {
let _ = env_logger::builder()
.is_test(true)
.filter(Some("polkadot_collator_protocol"), log::LevelFilter::Trace)
@@ -138,7 +144,7 @@ fn test_harness<T: Future<Output = VirtualOverseer>>(test: impl FnOnce(TestHarne
)
.unwrap();
let subsystem = run(
let subsystem = run_inner(
context,
Arc::new(keystore),
crate::CollatorEvictionPolicy {
@@ -146,6 +152,8 @@ fn test_harness<T: Future<Output = VirtualOverseer>>(test: impl FnOnce(TestHarne
undeclared: DECLARE_TIMEOUT,
},
Metrics::default(),
reputation,
REPUTATION_CHANGE_TEST_INTERVAL,
);
let test_fut = test(TestHarness { virtual_overseer });
@@ -348,7 +356,7 @@ async fn advertise_collation(
fn act_on_advertisement() {
let test_state = TestState::default();
test_harness(|test_harness| async move {
test_harness(ReputationAggregator::new(|_| true), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
let pair = CollatorPair::generate().0;
@@ -392,7 +400,7 @@ fn act_on_advertisement() {
fn collator_reporting_works() {
let test_state = TestState::default();
test_harness(|test_harness| async move {
test_harness(ReputationAggregator::new(|_| true), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
overseer_send(
@@ -433,10 +441,10 @@ fn collator_reporting_works() {
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::ReportPeer(peer, rep),
NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(peer, rep)),
) => {
assert_eq!(peer, peer_b);
assert_eq!(rep, COST_REPORT_BAD);
assert_eq!(rep.value, COST_REPORT_BAD.cost_or_benefit());
}
);
@@ -449,7 +457,7 @@ fn collator_reporting_works() {
fn collator_authentication_verification_works() {
let test_state = TestState::default();
test_harness(|test_harness| async move {
test_harness(ReputationAggregator::new(|_| true), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
let peer_b = PeerId::random();
@@ -483,10 +491,10 @@ fn collator_authentication_verification_works() {
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::ReportPeer(peer, rep),
NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(peer, rep)),
) => {
assert_eq!(peer, peer_b);
assert_eq!(rep, COST_INVALID_SIGNATURE);
assert_eq!(rep.value, COST_INVALID_SIGNATURE.cost_or_benefit());
}
);
virtual_overseer
@@ -500,7 +508,7 @@ fn collator_authentication_verification_works() {
fn fetch_one_collation_at_a_time() {
let test_state = TestState::default();
test_harness(|test_harness| async move {
test_harness(ReputationAggregator::new(|_| true), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
let second = Hash::random();
@@ -585,7 +593,7 @@ fn fetch_one_collation_at_a_time() {
fn fetches_next_collation() {
let test_state = TestState::default();
test_harness(|test_harness| async move {
test_harness(ReputationAggregator::new(|_| true), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
let second = Hash::random();
@@ -683,7 +691,7 @@ fn fetches_next_collation() {
fn reject_connection_to_next_group() {
let test_state = TestState::default();
test_harness(|test_harness| async move {
test_harness(ReputationAggregator::new(|_| true), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
overseer_send(
@@ -709,11 +717,10 @@ fn reject_connection_to_next_group() {
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(
peer,
rep,
ReportPeerMessage::Single(peer, rep),
)) => {
assert_eq!(peer, peer_b);
assert_eq!(rep, COST_UNNEEDED_COLLATOR);
assert_eq!(rep.value, COST_UNNEEDED_COLLATOR.cost_or_benefit());
}
);
@@ -728,7 +735,7 @@ fn reject_connection_to_next_group() {
fn fetch_next_collation_on_invalid_collation() {
let test_state = TestState::default();
test_harness(|test_harness| async move {
test_harness(ReputationAggregator::new(|_| true), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
let second = Hash::random();
@@ -802,11 +809,10 @@ fn fetch_next_collation_on_invalid_collation() {
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(
peer,
rep,
ReportPeerMessage::Single(peer, rep),
)) => {
assert_eq!(peer, peer_b);
assert_eq!(rep, COST_REPORT_BAD);
assert_eq!(rep.value, COST_REPORT_BAD.cost_or_benefit());
}
);
@@ -826,7 +832,7 @@ fn fetch_next_collation_on_invalid_collation() {
fn inactive_disconnected() {
let test_state = TestState::default();
test_harness(|test_harness| async move {
test_harness(ReputationAggregator::new(|_| true), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
let pair = CollatorPair::generate().0;
@@ -872,7 +878,7 @@ fn inactive_disconnected() {
fn activity_extends_life() {
let test_state = TestState::default();
test_harness(|test_harness| async move {
test_harness(ReputationAggregator::new(|_| true), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
let pair = CollatorPair::generate().0;
@@ -937,7 +943,7 @@ fn activity_extends_life() {
fn disconnect_if_no_declare() {
let test_state = TestState::default();
test_harness(|test_harness| async move {
test_harness(ReputationAggregator::new(|_| true), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
overseer_send(
@@ -973,7 +979,7 @@ fn disconnect_if_no_declare() {
fn disconnect_if_wrong_declare() {
let test_state = TestState::default();
test_harness(|test_harness| async move {
test_harness(ReputationAggregator::new(|_| true), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
let pair = CollatorPair::generate().0;
@@ -1017,11 +1023,10 @@ fn disconnect_if_wrong_declare() {
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(
peer,
rep,
ReportPeerMessage::Single(peer, rep),
)) => {
assert_eq!(peer, peer_b);
assert_eq!(rep, COST_UNNEEDED_COLLATOR);
assert_eq!(rep.value, COST_UNNEEDED_COLLATOR.cost_or_benefit());
}
);
@@ -1031,11 +1036,96 @@ fn disconnect_if_wrong_declare() {
})
}
#[test]
fn delay_reputation_change() {
let test_state = TestState::default();
test_harness(ReputationAggregator::new(|_| false), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
let pair = CollatorPair::generate().0;
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::OurViewChange(
our_view![test_state.relay_parent],
)),
)
.await;
respond_to_core_info_queries(&mut virtual_overseer, &test_state).await;
let peer_b = PeerId::random();
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerConnected(
peer_b.clone(),
ObservedRole::Full,
CollationVersion::V1.into(),
None,
)),
)
.await;
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(
peer_b.clone(),
Versioned::V1(protocol_v1::CollatorProtocolMessage::Declare(
pair.public(),
ParaId::from(69),
pair.sign(&protocol_v1::declare_signature_payload(&peer_b)),
)),
)),
)
.await;
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(
peer_b.clone(),
Versioned::V1(protocol_v1::CollatorProtocolMessage::Declare(
pair.public(),
ParaId::from(69),
pair.sign(&protocol_v1::declare_signature_payload(&peer_b)),
)),
)),
)
.await;
// Wait enough to fire reputation delay
futures_timer::Delay::new(REPUTATION_CHANGE_TEST_INTERVAL).await;
loop {
match overseer_recv(&mut virtual_overseer).await {
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::DisconnectPeer(_, _)) => {
gum::trace!("`Disconnecting inactive peer` message skipped");
continue
},
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(
ReportPeerMessage::Batch(v),
)) => {
let mut expected_change = HashMap::new();
for rep in vec![COST_UNNEEDED_COLLATOR, COST_UNNEEDED_COLLATOR] {
add_reputation(&mut expected_change, peer_b, rep);
}
assert_eq!(v, expected_change);
break
},
_ => panic!("Message should be either `DisconnectPeer` or `ReportPeer`"),
}
}
virtual_overseer
})
}
#[test]
fn view_change_clears_old_collators() {
let mut test_state = TestState::default();
test_harness(|test_harness| async move {
test_harness(ReputationAggregator::new(|_| true), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
let pair = CollatorPair::generate().0;
@@ -42,7 +42,7 @@ impl UnifiedReputationChange {
///
/// The whole range of an `i32` should be used, so order of magnitude of
/// something malicious should be `1<<20` (give or take).
const fn cost_or_benefit(&self) -> i32 {
pub const fn cost_or_benefit(&self) -> i32 {
match self {
Self::CostMinor(_) => -100_000,
Self::CostMajor(_) => -300_000,
@@ -81,9 +81,10 @@ impl UnifiedReputationChange {
_ => false,
}
}
}
/// Convert into a base reputation as used with substrate.
pub const fn into_base_rep(self) -> ReputationChange {
ReputationChange::new(self.cost_or_benefit(), self.description())
impl From<UnifiedReputationChange> for ReputationChange {
fn from(value: UnifiedReputationChange) -> Self {
ReputationChange::new(value.cost_or_benefit(), value.description())
}
}
@@ -88,8 +88,7 @@ where
let payload = match Req::decode(&mut payload.as_ref()) {
Ok(payload) => payload,
Err(err) => {
let reputation_changes =
reputation_changes.into_iter().map(|r| r.into_base_rep()).collect();
let reputation_changes = reputation_changes.into_iter().map(|r| r.into()).collect();
let response = sc_network::config::OutgoingResponse {
result: Err(()),
reputation_changes,
@@ -175,7 +174,7 @@ where
let response = netconfig::OutgoingResponse {
result: result.map(|v| v.encode()),
reputation_changes: reputation_changes.into_iter().map(|c| c.into_base_rep()).collect(),
reputation_changes: reputation_changes.into_iter().map(|c| c.into()).collect(),
sent_feedback,
};
@@ -7,6 +7,7 @@ edition.workspace = true
[dependencies]
futures = "0.3.21"
futures-timer = "3.0.2"
gum = { package = "tracing-gum", path = "../../gum" }
polkadot-primitives = { path = "../../../primitives" }
sp-staking = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }
@@ -55,12 +55,19 @@ use futures::{
channel::{mpsc, oneshot},
future::RemoteHandle,
prelude::*,
select,
};
use indexmap::{map::Entry as IEntry, IndexMap};
use sp_keystore::KeystorePtr;
use util::runtime::RuntimeInfo;
use util::{
reputation::{ReputationAggregator, REPUTATION_CHANGE_INTERVAL},
runtime::RuntimeInfo,
};
use std::collections::{hash_map::Entry, HashMap, HashSet, VecDeque};
use std::{
collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
time::Duration,
};
use fatality::Nested;
@@ -126,6 +133,8 @@ pub struct StatementDistributionSubsystem<R> {
metrics: Metrics,
/// Pseudo-random generator for peers selection logic
rng: R,
/// Aggregated reputation change
reputation: ReputationAggregator,
}
#[overseer::subsystem(StatementDistribution, error=SubsystemError, prefix=self::overseer)]
@@ -1167,12 +1176,14 @@ async fn send_statements<Context>(
}
}
async fn report_peer(
/// Modify the reputation of a peer based on its behavior.
async fn modify_reputation(
reputation: &mut ReputationAggregator,
sender: &mut impl overseer::StatementDistributionSenderTrait,
peer: PeerId,
rep: Rep,
) {
sender.send_message(NetworkBridgeTxMessage::ReportPeer(peer, rep)).await
reputation.modify(sender, peer, rep).await;
}
/// If message contains a statement, then retrieve it, otherwise fork task to fetch it.
@@ -1319,6 +1330,7 @@ async fn handle_incoming_message_and_circulate<'a, Context, R>(
metrics: &Metrics,
runtime: &mut RuntimeInfo,
rng: &mut R,
reputation: &mut ReputationAggregator,
) where
R: rand::Rng,
{
@@ -1333,6 +1345,7 @@ async fn handle_incoming_message_and_circulate<'a, Context, R>(
message,
req_sender,
metrics,
reputation,
)
.await,
None => None,
@@ -1397,6 +1410,7 @@ async fn handle_incoming_message<'a, Context>(
message: protocol_v1::StatementDistributionMessage,
req_sender: &mpsc::Sender<RequesterMessage>,
metrics: &Metrics,
reputation: &mut ReputationAggregator,
) -> Option<(Hash, StoredStatement<'a>)> {
let relay_parent = message.get_relay_parent();
let _ = metrics.time_network_bridge_update_v1("handle_incoming_message");
@@ -1411,7 +1425,7 @@ async fn handle_incoming_message<'a, Context>(
);
if !recent_outdated_heads.is_recent_outdated(&relay_parent) {
report_peer(ctx.sender(), peer, COST_UNEXPECTED_STATEMENT).await;
modify_reputation(reputation, ctx.sender(), peer, COST_UNEXPECTED_STATEMENT).await;
}
return None
@@ -1421,7 +1435,7 @@ async fn handle_incoming_message<'a, Context>(
if let protocol_v1::StatementDistributionMessage::LargeStatement(_) = message {
if let Err(rep) = peer_data.receive_large_statement(&relay_parent) {
gum::debug!(target: LOG_TARGET, ?peer, ?message, ?rep, "Unexpected large statement.",);
report_peer(ctx.sender(), peer, rep).await;
modify_reputation(reputation, ctx.sender(), peer, rep).await;
return None
}
}
@@ -1462,16 +1476,16 @@ async fn handle_incoming_message<'a, Context>(
// Report peer merely if this is not a duplicate out-of-view statement that
// was caused by a missing Seconded statement from this peer
if unexpected_count == 0_usize {
report_peer(ctx.sender(), peer, rep).await;
modify_reputation(reputation, ctx.sender(), peer, rep).await;
}
},
// This happens when we have an unexpected remote peer that announced Seconded
COST_UNEXPECTED_STATEMENT_REMOTE => {
metrics.on_unexpected_statement_seconded();
report_peer(ctx.sender(), peer, rep).await;
modify_reputation(reputation, ctx.sender(), peer, rep).await;
},
_ => {
report_peer(ctx.sender(), peer, rep).await;
modify_reputation(reputation, ctx.sender(), peer, rep).await;
},
}
@@ -1492,7 +1506,7 @@ async fn handle_incoming_message<'a, Context>(
peer_data
.receive(&relay_parent, &fingerprint, max_message_count)
.expect("checked in `check_can_receive` above; qed");
report_peer(ctx.sender(), peer, BENEFIT_VALID_STATEMENT).await;
modify_reputation(reputation, ctx.sender(), peer, BENEFIT_VALID_STATEMENT).await;
return None
},
@@ -1502,7 +1516,7 @@ async fn handle_incoming_message<'a, Context>(
match check_statement_signature(&active_head, relay_parent, unchecked_compact) {
Err(statement) => {
gum::debug!(target: LOG_TARGET, ?peer, ?statement, "Invalid statement signature");
report_peer(ctx.sender(), peer, COST_INVALID_SIGNATURE).await;
modify_reputation(reputation, ctx.sender(), peer, COST_INVALID_SIGNATURE).await;
return None
},
Ok(statement) => statement,
@@ -1528,7 +1542,7 @@ async fn handle_incoming_message<'a, Context>(
is_large_statement,
"Full statement had bad payload."
);
report_peer(ctx.sender(), peer, COST_WRONG_HASH).await;
modify_reputation(reputation, ctx.sender(), peer, COST_WRONG_HASH).await;
return None
},
Ok(statement) => statement,
@@ -1567,7 +1581,7 @@ async fn handle_incoming_message<'a, Context>(
unreachable!("checked in `is_useful_or_unknown` above; qed");
},
NotedStatement::Fresh(statement) => {
report_peer(ctx.sender(), peer, BENEFIT_VALID_STATEMENT_FIRST).await;
modify_reputation(reputation, ctx.sender(), peer, BENEFIT_VALID_STATEMENT_FIRST).await;
let mut _span = handle_incoming_span.child("notify-backing");
@@ -1641,6 +1655,7 @@ async fn handle_network_update<Context, R>(
metrics: &Metrics,
runtime: &mut RuntimeInfo,
rng: &mut R,
reputation: &mut ReputationAggregator,
) where
R: rand::Rng,
{
@@ -1713,6 +1728,7 @@ async fn handle_network_update<Context, R>(
metrics,
runtime,
rng,
reputation,
)
.await;
},
@@ -1750,10 +1766,27 @@ impl<R: rand::Rng> StatementDistributionSubsystem<R> {
metrics: Metrics,
rng: R,
) -> Self {
Self { keystore, req_receiver: Some(req_receiver), metrics, rng }
Self {
keystore,
req_receiver: Some(req_receiver),
metrics,
rng,
reputation: Default::default(),
}
}
async fn run<Context>(mut self, mut ctx: Context) -> std::result::Result<(), FatalError> {
async fn run<Context>(self, ctx: Context) -> std::result::Result<(), FatalError> {
self.run_inner(ctx, REPUTATION_CHANGE_INTERVAL).await
}
async fn run_inner<Context>(
mut self,
mut ctx: Context,
reputation_interval: Duration,
) -> std::result::Result<(), FatalError> {
let new_reputation_delay = || futures_timer::Delay::new(reputation_interval).fuse();
let mut reputation_delay = new_reputation_delay();
let mut peers: HashMap<PeerId, PeerData> = HashMap::new();
let mut topology_storage: SessionBoundGridTopologyStorage = Default::default();
let mut authorities: HashMap<AuthorityDiscoveryId, PeerId> = HashMap::new();
@@ -1778,55 +1811,61 @@ impl<R: rand::Rng> StatementDistributionSubsystem<R> {
.map_err(FatalError::SpawnTask)?;
loop {
let message =
MuxedMessage::receive(&mut ctx, &mut req_receiver, &mut res_receiver).await;
match message {
MuxedMessage::Subsystem(result) => {
let result = self
.handle_subsystem_message(
&mut ctx,
&mut runtime,
&mut peers,
&mut topology_storage,
&mut authorities,
&mut active_heads,
&mut recent_outdated_heads,
&req_sender,
result?,
)
.await;
match result.into_nested()? {
Ok(true) => break,
Ok(false) => {},
Err(jfyi) => gum::debug!(target: LOG_TARGET, error = ?jfyi),
}
select! {
_ = reputation_delay => {
self.reputation.send(ctx.sender()).await;
reputation_delay = new_reputation_delay();
},
MuxedMessage::Requester(result) => {
let result = self
.handle_requester_message(
&mut ctx,
&topology_storage,
&mut peers,
&mut active_heads,
&recent_outdated_heads,
&req_sender,
&mut runtime,
result.ok_or(FatalError::RequesterReceiverFinished)?,
)
.await;
log_error(result.map_err(From::from), "handle_requester_message")?;
},
MuxedMessage::Responder(result) => {
let result = self
.handle_responder_message(
&peers,
&mut active_heads,
result.ok_or(FatalError::ResponderReceiverFinished)?,
)
.await;
log_error(result.map_err(From::from), "handle_responder_message")?;
},
};
message = MuxedMessage::receive(&mut ctx, &mut req_receiver, &mut res_receiver).fuse() => {
match message {
MuxedMessage::Subsystem(result) => {
let result = self
.handle_subsystem_message(
&mut ctx,
&mut runtime,
&mut peers,
&mut topology_storage,
&mut authorities,
&mut active_heads,
&mut recent_outdated_heads,
&req_sender,
result?,
)
.await;
match result.into_nested()? {
Ok(true) => break,
Ok(false) => {},
Err(jfyi) => gum::debug!(target: LOG_TARGET, error = ?jfyi),
}
},
MuxedMessage::Requester(result) => {
let result = self
.handle_requester_message(
&mut ctx,
&topology_storage,
&mut peers,
&mut active_heads,
&recent_outdated_heads,
&req_sender,
&mut runtime,
result.ok_or(FatalError::RequesterReceiverFinished)?,
)
.await;
log_error(result.map_err(From::from), "handle_requester_message")?;
},
MuxedMessage::Responder(result) => {
let result = self
.handle_responder_message(
&peers,
&mut active_heads,
result.ok_or(FatalError::ResponderReceiverFinished)?,
)
.await;
log_error(result.map_err(From::from), "handle_responder_message")?;
},
};
}
}
}
Ok(())
}
@@ -1890,9 +1929,16 @@ impl<R: rand::Rng> StatementDistributionSubsystem<R> {
bad_peers,
} => {
for bad in bad_peers {
report_peer(ctx.sender(), bad, COST_FETCH_FAIL).await;
modify_reputation(&mut self.reputation, ctx.sender(), bad, COST_FETCH_FAIL)
.await;
}
report_peer(ctx.sender(), from_peer, BENEFIT_VALID_RESPONSE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
from_peer,
BENEFIT_VALID_RESPONSE,
)
.await;
let active_head = active_heads
.get_mut(&relay_parent)
@@ -1932,6 +1978,7 @@ impl<R: rand::Rng> StatementDistributionSubsystem<R> {
&self.metrics,
runtime,
&mut self.rng,
&mut self.reputation,
)
.await;
}
@@ -1975,7 +2022,8 @@ impl<R: rand::Rng> StatementDistributionSubsystem<R> {
}
}
},
RequesterMessage::ReportPeer(peer, rep) => report_peer(ctx.sender(), peer, rep).await,
RequesterMessage::ReportPeer(peer, rep) =>
modify_reputation(&mut self.reputation, ctx.sender(), peer, rep).await,
}
Ok(())
}
@@ -2113,6 +2161,7 @@ impl<R: rand::Rng> StatementDistributionSubsystem<R> {
metrics,
runtime,
&mut self.rng,
&mut self.reputation,
)
.await;
},
@@ -31,7 +31,9 @@ use polkadot_node_network_protocol::{
use polkadot_node_primitives::{Statement, UncheckedSignedFullStatement};
use polkadot_node_subsystem::{
jaeger,
messages::{network_bridge_event, AllMessages, RuntimeApiMessage, RuntimeApiRequest},
messages::{
network_bridge_event, AllMessages, ReportPeerMessage, RuntimeApiMessage, RuntimeApiRequest,
},
ActivatedLeaf, LeafStatus,
};
use polkadot_node_subsystem_test_helpers::mock::make_ferdie_keystore;
@@ -47,6 +49,7 @@ use sp_authority_discovery::AuthorityPair;
use sp_keyring::Sr25519Keyring;
use sp_keystore::{Keystore, KeystorePtr};
use std::{iter::FromIterator as _, sync::Arc, time::Duration};
use util::reputation::add_reputation;
// Some deterministic genesis hash for protocol names
const GENESIS_HASH: Hash = Hash::repeat_byte(0xff);
@@ -733,12 +736,13 @@ fn receiving_from_one_sends_to_another_and_to_candidate_backing() {
let (statement_req_receiver, _) = IncomingRequest::get_config_receiver(&req_protocol_names);
let bg = async move {
let s = StatementDistributionSubsystem::new(
Arc::new(LocalKeystore::in_memory()),
statement_req_receiver,
Default::default(),
AlwaysZeroRng,
);
let s = StatementDistributionSubsystem {
keystore: Arc::new(LocalKeystore::in_memory()),
req_receiver: Some(statement_req_receiver),
metrics: Default::default(),
rng: AlwaysZeroRng,
reputation: ReputationAggregator::new(|_| true),
};
s.run(ctx).await.unwrap();
};
@@ -862,8 +866,8 @@ fn receiving_from_one_sends_to_another_and_to_candidate_backing() {
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::ReportPeer(p, r)
) if p == peer_a && r == BENEFIT_VALID_STATEMENT_FIRST => {}
NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r))
) if p == peer_a && r == BENEFIT_VALID_STATEMENT_FIRST.into() => {}
);
assert_matches!(
@@ -936,12 +940,13 @@ fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing(
IncomingRequest::get_config_receiver(&req_protocol_names);
let bg = async move {
let s = StatementDistributionSubsystem::new(
make_ferdie_keystore(),
statement_req_receiver,
Default::default(),
AlwaysZeroRng,
);
let s = StatementDistributionSubsystem {
keystore: make_ferdie_keystore(),
req_receiver: Some(statement_req_receiver),
metrics: Default::default(),
rng: AlwaysZeroRng,
reputation: ReputationAggregator::new(|_| true),
};
s.run(ctx).await.unwrap();
};
@@ -1226,8 +1231,8 @@ fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing(
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::ReportPeer(p, r)
) if p == peer_bad && r == COST_WRONG_HASH => {}
NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r))
) if p == peer_bad && r == COST_WRONG_HASH.into() => {}
);
// a is tried again (retried in reverse order):
@@ -1277,22 +1282,22 @@ fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing(
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::ReportPeer(p, r)
) if p == peer_a && r == COST_FETCH_FAIL => {}
NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r))
) if p == peer_a && r == COST_FETCH_FAIL.into() => {}
);
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::ReportPeer(p, r)
) if p == peer_c && r == BENEFIT_VALID_RESPONSE => {}
NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r))
) if p == peer_c && r == BENEFIT_VALID_RESPONSE.into() => {}
);
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::ReportPeer(p, r)
) if p == peer_a && r == BENEFIT_VALID_STATEMENT_FIRST => {}
NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r))
) if p == peer_a && r == BENEFIT_VALID_STATEMENT_FIRST.into() => {}
);
assert_matches!(
@@ -1392,6 +1397,441 @@ fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing(
executor::block_on(future::join(test_fut, bg));
}
#[test]
fn delay_reputation_changes() {
sp_tracing::try_init_simple();
let hash_a = Hash::repeat_byte(1);
let candidate = {
let mut c = dummy_committed_candidate_receipt(dummy_hash());
c.descriptor.relay_parent = hash_a;
c.descriptor.para_id = 1.into();
c.commitments.new_validation_code = Some(ValidationCode(vec![1, 2, 3]));
c
};
let peer_a = PeerId::random(); // Alice
let peer_b = PeerId::random(); // Bob
let peer_c = PeerId::random(); // Charlie
let peer_bad = PeerId::random(); // No validator
let validators = vec![
Sr25519Keyring::Alice.pair(),
Sr25519Keyring::Bob.pair(),
Sr25519Keyring::Charlie.pair(),
// We:
Sr25519Keyring::Ferdie.pair(),
];
let session_info = make_session_info(validators, vec![vec![0, 1, 2, 4], vec![3]]);
let session_index = 1;
let pool = sp_core::testing::TaskExecutor::new();
let (ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
let req_protocol_names = ReqProtocolNames::new(&GENESIS_HASH, None);
let (statement_req_receiver, _) = IncomingRequest::get_config_receiver(&req_protocol_names);
let reputation_interval = Duration::from_millis(100);
let bg = async move {
let s = StatementDistributionSubsystem {
keystore: make_ferdie_keystore(),
req_receiver: Some(statement_req_receiver),
metrics: Default::default(),
rng: AlwaysZeroRng,
reputation: ReputationAggregator::new(|_| false),
};
s.run_inner(ctx, reputation_interval).await.unwrap();
};
let test_fut = async move {
// register our active heads.
handle
.send(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: hash_a,
number: 1,
status: LeafStatus::Fresh,
span: Arc::new(jaeger::Span::Disabled),
}),
)))
.await;
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionIndexForChild(tx))
)
if r == hash_a
=> {
let _ = tx.send(Ok(session_index));
}
);
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionInfo(sess_index, tx))
)
if r == hash_a && sess_index == session_index
=> {
let _ = tx.send(Ok(Some(session_info)));
}
);
// notify of peers and view
handle
.send(FromOrchestra::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::PeerConnected(
peer_a.clone(),
ObservedRole::Full,
ValidationVersion::V1.into(),
Some(HashSet::from([Sr25519Keyring::Alice.public().into()])),
),
),
})
.await;
handle
.send(FromOrchestra::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::PeerConnected(
peer_b.clone(),
ObservedRole::Full,
ValidationVersion::V1.into(),
Some(HashSet::from([Sr25519Keyring::Bob.public().into()])),
),
),
})
.await;
handle
.send(FromOrchestra::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::PeerConnected(
peer_c.clone(),
ObservedRole::Full,
ValidationVersion::V1.into(),
Some(HashSet::from([Sr25519Keyring::Charlie.public().into()])),
),
),
})
.await;
handle
.send(FromOrchestra::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::PeerConnected(
peer_bad.clone(),
ObservedRole::Full,
ValidationVersion::V1.into(),
None,
),
),
})
.await;
handle
.send(FromOrchestra::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![hash_a]),
),
})
.await;
handle
.send(FromOrchestra::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![hash_a]),
),
})
.await;
handle
.send(FromOrchestra::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::PeerViewChange(peer_c.clone(), view![hash_a]),
),
})
.await;
handle
.send(FromOrchestra::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::PeerViewChange(peer_bad.clone(), view![hash_a]),
),
})
.await;
// receive a seconded statement from peer A, which does not provide the request data,
// then get that data from peer C. It should be propagated onwards to peer B and to
// candidate backing.
let statement = {
let signing_context = SigningContext { parent_hash: hash_a, session_index };
let keystore: KeystorePtr = Arc::new(LocalKeystore::in_memory());
let alice_public = Keystore::sr25519_generate_new(
&*keystore,
ValidatorId::ID,
Some(&Sr25519Keyring::Alice.to_seed()),
)
.unwrap();
SignedFullStatement::sign(
&keystore,
Statement::Seconded(candidate.clone()),
&signing_context,
ValidatorIndex(0),
&alice_public.into(),
)
.ok()
.flatten()
.expect("should be signed")
};
let metadata = derive_metadata_assuming_seconded(hash_a, statement.clone().into());
handle
.send(FromOrchestra::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
Versioned::V1(protocol_v1::StatementDistributionMessage::LargeStatement(
metadata.clone(),
)),
),
),
})
.await;
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::SendRequests(
mut reqs, IfDisconnected::ImmediateError
)
) => {
let reqs = reqs.pop().unwrap();
let outgoing = match reqs {
Requests::StatementFetchingV1(outgoing) => outgoing,
_ => panic!("Unexpected request"),
};
let req = outgoing.payload;
assert_eq!(req.relay_parent, metadata.relay_parent);
assert_eq!(req.candidate_hash, metadata.candidate_hash);
assert_eq!(outgoing.peer, Recipient::Peer(peer_a));
// Just drop request - should trigger error.
}
);
// There is a race between request handler asking for more peers and processing of the
// coming `PeerMessage`s, we want the request handler to ask first here for better test
// coverage:
Delay::new(Duration::from_millis(20)).await;
handle
.send(FromOrchestra::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::PeerMessage(
peer_c.clone(),
Versioned::V1(protocol_v1::StatementDistributionMessage::LargeStatement(
metadata.clone(),
)),
),
),
})
.await;
// Malicious peer:
handle
.send(FromOrchestra::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::PeerMessage(
peer_bad.clone(),
Versioned::V1(protocol_v1::StatementDistributionMessage::LargeStatement(
metadata.clone(),
)),
),
),
})
.await;
// Let c fail once too:
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::SendRequests(
mut reqs, IfDisconnected::ImmediateError
)
) => {
let reqs = reqs.pop().unwrap();
let outgoing = match reqs {
Requests::StatementFetchingV1(outgoing) => outgoing,
_ => panic!("Unexpected request"),
};
let req = outgoing.payload;
assert_eq!(req.relay_parent, metadata.relay_parent);
assert_eq!(req.candidate_hash, metadata.candidate_hash);
assert_eq!(outgoing.peer, Recipient::Peer(peer_c));
}
);
// a fails again:
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::SendRequests(
mut reqs, IfDisconnected::ImmediateError
)
) => {
let reqs = reqs.pop().unwrap();
let outgoing = match reqs {
Requests::StatementFetchingV1(outgoing) => outgoing,
_ => panic!("Unexpected request"),
};
let req = outgoing.payload;
assert_eq!(req.relay_parent, metadata.relay_parent);
assert_eq!(req.candidate_hash, metadata.candidate_hash);
// On retry, we should have reverse order:
assert_eq!(outgoing.peer, Recipient::Peer(peer_a));
}
);
// Send invalid response (all other peers have been tried now):
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::SendRequests(
mut reqs, IfDisconnected::ImmediateError
)
) => {
let reqs = reqs.pop().unwrap();
let outgoing = match reqs {
Requests::StatementFetchingV1(outgoing) => outgoing,
_ => panic!("Unexpected request"),
};
let req = outgoing.payload;
assert_eq!(req.relay_parent, metadata.relay_parent);
assert_eq!(req.candidate_hash, metadata.candidate_hash);
assert_eq!(outgoing.peer, Recipient::Peer(peer_bad));
let bad_candidate = {
let mut bad = candidate.clone();
bad.descriptor.para_id = 0xeadbeaf.into();
bad
};
let response = StatementFetchingResponse::Statement(bad_candidate);
outgoing.pending_response.send(Ok(response.encode())).unwrap();
}
);
// a is tried again (retried in reverse order):
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::SendRequests(
mut reqs, IfDisconnected::ImmediateError
)
) => {
let reqs = reqs.pop().unwrap();
let outgoing = match reqs {
Requests::StatementFetchingV1(outgoing) => outgoing,
_ => panic!("Unexpected request"),
};
let req = outgoing.payload;
assert_eq!(req.relay_parent, metadata.relay_parent);
assert_eq!(req.candidate_hash, metadata.candidate_hash);
// On retry, we should have reverse order:
assert_eq!(outgoing.peer, Recipient::Peer(peer_a));
}
);
// c succeeds now:
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::SendRequests(
mut reqs, IfDisconnected::ImmediateError
)
) => {
let reqs = reqs.pop().unwrap();
let outgoing = match reqs {
Requests::StatementFetchingV1(outgoing) => outgoing,
_ => panic!("Unexpected request"),
};
let req = outgoing.payload;
assert_eq!(req.relay_parent, metadata.relay_parent);
assert_eq!(req.candidate_hash, metadata.candidate_hash);
// On retry, we should have reverse order:
assert_eq!(outgoing.peer, Recipient::Peer(peer_c));
let response = StatementFetchingResponse::Statement(candidate.clone());
outgoing.pending_response.send(Ok(response.encode())).unwrap();
}
);
assert_matches!(
handle.recv().await,
AllMessages::CandidateBacking(
CandidateBackingMessage::Statement(r, s)
) if r == hash_a && s == statement => {}
);
// Now messages should go out:
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::SendValidationMessage(
mut recipients,
Versioned::V1(protocol_v1::ValidationProtocol::StatementDistribution(
protocol_v1::StatementDistributionMessage::LargeStatement(meta)
)),
)
) => {
gum::debug!(
target: LOG_TARGET,
?recipients,
"Recipients received"
);
recipients.sort();
let mut expected = vec![peer_b, peer_c, peer_bad];
expected.sort();
assert_eq!(recipients, expected);
assert_eq!(meta.relay_parent, hash_a);
assert_eq!(meta.candidate_hash, statement.payload().candidate_hash());
assert_eq!(meta.signed_by, statement.validator_index());
assert_eq!(&meta.signature, statement.signature());
}
);
// Wait enough to fire reputation delay
futures_timer::Delay::new(reputation_interval).await;
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Batch(v))
) => {
let mut expected_change = HashMap::new();
for rep in vec![COST_FETCH_FAIL, BENEFIT_VALID_STATEMENT_FIRST] {
add_reputation(&mut expected_change, peer_a, rep)
}
for rep in vec![BENEFIT_VALID_RESPONSE, BENEFIT_VALID_STATEMENT] {
add_reputation(&mut expected_change, peer_c, rep)
}
for rep in vec![COST_WRONG_HASH, BENEFIT_VALID_STATEMENT] {
add_reputation(&mut expected_change, peer_bad, rep)
}
assert_eq!(v, expected_change);
}
);
handle.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await;
};
futures::pin_mut!(test_fut);
futures::pin_mut!(bg);
executor::block_on(future::join(test_fut, bg));
}
#[test]
fn share_prioritizes_backing_group() {
sp_tracing::try_init_simple();
@@ -1448,12 +1888,13 @@ fn share_prioritizes_backing_group() {
IncomingRequest::get_config_receiver(&req_protocol_names);
let bg = async move {
let s = StatementDistributionSubsystem::new(
make_ferdie_keystore(),
statement_req_receiver,
Default::default(),
AlwaysZeroRng,
);
let s = StatementDistributionSubsystem {
keystore: make_ferdie_keystore(),
req_receiver: Some(statement_req_receiver),
metrics: Default::default(),
rng: AlwaysZeroRng,
reputation: ReputationAggregator::new(|_| true),
};
s.run(ctx).await.unwrap();
};
@@ -1741,12 +2182,13 @@ fn peer_cant_flood_with_large_statements() {
let req_protocol_names = ReqProtocolNames::new(&GENESIS_HASH, None);
let (statement_req_receiver, _) = IncomingRequest::get_config_receiver(&req_protocol_names);
let bg = async move {
let s = StatementDistributionSubsystem::new(
make_ferdie_keystore(),
statement_req_receiver,
Default::default(),
AlwaysZeroRng,
);
let s = StatementDistributionSubsystem {
keystore: make_ferdie_keystore(),
req_receiver: Some(statement_req_receiver),
metrics: Default::default(),
rng: AlwaysZeroRng,
reputation: ReputationAggregator::new(|_| true),
};
s.run(ctx).await.unwrap();
};
@@ -1873,9 +2315,9 @@ fn peer_cant_flood_with_large_statements() {
requested = true;
},
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(p, r))
if p == peer_a && r == COST_APPARENT_FLOOD =>
{
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(
ReportPeerMessage::Single(p, r),
)) if p == peer_a && r == COST_APPARENT_FLOOD.into() => {
punished = true;
},
@@ -1945,12 +2387,13 @@ fn handle_multiple_seconded_statements() {
let (statement_req_receiver, _) = IncomingRequest::get_config_receiver(&req_protocol_names);
let virtual_overseer_fut = async move {
let s = StatementDistributionSubsystem::new(
Arc::new(LocalKeystore::in_memory()),
statement_req_receiver,
Default::default(),
AlwaysZeroRng,
);
let s = StatementDistributionSubsystem {
keystore: Arc::new(LocalKeystore::in_memory()),
req_receiver: Some(statement_req_receiver),
metrics: Default::default(),
rng: AlwaysZeroRng,
reputation: ReputationAggregator::new(|_| true),
};
s.run(ctx).await.unwrap();
};
@@ -2136,10 +2579,10 @@ fn handle_multiple_seconded_statements() {
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::ReportPeer(p, r)
NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r))
) => {
assert_eq!(p, peer_a);
assert_eq!(r, BENEFIT_VALID_STATEMENT_FIRST);
assert_eq!(r, BENEFIT_VALID_STATEMENT_FIRST.into());
}
);
@@ -2188,10 +2631,10 @@ fn handle_multiple_seconded_statements() {
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::ReportPeer(p, r)
NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r))
) => {
assert_eq!(p, peer_b);
assert_eq!(r, BENEFIT_VALID_STATEMENT);
assert_eq!(r, BENEFIT_VALID_STATEMENT.into());
}
);
@@ -2237,10 +2680,10 @@ fn handle_multiple_seconded_statements() {
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::ReportPeer(p, r)
NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r))
) => {
assert_eq!(p, peer_a);
assert_eq!(r, BENEFIT_VALID_STATEMENT_FIRST);
assert_eq!(r, BENEFIT_VALID_STATEMENT_FIRST.into());
}
);
@@ -2290,10 +2733,10 @@ fn handle_multiple_seconded_statements() {
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::ReportPeer(p, r)
NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r))
) => {
assert_eq!(p, peer_b);
assert_eq!(r, BENEFIT_VALID_STATEMENT);
assert_eq!(r, BENEFIT_VALID_STATEMENT.into());
}
);
+5 -2
View File
@@ -26,7 +26,7 @@ use polkadot_node_primitives::{
};
use polkadot_node_subsystem_types::{
jaeger,
messages::{NetworkBridgeEvent, RuntimeApiRequest},
messages::{NetworkBridgeEvent, ReportPeerMessage, RuntimeApiRequest},
ActivatedLeaf, LeafStatus,
};
use polkadot_primitives::{
@@ -855,7 +855,10 @@ fn test_availability_store_msg() -> AvailabilityStoreMessage {
}
fn test_network_bridge_tx_msg() -> NetworkBridgeTxMessage {
NetworkBridgeTxMessage::ReportPeer(PeerId::random(), UnifiedReputationChange::BenefitMinor(""))
NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(
PeerId::random(),
UnifiedReputationChange::BenefitMinor("").into(),
))
}
fn test_network_bridge_rx_msg() -> NetworkBridgeRxMessage {
+11 -3
View File
@@ -23,14 +23,13 @@
//! Subsystems' APIs are defined separately from their implementation, leading to easier mocking.
use futures::channel::oneshot;
use sc_network::Multiaddr;
use sc_network::{Multiaddr, ReputationChange};
use thiserror::Error;
pub use sc_network::IfDisconnected;
use polkadot_node_network_protocol::{
self as net_protocol, peer_set::PeerSet, request_response::Requests, PeerId,
UnifiedReputationChange,
};
use polkadot_node_primitives::{
approval::{BlockApprovalMeta, IndirectAssignmentCert, IndirectSignedApprovalVote},
@@ -305,11 +304,20 @@ pub enum NetworkBridgeRxMessage {
},
}
/// Type of peer reporting
#[derive(Debug)]
pub enum ReportPeerMessage {
/// Single peer report about malicious actions which should be sent right away
Single(PeerId, ReputationChange),
/// Delayed report for other actions.
Batch(HashMap<PeerId, i32>),
}
/// Messages received from other subsystems by the network bridge subsystem.
#[derive(Debug)]
pub enum NetworkBridgeTxMessage {
/// Report a peer for their actions.
ReportPeer(PeerId, UnifiedReputationChange),
ReportPeer(ReportPeerMessage),
/// Disconnect a peer from the given peer-set without affecting their reputation.
DisconnectPeer(PeerId, PeerSet),
+2
View File
@@ -77,6 +77,8 @@ pub mod database;
/// tasks, sending messages back.
pub mod nesting_sender;
pub mod reputation;
mod determine_new_blocks;
#[cfg(test)]
@@ -0,0 +1,116 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! A utility abstraction to collect and send reputation changes.
use polkadot_node_network_protocol::{PeerId, UnifiedReputationChange};
use polkadot_node_subsystem::{
messages::{NetworkBridgeTxMessage, ReportPeerMessage},
overseer,
};
use std::{collections::HashMap, time::Duration};
/// Default delay for sending reputation changes
pub const REPUTATION_CHANGE_INTERVAL: Duration = Duration::from_secs(30);
type BatchReputationChange = HashMap<PeerId, i32>;
/// Collects reputation changes and sends them in one batch to relieve network channels
#[derive(Debug, Clone)]
pub struct ReputationAggregator {
send_immediately_if: fn(UnifiedReputationChange) -> bool,
by_peer: Option<BatchReputationChange>,
}
impl Default for ReputationAggregator {
fn default() -> Self {
Self::new(|rep| matches!(rep, UnifiedReputationChange::Malicious(_)))
}
}
impl ReputationAggregator {
/// New ReputationAggregator
///
/// # Arguments
///
/// * `send_immediately_if` - A function, takes `UnifiedReputationChange`,
/// results shows if we need to send the changes right away.
/// By default, it is used for sending `UnifiedReputationChange::Malicious` changes immediately and for testing.
pub fn new(send_immediately_if: fn(UnifiedReputationChange) -> bool) -> Self {
Self { by_peer: Default::default(), send_immediately_if }
}
/// Sends collected reputation changes in a batch,
/// removing them from inner state
pub async fn send(
&mut self,
sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
) {
if let Some(by_peer) = self.by_peer.take() {
sender
.send_message(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Batch(by_peer)))
.await;
}
}
/// Adds reputation change to inner state
/// or sends it right away if the change is dangerous
pub async fn modify(
&mut self,
sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
peer_id: PeerId,
rep: UnifiedReputationChange,
) {
if (self.send_immediately_if)(rep) {
self.single_send(sender, peer_id, rep).await;
} else {
self.add(peer_id, rep);
}
}
async fn single_send(
&self,
sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
peer_id: PeerId,
rep: UnifiedReputationChange,
) {
sender
.send_message(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(
peer_id,
rep.into(),
)))
.await;
}
fn add(&mut self, peer_id: PeerId, rep: UnifiedReputationChange) {
if self.by_peer.is_none() {
self.by_peer = Some(HashMap::new());
}
if let Some(ref mut by_peer) = self.by_peer {
add_reputation(by_peer, peer_id, rep)
}
}
}
/// Add a reputation change to an existing collection.
pub fn add_reputation(
acc: &mut BatchReputationChange,
peer_id: PeerId,
rep: UnifiedReputationChange,
) {
let cost = rep.cost_or_benefit();
acc.entry(peer_id).and_modify(|v| *v = v.saturating_add(cost)).or_insert(cost);
}