mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-18 09:31:02 +00:00
Remove all (non-dev) client references from frame, activate dependency enforcer (#4184)
* Move transaction pool to primitives * move backend, errors into primitives * remove unused client depencies * Move rpc-api into primitives * Move peerset back to client * Move rpc/api back to client, move palette/support/rpc into utils * move support-rpc into subfolder * move system-rpc into utils * move transaction-pool and -graph back into client * fix broken imports * Clean up test primitives * Make support test utils independent of frame * remove unnecessary node dependencies from service * Reactivate dependency script: - only enforce the now achieved status quo will remain - allow for primitives to depend on /client for now without failing - more discriptive error message so people understand, what it wants - minor fix to differentiative between ../client and /client (which may be a subfolder) - don't allow this to fail anylonger. * fix doc comment * 'Should not' rather than 'must not'. * Revert unwanted dependency changes * fix faulty import * fixup derive_more version * fix wrong import path
This commit is contained in:
committed by
GitHub
parent
b2aab98e69
commit
bd652793db
@@ -0,0 +1,652 @@
|
||||
// Copyright 2018-2019 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Substrate is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Substrate is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! Peer Set Manager (PSM). Contains the strategy for choosing which nodes the network should be
|
||||
//! connected to.
|
||||
|
||||
mod peersstate;
|
||||
|
||||
use std::{collections::{HashSet, HashMap}, collections::VecDeque, time::Instant};
|
||||
use futures::{prelude::*, channel::mpsc};
|
||||
use libp2p::PeerId;
|
||||
use log::{debug, error, trace};
|
||||
use serde_json::json;
|
||||
use std::{pin::Pin, task::Context, task::Poll};
|
||||
|
||||
/// We don't accept nodes whose reputation is under this value.
|
||||
const BANNED_THRESHOLD: i32 = 82 * (i32::min_value() / 100);
|
||||
/// Reputation change for a node when we get disconnected from it.
|
||||
const DISCONNECT_REPUTATION_CHANGE: i32 = -10;
|
||||
/// Reserved peers group ID
|
||||
const RESERVED_NODES: &'static str = "reserved";
|
||||
|
||||
#[derive(Debug)]
|
||||
enum Action {
|
||||
AddReservedPeer(PeerId),
|
||||
RemoveReservedPeer(PeerId),
|
||||
SetReservedOnly(bool),
|
||||
ReportPeer(PeerId, i32),
|
||||
SetPriorityGroup(String, HashSet<PeerId>),
|
||||
AddToPriorityGroup(String, PeerId),
|
||||
RemoveFromPriorityGroup(String, PeerId),
|
||||
}
|
||||
|
||||
/// Shared handle to the peer set manager (PSM). Distributed around the code.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PeersetHandle {
|
||||
tx: mpsc::UnboundedSender<Action>,
|
||||
}
|
||||
|
||||
impl PeersetHandle {
|
||||
/// Adds a new reserved peer. The peerset will make an effort to always remain connected to
|
||||
/// this peer.
|
||||
///
|
||||
/// Has no effect if the node was already a reserved peer.
|
||||
///
|
||||
/// > **Note**: Keep in mind that the networking has to know an address for this node,
|
||||
/// > otherwise it will not be able to connect to it.
|
||||
pub fn add_reserved_peer(&self, peer_id: PeerId) {
|
||||
let _ = self.tx.unbounded_send(Action::AddReservedPeer(peer_id));
|
||||
}
|
||||
|
||||
/// Remove a previously-added reserved peer.
|
||||
///
|
||||
/// Has no effect if the node was not a reserved peer.
|
||||
pub fn remove_reserved_peer(&self, peer_id: PeerId) {
|
||||
let _ = self.tx.unbounded_send(Action::RemoveReservedPeer(peer_id));
|
||||
}
|
||||
|
||||
/// Sets whether or not the peerset only has connections .
|
||||
pub fn set_reserved_only(&self, reserved: bool) {
|
||||
let _ = self.tx.unbounded_send(Action::SetReservedOnly(reserved));
|
||||
}
|
||||
|
||||
/// Reports an adjustment to the reputation of the given peer.
|
||||
pub fn report_peer(&self, peer_id: PeerId, score_diff: i32) {
|
||||
let _ = self.tx.unbounded_send(Action::ReportPeer(peer_id, score_diff));
|
||||
}
|
||||
|
||||
/// Modify a priority group.
|
||||
pub fn set_priority_group(&self, group_id: String, peers: HashSet<PeerId>) {
|
||||
let _ = self.tx.unbounded_send(Action::SetPriorityGroup(group_id, peers));
|
||||
}
|
||||
|
||||
/// Add a peer to a priority group.
|
||||
pub fn add_to_priority_group(&self, group_id: String, peer_id: PeerId) {
|
||||
let _ = self.tx.unbounded_send(Action::AddToPriorityGroup(group_id, peer_id));
|
||||
}
|
||||
|
||||
/// Remove a peer from a priority group.
|
||||
pub fn remove_from_priority_group(&self, group_id: String, peer_id: PeerId) {
|
||||
let _ = self.tx.unbounded_send(Action::RemoveFromPriorityGroup(group_id, peer_id));
|
||||
}
|
||||
}
|
||||
|
||||
/// Message that can be sent by the peer set manager (PSM).
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum Message {
|
||||
/// Request to open a connection to the given peer. From the point of view of the PSM, we are
|
||||
/// immediately connected.
|
||||
Connect(PeerId),
|
||||
|
||||
/// Drop the connection to the given peer, or cancel the connection attempt after a `Connect`.
|
||||
Drop(PeerId),
|
||||
|
||||
/// Equivalent to `Connect` for the peer corresponding to this incoming index.
|
||||
Accept(IncomingIndex),
|
||||
|
||||
/// Equivalent to `Drop` for the peer corresponding to this incoming index.
|
||||
Reject(IncomingIndex),
|
||||
}
|
||||
|
||||
/// Opaque identifier for an incoming connection. Allocated by the network.
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct IncomingIndex(pub u64);
|
||||
|
||||
impl From<u64> for IncomingIndex {
|
||||
fn from(val: u64) -> IncomingIndex {
|
||||
IncomingIndex(val)
|
||||
}
|
||||
}
|
||||
|
||||
/// Configuration to pass when creating the peer set manager.
|
||||
#[derive(Debug)]
|
||||
pub struct PeersetConfig {
|
||||
/// Maximum number of ingoing links to peers.
|
||||
pub in_peers: u32,
|
||||
|
||||
/// Maximum number of outgoing links to peers.
|
||||
pub out_peers: u32,
|
||||
|
||||
/// List of bootstrap nodes to initialize the peer with.
|
||||
///
|
||||
/// > **Note**: Keep in mind that the networking has to know an address for these nodes,
|
||||
/// > otherwise it will not be able to connect to them.
|
||||
pub bootnodes: Vec<PeerId>,
|
||||
|
||||
/// If true, we only accept reserved nodes.
|
||||
pub reserved_only: bool,
|
||||
|
||||
/// List of nodes that we should always be connected to.
|
||||
///
|
||||
/// > **Note**: Keep in mind that the networking has to know an address for these nodes,
|
||||
/// > otherwise it will not be able to connect to them.
|
||||
pub reserved_nodes: Vec<PeerId>,
|
||||
}
|
||||
|
||||
/// Side of the peer set manager owned by the network. In other words, the "receiving" side.
|
||||
///
|
||||
/// Implements the `Stream` trait and can be polled for messages. The `Stream` never ends and never
|
||||
/// errors.
|
||||
#[derive(Debug)]
|
||||
pub struct Peerset {
|
||||
data: peersstate::PeersState,
|
||||
/// If true, we only accept reserved nodes.
|
||||
reserved_only: bool,
|
||||
/// Receiver for messages from the `PeersetHandle` and from `tx`.
|
||||
rx: mpsc::UnboundedReceiver<Action>,
|
||||
/// Sending side of `rx`.
|
||||
tx: mpsc::UnboundedSender<Action>,
|
||||
/// Queue of messages to be emitted when the `Peerset` is polled.
|
||||
message_queue: VecDeque<Message>,
|
||||
/// When the `Peerset` was created.
|
||||
created: Instant,
|
||||
/// Last time when we updated the reputations of connected nodes.
|
||||
latest_time_update: Instant,
|
||||
}
|
||||
|
||||
impl Peerset {
|
||||
/// Builds a new peerset from the given configuration.
|
||||
pub fn from_config(config: PeersetConfig) -> (Peerset, PeersetHandle) {
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
|
||||
let handle = PeersetHandle {
|
||||
tx: tx.clone(),
|
||||
};
|
||||
|
||||
let mut peerset = Peerset {
|
||||
data: peersstate::PeersState::new(config.in_peers, config.out_peers, config.reserved_only),
|
||||
tx,
|
||||
rx,
|
||||
reserved_only: config.reserved_only,
|
||||
message_queue: VecDeque::new(),
|
||||
created: Instant::now(),
|
||||
latest_time_update: Instant::now(),
|
||||
};
|
||||
|
||||
peerset.data.set_priority_group(RESERVED_NODES, config.reserved_nodes.into_iter().collect());
|
||||
for peer_id in config.bootnodes {
|
||||
if let peersstate::Peer::Unknown(entry) = peerset.data.peer(&peer_id) {
|
||||
entry.discover();
|
||||
} else {
|
||||
debug!(target: "peerset", "Duplicate bootnode in config: {:?}", peer_id);
|
||||
}
|
||||
}
|
||||
|
||||
peerset.alloc_slots();
|
||||
(peerset, handle)
|
||||
}
|
||||
|
||||
fn on_add_reserved_peer(&mut self, peer_id: PeerId) {
|
||||
let mut reserved = self.data.get_priority_group(RESERVED_NODES).unwrap_or_default();
|
||||
reserved.insert(peer_id);
|
||||
self.data.set_priority_group(RESERVED_NODES, reserved);
|
||||
self.alloc_slots();
|
||||
}
|
||||
|
||||
fn on_remove_reserved_peer(&mut self, peer_id: PeerId) {
|
||||
let mut reserved = self.data.get_priority_group(RESERVED_NODES).unwrap_or_default();
|
||||
reserved.remove(&peer_id);
|
||||
self.data.set_priority_group(RESERVED_NODES, reserved);
|
||||
match self.data.peer(&peer_id) {
|
||||
peersstate::Peer::Connected(peer) => {
|
||||
if self.reserved_only {
|
||||
peer.disconnect();
|
||||
self.message_queue.push_back(Message::Drop(peer_id));
|
||||
}
|
||||
}
|
||||
peersstate::Peer::NotConnected(_) => {},
|
||||
peersstate::Peer::Unknown(_) => {},
|
||||
}
|
||||
}
|
||||
|
||||
fn on_set_reserved_only(&mut self, reserved_only: bool) {
|
||||
self.reserved_only = reserved_only;
|
||||
self.data.set_priority_only(reserved_only);
|
||||
|
||||
if self.reserved_only {
|
||||
// Disconnect non-reserved nodes.
|
||||
let reserved = self.data.get_priority_group(RESERVED_NODES).unwrap_or_default();
|
||||
for peer_id in self.data.connected_peers().cloned().collect::<Vec<_>>().into_iter() {
|
||||
let peer = self.data.peer(&peer_id).into_connected()
|
||||
.expect("We are enumerating connected peers, therefore the peer is connected; qed");
|
||||
if !reserved.contains(&peer_id) {
|
||||
peer.disconnect();
|
||||
self.message_queue.push_back(Message::Drop(peer_id));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
self.alloc_slots();
|
||||
}
|
||||
}
|
||||
|
||||
fn on_set_priority_group(&mut self, group_id: &str, peers: HashSet<PeerId>) {
|
||||
self.data.set_priority_group(group_id, peers);
|
||||
self.alloc_slots();
|
||||
}
|
||||
|
||||
fn on_add_to_priority_group(&mut self, group_id: &str, peer_id: PeerId) {
|
||||
self.data.add_to_priority_group(group_id, peer_id);
|
||||
self.alloc_slots();
|
||||
}
|
||||
|
||||
fn on_remove_from_priority_group(&mut self, group_id: &str, peer_id: PeerId) {
|
||||
self.data.remove_from_priority_group(group_id, &peer_id);
|
||||
self.alloc_slots();
|
||||
}
|
||||
|
||||
fn on_report_peer(&mut self, peer_id: PeerId, score_diff: i32) {
|
||||
// We want reputations to be up-to-date before adjusting them.
|
||||
self.update_time();
|
||||
|
||||
match self.data.peer(&peer_id) {
|
||||
peersstate::Peer::Connected(mut peer) => {
|
||||
peer.add_reputation(score_diff);
|
||||
if peer.reputation() < BANNED_THRESHOLD {
|
||||
peer.disconnect();
|
||||
self.message_queue.push_back(Message::Drop(peer_id));
|
||||
}
|
||||
},
|
||||
peersstate::Peer::NotConnected(mut peer) => peer.add_reputation(score_diff),
|
||||
peersstate::Peer::Unknown(peer) => peer.discover().add_reputation(score_diff),
|
||||
}
|
||||
}
|
||||
|
||||
/// Updates the value of `self.latest_time_update` and performs all the updates that happen
|
||||
/// over time, such as reputation increases for staying connected.
|
||||
fn update_time(&mut self) {
|
||||
// We basically do `(now - self.latest_update).as_secs()`, except that by the way we do it
|
||||
// we know that we're not going to miss seconds because of rounding to integers.
|
||||
let secs_diff = {
|
||||
let now = Instant::now();
|
||||
let elapsed_latest = self.latest_time_update - self.created;
|
||||
let elapsed_now = now - self.created;
|
||||
self.latest_time_update = now;
|
||||
elapsed_now.as_secs() - elapsed_latest.as_secs()
|
||||
};
|
||||
|
||||
// For each elapsed second, move the node reputation towards zero.
|
||||
// If we multiply each second the reputation by `k` (where `k` is between 0 and 1), it
|
||||
// takes `ln(0.5) / ln(k)` seconds to reduce the reputation by half. Use this formula to
|
||||
// empirically determine a value of `k` that looks correct.
|
||||
for _ in 0..secs_diff {
|
||||
for peer in self.data.peers().cloned().collect::<Vec<_>>() {
|
||||
// We use `k = 0.98`, so we divide by `50`. With that value, it takes 34.3 seconds
|
||||
// to reduce the reputation by half.
|
||||
fn reput_tick(reput: i32) -> i32 {
|
||||
let mut diff = reput / 50;
|
||||
if diff == 0 && reput < 0 {
|
||||
diff = -1;
|
||||
} else if diff == 0 && reput > 0 {
|
||||
diff = 1;
|
||||
}
|
||||
reput.saturating_sub(diff)
|
||||
}
|
||||
match self.data.peer(&peer) {
|
||||
peersstate::Peer::Connected(mut peer) =>
|
||||
peer.set_reputation(reput_tick(peer.reputation())),
|
||||
peersstate::Peer::NotConnected(mut peer) =>
|
||||
peer.set_reputation(reput_tick(peer.reputation())),
|
||||
peersstate::Peer::Unknown(_) => unreachable!("We iterate over known peers; qed")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Try to fill available out slots with nodes.
|
||||
fn alloc_slots(&mut self) {
|
||||
self.update_time();
|
||||
|
||||
// Try to grab the next node to attempt to connect to.
|
||||
while let Some(next) = {
|
||||
if self.reserved_only {
|
||||
self.data.priority_not_connected_peer_from_group(RESERVED_NODES)
|
||||
} else {
|
||||
self.data.priority_not_connected_peer()
|
||||
}
|
||||
} {
|
||||
match next.try_outgoing() {
|
||||
Ok(conn) => self.message_queue.push_back(Message::Connect(conn.into_peer_id())),
|
||||
Err(_) => break, // No more slots available.
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
if self.reserved_only {
|
||||
break
|
||||
}
|
||||
|
||||
// Try to grab the next node to attempt to connect to.
|
||||
let next = match self.data.highest_not_connected_peer() {
|
||||
Some(p) => p,
|
||||
None => break, // No known node to add.
|
||||
};
|
||||
|
||||
// Don't connect to nodes with an abysmal reputation.
|
||||
if next.reputation() < BANNED_THRESHOLD {
|
||||
break;
|
||||
}
|
||||
|
||||
match next.try_outgoing() {
|
||||
Ok(conn) => self.message_queue.push_back(Message::Connect(conn.into_peer_id())),
|
||||
Err(_) => break, // No more slots available.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Indicate that we received an incoming connection. Must be answered either with
|
||||
/// a corresponding `Accept` or `Reject`, except if we were already connected to this peer.
|
||||
///
|
||||
/// Note that this mechanism is orthogonal to `Connect`/`Drop`. Accepting an incoming
|
||||
/// connection implicitly means `Connect`, but incoming connections aren't cancelled by
|
||||
/// `dropped`.
|
||||
///
|
||||
// Implementation note: because of concurrency issues, it is possible that we push a `Connect`
|
||||
// message to the output channel with a `PeerId`, and that `incoming` gets called with the same
|
||||
// `PeerId` before that message has been read by the user. In this situation we must not answer.
|
||||
pub fn incoming(&mut self, peer_id: PeerId, index: IncomingIndex) {
|
||||
trace!(target: "peerset", "Incoming {:?}", peer_id);
|
||||
self.update_time();
|
||||
|
||||
let not_connected = match self.data.peer(&peer_id) {
|
||||
// If we're already connected, don't answer, as the docs mention.
|
||||
peersstate::Peer::Connected(_) => return,
|
||||
peersstate::Peer::NotConnected(entry) => entry,
|
||||
peersstate::Peer::Unknown(entry) => entry.discover(),
|
||||
};
|
||||
|
||||
if not_connected.reputation() < BANNED_THRESHOLD {
|
||||
self.message_queue.push_back(Message::Reject(index));
|
||||
return
|
||||
}
|
||||
|
||||
match not_connected.try_accept_incoming() {
|
||||
Ok(_) => self.message_queue.push_back(Message::Accept(index)),
|
||||
Err(_) => self.message_queue.push_back(Message::Reject(index)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Indicate that we dropped an active connection with a peer, or that we failed to connect.
|
||||
///
|
||||
/// Must only be called after the PSM has either generated a `Connect` message with this
|
||||
/// `PeerId`, or accepted an incoming connection with this `PeerId`.
|
||||
pub fn dropped(&mut self, peer_id: PeerId) {
|
||||
trace!(target: "peerset", "Dropping {:?}", peer_id);
|
||||
|
||||
// We want reputations to be up-to-date before adjusting them.
|
||||
self.update_time();
|
||||
|
||||
match self.data.peer(&peer_id) {
|
||||
peersstate::Peer::Connected(mut entry) => {
|
||||
// Decrease the node's reputation so that we don't try it again and again and again.
|
||||
entry.add_reputation(DISCONNECT_REPUTATION_CHANGE);
|
||||
entry.disconnect();
|
||||
}
|
||||
peersstate::Peer::NotConnected(_) | peersstate::Peer::Unknown(_) =>
|
||||
error!(target: "peerset", "Received dropped() for non-connected node"),
|
||||
}
|
||||
|
||||
self.alloc_slots();
|
||||
}
|
||||
|
||||
/// Adds discovered peer ids to the PSM.
|
||||
///
|
||||
/// > **Note**: There is no equivalent "expired" message, meaning that it is the responsibility
|
||||
/// > of the PSM to remove `PeerId`s that fail to dial too often.
|
||||
pub fn discovered<I: IntoIterator<Item = PeerId>>(&mut self, peer_ids: I) {
|
||||
let mut discovered_any = false;
|
||||
|
||||
for peer_id in peer_ids {
|
||||
if let peersstate::Peer::Unknown(entry) = self.data.peer(&peer_id) {
|
||||
entry.discover();
|
||||
discovered_any = true;
|
||||
}
|
||||
}
|
||||
|
||||
if discovered_any {
|
||||
self.alloc_slots();
|
||||
}
|
||||
}
|
||||
|
||||
/// Reports an adjustment to the reputation of the given peer.
|
||||
pub fn report_peer(&mut self, peer_id: PeerId, score_diff: i32) {
|
||||
// We don't immediately perform the adjustments in order to have state consistency. We
|
||||
// don't want the reporting here to take priority over messages sent using the
|
||||
// `PeersetHandle`.
|
||||
let _ = self.tx.unbounded_send(Action::ReportPeer(peer_id, score_diff));
|
||||
}
|
||||
|
||||
/// Produces a JSON object containing the state of the peerset manager, for debugging purposes.
|
||||
pub fn debug_info(&mut self) -> serde_json::Value {
|
||||
self.update_time();
|
||||
|
||||
json!({
|
||||
"nodes": self.data.peers().cloned().collect::<Vec<_>>().into_iter().map(|peer_id| {
|
||||
let state = match self.data.peer(&peer_id) {
|
||||
peersstate::Peer::Connected(entry) => json!({
|
||||
"connected": true,
|
||||
"reputation": entry.reputation()
|
||||
}),
|
||||
peersstate::Peer::NotConnected(entry) => json!({
|
||||
"connected": false,
|
||||
"reputation": entry.reputation()
|
||||
}),
|
||||
peersstate::Peer::Unknown(_) =>
|
||||
unreachable!("We iterate over the known peers; QED")
|
||||
};
|
||||
|
||||
(peer_id.to_base58(), state)
|
||||
}).collect::<HashMap<_, _>>(),
|
||||
"reserved_only": self.reserved_only,
|
||||
"message_queue": self.message_queue.len(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns priority group by id.
|
||||
pub fn get_priority_group(&self, group_id: &str) -> Option<HashSet<PeerId>> {
|
||||
self.data.get_priority_group(group_id)
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for Peerset {
|
||||
type Item = Message;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
loop {
|
||||
if let Some(message) = self.message_queue.pop_front() {
|
||||
return Poll::Ready(Some(message));
|
||||
}
|
||||
|
||||
let action = match Stream::poll_next(Pin::new(&mut self.rx), cx) {
|
||||
Poll::Pending => return Poll::Pending,
|
||||
Poll::Ready(Some(event)) => event,
|
||||
Poll::Ready(None) => return Poll::Pending,
|
||||
};
|
||||
|
||||
match action {
|
||||
Action::AddReservedPeer(peer_id) =>
|
||||
self.on_add_reserved_peer(peer_id),
|
||||
Action::RemoveReservedPeer(peer_id) =>
|
||||
self.on_remove_reserved_peer(peer_id),
|
||||
Action::SetReservedOnly(reserved) =>
|
||||
self.on_set_reserved_only(reserved),
|
||||
Action::ReportPeer(peer_id, score_diff) =>
|
||||
self.on_report_peer(peer_id, score_diff),
|
||||
Action::SetPriorityGroup(group_id, peers) =>
|
||||
self.on_set_priority_group(&group_id, peers),
|
||||
Action::AddToPriorityGroup(group_id, peer_id) =>
|
||||
self.on_add_to_priority_group(&group_id, peer_id),
|
||||
Action::RemoveFromPriorityGroup(group_id, peer_id) =>
|
||||
self.on_remove_from_priority_group(&group_id, peer_id),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use libp2p::PeerId;
|
||||
use futures::prelude::*;
|
||||
use super::{PeersetConfig, Peerset, Message, IncomingIndex, BANNED_THRESHOLD};
|
||||
use std::{pin::Pin, task::Poll, thread, time::Duration};
|
||||
|
||||
fn assert_messages(mut peerset: Peerset, messages: Vec<Message>) -> Peerset {
|
||||
for expected_message in messages {
|
||||
let (message, p) = next_message(peerset).expect("expected message");
|
||||
assert_eq!(message, expected_message);
|
||||
peerset = p;
|
||||
}
|
||||
assert!(peerset.message_queue.is_empty(), peerset.message_queue);
|
||||
peerset
|
||||
}
|
||||
|
||||
fn next_message(mut peerset: Peerset) -> Result<(Message, Peerset), ()> {
|
||||
let next = futures::executor::block_on_stream(&mut peerset).next();
|
||||
let message = next.ok_or_else(|| ())?;
|
||||
Ok((message, peerset))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_peerset_add_reserved_peer() {
|
||||
let bootnode = PeerId::random();
|
||||
let reserved_peer = PeerId::random();
|
||||
let reserved_peer2 = PeerId::random();
|
||||
let config = PeersetConfig {
|
||||
in_peers: 0,
|
||||
out_peers: 2,
|
||||
bootnodes: vec![bootnode],
|
||||
reserved_only: true,
|
||||
reserved_nodes: Vec::new(),
|
||||
};
|
||||
|
||||
let (peerset, handle) = Peerset::from_config(config);
|
||||
handle.add_reserved_peer(reserved_peer.clone());
|
||||
handle.add_reserved_peer(reserved_peer2.clone());
|
||||
|
||||
assert_messages(peerset, vec![
|
||||
Message::Connect(reserved_peer),
|
||||
Message::Connect(reserved_peer2)
|
||||
]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_peerset_incoming() {
|
||||
let bootnode = PeerId::random();
|
||||
let incoming = PeerId::random();
|
||||
let incoming2 = PeerId::random();
|
||||
let incoming3 = PeerId::random();
|
||||
let ii = IncomingIndex(1);
|
||||
let ii2 = IncomingIndex(2);
|
||||
let ii3 = IncomingIndex(3);
|
||||
let ii4 = IncomingIndex(3);
|
||||
let config = PeersetConfig {
|
||||
in_peers: 2,
|
||||
out_peers: 1,
|
||||
bootnodes: vec![bootnode.clone()],
|
||||
reserved_only: false,
|
||||
reserved_nodes: Vec::new(),
|
||||
};
|
||||
|
||||
let (mut peerset, _handle) = Peerset::from_config(config);
|
||||
peerset.incoming(incoming.clone(), ii);
|
||||
peerset.incoming(incoming.clone(), ii4);
|
||||
peerset.incoming(incoming2.clone(), ii2);
|
||||
peerset.incoming(incoming3.clone(), ii3);
|
||||
|
||||
assert_messages(peerset, vec![
|
||||
Message::Connect(bootnode.clone()),
|
||||
Message::Accept(ii),
|
||||
Message::Accept(ii2),
|
||||
Message::Reject(ii3),
|
||||
]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_peerset_discovered() {
|
||||
let bootnode = PeerId::random();
|
||||
let discovered = PeerId::random();
|
||||
let discovered2 = PeerId::random();
|
||||
let config = PeersetConfig {
|
||||
in_peers: 0,
|
||||
out_peers: 2,
|
||||
bootnodes: vec![bootnode.clone()],
|
||||
reserved_only: false,
|
||||
reserved_nodes: vec![],
|
||||
};
|
||||
|
||||
let (mut peerset, _handle) = Peerset::from_config(config);
|
||||
peerset.discovered(Some(discovered.clone()));
|
||||
peerset.discovered(Some(discovered.clone()));
|
||||
peerset.discovered(Some(discovered2));
|
||||
|
||||
assert_messages(peerset, vec![
|
||||
Message::Connect(bootnode),
|
||||
Message::Connect(discovered),
|
||||
]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_peerset_banned() {
|
||||
let (mut peerset, handle) = Peerset::from_config(PeersetConfig {
|
||||
in_peers: 25,
|
||||
out_peers: 25,
|
||||
bootnodes: vec![],
|
||||
reserved_only: false,
|
||||
reserved_nodes: vec![],
|
||||
});
|
||||
|
||||
// We ban a node by setting its reputation under the threshold.
|
||||
let peer_id = PeerId::random();
|
||||
handle.report_peer(peer_id.clone(), BANNED_THRESHOLD - 1);
|
||||
|
||||
let fut = futures::future::poll_fn(move |cx| {
|
||||
// We need one polling for the message to be processed.
|
||||
assert_eq!(Stream::poll_next(Pin::new(&mut peerset), cx), Poll::Pending);
|
||||
|
||||
// Check that an incoming connection from that node gets refused.
|
||||
peerset.incoming(peer_id.clone(), IncomingIndex(1));
|
||||
if let Poll::Ready(msg) = Stream::poll_next(Pin::new(&mut peerset), cx) {
|
||||
assert_eq!(msg.unwrap(), Message::Reject(IncomingIndex(1)));
|
||||
} else {
|
||||
panic!()
|
||||
}
|
||||
|
||||
// Wait a bit for the node's reputation to go above the threshold.
|
||||
thread::sleep(Duration::from_millis(1500));
|
||||
|
||||
// Try again. This time the node should be accepted.
|
||||
peerset.incoming(peer_id.clone(), IncomingIndex(2));
|
||||
while let Poll::Ready(msg) = Stream::poll_next(Pin::new(&mut peerset), cx) {
|
||||
assert_eq!(msg.unwrap(), Message::Accept(IncomingIndex(2)));
|
||||
}
|
||||
|
||||
Poll::Ready(())
|
||||
});
|
||||
|
||||
futures::executor::block_on(fut);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,720 @@
|
||||
// Copyright 2019 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Substrate is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Substrate is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! Contains the state storage behind the peerset.
|
||||
|
||||
use libp2p::PeerId;
|
||||
use std::{borrow::Cow, collections::{HashSet, HashMap}};
|
||||
use log::warn;
|
||||
|
||||
/// State storage behind the peerset.
|
||||
///
|
||||
/// # Usage
|
||||
///
|
||||
/// This struct is nothing more but a data structure containing a list of nodes, where each node
|
||||
/// has a reputation and is either connected to us or not.
|
||||
///
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PeersState {
|
||||
/// List of nodes that we know about.
|
||||
///
|
||||
/// > **Note**: This list should really be ordered by decreasing reputation, so that we can
|
||||
/// easily select the best node to connect to. As a first draft, however, we don't
|
||||
/// sort, to make the logic easier.
|
||||
nodes: HashMap<PeerId, Node>,
|
||||
|
||||
/// Number of non-priority nodes for which the `ConnectionState` is `In`.
|
||||
num_in: u32,
|
||||
|
||||
/// Number of non-priority nodes for which the `ConnectionState` is `In`.
|
||||
num_out: u32,
|
||||
|
||||
/// Maximum allowed number of non-priority nodes for which the `ConnectionState` is `In`.
|
||||
max_in: u32,
|
||||
|
||||
/// Maximum allowed number of non-priority nodes for which the `ConnectionState` is `Out`.
|
||||
max_out: u32,
|
||||
|
||||
/// Priority groups. Each group is identified by a string ID and contains a set of peer IDs.
|
||||
priority_nodes: HashMap<String, HashSet<PeerId>>,
|
||||
|
||||
/// Only allow connections to/from peers in a priority group.
|
||||
priority_only: bool,
|
||||
}
|
||||
|
||||
/// State of a single node that we know about.
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||
struct Node {
|
||||
/// Whether we are connected to this node.
|
||||
connection_state: ConnectionState,
|
||||
|
||||
/// Reputation value of the node, between `i32::min_value` (we hate that node) and
|
||||
/// `i32::max_value` (we love that node).
|
||||
reputation: i32,
|
||||
}
|
||||
|
||||
impl Default for Node {
|
||||
fn default() -> Node {
|
||||
Node {
|
||||
connection_state: ConnectionState::NotConnected,
|
||||
reputation: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Whether we are connected to a node.
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||
enum ConnectionState {
|
||||
/// We are connected through an ingoing connection.
|
||||
In,
|
||||
/// We are connected through an outgoing connection.
|
||||
Out,
|
||||
/// We are not connected to this node.
|
||||
NotConnected,
|
||||
}
|
||||
|
||||
impl ConnectionState {
|
||||
/// Returns `true` for `In` and `Out`.
|
||||
fn is_connected(self) -> bool {
|
||||
match self {
|
||||
ConnectionState::In => true,
|
||||
ConnectionState::Out => true,
|
||||
ConnectionState::NotConnected => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PeersState {
|
||||
/// Builds a new empty `PeersState`.
|
||||
pub fn new(in_peers: u32, out_peers: u32, priority_only: bool) -> Self {
|
||||
PeersState {
|
||||
nodes: HashMap::new(),
|
||||
num_in: 0,
|
||||
num_out: 0,
|
||||
max_in: in_peers,
|
||||
max_out: out_peers,
|
||||
priority_nodes: HashMap::new(),
|
||||
priority_only,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns an object that grants access to the state of a peer.
|
||||
pub fn peer<'a>(&'a mut self, peer_id: &'a PeerId) -> Peer<'a> {
|
||||
match self.nodes.get_mut(peer_id) {
|
||||
None => return Peer::Unknown(UnknownPeer {
|
||||
parent: self,
|
||||
peer_id: Cow::Borrowed(peer_id),
|
||||
}),
|
||||
Some(peer) => {
|
||||
if peer.connection_state.is_connected() {
|
||||
Peer::Connected(ConnectedPeer {
|
||||
state: self,
|
||||
peer_id: Cow::Borrowed(peer_id),
|
||||
})
|
||||
} else {
|
||||
Peer::NotConnected(NotConnectedPeer {
|
||||
state: self,
|
||||
peer_id: Cow::Borrowed(peer_id),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the list of all the peers we know of.
|
||||
// Note: this method could theoretically return a `Peer`, but implementing that
|
||||
// isn't simple.
|
||||
pub fn peers(&self) -> impl Iterator<Item = &PeerId> {
|
||||
self.nodes.keys()
|
||||
}
|
||||
|
||||
/// Returns the list of peers we are connected to.
|
||||
// Note: this method could theoretically return a `ConnectedPeer`, but implementing that
|
||||
// isn't simple.
|
||||
pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
|
||||
self.nodes.iter()
|
||||
.filter(|(_, p)| p.connection_state.is_connected())
|
||||
.map(|(p, _)| p)
|
||||
}
|
||||
|
||||
/// Returns the first priority peer that we are not connected to.
|
||||
///
|
||||
/// If multiple nodes are prioritized, which one is returned is unspecified.
|
||||
pub fn priority_not_connected_peer(&mut self) -> Option<NotConnectedPeer> {
|
||||
let id = self.priority_nodes.values()
|
||||
.flatten()
|
||||
.find(|id| self.nodes.get(id).map_or(false, |node| !node.connection_state.is_connected()))
|
||||
.cloned();
|
||||
id.map(move |id| NotConnectedPeer {
|
||||
state: self,
|
||||
peer_id: Cow::Owned(id),
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns the first priority peer that we are not connected to.
|
||||
///
|
||||
/// If multiple nodes are prioritized, which one is returned is unspecified.
|
||||
pub fn priority_not_connected_peer_from_group(&mut self, group_id: &str) -> Option<NotConnectedPeer> {
|
||||
let id = self.priority_nodes.get(group_id)
|
||||
.and_then(|group| group.iter()
|
||||
.find(|id| self.nodes.get(id).map_or(false, |node| !node.connection_state.is_connected()))
|
||||
.cloned());
|
||||
id.map(move |id| NotConnectedPeer {
|
||||
state: self,
|
||||
peer_id: Cow::Owned(id),
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns the peer with the highest reputation and that we are not connected to.
|
||||
///
|
||||
/// If multiple nodes have the same reputation, which one is returned is unspecified.
|
||||
pub fn highest_not_connected_peer(&mut self) -> Option<NotConnectedPeer> {
|
||||
let outcome = self.nodes
|
||||
.iter_mut()
|
||||
.filter(|(_, Node { connection_state, .. })| !connection_state.is_connected())
|
||||
.fold(None::<(&PeerId, &mut Node)>, |mut cur_node, to_try| {
|
||||
if let Some(cur_node) = cur_node.take() {
|
||||
if cur_node.1.reputation >= to_try.1.reputation {
|
||||
return Some(cur_node);
|
||||
}
|
||||
}
|
||||
Some(to_try)
|
||||
})
|
||||
.map(|(peer_id, _)| peer_id.clone());
|
||||
|
||||
if let Some(peer_id) = outcome {
|
||||
Some(NotConnectedPeer {
|
||||
state: self,
|
||||
peer_id: Cow::Owned(peer_id),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn disconnect(&mut self, peer_id: &PeerId) {
|
||||
let is_priority = self.is_priority(peer_id);
|
||||
if let Some(mut node) = self.nodes.get_mut(peer_id) {
|
||||
if !is_priority {
|
||||
match node.connection_state {
|
||||
ConnectionState::In => self.num_in -= 1,
|
||||
ConnectionState::Out => self.num_out -= 1,
|
||||
ConnectionState::NotConnected =>
|
||||
debug_assert!(false, "State inconsistency: disconnecting a disconnected node")
|
||||
}
|
||||
}
|
||||
node.connection_state = ConnectionState::NotConnected;
|
||||
} else {
|
||||
warn!(target: "peerset", "Attempting to disconnect unknown peer {}", peer_id);
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the peer as connected with an outgoing connection.
|
||||
fn try_outgoing(&mut self, peer_id: &PeerId) -> bool {
|
||||
let is_priority = self.is_priority(peer_id);
|
||||
|
||||
// We are only accepting connections from priority nodes.
|
||||
if !is_priority && self.priority_only {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Note that it is possible for num_out to be strictly superior to the max, in case we were
|
||||
// connected to reserved node then marked them as not reserved.
|
||||
if self.num_out >= self.max_out && !is_priority {
|
||||
return false;
|
||||
}
|
||||
|
||||
if let Some(mut peer) = self.nodes.get_mut(peer_id) {
|
||||
peer.connection_state = ConnectionState::Out;
|
||||
if !is_priority {
|
||||
self.num_out += 1;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Tries to accept the peer as an incoming connection.
|
||||
///
|
||||
/// If there are enough slots available, switches the node to "connected" and returns `Ok`. If
|
||||
/// the slots are full, the node stays "not connected" and we return `Err`.
|
||||
///
|
||||
/// Note that reserved nodes don't count towards the number of slots.
|
||||
fn try_accept_incoming(&mut self, peer_id: &PeerId) -> bool {
|
||||
let is_priority = self.is_priority(peer_id);
|
||||
|
||||
// We are only accepting connections from priority nodes.
|
||||
if !is_priority && self.priority_only {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Note that it is possible for num_in to be strictly superior to the max, in case we were
|
||||
// connected to reserved node then marked them as not reserved.
|
||||
if self.num_in >= self.max_in && !is_priority {
|
||||
return false;
|
||||
}
|
||||
if let Some(mut peer) = self.nodes.get_mut(peer_id) {
|
||||
peer.connection_state = ConnectionState::In;
|
||||
if !is_priority {
|
||||
self.num_in += 1;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Sets priority group
|
||||
pub fn set_priority_group(&mut self, group_id: &str, peers: HashSet<PeerId>) {
|
||||
// update slot counters
|
||||
let all_other_groups: HashSet<_> = self.priority_nodes
|
||||
.iter()
|
||||
.filter(|(g, _)| *g != group_id)
|
||||
.flat_map(|(_, id)| id.clone())
|
||||
.collect();
|
||||
let existing_group = self.priority_nodes.remove(group_id).unwrap_or_default();
|
||||
for id in existing_group {
|
||||
// update slots for nodes that are no longer priority
|
||||
if !all_other_groups.contains(&id) {
|
||||
if let Some(peer) = self.nodes.get_mut(&id) {
|
||||
match peer.connection_state {
|
||||
ConnectionState::In => self.num_in += 1,
|
||||
ConnectionState::Out => self.num_out += 1,
|
||||
ConnectionState::NotConnected => {},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for id in &peers {
|
||||
// update slots for nodes that become priority
|
||||
if !all_other_groups.contains(&id) {
|
||||
let peer = self.nodes.entry(id.clone()).or_default();
|
||||
match peer.connection_state {
|
||||
ConnectionState::In => self.num_in -= 1,
|
||||
ConnectionState::Out => self.num_out -= 1,
|
||||
ConnectionState::NotConnected => {},
|
||||
}
|
||||
}
|
||||
}
|
||||
self.priority_nodes.insert(group_id.into(), peers);
|
||||
}
|
||||
|
||||
/// Add a peer to a priority group.
|
||||
pub fn add_to_priority_group(&mut self, group_id: &str, peer_id: PeerId) {
|
||||
let mut peers = self.priority_nodes.get(group_id).cloned().unwrap_or_default();
|
||||
peers.insert(peer_id);
|
||||
self.set_priority_group(group_id, peers);
|
||||
}
|
||||
|
||||
/// Remove a peer from a priority group.
|
||||
pub fn remove_from_priority_group(&mut self, group_id: &str, peer_id: &PeerId) {
|
||||
let mut peers = self.priority_nodes.get(group_id).cloned().unwrap_or_default();
|
||||
peers.remove(&peer_id);
|
||||
self.set_priority_group(group_id, peers);
|
||||
}
|
||||
|
||||
/// Get priority group content.
|
||||
pub fn get_priority_group(&self, group_id: &str) -> Option<HashSet<PeerId>> {
|
||||
self.priority_nodes.get(group_id).cloned()
|
||||
}
|
||||
|
||||
/// Set whether to only allow connections to/from peers in a priority group.
|
||||
/// Calling this method does not affect any existing connection, e.g.
|
||||
/// enabling priority only will not disconnect from any non-priority peers
|
||||
/// we are already connected to, only future incoming/outgoing connection
|
||||
/// attempts will be affected.
|
||||
pub fn set_priority_only(&mut self, priority: bool) {
|
||||
self.priority_only = priority;
|
||||
}
|
||||
|
||||
/// Check that node is any priority group.
|
||||
fn is_priority(&self, peer_id: &PeerId) -> bool {
|
||||
self.priority_nodes.iter().any(|(_, group)| group.contains(peer_id))
|
||||
}
|
||||
|
||||
/// Returns the reputation value of the node.
|
||||
fn reputation(&self, peer_id: &PeerId) -> i32 {
|
||||
self.nodes.get(peer_id).map_or(0, |p| p.reputation)
|
||||
}
|
||||
|
||||
/// Sets the reputation of the peer.
|
||||
fn set_reputation(&mut self, peer_id: &PeerId, value: i32) {
|
||||
let node = self.nodes
|
||||
.entry(peer_id.clone())
|
||||
.or_default();
|
||||
node.reputation = value;
|
||||
}
|
||||
|
||||
/// Performs an arithmetic addition on the reputation score of that peer.
|
||||
///
|
||||
/// In case of overflow, the value will be capped.
|
||||
/// If the peer is unknown to us, we insert it and consider that it has a reputation of 0.
|
||||
fn add_reputation(&mut self, peer_id: &PeerId, modifier: i32) {
|
||||
let node = self.nodes
|
||||
.entry(peer_id.clone())
|
||||
.or_default();
|
||||
node.reputation = node.reputation.saturating_add(modifier);
|
||||
}
|
||||
}
|
||||
|
||||
/// Grants access to the state of a peer in the `PeersState`.
|
||||
pub enum Peer<'a> {
|
||||
/// We are connected to this node.
|
||||
Connected(ConnectedPeer<'a>),
|
||||
/// We are not connected to this node.
|
||||
NotConnected(NotConnectedPeer<'a>),
|
||||
/// We have never heard of this node.
|
||||
Unknown(UnknownPeer<'a>),
|
||||
}
|
||||
|
||||
impl<'a> Peer<'a> {
|
||||
/// If we are the `Connected` variant, returns the inner `ConnectedPeer`. Returns `None`
|
||||
/// otherwise.
|
||||
pub fn into_connected(self) -> Option<ConnectedPeer<'a>> {
|
||||
match self {
|
||||
Peer::Connected(peer) => Some(peer),
|
||||
Peer::NotConnected(_) => None,
|
||||
Peer::Unknown(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// If we are the `Unknown` variant, returns the inner `ConnectedPeer`. Returns `None`
|
||||
/// otherwise.
|
||||
#[cfg(test)] // Feel free to remove this if this function is needed outside of tests
|
||||
pub fn into_not_connected(self) -> Option<NotConnectedPeer<'a>> {
|
||||
match self {
|
||||
Peer::Connected(_) => None,
|
||||
Peer::NotConnected(peer) => Some(peer),
|
||||
Peer::Unknown(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// If we are the `Unknown` variant, returns the inner `ConnectedPeer`. Returns `None`
|
||||
/// otherwise.
|
||||
#[cfg(test)] // Feel free to remove this if this function is needed outside of tests
|
||||
pub fn into_unknown(self) -> Option<UnknownPeer<'a>> {
|
||||
match self {
|
||||
Peer::Connected(_) => None,
|
||||
Peer::NotConnected(_) => None,
|
||||
Peer::Unknown(peer) => Some(peer),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A peer that is connected to us.
|
||||
pub struct ConnectedPeer<'a> {
|
||||
state: &'a mut PeersState,
|
||||
peer_id: Cow<'a, PeerId>,
|
||||
}
|
||||
|
||||
impl<'a> ConnectedPeer<'a> {
|
||||
/// Destroys this `ConnectedPeer` and returns the `PeerId` inside of it.
|
||||
pub fn into_peer_id(self) -> PeerId {
|
||||
self.peer_id.into_owned()
|
||||
}
|
||||
|
||||
/// Switches the peer to "not connected".
|
||||
pub fn disconnect(self) -> NotConnectedPeer<'a> {
|
||||
self.state.disconnect(&self.peer_id);
|
||||
NotConnectedPeer {
|
||||
state: self.state,
|
||||
peer_id: self.peer_id,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the reputation value of the node.
|
||||
pub fn reputation(&self) -> i32 {
|
||||
self.state.reputation(&self.peer_id)
|
||||
}
|
||||
|
||||
/// Sets the reputation of the peer.
|
||||
pub fn set_reputation(&mut self, value: i32) {
|
||||
self.state.set_reputation(&self.peer_id, value)
|
||||
}
|
||||
|
||||
/// Performs an arithmetic addition on the reputation score of that peer.
|
||||
///
|
||||
/// In case of overflow, the value will be capped.
|
||||
pub fn add_reputation(&mut self, modifier: i32) {
|
||||
self.state.add_reputation(&self.peer_id, modifier)
|
||||
}
|
||||
}
|
||||
|
||||
/// A peer that is not connected to us.
|
||||
#[derive(Debug)]
|
||||
pub struct NotConnectedPeer<'a> {
|
||||
state: &'a mut PeersState,
|
||||
peer_id: Cow<'a, PeerId>,
|
||||
}
|
||||
|
||||
impl<'a> NotConnectedPeer<'a> {
|
||||
/// Destroys this `NotConnectedPeer` and returns the `PeerId` inside of it.
|
||||
#[cfg(test)] // Feel free to remove this if this function is needed outside of tests
|
||||
pub fn into_peer_id(self) -> PeerId {
|
||||
self.peer_id.into_owned()
|
||||
}
|
||||
|
||||
/// Tries to set the peer as connected as an outgoing connection.
|
||||
///
|
||||
/// If there are enough slots available, switches the node to "connected" and returns `Ok`. If
|
||||
/// the slots are full, the node stays "not connected" and we return `Err`.
|
||||
///
|
||||
/// Note that priority nodes don't count towards the number of slots.
|
||||
pub fn try_outgoing(self) -> Result<ConnectedPeer<'a>, NotConnectedPeer<'a>> {
|
||||
if self.state.try_outgoing(&self.peer_id) {
|
||||
Ok(ConnectedPeer {
|
||||
state: self.state,
|
||||
peer_id: self.peer_id,
|
||||
})
|
||||
} else {
|
||||
Err(self)
|
||||
}
|
||||
}
|
||||
|
||||
/// Tries to accept the peer as an incoming connection.
|
||||
///
|
||||
/// If there are enough slots available, switches the node to "connected" and returns `Ok`. If
|
||||
/// the slots are full, the node stays "not connected" and we return `Err`.
|
||||
///
|
||||
/// Note that priority nodes don't count towards the number of slots.
|
||||
pub fn try_accept_incoming(self) -> Result<ConnectedPeer<'a>, NotConnectedPeer<'a>> {
|
||||
if self.state.try_accept_incoming(&self.peer_id) {
|
||||
Ok(ConnectedPeer {
|
||||
state: self.state,
|
||||
peer_id: self.peer_id,
|
||||
})
|
||||
} else {
|
||||
Err(self)
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the reputation value of the node.
|
||||
pub fn reputation(&self) -> i32 {
|
||||
self.state.reputation(&self.peer_id)
|
||||
}
|
||||
|
||||
/// Sets the reputation of the peer.
|
||||
pub fn set_reputation(&mut self, value: i32) {
|
||||
self.state.set_reputation(&self.peer_id, value)
|
||||
}
|
||||
|
||||
/// Performs an arithmetic addition on the reputation score of that peer.
|
||||
///
|
||||
/// In case of overflow, the value will be capped.
|
||||
/// If the peer is unknown to us, we insert it and consider that it has a reputation of 0.
|
||||
pub fn add_reputation(&mut self, modifier: i32) {
|
||||
self.state.add_reputation(&self.peer_id, modifier)
|
||||
}
|
||||
}
|
||||
|
||||
/// A peer that we have never heard of.
|
||||
pub struct UnknownPeer<'a> {
|
||||
parent: &'a mut PeersState,
|
||||
peer_id: Cow<'a, PeerId>,
|
||||
}
|
||||
|
||||
impl<'a> UnknownPeer<'a> {
|
||||
/// Inserts the peer identity in our list.
|
||||
///
|
||||
/// The node starts with a reputation of 0. You can adjust these default
|
||||
/// values using the `NotConnectedPeer` that this method returns.
|
||||
pub fn discover(self) -> NotConnectedPeer<'a> {
|
||||
self.parent.nodes.insert(self.peer_id.clone().into_owned(), Node {
|
||||
connection_state: ConnectionState::NotConnected,
|
||||
reputation: 0,
|
||||
});
|
||||
|
||||
let state = self.parent;
|
||||
NotConnectedPeer {
|
||||
state,
|
||||
peer_id: self.peer_id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{PeersState, Peer};
|
||||
use libp2p::PeerId;
|
||||
|
||||
#[test]
|
||||
fn full_slots_in() {
|
||||
let mut peers_state = PeersState::new(1, 1, false);
|
||||
let id1 = PeerId::random();
|
||||
let id2 = PeerId::random();
|
||||
|
||||
if let Peer::Unknown(e) = peers_state.peer(&id1) {
|
||||
assert!(e.discover().try_accept_incoming().is_ok());
|
||||
}
|
||||
|
||||
if let Peer::Unknown(e) = peers_state.peer(&id2) {
|
||||
assert!(e.discover().try_accept_incoming().is_err());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn priority_node_doesnt_use_slot() {
|
||||
let mut peers_state = PeersState::new(1, 1, false);
|
||||
let id1 = PeerId::random();
|
||||
let id2 = PeerId::random();
|
||||
|
||||
peers_state.set_priority_group("test", vec![id1.clone()].into_iter().collect());
|
||||
if let Peer::NotConnected(p) = peers_state.peer(&id1) {
|
||||
assert!(p.try_accept_incoming().is_ok());
|
||||
} else { panic!() }
|
||||
|
||||
if let Peer::Unknown(e) = peers_state.peer(&id2) {
|
||||
assert!(e.discover().try_accept_incoming().is_ok());
|
||||
} else { panic!() }
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn disconnecting_frees_slot() {
|
||||
let mut peers_state = PeersState::new(1, 1, false);
|
||||
let id1 = PeerId::random();
|
||||
let id2 = PeerId::random();
|
||||
|
||||
assert!(peers_state.peer(&id1).into_unknown().unwrap().discover().try_accept_incoming().is_ok());
|
||||
assert!(peers_state.peer(&id2).into_unknown().unwrap().discover().try_accept_incoming().is_err());
|
||||
peers_state.peer(&id1).into_connected().unwrap().disconnect();
|
||||
assert!(peers_state.peer(&id2).into_not_connected().unwrap().try_accept_incoming().is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn priority_not_connected_peer() {
|
||||
let mut peers_state = PeersState::new(25, 25, false);
|
||||
let id1 = PeerId::random();
|
||||
let id2 = PeerId::random();
|
||||
|
||||
assert!(peers_state.priority_not_connected_peer().is_none());
|
||||
peers_state.peer(&id1).into_unknown().unwrap().discover();
|
||||
peers_state.peer(&id2).into_unknown().unwrap().discover();
|
||||
|
||||
assert!(peers_state.priority_not_connected_peer().is_none());
|
||||
peers_state.set_priority_group("test", vec![id1.clone()].into_iter().collect());
|
||||
assert!(peers_state.priority_not_connected_peer().is_some());
|
||||
peers_state.set_priority_group("test", vec![id2.clone(), id2.clone()].into_iter().collect());
|
||||
assert!(peers_state.priority_not_connected_peer().is_some());
|
||||
peers_state.set_priority_group("test", vec![].into_iter().collect());
|
||||
assert!(peers_state.priority_not_connected_peer().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn highest_not_connected_peer() {
|
||||
let mut peers_state = PeersState::new(25, 25, false);
|
||||
let id1 = PeerId::random();
|
||||
let id2 = PeerId::random();
|
||||
|
||||
assert!(peers_state.highest_not_connected_peer().is_none());
|
||||
peers_state.peer(&id1).into_unknown().unwrap().discover().set_reputation(50);
|
||||
peers_state.peer(&id2).into_unknown().unwrap().discover().set_reputation(25);
|
||||
assert_eq!(peers_state.highest_not_connected_peer().map(|p| p.into_peer_id()), Some(id1.clone()));
|
||||
peers_state.peer(&id2).into_not_connected().unwrap().set_reputation(75);
|
||||
assert_eq!(peers_state.highest_not_connected_peer().map(|p| p.into_peer_id()), Some(id2.clone()));
|
||||
peers_state.peer(&id2).into_not_connected().unwrap().try_accept_incoming().unwrap();
|
||||
assert_eq!(peers_state.highest_not_connected_peer().map(|p| p.into_peer_id()), Some(id1.clone()));
|
||||
peers_state.peer(&id1).into_not_connected().unwrap().set_reputation(100);
|
||||
peers_state.peer(&id2).into_connected().unwrap().disconnect();
|
||||
assert_eq!(peers_state.highest_not_connected_peer().map(|p| p.into_peer_id()), Some(id1.clone()));
|
||||
peers_state.peer(&id1).into_not_connected().unwrap().set_reputation(-100);
|
||||
assert_eq!(peers_state.highest_not_connected_peer().map(|p| p.into_peer_id()), Some(id2.clone()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn disconnect_priority_doesnt_panic() {
|
||||
let mut peers_state = PeersState::new(1, 1, false);
|
||||
let id = PeerId::random();
|
||||
peers_state.set_priority_group("test", vec![id.clone()].into_iter().collect());
|
||||
let peer = peers_state.peer(&id).into_not_connected().unwrap().try_outgoing().unwrap();
|
||||
peer.disconnect();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn multiple_priority_groups_slot_count() {
|
||||
let mut peers_state = PeersState::new(1, 1, false);
|
||||
let id = PeerId::random();
|
||||
|
||||
if let Peer::Unknown(p) = peers_state.peer(&id) {
|
||||
assert!(p.discover().try_accept_incoming().is_ok());
|
||||
} else { panic!() }
|
||||
|
||||
assert_eq!(peers_state.num_in, 1);
|
||||
peers_state.set_priority_group("test1", vec![id.clone()].into_iter().collect());
|
||||
assert_eq!(peers_state.num_in, 0);
|
||||
peers_state.set_priority_group("test2", vec![id.clone()].into_iter().collect());
|
||||
assert_eq!(peers_state.num_in, 0);
|
||||
peers_state.set_priority_group("test1", vec![].into_iter().collect());
|
||||
assert_eq!(peers_state.num_in, 0);
|
||||
peers_state.set_priority_group("test2", vec![].into_iter().collect());
|
||||
assert_eq!(peers_state.num_in, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn priority_only_mode_ignores_drops_unknown_nodes() {
|
||||
// test whether connection to/from given peer is allowed
|
||||
let test_connection = |peers_state: &mut PeersState, id| {
|
||||
if let Peer::Unknown(p) = peers_state.peer(id) {
|
||||
p.discover();
|
||||
}
|
||||
|
||||
let incoming = if let Peer::NotConnected(p) = peers_state.peer(id) {
|
||||
p.try_accept_incoming().is_ok()
|
||||
} else {
|
||||
panic!()
|
||||
};
|
||||
|
||||
if incoming {
|
||||
peers_state.peer(id).into_connected().map(|p| p.disconnect());
|
||||
}
|
||||
|
||||
let outgoing = if let Peer::NotConnected(p) = peers_state.peer(id) {
|
||||
p.try_outgoing().is_ok()
|
||||
} else {
|
||||
panic!()
|
||||
};
|
||||
|
||||
if outgoing {
|
||||
peers_state.peer(id).into_connected().map(|p| p.disconnect());
|
||||
}
|
||||
|
||||
incoming || outgoing
|
||||
};
|
||||
|
||||
let mut peers_state = PeersState::new(1, 1, true);
|
||||
let id = PeerId::random();
|
||||
|
||||
// this is an unknown peer and our peer state is set to only allow
|
||||
// priority peers so any connection attempt should be denied.
|
||||
assert!(!test_connection(&mut peers_state, &id));
|
||||
|
||||
// disabling priority only mode should allow the connection to go
|
||||
// through.
|
||||
peers_state.set_priority_only(false);
|
||||
assert!(test_connection(&mut peers_state, &id));
|
||||
|
||||
// re-enabling it we should again deny connections from the peer.
|
||||
peers_state.set_priority_only(true);
|
||||
assert!(!test_connection(&mut peers_state, &id));
|
||||
|
||||
// but if we add the peer to a priority group it should be accepted.
|
||||
peers_state.set_priority_group("TEST_GROUP", vec![id.clone()].into_iter().collect());
|
||||
assert!(test_connection(&mut peers_state, &id));
|
||||
|
||||
// and removing it will cause the connection to once again be denied.
|
||||
peers_state.remove_from_priority_group("TEST_GROUP", &id);
|
||||
assert!(!test_connection(&mut peers_state, &id));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user