mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-20 17:31:02 +00:00
Evict inactive peers from the collator protocol peer-set (#2680)
* malicious reputation cost is fatal * make ReportBad a malicious cost * futures control-flow for cleaning up inactive collator peers * guide: network bridge updates * add `PeerDisconnected` message * guide: update * reverse order * remember to match * implement disconnect peer in network bridge * implement disconnect_inactive_peers * test * remove println * don't hardcore policy * add fuse outside of loop * use default eviction policy
This commit is contained in:
committed by
GitHub
parent
0f8b6f2f6e
commit
b8867d71bc
Generated
+1
@@ -5365,6 +5365,7 @@ dependencies = [
|
|||||||
"assert_matches",
|
"assert_matches",
|
||||||
"env_logger 0.8.2",
|
"env_logger 0.8.2",
|
||||||
"futures 0.3.12",
|
"futures 0.3.12",
|
||||||
|
"futures-timer 3.0.2",
|
||||||
"log",
|
"log",
|
||||||
"polkadot-node-network-protocol",
|
"polkadot-node-network-protocol",
|
||||||
"polkadot-node-primitives",
|
"polkadot-node-primitives",
|
||||||
|
|||||||
@@ -57,6 +57,9 @@ pub(crate) enum Action {
|
|||||||
/// Report a peer to the network implementation (decreasing/increasing its reputation).
|
/// Report a peer to the network implementation (decreasing/increasing its reputation).
|
||||||
ReportPeer(PeerId, UnifiedReputationChange),
|
ReportPeer(PeerId, UnifiedReputationChange),
|
||||||
|
|
||||||
|
/// Disconnect a peer from the given peer-set without affecting their reputation.
|
||||||
|
DisconnectPeer(PeerId, PeerSet),
|
||||||
|
|
||||||
/// A subsystem updates us on the relay chain leaves we consider active.
|
/// A subsystem updates us on the relay chain leaves we consider active.
|
||||||
///
|
///
|
||||||
/// Implementation will send `WireMessage::ViewUpdate` message to peers as appropriate to the
|
/// Implementation will send `WireMessage::ViewUpdate` message to peers as appropriate to the
|
||||||
@@ -119,6 +122,9 @@ impl From<polkadot_subsystem::SubsystemResult<FromOverseer<NetworkBridgeMessage>
|
|||||||
}
|
}
|
||||||
Ok(FromOverseer::Communication { msg }) => match msg {
|
Ok(FromOverseer::Communication { msg }) => match msg {
|
||||||
NetworkBridgeMessage::ReportPeer(peer, rep) => Action::ReportPeer(peer, rep),
|
NetworkBridgeMessage::ReportPeer(peer, rep) => Action::ReportPeer(peer, rep),
|
||||||
|
NetworkBridgeMessage::DisconnectPeer(peer, peer_set) => {
|
||||||
|
Action::DisconnectPeer(peer, peer_set)
|
||||||
|
}
|
||||||
NetworkBridgeMessage::SendValidationMessage(peers, msg) => {
|
NetworkBridgeMessage::SendValidationMessage(peers, msg) => {
|
||||||
Action::SendValidationMessages(vec![(peers, msg)])
|
Action::SendValidationMessages(vec![(peers, msg)])
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -297,6 +297,16 @@ where
|
|||||||
bridge.network_service.report_peer(peer, rep).await?
|
bridge.network_service.report_peer(peer, rep).await?
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Action::DisconnectPeer(peer, peer_set) => {
|
||||||
|
tracing::debug!(
|
||||||
|
target: LOG_TARGET,
|
||||||
|
action = "DisconnectPeer",
|
||||||
|
?peer,
|
||||||
|
peer_set = ?peer_set,
|
||||||
|
);
|
||||||
|
bridge.network_service.disconnect_peer(peer, peer_set);
|
||||||
|
}
|
||||||
|
|
||||||
Action::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => {
|
Action::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => {
|
||||||
tracing::debug!(
|
tracing::debug!(
|
||||||
target: LOG_TARGET,
|
target: LOG_TARGET,
|
||||||
|
|||||||
@@ -90,6 +90,8 @@ where
|
|||||||
pub enum NetworkAction {
|
pub enum NetworkAction {
|
||||||
/// Note a change in reputation for a peer.
|
/// Note a change in reputation for a peer.
|
||||||
ReputationChange(PeerId, Rep),
|
ReputationChange(PeerId, Rep),
|
||||||
|
/// Disconnect a peer from the given peer-set.
|
||||||
|
DisconnectPeer(PeerId, PeerSet),
|
||||||
/// Write a notification to a given peer on the given peer-set.
|
/// Write a notification to a given peer on the given peer-set.
|
||||||
WriteNotification(PeerId, PeerSet, Vec<u8>),
|
WriteNotification(PeerId, PeerSet, Vec<u8>),
|
||||||
}
|
}
|
||||||
@@ -130,6 +132,20 @@ pub trait Network: Send + 'static {
|
|||||||
.boxed()
|
.boxed()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Disconnect a given peer from the peer set specified without harming reputation.
|
||||||
|
fn disconnect_peer(
|
||||||
|
&mut self,
|
||||||
|
who: PeerId,
|
||||||
|
peer_set: PeerSet,
|
||||||
|
) -> BoxFuture<SubsystemResult<()>> {
|
||||||
|
async move {
|
||||||
|
self.action_sink()
|
||||||
|
.send(NetworkAction::DisconnectPeer(who, peer_set))
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
.boxed()
|
||||||
|
}
|
||||||
|
|
||||||
/// Write a notification to a peer on the given peer-set's protocol.
|
/// Write a notification to a peer on the given peer-set's protocol.
|
||||||
fn write_notification(
|
fn write_notification(
|
||||||
&mut self,
|
&mut self,
|
||||||
@@ -179,6 +195,9 @@ impl Network for Arc<NetworkService<Block, Hash>> {
|
|||||||
);
|
);
|
||||||
self.0.report_peer(peer, cost_benefit.into_base_rep())
|
self.0.report_peer(peer, cost_benefit.into_base_rep())
|
||||||
}
|
}
|
||||||
|
NetworkAction::DisconnectPeer(peer, peer_set) => self
|
||||||
|
.0
|
||||||
|
.disconnect_peer(peer, peer_set.into_protocol_name()),
|
||||||
NetworkAction::WriteNotification(peer, peer_set, message) => self
|
NetworkAction::WriteNotification(peer, peer_set, message) => self
|
||||||
.0
|
.0
|
||||||
.write_notification(peer, peer_set.into_protocol_name(), message),
|
.write_notification(peer, peer_set.into_protocol_name(), message),
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ edition = "2018"
|
|||||||
futures = "0.3.12"
|
futures = "0.3.12"
|
||||||
tracing = "0.1.25"
|
tracing = "0.1.25"
|
||||||
thiserror = "1.0.23"
|
thiserror = "1.0.23"
|
||||||
|
futures-timer = "3"
|
||||||
|
|
||||||
polkadot-primitives = { path = "../../../primitives" }
|
polkadot-primitives = { path = "../../../primitives" }
|
||||||
polkadot-node-network-protocol = { path = "../../network/protocol" }
|
polkadot-node-network-protocol = { path = "../../network/protocol" }
|
||||||
|
|||||||
@@ -20,6 +20,8 @@
|
|||||||
#![deny(missing_docs, unused_crate_dependencies)]
|
#![deny(missing_docs, unused_crate_dependencies)]
|
||||||
#![recursion_limit="256"]
|
#![recursion_limit="256"]
|
||||||
|
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use futures::{channel::oneshot, FutureExt, TryFutureExt};
|
use futures::{channel::oneshot, FutureExt, TryFutureExt};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
|
||||||
@@ -60,10 +62,20 @@ enum Error {
|
|||||||
|
|
||||||
type Result<T> = std::result::Result<T, Error>;
|
type Result<T> = std::result::Result<T, Error>;
|
||||||
|
|
||||||
|
/// A collator eviction policy - how fast to evict collators which are inactive.
|
||||||
|
#[derive(Debug, Clone, Copy)]
|
||||||
|
pub struct CollatorEvictionPolicy(pub Duration);
|
||||||
|
|
||||||
|
impl Default for CollatorEvictionPolicy {
|
||||||
|
fn default() -> Self {
|
||||||
|
CollatorEvictionPolicy(Duration::from_secs(24))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// What side of the collator protocol is being engaged
|
/// What side of the collator protocol is being engaged
|
||||||
pub enum ProtocolSide {
|
pub enum ProtocolSide {
|
||||||
/// Validators operate on the relay chain.
|
/// Validators operate on the relay chain.
|
||||||
Validator(validator_side::Metrics),
|
Validator(CollatorEvictionPolicy, validator_side::Metrics),
|
||||||
/// Collators operate on a parachain.
|
/// Collators operate on a parachain.
|
||||||
Collator(CollatorId, collator_side::Metrics),
|
Collator(CollatorId, collator_side::Metrics),
|
||||||
}
|
}
|
||||||
@@ -90,8 +102,9 @@ impl CollatorProtocolSubsystem {
|
|||||||
Context: SubsystemContext<Message = CollatorProtocolMessage>,
|
Context: SubsystemContext<Message = CollatorProtocolMessage>,
|
||||||
{
|
{
|
||||||
match self.protocol_side {
|
match self.protocol_side {
|
||||||
ProtocolSide::Validator(metrics) => validator_side::run(
|
ProtocolSide::Validator(policy, metrics) => validator_side::run(
|
||||||
ctx,
|
ctx,
|
||||||
|
policy,
|
||||||
metrics,
|
metrics,
|
||||||
).await,
|
).await,
|
||||||
ProtocolSide::Collator(id, metrics) => collator_side::run(
|
ProtocolSide::Collator(id, metrics) => collator_side::run(
|
||||||
|
|||||||
@@ -15,8 +15,11 @@
|
|||||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
use std::{collections::{HashMap, HashSet}, sync::Arc, task::Poll};
|
use std::{collections::{HashMap, HashSet}, sync::Arc, task::Poll};
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use futures::{FutureExt, channel::oneshot, future::{Fuse, FusedFuture, BoxFuture}};
|
use futures::{FutureExt, channel::oneshot, future::{Fuse, FusedFuture, BoxFuture, Either}};
|
||||||
|
use futures::StreamExt;
|
||||||
|
use futures_timer::Delay;
|
||||||
use always_assert::never;
|
use always_assert::never;
|
||||||
|
|
||||||
use polkadot_primitives::v1::{
|
use polkadot_primitives::v1::{
|
||||||
@@ -32,6 +35,7 @@ use polkadot_subsystem::{
|
|||||||
};
|
};
|
||||||
use polkadot_node_network_protocol::{
|
use polkadot_node_network_protocol::{
|
||||||
OurView, PeerId, UnifiedReputationChange as Rep, View,
|
OurView, PeerId, UnifiedReputationChange as Rep, View,
|
||||||
|
peer_set::PeerSet,
|
||||||
request_response::{OutgoingRequest, Requests, request::{Recipient, RequestError}}, v1 as protocol_v1
|
request_response::{OutgoingRequest, Requests, request::{Recipient, RequestError}}, v1 as protocol_v1
|
||||||
};
|
};
|
||||||
use polkadot_node_network_protocol::request_response::v1::{CollationFetchingRequest, CollationFetchingResponse};
|
use polkadot_node_network_protocol::request_response::v1::{CollationFetchingRequest, CollationFetchingResponse};
|
||||||
@@ -47,13 +51,19 @@ const COST_CORRUPTED_MESSAGE: Rep = Rep::CostMinor("Message was corrupt");
|
|||||||
/// Network errors that originated at the remote host should have same cost as timeout.
|
/// Network errors that originated at the remote host should have same cost as timeout.
|
||||||
const COST_NETWORK_ERROR: Rep = Rep::CostMinor("Some network error");
|
const COST_NETWORK_ERROR: Rep = Rep::CostMinor("Some network error");
|
||||||
const COST_REQUEST_TIMED_OUT: Rep = Rep::CostMinor("A collation request has timed out");
|
const COST_REQUEST_TIMED_OUT: Rep = Rep::CostMinor("A collation request has timed out");
|
||||||
const COST_REPORT_BAD: Rep = Rep::CostMajor("A collator was reported by another subsystem");
|
const COST_REPORT_BAD: Rep = Rep::Malicious("A collator was reported by another subsystem");
|
||||||
const BENEFIT_NOTIFY_GOOD: Rep = Rep::BenefitMinor("A collator was noted good by another subsystem");
|
const BENEFIT_NOTIFY_GOOD: Rep = Rep::BenefitMinor("A collator was noted good by another subsystem");
|
||||||
|
|
||||||
|
// How often to check all peers with activity.
|
||||||
|
#[cfg(not(test))]
|
||||||
|
const ACTIVITY_POLL: Duration = Duration::from_secs(1);
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
const ACTIVITY_POLL: Duration = Duration::from_millis(10);
|
||||||
|
|
||||||
#[derive(Clone, Default)]
|
#[derive(Clone, Default)]
|
||||||
pub struct Metrics(Option<MetricsInner>);
|
pub struct Metrics(Option<MetricsInner>);
|
||||||
|
|
||||||
|
|
||||||
impl Metrics {
|
impl Metrics {
|
||||||
fn on_request(&self, succeeded: std::result::Result<(), ()>) {
|
fn on_request(&self, succeeded: std::result::Result<(), ()>) {
|
||||||
if let Some(metrics) = &self.0 {
|
if let Some(metrics) = &self.0 {
|
||||||
@@ -130,14 +140,42 @@ struct PerRequest {
|
|||||||
span: Option<jaeger::Span>,
|
span: Option<jaeger::Span>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct PeerData {
|
||||||
|
view: View,
|
||||||
|
last_active: Instant,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PeerData {
|
||||||
|
fn new(view: View) -> Self {
|
||||||
|
PeerData {
|
||||||
|
view,
|
||||||
|
last_active: Instant::now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn note_active(&mut self) {
|
||||||
|
self.last_active = Instant::now();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn active_since(&self, instant: Instant) -> bool {
|
||||||
|
self.last_active >= instant
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for PeerData {
|
||||||
|
fn default() -> Self {
|
||||||
|
PeerData::new(Default::default())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// All state relevant for the validator side of the protocol lives here.
|
/// All state relevant for the validator side of the protocol lives here.
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
struct State {
|
struct State {
|
||||||
/// Our own view.
|
/// Our own view.
|
||||||
view: OurView,
|
view: OurView,
|
||||||
|
|
||||||
/// Track all active collators and their views.
|
/// Track all active collators and their data.
|
||||||
peer_views: HashMap<PeerId, View>,
|
peer_data: HashMap<PeerId, PeerData>,
|
||||||
|
|
||||||
/// Peers that have declared themselves as collators.
|
/// Peers that have declared themselves as collators.
|
||||||
known_collators: HashMap<PeerId, CollatorId>,
|
known_collators: HashMap<PeerId, CollatorId>,
|
||||||
@@ -263,18 +301,18 @@ async fn handle_peer_view_change(
|
|||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
view: View,
|
view: View,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let current = state.peer_views.entry(peer_id.clone()).or_default();
|
let current = state.peer_data.entry(peer_id.clone()).or_default();
|
||||||
|
|
||||||
let removed: Vec<_> = current.difference(&view).cloned().collect();
|
let removed: Vec<_> = current.view.difference(&view).cloned().collect();
|
||||||
|
|
||||||
*current = view;
|
current.view = view;
|
||||||
|
|
||||||
if let Some(advertisements) = state.advertisements.get_mut(&peer_id) {
|
if let Some(advertisements) = state.advertisements.get_mut(&peer_id) {
|
||||||
advertisements.retain(|(_, relay_parent)| !removed.contains(relay_parent));
|
advertisements.retain(|(_, relay_parent)| !removed.contains(relay_parent));
|
||||||
}
|
}
|
||||||
|
|
||||||
for removed in removed.into_iter() {
|
for removed in removed.into_iter() {
|
||||||
state.requested_collations.retain(|k, _| k.0 != removed);
|
state.requested_collations.retain(|k, _| k.0 != removed || k.2 != peer_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -385,6 +423,10 @@ where
|
|||||||
{
|
{
|
||||||
use protocol_v1::CollatorProtocolMessage::*;
|
use protocol_v1::CollatorProtocolMessage::*;
|
||||||
|
|
||||||
|
if let Some(d) = state.peer_data.get_mut(&origin) {
|
||||||
|
d.note_active();
|
||||||
|
}
|
||||||
|
|
||||||
match msg {
|
match msg {
|
||||||
Declare(id) => {
|
Declare(id) => {
|
||||||
tracing::debug!(
|
tracing::debug!(
|
||||||
@@ -392,12 +434,39 @@ where
|
|||||||
peer_id = ?origin,
|
peer_id = ?origin,
|
||||||
"Declared as collator",
|
"Declared as collator",
|
||||||
);
|
);
|
||||||
state.known_collators.insert(origin.clone(), id);
|
|
||||||
state.peer_views.entry(origin).or_default();
|
if state.known_collators.insert(origin.clone(), id).is_some() {
|
||||||
|
modify_reputation(ctx, origin.clone(), COST_UNEXPECTED_MESSAGE).await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
AdvertiseCollation(relay_parent, para_id) => {
|
AdvertiseCollation(relay_parent, para_id) => {
|
||||||
let _span = state.span_per_relay_parent.get(&relay_parent).map(|s| s.child("advertise-collation"));
|
let _span = state.span_per_relay_parent.get(&relay_parent).map(|s| s.child("advertise-collation"));
|
||||||
state.advertisements.entry(origin.clone()).or_default().insert((para_id, relay_parent));
|
|
||||||
|
if !state.view.contains(&relay_parent) {
|
||||||
|
tracing::debug!(
|
||||||
|
target: LOG_TARGET,
|
||||||
|
peer_id = ?origin,
|
||||||
|
%para_id,
|
||||||
|
?relay_parent,
|
||||||
|
"Advertise collation out of view",
|
||||||
|
);
|
||||||
|
|
||||||
|
modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if !state.advertisements.entry(origin.clone()).or_default().insert((para_id, relay_parent)) {
|
||||||
|
tracing::debug!(
|
||||||
|
target: LOG_TARGET,
|
||||||
|
peer_id = ?origin,
|
||||||
|
%para_id,
|
||||||
|
?relay_parent,
|
||||||
|
"Multiple collations for same relay-parent advertised",
|
||||||
|
);
|
||||||
|
|
||||||
|
modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if let Some(collator) = state.known_collators.get(&origin) {
|
if let Some(collator) = state.known_collators.get(&origin) {
|
||||||
tracing::debug!(
|
tracing::debug!(
|
||||||
@@ -488,13 +557,12 @@ where
|
|||||||
use NetworkBridgeEvent::*;
|
use NetworkBridgeEvent::*;
|
||||||
|
|
||||||
match bridge_message {
|
match bridge_message {
|
||||||
PeerConnected(_id, _role) => {
|
PeerConnected(peer_id, _role) => {
|
||||||
// A peer has connected. Until it issues a `Declare` message we do not
|
state.peer_data.entry(peer_id).or_default();
|
||||||
// want to track it's view or take any other actions.
|
|
||||||
},
|
},
|
||||||
PeerDisconnected(peer_id) => {
|
PeerDisconnected(peer_id) => {
|
||||||
state.known_collators.remove(&peer_id);
|
state.known_collators.remove(&peer_id);
|
||||||
state.peer_views.remove(&peer_id);
|
state.peer_data.remove(&peer_id);
|
||||||
},
|
},
|
||||||
PeerViewChange(peer_id, view) => {
|
PeerViewChange(peer_id, view) => {
|
||||||
handle_peer_view_change(state, peer_id, view).await?;
|
handle_peer_view_change(state, peer_id, view).await?;
|
||||||
@@ -573,14 +641,26 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// wait until next inactivity check. returns the instant for the following check.
|
||||||
|
async fn wait_until_next_check(last_poll: Instant) -> Instant {
|
||||||
|
let now = Instant::now();
|
||||||
|
let next_poll = last_poll + ACTIVITY_POLL;
|
||||||
|
|
||||||
|
if next_poll > now {
|
||||||
|
Delay::new(next_poll - now).await
|
||||||
|
}
|
||||||
|
|
||||||
|
Instant::now()
|
||||||
|
}
|
||||||
|
|
||||||
/// The main run loop.
|
/// The main run loop.
|
||||||
#[tracing::instrument(skip(ctx, metrics), fields(subsystem = LOG_TARGET))]
|
#[tracing::instrument(skip(ctx, metrics), fields(subsystem = LOG_TARGET))]
|
||||||
pub(crate) async fn run<Context>(
|
pub(crate) async fn run<Context>(
|
||||||
mut ctx: Context,
|
mut ctx: Context,
|
||||||
|
eviction_policy: crate::CollatorEvictionPolicy,
|
||||||
metrics: Metrics,
|
metrics: Metrics,
|
||||||
) -> Result<()>
|
) -> Result<()>
|
||||||
where
|
where Context: SubsystemContext<Message = CollatorProtocolMessage>
|
||||||
Context: SubsystemContext<Message = CollatorProtocolMessage>
|
|
||||||
{
|
{
|
||||||
use FromOverseer::*;
|
use FromOverseer::*;
|
||||||
use OverseerSignal::*;
|
use OverseerSignal::*;
|
||||||
@@ -590,23 +670,50 @@ where
|
|||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
|
|
||||||
loop {
|
let next_inactivity_stream = futures::stream::unfold(
|
||||||
if let Poll::Ready(msg) = futures::poll!(ctx.recv()) {
|
Instant::now() + ACTIVITY_POLL,
|
||||||
let msg = msg?;
|
|next_check| async move { Some(((), wait_until_next_check(next_check).await)) }
|
||||||
tracing::trace!(target: LOG_TARGET, msg = ?msg, "received a message");
|
).fuse();
|
||||||
|
|
||||||
match msg {
|
futures::pin_mut!(next_inactivity_stream);
|
||||||
Communication { msg } => process_msg(&mut ctx, msg, &mut state).await,
|
|
||||||
Signal(BlockFinalized(..)) => {}
|
loop {
|
||||||
Signal(ActiveLeaves(_)) => {}
|
let res = {
|
||||||
Signal(Conclude) => { break }
|
let s = futures::future::select(ctx.recv().fuse(), next_inactivity_stream.next().fuse());
|
||||||
|
|
||||||
|
if let Poll::Ready(res) = futures::poll!(s) {
|
||||||
|
Some(match res {
|
||||||
|
Either::Left((msg, _)) => Either::Left(msg?),
|
||||||
|
Either::Right((_, _)) => Either::Right(()),
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
None
|
||||||
}
|
}
|
||||||
continue;
|
};
|
||||||
|
|
||||||
|
match res {
|
||||||
|
Some(Either::Left(msg)) => {
|
||||||
|
tracing::trace!(target: LOG_TARGET, msg = ?msg, "received a message");
|
||||||
|
|
||||||
|
match msg {
|
||||||
|
Communication { msg } => process_msg(&mut ctx, msg, &mut state).await,
|
||||||
|
Signal(BlockFinalized(..)) => {}
|
||||||
|
Signal(ActiveLeaves(_)) => {}
|
||||||
|
Signal(Conclude) => { break }
|
||||||
|
}
|
||||||
|
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
Some(Either::Right(())) => {
|
||||||
|
disconnect_inactive_peers(&mut ctx, eviction_policy, &state.peer_data).await;
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
None => {}
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut retained_requested = HashSet::new();
|
let mut retained_requested = HashSet::new();
|
||||||
for ((hash, para_id, peer_id), per_req) in state.requested_collations.iter_mut() {
|
for ((hash, para_id, peer_id), per_req) in state.requested_collations.iter_mut() {
|
||||||
// Despite the await, this won't block:
|
// Despite the await, this won't block on the response itself.
|
||||||
let finished = poll_collation_response(
|
let finished = poll_collation_response(
|
||||||
&mut ctx, &state.metrics, &state.span_per_relay_parent,
|
&mut ctx, &state.metrics, &state.span_per_relay_parent,
|
||||||
hash, para_id, peer_id, per_req
|
hash, para_id, peer_id, per_req
|
||||||
@@ -621,6 +728,28 @@ where
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This issues `NetworkBridge` notifications to disconnect from all inactive peers at the
|
||||||
|
// earliest possible point. This does not yet clean up any metadata, as that will be done upon
|
||||||
|
// receipt of the `PeerDisconnected` event.
|
||||||
|
async fn disconnect_inactive_peers(
|
||||||
|
ctx: &mut impl SubsystemContext,
|
||||||
|
eviction_policy: crate::CollatorEvictionPolicy,
|
||||||
|
peers: &HashMap<PeerId, PeerData>,
|
||||||
|
) {
|
||||||
|
let cutoff = match Instant::now().checked_sub(eviction_policy.0) {
|
||||||
|
None => return,
|
||||||
|
Some(i) => i,
|
||||||
|
};
|
||||||
|
|
||||||
|
for (peer, peer_data) in peers {
|
||||||
|
if !peer_data.active_since(cutoff) {
|
||||||
|
ctx.send_message(
|
||||||
|
NetworkBridgeMessage::DisconnectPeer(peer.clone(), PeerSet::Collation).into()
|
||||||
|
).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Poll collation response, return immediately if there is none.
|
/// Poll collation response, return immediately if there is none.
|
||||||
///
|
///
|
||||||
/// Ready responses are handled, by logging and decreasing peer's reputation on error and by
|
/// Ready responses are handled, by logging and decreasing peer's reputation on error and by
|
||||||
@@ -761,10 +890,12 @@ mod tests {
|
|||||||
|
|
||||||
use polkadot_primitives::v1::{BlockData, CollatorPair, CompressedPoV};
|
use polkadot_primitives::v1::{BlockData, CollatorPair, CompressedPoV};
|
||||||
use polkadot_subsystem_testhelpers as test_helpers;
|
use polkadot_subsystem_testhelpers as test_helpers;
|
||||||
use polkadot_node_network_protocol::{our_view,
|
use polkadot_node_network_protocol::{our_view, ObservedRole,
|
||||||
request_response::Requests
|
request_response::Requests
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const ACTIVITY_TIMEOUT: Duration = Duration::from_millis(50);
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct TestState {
|
struct TestState {
|
||||||
chain_ids: Vec<ParaId>,
|
chain_ids: Vec<ParaId>,
|
||||||
@@ -813,7 +944,11 @@ mod tests {
|
|||||||
|
|
||||||
let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
|
let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
|
||||||
|
|
||||||
let subsystem = run(context, Metrics::default());
|
let subsystem = run(
|
||||||
|
context,
|
||||||
|
crate::CollatorEvictionPolicy(ACTIVITY_TIMEOUT),
|
||||||
|
Metrics::default(),
|
||||||
|
);
|
||||||
|
|
||||||
let test_fut = test(TestHarness { virtual_overseer });
|
let test_fut = test(TestHarness { virtual_overseer });
|
||||||
|
|
||||||
@@ -823,7 +958,7 @@ mod tests {
|
|||||||
executor::block_on(future::select(test_fut, subsystem));
|
executor::block_on(future::select(test_fut, subsystem));
|
||||||
}
|
}
|
||||||
|
|
||||||
const TIMEOUT: Duration = Duration::from_millis(100);
|
const TIMEOUT: Duration = Duration::from_millis(200);
|
||||||
|
|
||||||
async fn overseer_send(
|
async fn overseer_send(
|
||||||
overseer: &mut test_helpers::TestSubsystemContextHandle<CollatorProtocolMessage>,
|
overseer: &mut test_helpers::TestSubsystemContextHandle<CollatorProtocolMessage>,
|
||||||
@@ -1191,4 +1326,208 @@ mod tests {
|
|||||||
assert_eq!(collation_1.0, candidate_b);
|
assert_eq!(collation_1.0, candidate_b);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn inactive_disconnected() {
|
||||||
|
let test_state = TestState::default();
|
||||||
|
|
||||||
|
test_harness(|test_harness| async move {
|
||||||
|
let TestHarness {
|
||||||
|
mut virtual_overseer,
|
||||||
|
} = test_harness;
|
||||||
|
|
||||||
|
let pair = CollatorPair::generate().0;
|
||||||
|
tracing::trace!("activating");
|
||||||
|
|
||||||
|
let hash_a = test_state.relay_parent;
|
||||||
|
|
||||||
|
overseer_send(
|
||||||
|
&mut virtual_overseer,
|
||||||
|
CollatorProtocolMessage::NetworkBridgeUpdateV1(
|
||||||
|
NetworkBridgeEvent::OurViewChange(our_view![hash_a])
|
||||||
|
)
|
||||||
|
).await;
|
||||||
|
|
||||||
|
|
||||||
|
let peer_b = PeerId::random();
|
||||||
|
|
||||||
|
overseer_send(
|
||||||
|
&mut virtual_overseer,
|
||||||
|
CollatorProtocolMessage::NetworkBridgeUpdateV1(
|
||||||
|
NetworkBridgeEvent::PeerConnected(
|
||||||
|
peer_b.clone(),
|
||||||
|
ObservedRole::Full,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
).await;
|
||||||
|
|
||||||
|
overseer_send(
|
||||||
|
&mut virtual_overseer,
|
||||||
|
CollatorProtocolMessage::NetworkBridgeUpdateV1(
|
||||||
|
NetworkBridgeEvent::PeerMessage(
|
||||||
|
peer_b.clone(),
|
||||||
|
protocol_v1::CollatorProtocolMessage::Declare(pair.public()),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
).await;
|
||||||
|
|
||||||
|
overseer_send(
|
||||||
|
&mut virtual_overseer,
|
||||||
|
CollatorProtocolMessage::NetworkBridgeUpdateV1(
|
||||||
|
NetworkBridgeEvent::PeerMessage(
|
||||||
|
peer_b.clone(),
|
||||||
|
protocol_v1::CollatorProtocolMessage::AdvertiseCollation(
|
||||||
|
test_state.relay_parent,
|
||||||
|
test_state.chain_ids[0],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
).await;
|
||||||
|
|
||||||
|
assert_matches!(
|
||||||
|
overseer_recv(&mut virtual_overseer).await,
|
||||||
|
AllMessages::CandidateSelection(CandidateSelectionMessage::Collation(
|
||||||
|
relay_parent,
|
||||||
|
para_id,
|
||||||
|
collator,
|
||||||
|
)
|
||||||
|
) => {
|
||||||
|
assert_eq!(relay_parent, test_state.relay_parent);
|
||||||
|
assert_eq!(para_id, test_state.chain_ids[0]);
|
||||||
|
assert_eq!(collator, pair.public());
|
||||||
|
});
|
||||||
|
|
||||||
|
Delay::new(ACTIVITY_TIMEOUT * 2).await;
|
||||||
|
|
||||||
|
assert_matches!(
|
||||||
|
overseer_recv(&mut virtual_overseer).await,
|
||||||
|
AllMessages::NetworkBridge(NetworkBridgeMessage::DisconnectPeer(
|
||||||
|
peer,
|
||||||
|
peer_set,
|
||||||
|
)) => {
|
||||||
|
assert_eq!(peer, peer_b);
|
||||||
|
assert_eq!(peer_set, PeerSet::Collation);
|
||||||
|
}
|
||||||
|
)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn activity_extends_life() {
|
||||||
|
let test_state = TestState::default();
|
||||||
|
|
||||||
|
test_harness(|test_harness| async move {
|
||||||
|
let TestHarness {
|
||||||
|
mut virtual_overseer,
|
||||||
|
} = test_harness;
|
||||||
|
|
||||||
|
let pair = CollatorPair::generate().0;
|
||||||
|
tracing::trace!("activating");
|
||||||
|
|
||||||
|
let hash_a = test_state.relay_parent;
|
||||||
|
let hash_b = Hash::repeat_byte(1);
|
||||||
|
let hash_c = Hash::repeat_byte(2);
|
||||||
|
|
||||||
|
overseer_send(
|
||||||
|
&mut virtual_overseer,
|
||||||
|
CollatorProtocolMessage::NetworkBridgeUpdateV1(
|
||||||
|
NetworkBridgeEvent::OurViewChange(our_view![hash_a, hash_b, hash_c])
|
||||||
|
)
|
||||||
|
).await;
|
||||||
|
|
||||||
|
|
||||||
|
let peer_b = PeerId::random();
|
||||||
|
|
||||||
|
overseer_send(
|
||||||
|
&mut virtual_overseer,
|
||||||
|
CollatorProtocolMessage::NetworkBridgeUpdateV1(
|
||||||
|
NetworkBridgeEvent::PeerConnected(
|
||||||
|
peer_b.clone(),
|
||||||
|
ObservedRole::Full,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
).await;
|
||||||
|
|
||||||
|
Delay::new(ACTIVITY_TIMEOUT * 2 / 3).await;
|
||||||
|
|
||||||
|
overseer_send(
|
||||||
|
&mut virtual_overseer,
|
||||||
|
CollatorProtocolMessage::NetworkBridgeUpdateV1(
|
||||||
|
NetworkBridgeEvent::PeerMessage(
|
||||||
|
peer_b.clone(),
|
||||||
|
protocol_v1::CollatorProtocolMessage::Declare(pair.public()),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
).await;
|
||||||
|
|
||||||
|
Delay::new(ACTIVITY_TIMEOUT * 2 / 3).await;
|
||||||
|
|
||||||
|
overseer_send(
|
||||||
|
&mut virtual_overseer,
|
||||||
|
CollatorProtocolMessage::NetworkBridgeUpdateV1(
|
||||||
|
NetworkBridgeEvent::PeerMessage(
|
||||||
|
peer_b.clone(),
|
||||||
|
protocol_v1::CollatorProtocolMessage::AdvertiseCollation(
|
||||||
|
test_state.relay_parent,
|
||||||
|
test_state.chain_ids[0],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
).await;
|
||||||
|
|
||||||
|
assert_matches!(
|
||||||
|
overseer_recv(&mut virtual_overseer).await,
|
||||||
|
AllMessages::CandidateSelection(CandidateSelectionMessage::Collation(
|
||||||
|
relay_parent,
|
||||||
|
para_id,
|
||||||
|
collator,
|
||||||
|
)
|
||||||
|
) => {
|
||||||
|
assert_eq!(relay_parent, test_state.relay_parent);
|
||||||
|
assert_eq!(para_id, test_state.chain_ids[0]);
|
||||||
|
assert_eq!(collator, pair.public());
|
||||||
|
});
|
||||||
|
|
||||||
|
Delay::new(ACTIVITY_TIMEOUT * 2 / 3).await;
|
||||||
|
|
||||||
|
overseer_send(
|
||||||
|
&mut virtual_overseer,
|
||||||
|
CollatorProtocolMessage::NetworkBridgeUpdateV1(
|
||||||
|
NetworkBridgeEvent::PeerMessage(
|
||||||
|
peer_b.clone(),
|
||||||
|
protocol_v1::CollatorProtocolMessage::AdvertiseCollation(
|
||||||
|
hash_b,
|
||||||
|
test_state.chain_ids[1],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
).await;
|
||||||
|
|
||||||
|
assert_matches!(
|
||||||
|
overseer_recv(&mut virtual_overseer).await,
|
||||||
|
AllMessages::CandidateSelection(CandidateSelectionMessage::Collation(
|
||||||
|
relay_parent,
|
||||||
|
para_id,
|
||||||
|
collator,
|
||||||
|
)
|
||||||
|
) => {
|
||||||
|
assert_eq!(relay_parent, hash_b);
|
||||||
|
assert_eq!(para_id, test_state.chain_ids[1]);
|
||||||
|
assert_eq!(collator, pair.public());
|
||||||
|
});
|
||||||
|
|
||||||
|
Delay::new(ACTIVITY_TIMEOUT * 3 / 2).await;
|
||||||
|
|
||||||
|
assert_matches!(
|
||||||
|
overseer_recv(&mut virtual_overseer).await,
|
||||||
|
AllMessages::NetworkBridge(NetworkBridgeMessage::DisconnectPeer(
|
||||||
|
peer,
|
||||||
|
peer_set,
|
||||||
|
)) => {
|
||||||
|
assert_eq!(peer, peer_b);
|
||||||
|
assert_eq!(peer_set, PeerSet::Collation);
|
||||||
|
}
|
||||||
|
)
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -32,7 +32,7 @@ impl UnifiedReputationChange {
|
|||||||
Self::CostMajor(_) => -300_000,
|
Self::CostMajor(_) => -300_000,
|
||||||
Self::CostMinorRepeated(_) => -200_000,
|
Self::CostMinorRepeated(_) => -200_000,
|
||||||
Self::CostMajorRepeated(_) => -600_000,
|
Self::CostMajorRepeated(_) => -600_000,
|
||||||
Self::Malicious(_) => -1_000_000,
|
Self::Malicious(_) => i32::min_value(),
|
||||||
Self::BenefitMajorFirst(_) => 300_000,
|
Self::BenefitMajorFirst(_) => 300_000,
|
||||||
Self::BenefitMajor(_) => 200_000,
|
Self::BenefitMajor(_) => 200_000,
|
||||||
Self::BenefitMinorFirst(_) => 15_000,
|
Self::BenefitMinorFirst(_) => 15_000,
|
||||||
|
|||||||
@@ -503,7 +503,7 @@ where
|
|||||||
collator_protocol: {
|
collator_protocol: {
|
||||||
let side = match is_collator {
|
let side = match is_collator {
|
||||||
IsCollator::Yes(id) => ProtocolSide::Collator(id, Metrics::register(registry)?),
|
IsCollator::Yes(id) => ProtocolSide::Collator(id, Metrics::register(registry)?),
|
||||||
IsCollator::No => ProtocolSide::Validator(Metrics::register(registry)?),
|
IsCollator::No => ProtocolSide::Validator(Default::default(),Metrics::register(registry)?),
|
||||||
};
|
};
|
||||||
CollatorProtocolSubsystem::new(
|
CollatorProtocolSubsystem::new(
|
||||||
side,
|
side,
|
||||||
|
|||||||
@@ -211,6 +211,9 @@ pub enum NetworkBridgeMessage {
|
|||||||
/// Report a peer for their actions.
|
/// Report a peer for their actions.
|
||||||
ReportPeer(PeerId, UnifiedReputationChange),
|
ReportPeer(PeerId, UnifiedReputationChange),
|
||||||
|
|
||||||
|
/// Disconnect a peer from the given peer-set without affecting their reputation.
|
||||||
|
DisconnectPeer(PeerId, PeerSet),
|
||||||
|
|
||||||
/// Send a message to one or more peers on the validation peer-set.
|
/// Send a message to one or more peers on the validation peer-set.
|
||||||
SendValidationMessage(Vec<PeerId>, protocol_v1::ValidationProtocol),
|
SendValidationMessage(Vec<PeerId>, protocol_v1::ValidationProtocol),
|
||||||
|
|
||||||
@@ -249,6 +252,7 @@ impl NetworkBridgeMessage {
|
|||||||
pub fn relay_parent(&self) -> Option<Hash> {
|
pub fn relay_parent(&self) -> Option<Hash> {
|
||||||
match self {
|
match self {
|
||||||
Self::ReportPeer(_, _) => None,
|
Self::ReportPeer(_, _) => None,
|
||||||
|
Self::DisconnectPeer(_, _) => None,
|
||||||
Self::SendValidationMessage(_, _) => None,
|
Self::SendValidationMessage(_, _) => None,
|
||||||
Self::SendCollationMessage(_, _) => None,
|
Self::SendCollationMessage(_, _) => None,
|
||||||
Self::SendValidationMessages(_) => None,
|
Self::SendValidationMessages(_) => None,
|
||||||
|
|||||||
@@ -86,6 +86,10 @@ Map the message onto the corresponding [Event Handler](#event-handlers) based on
|
|||||||
|
|
||||||
- Adjust peer reputation according to cost or benefit provided
|
- Adjust peer reputation according to cost or benefit provided
|
||||||
|
|
||||||
|
### DisconnectPeer
|
||||||
|
|
||||||
|
- Disconnect the peer from the peer-set requested, if connected.
|
||||||
|
|
||||||
### SendValidationMessage / SendValidationMessages
|
### SendValidationMessage / SendValidationMessages
|
||||||
|
|
||||||
- Issue a corresponding `ProtocolMessage` to each listed peer on the validation peer-set.
|
- Issue a corresponding `ProtocolMessage` to each listed peer on the validation peer-set.
|
||||||
|
|||||||
@@ -328,7 +328,9 @@ enum PeerSet {
|
|||||||
|
|
||||||
enum NetworkBridgeMessage {
|
enum NetworkBridgeMessage {
|
||||||
/// Report a cost or benefit of a peer. Negative values are costs, positive are benefits.
|
/// Report a cost or benefit of a peer. Negative values are costs, positive are benefits.
|
||||||
ReportPeer(PeerSet, PeerId, cost_benefit: i32),
|
ReportPeer(PeerId, cost_benefit: i32),
|
||||||
|
/// Disconnect a peer from the given peer-set without affecting their reputation.
|
||||||
|
DisconnectPeer(PeerId, PeerSet),
|
||||||
/// Send a message to one or more peers on the validation peerset.
|
/// Send a message to one or more peers on the validation peerset.
|
||||||
SendValidationMessage([PeerId], ValidationProtocolV1),
|
SendValidationMessage([PeerId], ValidationProtocolV1),
|
||||||
/// Send a message to one or more peers on the collation peerset.
|
/// Send a message to one or more peers on the collation peerset.
|
||||||
|
|||||||
Reference in New Issue
Block a user