mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-20 15:11:02 +00:00
Rewrite the PSM (#2440)
* Rewrite the PSM * Fix disconnecting from reserved peers * Minor adjustements * Address review * Reputation changes adjustements * More adjustements * Adjust all reputations * More fixes and adjustments * Improve proof * Remove the possible panic * Make sure reputation reaches 0
This commit is contained in:
committed by
Gavin Wood
parent
18ca0170c3
commit
4aa44ab280
@@ -159,7 +159,7 @@ impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the state of the peerset manager, for debugging purposes.
|
/// Returns the state of the peerset manager, for debugging purposes.
|
||||||
pub fn peerset_debug_info(&self) -> serde_json::Value {
|
pub fn peerset_debug_info(&mut self) -> serde_json::Value {
|
||||||
self.custom_protocols.peerset_debug_info()
|
self.custom_protocols.peerset_debug_info()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -357,7 +357,7 @@ impl<TMessage, TSubstream> CustomProto<TMessage, TSubstream> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the state of the peerset manager, for debugging purposes.
|
/// Returns the state of the peerset manager, for debugging purposes.
|
||||||
pub fn peerset_debug_info(&self) -> serde_json::Value {
|
pub fn peerset_debug_info(&mut self) -> serde_json::Value {
|
||||||
self.peerset.debug_info()
|
self.peerset.debug_info()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
+178
-348
@@ -17,33 +17,16 @@
|
|||||||
//! Peer Set Manager (PSM). Contains the strategy for choosing which nodes the network should be
|
//! Peer Set Manager (PSM). Contains the strategy for choosing which nodes the network should be
|
||||||
//! connected to.
|
//! connected to.
|
||||||
|
|
||||||
mod slots;
|
mod peersstate;
|
||||||
|
|
||||||
use std::collections::VecDeque;
|
use std::{collections::HashMap, collections::VecDeque, time::Instant};
|
||||||
use futures::{prelude::*, sync::mpsc, try_ready};
|
use futures::{prelude::*, sync::mpsc, try_ready};
|
||||||
use libp2p::PeerId;
|
use libp2p::PeerId;
|
||||||
use log::trace;
|
use log::{debug, error, trace};
|
||||||
use lru_cache::LruCache;
|
|
||||||
use slots::{SlotType, SlotState, Slots};
|
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
|
|
||||||
const PEERSET_SCORES_CACHE_SIZE: usize = 1000;
|
/// Reputation change for a node when we get disconnected from it.
|
||||||
const DISCOVERED_NODES_LIMIT: u32 = 1000;
|
const DISCONNECT_REPUTATION_CHANGE: i32 = -10;
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
struct PeersetData {
|
|
||||||
/// List of nodes that we know exist, but we are not connected to.
|
|
||||||
/// Elements in this list must never be in `out_slots` or `in_slots`.
|
|
||||||
discovered: Slots,
|
|
||||||
/// If true, we only accept reserved nodes.
|
|
||||||
reserved_only: bool,
|
|
||||||
/// Node slots for outgoing connections.
|
|
||||||
out_slots: Slots,
|
|
||||||
/// Node slots for incoming connections.
|
|
||||||
in_slots: Slots,
|
|
||||||
/// List of node scores.
|
|
||||||
scores: LruCache<PeerId, i32>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
enum Action {
|
enum Action {
|
||||||
@@ -147,9 +130,15 @@ pub struct PeersetConfig {
|
|||||||
/// errors.
|
/// errors.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Peerset {
|
pub struct Peerset {
|
||||||
data: PeersetData,
|
data: peersstate::PeersState,
|
||||||
|
/// If true, we only accept reserved nodes.
|
||||||
|
reserved_only: bool,
|
||||||
rx: mpsc::UnboundedReceiver<Action>,
|
rx: mpsc::UnboundedReceiver<Action>,
|
||||||
message_queue: VecDeque<Message>,
|
message_queue: VecDeque<Message>,
|
||||||
|
/// When the `Peerset` was created.
|
||||||
|
created: Instant,
|
||||||
|
/// Last time when we updated the reputations of connected nodes.
|
||||||
|
latest_time_update: Instant,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Peerset {
|
impl Peerset {
|
||||||
@@ -157,30 +146,33 @@ impl Peerset {
|
|||||||
pub fn from_config(config: PeersetConfig) -> (Peerset, PeersetHandle) {
|
pub fn from_config(config: PeersetConfig) -> (Peerset, PeersetHandle) {
|
||||||
let (tx, rx) = mpsc::unbounded();
|
let (tx, rx) = mpsc::unbounded();
|
||||||
|
|
||||||
let data = PeersetData {
|
|
||||||
discovered: Slots::new(DISCOVERED_NODES_LIMIT),
|
|
||||||
reserved_only: config.reserved_only,
|
|
||||||
out_slots: Slots::new(config.out_peers),
|
|
||||||
in_slots: Slots::new(config.in_peers),
|
|
||||||
scores: LruCache::new(PEERSET_SCORES_CACHE_SIZE),
|
|
||||||
};
|
|
||||||
|
|
||||||
let handle = PeersetHandle {
|
let handle = PeersetHandle {
|
||||||
tx,
|
tx,
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut peerset = Peerset {
|
let mut peerset = Peerset {
|
||||||
data,
|
data: peersstate::PeersState::new(config.in_peers, config.out_peers),
|
||||||
rx,
|
rx,
|
||||||
|
reserved_only: config.reserved_only,
|
||||||
message_queue: VecDeque::new(),
|
message_queue: VecDeque::new(),
|
||||||
|
created: Instant::now(),
|
||||||
|
latest_time_update: Instant::now(),
|
||||||
};
|
};
|
||||||
|
|
||||||
for peer_id in config.reserved_nodes {
|
for peer_id in config.reserved_nodes {
|
||||||
peerset.data.discovered.add_peer(peer_id, SlotType::Reserved);
|
if let peersstate::Peer::Unknown(entry) = peerset.data.peer(&peer_id) {
|
||||||
|
entry.discover().set_reserved(true);
|
||||||
|
} else {
|
||||||
|
debug!(target: "peerset", "Duplicate reserved node in config: {:?}", peer_id);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for peer_id in config.bootnodes {
|
for peer_id in config.bootnodes {
|
||||||
peerset.data.discovered.add_peer(peer_id, SlotType::Common);
|
if let peersstate::Peer::Unknown(entry) = peerset.data.peer(&peer_id) {
|
||||||
|
entry.discover();
|
||||||
|
} else {
|
||||||
|
debug!(target: "peerset", "Duplicate bootnode in config: {:?}", peer_id);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
peerset.alloc_slots();
|
peerset.alloc_slots();
|
||||||
@@ -188,115 +180,131 @@ impl Peerset {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn on_add_reserved_peer(&mut self, peer_id: PeerId) {
|
fn on_add_reserved_peer(&mut self, peer_id: PeerId) {
|
||||||
// Nothing more to do if we're already connected.
|
let mut entry = match self.data.peer(&peer_id) {
|
||||||
if self.data.in_slots.contains(&peer_id) {
|
peersstate::Peer::Connected(mut connected) => {
|
||||||
self.data.in_slots.mark_reserved(&peer_id);
|
connected.set_reserved(true);
|
||||||
return;
|
return
|
||||||
}
|
}
|
||||||
|
peersstate::Peer::NotConnected(entry) => entry,
|
||||||
|
peersstate::Peer::Unknown(entry) => entry.discover(),
|
||||||
|
};
|
||||||
|
|
||||||
match self.data.out_slots.add_peer(peer_id, SlotType::Reserved) {
|
// We reach this point if and only if we were not connected to the node.
|
||||||
SlotState::Added(peer_id) => {
|
entry.set_reserved(true);
|
||||||
// reserved node may have been previously stored as normal node in discovered list
|
entry.force_outgoing();
|
||||||
self.data.discovered.remove_peer(&peer_id);
|
self.message_queue.push_back(Message::Connect(peer_id));
|
||||||
|
|
||||||
// notify that connection has been made
|
|
||||||
trace!(target: "peerset", "Connecting to new reserved peer {}", peer_id);
|
|
||||||
self.message_queue.push_back(Message::Connect(peer_id));
|
|
||||||
return;
|
|
||||||
},
|
|
||||||
SlotState::Swaped { removed, added } => {
|
|
||||||
// reserved node may have been previously stored as normal node in discovered list
|
|
||||||
self.data.discovered.remove_peer(&added);
|
|
||||||
// let's add the peer we disconnected from to the discovered list again
|
|
||||||
self.data.discovered.add_peer(removed.clone(), SlotType::Common);
|
|
||||||
// swap connections
|
|
||||||
trace!(target: "peerset", "Connecting to new reserved peer {}, dropping {}", added, removed);
|
|
||||||
self.message_queue.push_back(Message::Drop(removed));
|
|
||||||
self.message_queue.push_back(Message::Connect(added));
|
|
||||||
}
|
|
||||||
SlotState::AlreadyExists(_) | SlotState::Upgraded(_) => {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
SlotState::MaxCapacity(peer_id) => {
|
|
||||||
self.data.discovered.add_peer(peer_id, SlotType::Reserved);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_remove_reserved_peer(&mut self, peer_id: PeerId) {
|
fn on_remove_reserved_peer(&mut self, peer_id: PeerId) {
|
||||||
self.data.in_slots.mark_not_reserved(&peer_id);
|
match self.data.peer(&peer_id) {
|
||||||
self.data.out_slots.mark_not_reserved(&peer_id);
|
peersstate::Peer::Connected(mut peer) => {
|
||||||
self.data.discovered.mark_not_reserved(&peer_id);
|
peer.set_reserved(false);
|
||||||
if self.data.reserved_only {
|
if self.reserved_only {
|
||||||
if self.data.in_slots.remove_peer(&peer_id) || self.data.out_slots.remove_peer(&peer_id) {
|
peer.disconnect();
|
||||||
// insert peer back into discovered list
|
self.message_queue.push_back(Message::Drop(peer_id));
|
||||||
self.data.discovered.add_peer(peer_id.clone(), SlotType::Common);
|
}
|
||||||
self.message_queue.push_back(Message::Drop(peer_id));
|
|
||||||
// call alloc_slots again, cause we may have some reserved peers in discovered list
|
|
||||||
// waiting for the slot that was just cleared
|
|
||||||
self.alloc_slots();
|
|
||||||
}
|
}
|
||||||
|
peersstate::Peer::NotConnected(mut peer) => peer.set_reserved(false),
|
||||||
|
peersstate::Peer::Unknown(_) => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_set_reserved_only(&mut self, reserved_only: bool) {
|
fn on_set_reserved_only(&mut self, reserved_only: bool) {
|
||||||
// Disconnect non-reserved nodes.
|
// Disconnect non-reserved nodes.
|
||||||
self.data.reserved_only = reserved_only;
|
self.reserved_only = reserved_only;
|
||||||
if self.data.reserved_only {
|
if self.reserved_only {
|
||||||
for peer_id in self.data.in_slots.clear_common_slots().into_iter().chain(self.data.out_slots.clear_common_slots().into_iter()) {
|
for peer_id in self.data.connected_peers().cloned().collect::<Vec<_>>().into_iter() {
|
||||||
// insert peer back into discovered list
|
let peer = self.data.peer(&peer_id).into_connected()
|
||||||
self.data.discovered.add_peer(peer_id.clone(), SlotType::Common);
|
.expect("We are enumerating connected peers, therefore the peer is connected; qed");
|
||||||
self.message_queue.push_back(Message::Drop(peer_id));
|
if !peer.is_reserved() {
|
||||||
|
peer.disconnect();
|
||||||
|
self.message_queue.push_back(Message::Drop(peer_id));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
self.alloc_slots();
|
self.alloc_slots();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_report_peer(&mut self, peer_id: PeerId, score_diff: i32) {
|
fn on_report_peer(&mut self, peer_id: PeerId, score_diff: i32) {
|
||||||
let score = match self.data.scores.get_mut(&peer_id) {
|
// We want reputations to be up-to-date before adjusting them.
|
||||||
Some(score) => {
|
self.update_time();
|
||||||
*score = score.saturating_add(score_diff);
|
|
||||||
*score
|
match self.data.peer(&peer_id) {
|
||||||
},
|
peersstate::Peer::Connected(mut peer) => peer.add_reputation(score_diff),
|
||||||
None => {
|
peersstate::Peer::NotConnected(mut peer) => peer.add_reputation(score_diff),
|
||||||
self.data.scores.insert(peer_id.clone(), score_diff);
|
peersstate::Peer::Unknown(peer) => peer.discover().add_reputation(score_diff),
|
||||||
score_diff
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Updates the value of `self.latest_time_update` and performs all the updates that happen
|
||||||
|
/// over time, such as reputation increases for staying connected.
|
||||||
|
fn update_time(&mut self) {
|
||||||
|
// We basically do `(now - self.latest_update).as_secs()`, except that by the way we do it
|
||||||
|
// we know that we're not going to miss seconds because of rounding to integers.
|
||||||
|
let secs_diff = {
|
||||||
|
let now = Instant::now();
|
||||||
|
let elapsed_latest = self.latest_time_update - self.created;
|
||||||
|
let elapsed_now = now - self.created;
|
||||||
|
self.latest_time_update = now;
|
||||||
|
elapsed_now.as_secs() - elapsed_latest.as_secs()
|
||||||
};
|
};
|
||||||
|
|
||||||
if score < 0 {
|
// For each elapsed second, move the node reputation towards zero.
|
||||||
// peer will be removed from `in_slots` or `out_slots` in `on_dropped` method
|
// If we multiply each second the reputation by `k` (where `k` is between 0 and 1), it
|
||||||
if self.data.in_slots.contains(&peer_id) || self.data.out_slots.contains(&peer_id) {
|
// takes `ln(0.5) / ln(k)` seconds to reduce the reputation by half. Use this formula to
|
||||||
self.data.in_slots.remove_peer(&peer_id);
|
// empirically determine a value of `k` that looks correct.
|
||||||
self.data.out_slots.remove_peer(&peer_id);
|
for _ in 0..secs_diff {
|
||||||
self.message_queue.push_back(Message::Drop(peer_id));
|
for peer in self.data.peers().cloned().collect::<Vec<_>>() {
|
||||||
|
// We use `k = 0.98`, so we divide by `50`. With that value, it takes 34.3 seconds
|
||||||
|
// to reduce the reputation by half.
|
||||||
|
fn reput_tick(reput: i32) -> i32 {
|
||||||
|
let mut diff = -reput / 50;
|
||||||
|
if diff == 0 && reput < 0 {
|
||||||
|
diff = 1;
|
||||||
|
} else if diff == 0 && reput > 0 {
|
||||||
|
diff = -1;
|
||||||
|
}
|
||||||
|
reput.saturating_add(diff)
|
||||||
|
}
|
||||||
|
match self.data.peer(&peer) {
|
||||||
|
peersstate::Peer::Connected(mut peer) =>
|
||||||
|
peer.set_reputation(reput_tick(peer.reputation())),
|
||||||
|
peersstate::Peer::NotConnected(mut peer) =>
|
||||||
|
peer.set_reputation(reput_tick(peer.reputation())),
|
||||||
|
peersstate::Peer::Unknown(_) => unreachable!("We iterate over known peers; qed")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Try to fill available out slots with nodes.
|
||||||
fn alloc_slots(&mut self) {
|
fn alloc_slots(&mut self) {
|
||||||
while let Some((peer_id, slot_type)) = self.data.discovered.pop_most_important_peer(self.data.reserved_only) {
|
self.update_time();
|
||||||
match self.data.out_slots.add_peer(peer_id, slot_type) {
|
|
||||||
SlotState::Added(peer_id) => {
|
loop {
|
||||||
trace!(target: "peerset", "Connecting to new peer {}", peer_id);
|
// Try to grab the next node to attempt to connect to.
|
||||||
self.message_queue.push_back(Message::Connect(peer_id));
|
let next = match self.data.reserved_not_connected_peer() {
|
||||||
},
|
Some(p) => p,
|
||||||
SlotState::Swaped { removed, added } => {
|
None => if self.reserved_only {
|
||||||
// insert peer back into discovered list
|
break // No known node to add.
|
||||||
trace!(target: "peerset", "Connecting to new peer {}, dropping {}", added, removed);
|
} else {
|
||||||
self.data.discovered.add_peer(removed.clone(), SlotType::Common);
|
match self.data.highest_not_connected_peer() {
|
||||||
self.message_queue.push_back(Message::Drop(removed));
|
Some(p) => p,
|
||||||
self.message_queue.push_back(Message::Connect(added));
|
None => break, // No known node to add.
|
||||||
|
}
|
||||||
}
|
}
|
||||||
SlotState::Upgraded(_) | SlotState::AlreadyExists(_) => {
|
};
|
||||||
// TODO: we should never reach this point
|
|
||||||
},
|
// Don't connect to nodes with an abysmal reputation.
|
||||||
SlotState::MaxCapacity(peer_id) => {
|
if next.reputation() == i32::min_value() {
|
||||||
self.data.discovered.add_peer(peer_id, slot_type);
|
break;
|
||||||
break;
|
}
|
||||||
},
|
|
||||||
|
match next.try_outgoing() {
|
||||||
|
Ok(conn) => self.message_queue.push_back(Message::Connect(conn.into_peer_id())),
|
||||||
|
Err(_) => break, // No more slots available.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -305,60 +313,24 @@ impl Peerset {
|
|||||||
/// a corresponding `Accept` or `Reject`, except if we were already connected to this peer.
|
/// a corresponding `Accept` or `Reject`, except if we were already connected to this peer.
|
||||||
///
|
///
|
||||||
/// Note that this mechanism is orthogonal to `Connect`/`Drop`. Accepting an incoming
|
/// Note that this mechanism is orthogonal to `Connect`/`Drop`. Accepting an incoming
|
||||||
/// connection implicitely means `Accept`, but incoming connections aren't cancelled by
|
/// connection implicitely means `Connect`, but incoming connections aren't cancelled by
|
||||||
/// `dropped`.
|
/// `dropped`.
|
||||||
///
|
///
|
||||||
/// Because of concurrency issues, it is acceptable to call `incoming` with a `PeerId` the
|
/// Because of concurrency issues, it is acceptable to call `incoming` with a `PeerId` the
|
||||||
/// peerset is already connected to, in which case it must not answer.
|
/// peerset is already connected to, in which case it must not answer.
|
||||||
pub fn incoming(&mut self, peer_id: PeerId, index: IncomingIndex) {
|
pub fn incoming(&mut self, peer_id: PeerId, index: IncomingIndex) {
|
||||||
trace!(
|
trace!(target: "peerset", "Incoming {:?}", peer_id);
|
||||||
target: "peerset",
|
|
||||||
"Incoming {:?}\nin_slots={:?}\nout_slots={:?}",
|
|
||||||
peer_id, self.data.in_slots, self.data.out_slots
|
|
||||||
);
|
|
||||||
// if `reserved_only` is set, but this peer is not a part of our discovered list,
|
|
||||||
// a) it is not reserved, so we reject the connection
|
|
||||||
// b) we are already connected to it, so we reject the connection
|
|
||||||
if self.data.reserved_only && !self.data.discovered.is_reserved(&peer_id) {
|
|
||||||
self.message_queue.push_back(Message::Reject(index));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// check if we are already connected to this peer
|
let not_connected = match self.data.peer(&peer_id) {
|
||||||
if self.data.out_slots.contains(&peer_id) {
|
// If we're already connected, don't answer, as the docs mention.
|
||||||
// we are already connected. in this case we do not answer
|
peersstate::Peer::Connected(_) => return,
|
||||||
return;
|
peersstate::Peer::NotConnected(entry) => entry,
|
||||||
}
|
peersstate::Peer::Unknown(entry) => entry.discover(),
|
||||||
|
|
||||||
let slot_type = if self.data.reserved_only {
|
|
||||||
SlotType::Reserved
|
|
||||||
} else {
|
|
||||||
SlotType::Common
|
|
||||||
};
|
};
|
||||||
|
|
||||||
match self.data.in_slots.add_peer(peer_id, slot_type) {
|
match not_connected.try_accept_incoming() {
|
||||||
SlotState::Added(peer_id) => {
|
Ok(_) => self.message_queue.push_back(Message::Accept(index)),
|
||||||
// reserved node may have been previously stored as normal node in discovered list
|
Err(_) => self.message_queue.push_back(Message::Reject(index)),
|
||||||
self.data.discovered.remove_peer(&peer_id);
|
|
||||||
self.message_queue.push_back(Message::Accept(index));
|
|
||||||
return;
|
|
||||||
},
|
|
||||||
SlotState::Swaped { removed, added } => {
|
|
||||||
// reserved node may have been previously stored as normal node in discovered list
|
|
||||||
self.data.discovered.remove_peer(&added);
|
|
||||||
// swap connections.
|
|
||||||
self.message_queue.push_back(Message::Drop(removed));
|
|
||||||
self.message_queue.push_back(Message::Accept(index));
|
|
||||||
},
|
|
||||||
SlotState::AlreadyExists(_) | SlotState::Upgraded(_) => {
|
|
||||||
// we are already connected. in this case we do not answer
|
|
||||||
return;
|
|
||||||
},
|
|
||||||
SlotState::MaxCapacity(peer_id) => {
|
|
||||||
self.data.discovered.add_peer(peer_id, slot_type);
|
|
||||||
self.message_queue.push_back(Message::Reject(index));
|
|
||||||
return;
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -367,24 +339,21 @@ impl Peerset {
|
|||||||
/// Must only be called after the PSM has either generated a `Connect` message with this
|
/// Must only be called after the PSM has either generated a `Connect` message with this
|
||||||
/// `PeerId`, or accepted an incoming connection with this `PeerId`.
|
/// `PeerId`, or accepted an incoming connection with this `PeerId`.
|
||||||
pub fn dropped(&mut self, peer_id: PeerId) {
|
pub fn dropped(&mut self, peer_id: PeerId) {
|
||||||
trace!(
|
trace!(target: "peerset", "Dropping {:?}", peer_id);
|
||||||
target: "peerset",
|
|
||||||
"Dropping {:?}\nin_slots={:?}\nout_slots={:?}",
|
// We want reputations to be up-to-date before adjusting them.
|
||||||
peer_id, self.data.in_slots, self.data.out_slots
|
self.update_time();
|
||||||
);
|
|
||||||
// Automatically connect back if reserved.
|
match self.data.peer(&peer_id) {
|
||||||
if self.data.in_slots.is_reserved(&peer_id) || self.data.out_slots.is_reserved(&peer_id) {
|
peersstate::Peer::Connected(mut entry) => {
|
||||||
self.message_queue.push_back(Message::Connect(peer_id));
|
// Decrease the node's reputation so that we don't try it again and again and again.
|
||||||
return;
|
entry.add_reputation(DISCONNECT_REPUTATION_CHANGE);
|
||||||
|
entry.disconnect();
|
||||||
|
}
|
||||||
|
peersstate::Peer::NotConnected(_) | peersstate::Peer::Unknown(_) =>
|
||||||
|
error!(target: "peerset", "Received dropped() for non-connected node"),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Otherwise, free the slot.
|
|
||||||
self.data.in_slots.remove_peer(&peer_id);
|
|
||||||
self.data.out_slots.remove_peer(&peer_id);
|
|
||||||
|
|
||||||
// Note: in this dummy implementation we consider that peers never expire. As soon as we
|
|
||||||
// are disconnected from a peer, we try again.
|
|
||||||
self.data.discovered.add_peer(peer_id, SlotType::Common);
|
|
||||||
self.alloc_slots();
|
self.alloc_slots();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -393,28 +362,42 @@ impl Peerset {
|
|||||||
/// > **Note**: There is no equivalent "expired" message, meaning that it is the responsibility
|
/// > **Note**: There is no equivalent "expired" message, meaning that it is the responsibility
|
||||||
/// > of the PSM to remove `PeerId`s that fail to dial too often.
|
/// > of the PSM to remove `PeerId`s that fail to dial too often.
|
||||||
pub fn discovered<I: IntoIterator<Item = PeerId>>(&mut self, peer_ids: I) {
|
pub fn discovered<I: IntoIterator<Item = PeerId>>(&mut self, peer_ids: I) {
|
||||||
|
let mut discovered_any = false;
|
||||||
|
|
||||||
for peer_id in peer_ids {
|
for peer_id in peer_ids {
|
||||||
if !self.data.in_slots.contains(&peer_id) && !self.data.out_slots.contains(&peer_id) && !self.data.discovered.contains(&peer_id) {
|
if let peersstate::Peer::Unknown(entry) = self.data.peer(&peer_id) {
|
||||||
trace!(target: "peerset", "Discovered new peer: {:?}", peer_id);
|
entry.discover();
|
||||||
self.data.discovered.add_peer(peer_id, SlotType::Common);
|
discovered_any = true;
|
||||||
} else {
|
|
||||||
trace!(target: "peerset", "Discovered known peer: {:?}", peer_id);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.alloc_slots();
|
if discovered_any {
|
||||||
|
self.alloc_slots();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Produces a JSON object containing the state of the peerset manager, for debugging purposes.
|
/// Produces a JSON object containing the state of the peerset manager, for debugging purposes.
|
||||||
pub fn debug_info(&self) -> serde_json::Value {
|
pub fn debug_info(&mut self) -> serde_json::Value {
|
||||||
|
self.update_time();
|
||||||
|
|
||||||
json!({
|
json!({
|
||||||
"data": {
|
"nodes": self.data.peers().cloned().collect::<Vec<_>>().into_iter().map(|peer_id| {
|
||||||
// add scores
|
let state = match self.data.peer(&peer_id) {
|
||||||
"discovered": self.data.discovered.debug_info(),
|
peersstate::Peer::Connected(entry) => json!({
|
||||||
"reserved_only": self.data.reserved_only,
|
"connected": true,
|
||||||
"out_slots": self.data.out_slots.debug_info(),
|
"reputation": entry.reputation()
|
||||||
"in_slots": self.data.in_slots.debug_info()
|
}),
|
||||||
},
|
peersstate::Peer::NotConnected(entry) => json!({
|
||||||
|
"connected": false,
|
||||||
|
"reputation": entry.reputation()
|
||||||
|
}),
|
||||||
|
peersstate::Peer::Unknown(_) =>
|
||||||
|
unreachable!("We iterate over the known peers; QED")
|
||||||
|
};
|
||||||
|
|
||||||
|
(peer_id.to_base58(), state)
|
||||||
|
}).collect::<HashMap<_, _>>(),
|
||||||
|
"reserved_only": self.reserved_only,
|
||||||
"message_queue": self.message_queue.len(),
|
"message_queue": self.message_queue.len(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -454,7 +437,7 @@ mod tests {
|
|||||||
assert_eq!(message, expected_message);
|
assert_eq!(message, expected_message);
|
||||||
peerset = p;
|
peerset = p;
|
||||||
}
|
}
|
||||||
assert!(peerset.message_queue.is_empty());
|
assert!(peerset.message_queue.is_empty(), peerset.message_queue);
|
||||||
peerset
|
peerset
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -466,49 +449,6 @@ mod tests {
|
|||||||
Ok((message, peerset))
|
Ok((message, peerset))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_peerset_from_config_with_bootnodes() {
|
|
||||||
let bootnode = PeerId::random();
|
|
||||||
let bootnode2 = PeerId::random();
|
|
||||||
let config = PeersetConfig {
|
|
||||||
in_peers: 0,
|
|
||||||
out_peers: 2,
|
|
||||||
bootnodes: vec![bootnode.clone(), bootnode2.clone()],
|
|
||||||
reserved_only: false,
|
|
||||||
reserved_nodes: Vec::new(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let (peerset, _handle) = Peerset::from_config(config);
|
|
||||||
|
|
||||||
assert_messages(peerset, vec![
|
|
||||||
Message::Connect(bootnode),
|
|
||||||
Message::Connect(bootnode2),
|
|
||||||
]);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_peerset_from_config_with_reserved_nodes() {
|
|
||||||
let bootnode = PeerId::random();
|
|
||||||
let bootnode2 = PeerId::random();
|
|
||||||
let reserved_peer = PeerId::random();
|
|
||||||
let reserved_peer2 = PeerId::random();
|
|
||||||
let config = PeersetConfig {
|
|
||||||
in_peers: 0,
|
|
||||||
out_peers: 3,
|
|
||||||
bootnodes: vec![bootnode.clone(), bootnode2.clone()],
|
|
||||||
reserved_only: false,
|
|
||||||
reserved_nodes: vec![reserved_peer.clone(), reserved_peer2.clone()],
|
|
||||||
};
|
|
||||||
|
|
||||||
let (peerset, _handle) = Peerset::from_config(config);
|
|
||||||
|
|
||||||
assert_messages(peerset, vec![
|
|
||||||
Message::Connect(reserved_peer),
|
|
||||||
Message::Connect(reserved_peer2),
|
|
||||||
Message::Connect(bootnode)
|
|
||||||
]);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_peerset_add_reserved_peer() {
|
fn test_peerset_add_reserved_peer() {
|
||||||
let bootnode = PeerId::random();
|
let bootnode = PeerId::random();
|
||||||
@@ -532,87 +472,6 @@ mod tests {
|
|||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_peerset_remove_reserved_peer() {
|
|
||||||
let reserved_peer = PeerId::random();
|
|
||||||
let reserved_peer2 = PeerId::random();
|
|
||||||
let config = PeersetConfig {
|
|
||||||
in_peers: 0,
|
|
||||||
out_peers: 2,
|
|
||||||
bootnodes: vec![],
|
|
||||||
reserved_only: false,
|
|
||||||
reserved_nodes: vec![reserved_peer.clone(), reserved_peer2.clone()],
|
|
||||||
};
|
|
||||||
|
|
||||||
let (peerset, handle) = Peerset::from_config(config);
|
|
||||||
handle.remove_reserved_peer(reserved_peer.clone());
|
|
||||||
|
|
||||||
let peerset = assert_messages(peerset, vec![
|
|
||||||
Message::Connect(reserved_peer.clone()),
|
|
||||||
Message::Connect(reserved_peer2.clone()),
|
|
||||||
]);
|
|
||||||
|
|
||||||
handle.set_reserved_only(true);
|
|
||||||
handle.remove_reserved_peer(reserved_peer2.clone());
|
|
||||||
|
|
||||||
assert_messages(peerset, vec![
|
|
||||||
Message::Drop(reserved_peer),
|
|
||||||
Message::Drop(reserved_peer2),
|
|
||||||
]);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_peerset_set_reserved_only() {
|
|
||||||
let bootnode = PeerId::random();
|
|
||||||
let bootnode2 = PeerId::random();
|
|
||||||
let reserved_peer = PeerId::random();
|
|
||||||
let reserved_peer2 = PeerId::random();
|
|
||||||
let config = PeersetConfig {
|
|
||||||
in_peers: 0,
|
|
||||||
out_peers: 4,
|
|
||||||
bootnodes: vec![bootnode.clone(), bootnode2.clone()],
|
|
||||||
reserved_only: false,
|
|
||||||
reserved_nodes: vec![reserved_peer.clone(), reserved_peer2.clone()],
|
|
||||||
};
|
|
||||||
|
|
||||||
let (peerset, handle) = Peerset::from_config(config);
|
|
||||||
handle.set_reserved_only(true);
|
|
||||||
handle.set_reserved_only(false);
|
|
||||||
|
|
||||||
assert_messages(peerset, vec![
|
|
||||||
Message::Connect(reserved_peer),
|
|
||||||
Message::Connect(reserved_peer2),
|
|
||||||
Message::Connect(bootnode.clone()),
|
|
||||||
Message::Connect(bootnode2.clone()),
|
|
||||||
Message::Drop(bootnode.clone()),
|
|
||||||
Message::Drop(bootnode2.clone()),
|
|
||||||
Message::Connect(bootnode),
|
|
||||||
Message::Connect(bootnode2),
|
|
||||||
]);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_peerset_report_peer() {
|
|
||||||
let bootnode = PeerId::random();
|
|
||||||
let bootnode2 = PeerId::random();
|
|
||||||
let config = PeersetConfig {
|
|
||||||
in_peers: 0,
|
|
||||||
out_peers: 1,
|
|
||||||
bootnodes: vec![bootnode.clone(), bootnode2.clone()],
|
|
||||||
reserved_only: false,
|
|
||||||
reserved_nodes: Vec::new(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let (peerset, handle) = Peerset::from_config(config);
|
|
||||||
handle.report_peer(bootnode2, -1);
|
|
||||||
handle.report_peer(bootnode.clone(), -1);
|
|
||||||
|
|
||||||
assert_messages(peerset, vec![
|
|
||||||
Message::Connect(bootnode.clone()),
|
|
||||||
Message::Drop(bootnode)
|
|
||||||
]);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_peerset_incoming() {
|
fn test_peerset_incoming() {
|
||||||
let bootnode = PeerId::random();
|
let bootnode = PeerId::random();
|
||||||
@@ -645,35 +504,6 @@ mod tests {
|
|||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_peerset_dropped() {
|
|
||||||
let bootnode = PeerId::random();
|
|
||||||
let bootnode2 = PeerId::random();
|
|
||||||
let reserved_peer = PeerId::random();
|
|
||||||
let config = PeersetConfig {
|
|
||||||
in_peers: 0,
|
|
||||||
out_peers: 2,
|
|
||||||
bootnodes: vec![bootnode.clone(), bootnode2.clone()],
|
|
||||||
reserved_only: false,
|
|
||||||
reserved_nodes: vec![reserved_peer.clone()],
|
|
||||||
};
|
|
||||||
|
|
||||||
let (peerset, _handle) = Peerset::from_config(config);
|
|
||||||
|
|
||||||
let mut peerset = assert_messages(peerset, vec![
|
|
||||||
Message::Connect(reserved_peer.clone()),
|
|
||||||
Message::Connect(bootnode.clone()),
|
|
||||||
]);
|
|
||||||
|
|
||||||
peerset.dropped(reserved_peer.clone());
|
|
||||||
peerset.dropped(bootnode);
|
|
||||||
|
|
||||||
let _peerset = assert_messages(peerset, vec![
|
|
||||||
Message::Connect(reserved_peer),
|
|
||||||
Message::Connect(bootnode2),
|
|
||||||
]);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_peerset_discovered() {
|
fn test_peerset_discovered() {
|
||||||
let bootnode = PeerId::random();
|
let bootnode = PeerId::random();
|
||||||
|
|||||||
@@ -0,0 +1,586 @@
|
|||||||
|
// Copyright 2019 Parity Technologies (UK) Ltd.
|
||||||
|
// This file is part of Substrate.
|
||||||
|
|
||||||
|
// Substrate is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// Substrate is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
//! Contains the state storage behind the peerset.
|
||||||
|
|
||||||
|
use libp2p::PeerId;
|
||||||
|
use std::{borrow::Cow, collections::HashMap};
|
||||||
|
|
||||||
|
/// State storage behind the peerset.
|
||||||
|
///
|
||||||
|
/// # Usage
|
||||||
|
///
|
||||||
|
/// This struct is nothing more but a data structure containing a list of nodes, where each node
|
||||||
|
/// has a reputation and is either connected to us or not.
|
||||||
|
///
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct PeersState {
|
||||||
|
/// List of nodes that we know about.
|
||||||
|
///
|
||||||
|
/// > **Note**: This list should really be ordered by decreasing reputation, so that we can
|
||||||
|
/// easily select the best node to connect to. As a first draft, however, we don't
|
||||||
|
/// sort, to make the logic easier.
|
||||||
|
nodes: HashMap<PeerId, Node>,
|
||||||
|
|
||||||
|
/// Number of non-reserved nodes for which the `ConnectionState` is `In`.
|
||||||
|
num_in: u32,
|
||||||
|
|
||||||
|
/// Number of non-reserved nodes for which the `ConnectionState` is `In`.
|
||||||
|
num_out: u32,
|
||||||
|
|
||||||
|
/// Maximum allowed number of non-reserved nodes for which the `ConnectionState` is `In`.
|
||||||
|
max_in: u32,
|
||||||
|
|
||||||
|
/// Maximum allowed number of non-reserved nodes for which the `ConnectionState` is `Out`.
|
||||||
|
max_out: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// State of a single node that we know about.
|
||||||
|
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||||
|
struct Node {
|
||||||
|
/// Whether we are connected to this node.
|
||||||
|
connection_state: ConnectionState,
|
||||||
|
|
||||||
|
/// If true, this node is reserved and should always be connected to.
|
||||||
|
reserved: bool,
|
||||||
|
|
||||||
|
/// Reputation value of the node, between `i32::min_value` (we hate that node) and
|
||||||
|
/// `i32::max_value` (we love that node).
|
||||||
|
reputation: i32,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Whether we are connected to a node.
|
||||||
|
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||||
|
enum ConnectionState {
|
||||||
|
/// We are connected through an ingoing connection.
|
||||||
|
In,
|
||||||
|
/// We are connected through an outgoing connection.
|
||||||
|
Out,
|
||||||
|
/// We are not connected to this node.
|
||||||
|
NotConnected,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConnectionState {
|
||||||
|
/// Returns `true` for `In` and `Out`.
|
||||||
|
fn is_connected(self) -> bool {
|
||||||
|
match self {
|
||||||
|
ConnectionState::In => true,
|
||||||
|
ConnectionState::Out => true,
|
||||||
|
ConnectionState::NotConnected => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PeersState {
|
||||||
|
/// Builds a new empty `PeersState`.
|
||||||
|
pub fn new(in_peers: u32, out_peers: u32) -> Self {
|
||||||
|
PeersState {
|
||||||
|
nodes: HashMap::new(),
|
||||||
|
num_in: 0,
|
||||||
|
num_out: 0,
|
||||||
|
max_in: in_peers,
|
||||||
|
max_out: out_peers,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns an object that grants access to the state of a peer.
|
||||||
|
pub fn peer<'a>(&'a mut self, peer_id: &'a PeerId) -> Peer<'a> {
|
||||||
|
// Note: the Rust borrow checker still has some issues. In particular, we can't put this
|
||||||
|
// block as an `else` below (as the obvious solution would be here), or it will complain
|
||||||
|
// that we borrow `self` while it is already borrowed.
|
||||||
|
if !self.nodes.contains_key(peer_id) {
|
||||||
|
return Peer::Unknown(UnknownPeer {
|
||||||
|
parent: self,
|
||||||
|
peer_id: Cow::Borrowed(peer_id),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
let state = self.nodes.get_mut(peer_id)
|
||||||
|
.expect("We check that the value is present right above; QED");
|
||||||
|
|
||||||
|
if state.connection_state.is_connected() {
|
||||||
|
Peer::Connected(ConnectedPeer {
|
||||||
|
state,
|
||||||
|
peer_id: Cow::Borrowed(peer_id),
|
||||||
|
num_in: &mut self.num_in,
|
||||||
|
num_out: &mut self.num_out,
|
||||||
|
max_in: self.max_in,
|
||||||
|
max_out: self.max_out,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
Peer::NotConnected(NotConnectedPeer {
|
||||||
|
state,
|
||||||
|
peer_id: Cow::Borrowed(peer_id),
|
||||||
|
num_in: &mut self.num_in,
|
||||||
|
num_out: &mut self.num_out,
|
||||||
|
max_in: self.max_in,
|
||||||
|
max_out: self.max_out,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the list of all the peers we know of.
|
||||||
|
// Note: this method could theoretically return a `Peer`, but implementing that
|
||||||
|
// isn't simple.
|
||||||
|
pub fn peers(&self) -> impl Iterator<Item = &PeerId> {
|
||||||
|
self.nodes.keys()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the list of peers we are connected to.
|
||||||
|
// Note: this method could theoretically return a `ConnectedPeer`, but implementing that
|
||||||
|
// isn't simple.
|
||||||
|
pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
|
||||||
|
self.nodes.iter()
|
||||||
|
.filter(|(_, p)| p.connection_state.is_connected())
|
||||||
|
.map(|(p, _)| p)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the first reserved peer that we are not connected to.
|
||||||
|
///
|
||||||
|
/// If multiple nodes are reserved, which one is returned is unspecified.
|
||||||
|
pub fn reserved_not_connected_peer(&mut self) -> Option<NotConnectedPeer> {
|
||||||
|
let outcome = self.nodes.iter_mut()
|
||||||
|
.find(|(_, &mut Node { connection_state, reserved, .. })| {
|
||||||
|
reserved && !connection_state.is_connected()
|
||||||
|
})
|
||||||
|
.map(|(peer_id, node)| (peer_id.clone(), node));
|
||||||
|
|
||||||
|
if let Some((peer_id, state)) = outcome {
|
||||||
|
Some(NotConnectedPeer {
|
||||||
|
state,
|
||||||
|
peer_id: Cow::Owned(peer_id),
|
||||||
|
num_in: &mut self.num_in,
|
||||||
|
num_out: &mut self.num_out,
|
||||||
|
max_in: self.max_in,
|
||||||
|
max_out: self.max_out,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the peer with the highest reputation and that we are not connected to.
|
||||||
|
///
|
||||||
|
/// If multiple nodes have the same reputation, which one is returned is unspecified.
|
||||||
|
pub fn highest_not_connected_peer(&mut self) -> Option<NotConnectedPeer> {
|
||||||
|
let outcome = self.nodes
|
||||||
|
.iter_mut()
|
||||||
|
.filter(|(_, Node { connection_state, .. })| !connection_state.is_connected())
|
||||||
|
.fold(None::<(&PeerId, &mut Node)>, |mut cur_node, to_try| {
|
||||||
|
if let Some(cur_node) = cur_node.take() {
|
||||||
|
if cur_node.1.reputation >= to_try.1.reputation {
|
||||||
|
return Some(cur_node);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(to_try)
|
||||||
|
})
|
||||||
|
.map(|(peer_id, state)| (peer_id.clone(), state));
|
||||||
|
|
||||||
|
if let Some((peer_id, state)) = outcome {
|
||||||
|
Some(NotConnectedPeer {
|
||||||
|
state,
|
||||||
|
peer_id: Cow::Owned(peer_id),
|
||||||
|
num_in: &mut self.num_in,
|
||||||
|
num_out: &mut self.num_out,
|
||||||
|
max_in: self.max_in,
|
||||||
|
max_out: self.max_out,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Grants access to the state of a peer in the `PeersState`.
|
||||||
|
pub enum Peer<'a> {
|
||||||
|
/// We are connected to this node.
|
||||||
|
Connected(ConnectedPeer<'a>),
|
||||||
|
/// We are not connected to this node.
|
||||||
|
NotConnected(NotConnectedPeer<'a>),
|
||||||
|
/// We have never heard of this node.
|
||||||
|
Unknown(UnknownPeer<'a>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> Peer<'a> {
|
||||||
|
/// If we are the `Connected` variant, returns the inner `ConnectedPeer`. Returns `None`
|
||||||
|
/// otherwise.
|
||||||
|
pub fn into_connected(self) -> Option<ConnectedPeer<'a>> {
|
||||||
|
match self {
|
||||||
|
Peer::Connected(peer) => Some(peer),
|
||||||
|
Peer::NotConnected(_) => None,
|
||||||
|
Peer::Unknown(_) => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// If we are the `Unknown` variant, returns the inner `ConnectedPeer`. Returns `None`
|
||||||
|
/// otherwise.
|
||||||
|
#[cfg(test)] // Feel free to remove this if this function is needed outside of tests
|
||||||
|
pub fn into_not_connected(self) -> Option<NotConnectedPeer<'a>> {
|
||||||
|
match self {
|
||||||
|
Peer::Connected(_) => None,
|
||||||
|
Peer::NotConnected(peer) => Some(peer),
|
||||||
|
Peer::Unknown(_) => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// If we are the `Unknown` variant, returns the inner `ConnectedPeer`. Returns `None`
|
||||||
|
/// otherwise.
|
||||||
|
#[cfg(test)] // Feel free to remove this if this function is needed outside of tests
|
||||||
|
pub fn into_unknown(self) -> Option<UnknownPeer<'a>> {
|
||||||
|
match self {
|
||||||
|
Peer::Connected(_) => None,
|
||||||
|
Peer::NotConnected(_) => None,
|
||||||
|
Peer::Unknown(peer) => Some(peer),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A peer that is connected to us.
|
||||||
|
pub struct ConnectedPeer<'a> {
|
||||||
|
state: &'a mut Node,
|
||||||
|
peer_id: Cow<'a, PeerId>,
|
||||||
|
num_in: &'a mut u32,
|
||||||
|
num_out: &'a mut u32,
|
||||||
|
max_in: u32,
|
||||||
|
max_out: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> ConnectedPeer<'a> {
|
||||||
|
/// Destroys this `ConnectedPeer` and returns the `PeerId` inside of it.
|
||||||
|
pub fn into_peer_id(self) -> PeerId {
|
||||||
|
self.peer_id.into_owned()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Switches the peer to "not connected".
|
||||||
|
pub fn disconnect(self) -> NotConnectedPeer<'a> {
|
||||||
|
let connec_state = &mut self.state.connection_state;
|
||||||
|
|
||||||
|
match *connec_state {
|
||||||
|
ConnectionState::In => *self.num_in -= 1,
|
||||||
|
ConnectionState::Out => *self.num_out -= 1,
|
||||||
|
ConnectionState::NotConnected =>
|
||||||
|
debug_assert!(false, "State inconsistency: disconnecting a disconnected node")
|
||||||
|
}
|
||||||
|
|
||||||
|
*connec_state = ConnectionState::NotConnected;
|
||||||
|
|
||||||
|
NotConnectedPeer {
|
||||||
|
state: self.state,
|
||||||
|
peer_id: self.peer_id,
|
||||||
|
num_in: self.num_in,
|
||||||
|
num_out: self.num_out,
|
||||||
|
max_in: self.max_in,
|
||||||
|
max_out: self.max_out,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sets whether or not the node is reserved.
|
||||||
|
pub fn set_reserved(&mut self, reserved: bool) {
|
||||||
|
if reserved {
|
||||||
|
self.state.reserved = true;
|
||||||
|
match self.state.connection_state {
|
||||||
|
ConnectionState::In => *self.num_in -= 1,
|
||||||
|
ConnectionState::Out => *self.num_out -= 1,
|
||||||
|
ConnectionState::NotConnected => debug_assert!(false, "State inconsistency: \
|
||||||
|
connected node is in fact not connected"),
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
self.state.reserved = false;
|
||||||
|
match self.state.connection_state {
|
||||||
|
ConnectionState::In => *self.num_in += 1,
|
||||||
|
ConnectionState::Out => *self.num_out += 1,
|
||||||
|
ConnectionState::NotConnected => debug_assert!(false, "State inconsistency: \
|
||||||
|
connected node is in fact not connected"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns whether or not the node is reserved.
|
||||||
|
pub fn is_reserved(&self) -> bool {
|
||||||
|
self.state.reserved
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the reputation value of the node.
|
||||||
|
pub fn reputation(&self) -> i32 {
|
||||||
|
self.state.reputation
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sets the reputation of the peer.
|
||||||
|
pub fn set_reputation(&mut self, value: i32) {
|
||||||
|
self.state.reputation = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Performs an arithmetic addition on the reputation score of that peer.
|
||||||
|
///
|
||||||
|
/// In case of overflow, the value will be capped.
|
||||||
|
pub fn add_reputation(&mut self, modifier: i32) {
|
||||||
|
let reputation = &mut self.state.reputation;
|
||||||
|
*reputation = reputation.saturating_add(modifier);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A peer that is not connected to us.
|
||||||
|
pub struct NotConnectedPeer<'a> {
|
||||||
|
state: &'a mut Node,
|
||||||
|
peer_id: Cow<'a, PeerId>,
|
||||||
|
num_in: &'a mut u32,
|
||||||
|
num_out: &'a mut u32,
|
||||||
|
max_in: u32,
|
||||||
|
max_out: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> NotConnectedPeer<'a> {
|
||||||
|
/// Destroys this `NotConnectedPeer` and returns the `PeerId` inside of it.
|
||||||
|
#[cfg(test)] // Feel free to remove this if this function is needed outside of tests
|
||||||
|
pub fn into_peer_id(self) -> PeerId {
|
||||||
|
self.peer_id.into_owned()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tries to set the peer as connected as an outgoing connection.
|
||||||
|
///
|
||||||
|
/// If there are enough slots available, switches the node to "connected" and returns `Ok`. If
|
||||||
|
/// the slots are full, the node stays "not connected" and we return `Err`.
|
||||||
|
/// If the node is reserved, this method always succeeds.
|
||||||
|
///
|
||||||
|
/// Note that reserved nodes don't count towards the number of slots.
|
||||||
|
pub fn try_outgoing(self) -> Result<ConnectedPeer<'a>, NotConnectedPeer<'a>> {
|
||||||
|
if self.is_reserved() {
|
||||||
|
return Ok(self.force_outgoing())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Note that it is possible for num_out to be strictly superior to the max, in case we were
|
||||||
|
// connected to reserved node then marked them as not reserved, or if the user used
|
||||||
|
// `force_outgoing`.
|
||||||
|
if *self.num_out >= self.max_out {
|
||||||
|
return Err(self);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(self.force_outgoing())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sets the peer as connected as an outgoing connection.
|
||||||
|
pub fn force_outgoing(self) -> ConnectedPeer<'a> {
|
||||||
|
let connec_state = &mut self.state.connection_state;
|
||||||
|
debug_assert!(!connec_state.is_connected());
|
||||||
|
*connec_state = ConnectionState::Out;
|
||||||
|
|
||||||
|
if !self.state.reserved {
|
||||||
|
*self.num_out += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
ConnectedPeer {
|
||||||
|
state: self.state,
|
||||||
|
peer_id: self.peer_id,
|
||||||
|
num_in: self.num_in,
|
||||||
|
num_out: self.num_out,
|
||||||
|
max_in: self.max_in,
|
||||||
|
max_out: self.max_out,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tries to accept the peer as an incoming connection.
|
||||||
|
///
|
||||||
|
/// If there are enough slots available, switches the node to "connected" and returns `Ok`. If
|
||||||
|
/// the slots are full, the node stays "not connected" and we return `Err`.
|
||||||
|
///
|
||||||
|
/// Note that reserved nodes don't count towards the number of slots.
|
||||||
|
pub fn try_accept_incoming(self) -> Result<ConnectedPeer<'a>, NotConnectedPeer<'a>> {
|
||||||
|
if self.is_reserved() {
|
||||||
|
return Ok(self.force_ingoing())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Note that it is possible for num_in to be strictly superior to the max, in case we were
|
||||||
|
// connected to reserved node then marked them as not reserved.
|
||||||
|
if *self.num_in >= self.max_in {
|
||||||
|
return Err(self);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(self.force_ingoing())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sets the peer as connected as an ingoing connection.
|
||||||
|
pub fn force_ingoing(self) -> ConnectedPeer<'a> {
|
||||||
|
let connec_state = &mut self.state.connection_state;
|
||||||
|
debug_assert!(!connec_state.is_connected());
|
||||||
|
*connec_state = ConnectionState::In;
|
||||||
|
|
||||||
|
if !self.state.reserved {
|
||||||
|
*self.num_in += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
ConnectedPeer {
|
||||||
|
state: self.state,
|
||||||
|
peer_id: self.peer_id,
|
||||||
|
num_in: self.num_in,
|
||||||
|
num_out: self.num_out,
|
||||||
|
max_in: self.max_in,
|
||||||
|
max_out: self.max_out,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sets whether or not the node is reserved.
|
||||||
|
pub fn set_reserved(&mut self, reserved: bool) {
|
||||||
|
self.state.reserved = reserved;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns true if the the node is reserved.
|
||||||
|
pub fn is_reserved(&self) -> bool {
|
||||||
|
self.state.reserved
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the reputation value of the node.
|
||||||
|
pub fn reputation(&self) -> i32 {
|
||||||
|
self.state.reputation
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sets the reputation of the peer.
|
||||||
|
pub fn set_reputation(&mut self, value: i32) {
|
||||||
|
self.state.reputation = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Performs an arithmetic addition on the reputation score of that peer.
|
||||||
|
///
|
||||||
|
/// In case of overflow, the value will be capped.
|
||||||
|
/// If the peer is unknown to us, we insert it and consider that it has a reputation of 0.
|
||||||
|
pub fn add_reputation(&mut self, modifier: i32) {
|
||||||
|
let reputation = &mut self.state.reputation;
|
||||||
|
*reputation = reputation.saturating_add(modifier);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A peer that we have never heard of.
|
||||||
|
pub struct UnknownPeer<'a> {
|
||||||
|
parent: &'a mut PeersState,
|
||||||
|
peer_id: Cow<'a, PeerId>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> UnknownPeer<'a> {
|
||||||
|
/// Inserts the peer identity in our list.
|
||||||
|
///
|
||||||
|
/// The node is not reserved and starts with a reputation of 0. You can adjust these default
|
||||||
|
/// values using the `NotConnectedPeer` that this method returns.
|
||||||
|
pub fn discover(self) -> NotConnectedPeer<'a> {
|
||||||
|
self.parent.nodes.insert(self.peer_id.clone().into_owned(), Node {
|
||||||
|
connection_state: ConnectionState::NotConnected,
|
||||||
|
reputation: 0,
|
||||||
|
reserved: false,
|
||||||
|
});
|
||||||
|
|
||||||
|
let state = self.parent.nodes.get_mut(&self.peer_id)
|
||||||
|
.expect("We insert that key into the HashMap right above; QED");
|
||||||
|
|
||||||
|
NotConnectedPeer {
|
||||||
|
state,
|
||||||
|
peer_id: self.peer_id,
|
||||||
|
num_in: &mut self.parent.num_in,
|
||||||
|
num_out: &mut self.parent.num_out,
|
||||||
|
max_in: self.parent.max_in,
|
||||||
|
max_out: self.parent.max_out,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::{PeersState, Peer};
|
||||||
|
use libp2p::PeerId;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn full_slots_in() {
|
||||||
|
let mut peers_state = PeersState::new(1, 1);
|
||||||
|
let id1 = PeerId::random();
|
||||||
|
let id2 = PeerId::random();
|
||||||
|
|
||||||
|
if let Peer::Unknown(e) = peers_state.peer(&id1) {
|
||||||
|
assert!(e.discover().try_accept_incoming().is_ok());
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Peer::Unknown(e) = peers_state.peer(&id2) {
|
||||||
|
assert!(e.discover().try_accept_incoming().is_err());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn reserved_node_doesnt_use_slot() {
|
||||||
|
let mut peers_state = PeersState::new(1, 1);
|
||||||
|
let id1 = PeerId::random();
|
||||||
|
let id2 = PeerId::random();
|
||||||
|
|
||||||
|
if let Peer::Unknown(e) = peers_state.peer(&id1) {
|
||||||
|
let mut p = e.discover();
|
||||||
|
p.set_reserved(true);
|
||||||
|
assert!(p.try_accept_incoming().is_ok());
|
||||||
|
} else { panic!() }
|
||||||
|
|
||||||
|
if let Peer::Unknown(e) = peers_state.peer(&id2) {
|
||||||
|
assert!(e.discover().try_accept_incoming().is_ok());
|
||||||
|
} else { panic!() }
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn disconnecting_frees_slot() {
|
||||||
|
let mut peers_state = PeersState::new(1, 1);
|
||||||
|
let id1 = PeerId::random();
|
||||||
|
let id2 = PeerId::random();
|
||||||
|
|
||||||
|
assert!(peers_state.peer(&id1).into_unknown().unwrap().discover().try_accept_incoming().is_ok());
|
||||||
|
assert!(peers_state.peer(&id2).into_unknown().unwrap().discover().try_accept_incoming().is_err());
|
||||||
|
peers_state.peer(&id1).into_connected().unwrap().disconnect();
|
||||||
|
assert!(peers_state.peer(&id2).into_not_connected().unwrap().try_accept_incoming().is_ok());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn reserved_not_connected_peer() {
|
||||||
|
let mut peers_state = PeersState::new(25, 25);
|
||||||
|
let id1 = PeerId::random();
|
||||||
|
let id2 = PeerId::random();
|
||||||
|
|
||||||
|
assert!(peers_state.reserved_not_connected_peer().is_none());
|
||||||
|
peers_state.peer(&id1).into_unknown().unwrap().discover();
|
||||||
|
peers_state.peer(&id2).into_unknown().unwrap().discover();
|
||||||
|
|
||||||
|
assert!(peers_state.reserved_not_connected_peer().is_none());
|
||||||
|
peers_state.peer(&id1).into_not_connected().unwrap().set_reserved(true);
|
||||||
|
assert!(peers_state.reserved_not_connected_peer().is_some());
|
||||||
|
peers_state.peer(&id2).into_not_connected().unwrap().set_reserved(true);
|
||||||
|
peers_state.peer(&id1).into_not_connected().unwrap().set_reserved(false);
|
||||||
|
assert!(peers_state.reserved_not_connected_peer().is_some());
|
||||||
|
peers_state.peer(&id2).into_not_connected().unwrap().set_reserved(false);
|
||||||
|
assert!(peers_state.reserved_not_connected_peer().is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn highest_not_connected_peer() {
|
||||||
|
let mut peers_state = PeersState::new(25, 25);
|
||||||
|
let id1 = PeerId::random();
|
||||||
|
let id2 = PeerId::random();
|
||||||
|
|
||||||
|
assert!(peers_state.highest_not_connected_peer().is_none());
|
||||||
|
peers_state.peer(&id1).into_unknown().unwrap().discover().set_reputation(50);
|
||||||
|
peers_state.peer(&id2).into_unknown().unwrap().discover().set_reputation(25);
|
||||||
|
assert_eq!(peers_state.highest_not_connected_peer().map(|p| p.into_peer_id()), Some(id1.clone()));
|
||||||
|
peers_state.peer(&id2).into_not_connected().unwrap().set_reputation(75);
|
||||||
|
assert_eq!(peers_state.highest_not_connected_peer().map(|p| p.into_peer_id()), Some(id2.clone()));
|
||||||
|
peers_state.peer(&id2).into_not_connected().unwrap().force_ingoing();
|
||||||
|
assert_eq!(peers_state.highest_not_connected_peer().map(|p| p.into_peer_id()), Some(id1.clone()));
|
||||||
|
peers_state.peer(&id1).into_not_connected().unwrap().set_reputation(100);
|
||||||
|
peers_state.peer(&id2).into_connected().unwrap().disconnect();
|
||||||
|
assert_eq!(peers_state.highest_not_connected_peer().map(|p| p.into_peer_id()), Some(id1.clone()));
|
||||||
|
peers_state.peer(&id1).into_not_connected().unwrap().set_reputation(-100);
|
||||||
|
assert_eq!(peers_state.highest_not_connected_peer().map(|p| p.into_peer_id()), Some(id2.clone()));
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,222 +0,0 @@
|
|||||||
// Copyright 2018-2019 Parity Technologies (UK) Ltd.
|
|
||||||
// This file is part of Substrate.
|
|
||||||
|
|
||||||
// Substrate is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
|
|
||||||
// Substrate is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU General Public License for more details.
|
|
||||||
|
|
||||||
// You should have received a copy of the GNU General Public License
|
|
||||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
use std::{fmt, mem};
|
|
||||||
use libp2p::PeerId;
|
|
||||||
use linked_hash_map::LinkedHashMap;
|
|
||||||
use serde_json::json;
|
|
||||||
|
|
||||||
/// Describes the nature of connection with a given peer.
|
|
||||||
#[derive(Debug, PartialEq, Clone, Copy)]
|
|
||||||
pub enum SlotType {
|
|
||||||
/// Reserved peer is a peer we should always stay connected to.
|
|
||||||
Reserved,
|
|
||||||
/// Common peer is a type of peer that we stay connected to only if it's
|
|
||||||
/// useful for us.
|
|
||||||
Common,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Descibes the result of `add_peer` action.
|
|
||||||
pub enum SlotState {
|
|
||||||
/// Returned when `add_peer` successfully adds a peer to the slot.
|
|
||||||
Added(PeerId),
|
|
||||||
/// Returned when we already have given peer in our list, but it is upgraded from
|
|
||||||
/// `Common` to `Reserved`.
|
|
||||||
Upgraded(PeerId),
|
|
||||||
/// Returned when we should removed a common peer to make space for a reserved peer.
|
|
||||||
Swaped {
|
|
||||||
/// Peer was removed from the list.
|
|
||||||
removed: PeerId,
|
|
||||||
/// Peer was added to the list.
|
|
||||||
added: PeerId,
|
|
||||||
},
|
|
||||||
/// Error returned when we are already know about given peer.
|
|
||||||
AlreadyExists(PeerId),
|
|
||||||
/// Error returned when list is full and no more peers can be added.
|
|
||||||
MaxCapacity(PeerId),
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Contains all information about group of slots.
|
|
||||||
pub struct Slots {
|
|
||||||
/// Maximum number of slots. Total number of reserved and common slots must be always
|
|
||||||
/// smaller or equal to `max_slots`.
|
|
||||||
max_slots: usize,
|
|
||||||
/// Reserved slots.
|
|
||||||
reserved: LinkedHashMap<PeerId,()>,
|
|
||||||
/// Common slots.
|
|
||||||
common: LinkedHashMap<PeerId, ()>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Debug for Slots {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
||||||
struct ListFormatter<'a>(&'a LinkedHashMap<PeerId, ()>);
|
|
||||||
|
|
||||||
impl<'a> fmt::Debug for ListFormatter<'a> {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
||||||
f.debug_list().entries(self.0.keys()).finish()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
f.debug_struct("Slots")
|
|
||||||
.field("max_slots", &self.max_slots)
|
|
||||||
.field("reserved", &ListFormatter(&self.reserved))
|
|
||||||
.field("common", &ListFormatter(&self.common))
|
|
||||||
.finish()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Slots {
|
|
||||||
/// Creates a group of slots with a limited size.
|
|
||||||
pub fn new(max_slots: u32) -> Self {
|
|
||||||
let max_slots = max_slots as usize;
|
|
||||||
Slots {
|
|
||||||
max_slots,
|
|
||||||
reserved: LinkedHashMap::new(),
|
|
||||||
common: LinkedHashMap::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns true if one of the slots contains given peer.
|
|
||||||
pub fn contains(&self, peer_id: &PeerId) -> bool {
|
|
||||||
self.common.contains_key(peer_id) || self.reserved.contains_key(peer_id)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Tries to find a slot for a given peer and returns `SlotState`.
|
|
||||||
///
|
|
||||||
/// - If a peer is already inserted into reserved list or inserted or
|
|
||||||
/// inserted into common list and readded with the same `SlotType`,
|
|
||||||
/// the function returns `SlotState::AlreadyExists`
|
|
||||||
/// - If a peer is already inserted common list returns `SlotState::Upgraded`
|
|
||||||
/// - If there is no slot for a reserved peer, we try to drop one common peer
|
|
||||||
/// and it a new reserved one in it's place, function returns `SlotState::Swaped`
|
|
||||||
/// - If there is no place for a peer, function returns `SlotState::MaxCapacity`
|
|
||||||
/// - If the peer was simply added, `SlotState::Added` is returned
|
|
||||||
pub fn add_peer(&mut self, peer_id: PeerId, slot_type: SlotType) -> SlotState {
|
|
||||||
if self.reserved.contains_key(&peer_id) {
|
|
||||||
return SlotState::AlreadyExists(peer_id);
|
|
||||||
}
|
|
||||||
|
|
||||||
if self.common.contains_key(&peer_id) {
|
|
||||||
if slot_type == SlotType::Reserved {
|
|
||||||
self.common.remove(&peer_id);
|
|
||||||
self.reserved.insert(peer_id.clone(), ());
|
|
||||||
return SlotState::Upgraded(peer_id);
|
|
||||||
} else {
|
|
||||||
return SlotState::AlreadyExists(peer_id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if self.max_slots == (self.common.len() + self.reserved.len()) {
|
|
||||||
if let SlotType::Reserved = slot_type {
|
|
||||||
if let Some((to_remove, _)) = self.common.pop_front() {
|
|
||||||
self.reserved.insert(peer_id.clone(), ());
|
|
||||||
return SlotState::Swaped {
|
|
||||||
removed: to_remove,
|
|
||||||
added: peer_id,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return SlotState::MaxCapacity(peer_id);
|
|
||||||
}
|
|
||||||
|
|
||||||
match slot_type {
|
|
||||||
SlotType::Common => self.common.insert(peer_id.clone(), ()),
|
|
||||||
SlotType::Reserved => self.reserved.insert(peer_id.clone(), ()),
|
|
||||||
};
|
|
||||||
|
|
||||||
SlotState::Added(peer_id)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Pops the oldest reserved peer. If none exists and `reserved_only = false` pops a common peer.
|
|
||||||
pub fn pop_most_important_peer(&mut self, reserved_only: bool) -> Option<(PeerId, SlotType)> {
|
|
||||||
if let Some((peer_id, _)) = self.reserved.pop_front() {
|
|
||||||
return Some((peer_id, SlotType::Reserved));
|
|
||||||
}
|
|
||||||
|
|
||||||
if reserved_only {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
self.common.pop_front()
|
|
||||||
.map(|(peer_id, _)| (peer_id, SlotType::Common))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Removes all common peers from the list and returns an iterator over them.
|
|
||||||
pub fn clear_common_slots(&mut self) -> impl Iterator<Item = PeerId> {
|
|
||||||
let slots = mem::replace(&mut self.common, LinkedHashMap::new());
|
|
||||||
slots.into_iter().map(|(peer_id, _)| peer_id)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Marks given peer as a reserved one.
|
|
||||||
pub fn mark_reserved(&mut self, peer_id: &PeerId) {
|
|
||||||
if self.common.remove(peer_id).is_some() {
|
|
||||||
self.reserved.insert(peer_id.clone(), ());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Marks given peer as not reserved one.
|
|
||||||
pub fn mark_not_reserved(&mut self, peer_id: &PeerId) {
|
|
||||||
if self.reserved.remove(peer_id).is_some() {
|
|
||||||
self.common.insert(peer_id.clone(), ());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Removes a peer from a list and returns true if it existed.
|
|
||||||
pub fn remove_peer(&mut self, peer_id: &PeerId) -> bool {
|
|
||||||
self.common.remove(peer_id).is_some() || self.reserved.remove(peer_id).is_some()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns true if given peer is reserved.
|
|
||||||
pub fn is_reserved(&self, peer_id: &PeerId) -> bool {
|
|
||||||
self.reserved.contains_key(peer_id)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Produces a JSON object containing the state of slots, for debugging purposes.
|
|
||||||
pub fn debug_info(&self) -> serde_json::Value {
|
|
||||||
json!({
|
|
||||||
"max_slots": self.max_slots,
|
|
||||||
"reserved": self.reserved.keys().map(|peer_id| peer_id.to_base58()).collect::<Vec<_>>(),
|
|
||||||
"common": self.common.keys().map(|peer_id| peer_id.to_base58()).collect::<Vec<_>>()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use libp2p::PeerId;
|
|
||||||
use serde_json::json;
|
|
||||||
use super::{Slots, SlotType};
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_slots_debug_info() {
|
|
||||||
let reserved_peer = PeerId::random();
|
|
||||||
let reserved_peer2 = PeerId::random();
|
|
||||||
let common_peer = PeerId::random();
|
|
||||||
let mut slots = Slots::new(10);
|
|
||||||
|
|
||||||
slots.add_peer(reserved_peer.clone(), SlotType::Reserved);
|
|
||||||
slots.add_peer(reserved_peer2.clone(), SlotType::Reserved);
|
|
||||||
slots.add_peer(common_peer.clone(), SlotType::Common);
|
|
||||||
|
|
||||||
let expected = json!({
|
|
||||||
"max_slots": 10,
|
|
||||||
"reserved": vec![reserved_peer.to_base58(), reserved_peer2.to_base58()],
|
|
||||||
"common": vec![common_peer.to_base58()],
|
|
||||||
});
|
|
||||||
|
|
||||||
assert_eq!(expected, slots.debug_info());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user