Peerset: reuse Slots data structure for discovered peers (#2227)

* Reuse slots data structure also for peerset discovered

* updated peerset docs
This commit is contained in:
Marek Kotewicz
2019-04-10 16:12:43 +02:00
committed by GitHub
parent 19d77cbc23
commit e6d5438b57
2 changed files with 145 additions and 128 deletions
+16 -74
View File
@@ -22,77 +22,19 @@ mod slots;
use std::collections::VecDeque;
use futures::{prelude::*, sync::mpsc, try_ready};
use libp2p::PeerId;
use linked_hash_map::LinkedHashMap;
use log::trace;
use lru_cache::LruCache;
use slots::{SlotType, SlotState, Slots};
pub use serde_json::Value;
const PEERSET_SCORES_CACHE_SIZE: usize = 1000;
/// FIFO-ordered list of nodes that we know exist, but we are not connected to.
#[derive(Debug, Default)]
struct Discovered {
/// Nodes we should connect to first.
reserved: LinkedHashMap<PeerId, ()>,
/// All remaining nodes.
common: LinkedHashMap<PeerId, ()>,
}
impl Discovered {
/// Returns true if we already know given node.
fn contains(&self, peer_id: &PeerId) -> bool {
self.reserved.contains_key(peer_id) || self.common.contains_key(peer_id)
}
/// Returns true if given node is reserved.
fn is_reserved(&self, peer_id: &PeerId) -> bool {
self.reserved.contains_key(peer_id)
}
/// Adds new peer of a given type.
fn add_peer(&mut self, peer_id: PeerId, slot_type: SlotType) {
if !self.contains(&peer_id) {
match slot_type {
SlotType::Common => self.common.insert(peer_id, ()),
SlotType::Reserved => self.reserved.insert(peer_id, ()),
};
}
}
/// Pops the oldest peer from the list.
fn pop_peer(&mut self, reserved_only: bool) -> Option<(PeerId, SlotType)> {
if let Some((peer_id, _)) = self.reserved.pop_front() {
return Some((peer_id, SlotType::Reserved));
}
if reserved_only {
return None;
}
self.common.pop_front()
.map(|(peer_id, _)| (peer_id, SlotType::Common))
}
/// Marks the node as not reserved.
fn mark_not_reserved(&mut self, peer_id: &PeerId) {
if let Some(_) = self.reserved.remove(peer_id) {
self.common.insert(peer_id.clone(), ());
}
}
/// Removes the node from the list.
fn remove_peer(&mut self, peer_id: &PeerId) {
self.reserved.remove(peer_id);
self.common.remove(peer_id);
}
}
const DISCOVERED_NODES_LIMIT: u32 = 1000;
#[derive(Debug)]
struct PeersetData {
/// List of nodes that we know exist, but we are not connected to.
/// Elements in this list must never be in `out_slots` or `in_slots`.
discovered: Discovered,
discovered: Slots,
/// If true, we only accept reserved nodes.
reserved_only: bool,
/// Node slots for outgoing connections.
@@ -216,7 +158,7 @@ impl Peerset {
let (tx, rx) = mpsc::unbounded();
let data = PeersetData {
discovered: Default::default(),
discovered: Slots::new(DISCOVERED_NODES_LIMIT),
reserved_only: config.reserved_only,
out_slots: Slots::new(config.out_peers),
in_slots: Slots::new(config.in_peers),
@@ -270,10 +212,10 @@ impl Peerset {
self.message_queue.push_back(Message::Drop(removed));
self.message_queue.push_back(Message::Connect(added));
}
SlotState::AlreadyConnected(_) | SlotState::Upgraded(_) => {
SlotState::AlreadyExists(_) | SlotState::Upgraded(_) => {
return;
}
SlotState::MaxConnections(peer_id) => {
SlotState::MaxCapacity(peer_id) => {
self.data.discovered.add_peer(peer_id, SlotType::Reserved);
return;
}
@@ -285,7 +227,7 @@ impl Peerset {
self.data.out_slots.mark_not_reserved(&peer_id);
self.data.discovered.mark_not_reserved(&peer_id);
if self.data.reserved_only {
if self.data.in_slots.clear_slot(&peer_id) || self.data.out_slots.clear_slot(&peer_id) {
if self.data.in_slots.remove_peer(&peer_id) || self.data.out_slots.remove_peer(&peer_id) {
// insert peer back into discovered list
self.data.discovered.add_peer(peer_id.clone(), SlotType::Common);
self.message_queue.push_back(Message::Drop(peer_id));
@@ -325,15 +267,15 @@ impl Peerset {
if score < 0 {
// peer will be removed from `in_slots` or `out_slots` in `on_dropped` method
if self.data.in_slots.contains(&peer_id) || self.data.out_slots.contains(&peer_id) {
self.data.in_slots.clear_slot(&peer_id);
self.data.out_slots.clear_slot(&peer_id);
self.data.in_slots.remove_peer(&peer_id);
self.data.out_slots.remove_peer(&peer_id);
self.message_queue.push_back(Message::Drop(peer_id));
}
}
}
fn alloc_slots(&mut self) {
while let Some((peer_id, slot_type)) = self.data.discovered.pop_peer(self.data.reserved_only) {
while let Some((peer_id, slot_type)) = self.data.discovered.pop_most_important_peer(self.data.reserved_only) {
match self.data.out_slots.add_peer(peer_id, slot_type) {
SlotState::Added(peer_id) => {
self.message_queue.push_back(Message::Connect(peer_id));
@@ -344,10 +286,10 @@ impl Peerset {
self.message_queue.push_back(Message::Drop(removed));
self.message_queue.push_back(Message::Connect(added));
}
SlotState::Upgraded(_) | SlotState::AlreadyConnected(_) => {
SlotState::Upgraded(_) | SlotState::AlreadyExists(_) => {
// TODO: we should never reach this point
},
SlotState::MaxConnections(peer_id) => {
SlotState::MaxCapacity(peer_id) => {
self.data.discovered.add_peer(peer_id, slot_type);
break;
},
@@ -404,11 +346,11 @@ impl Peerset {
self.message_queue.push_back(Message::Drop(removed));
self.message_queue.push_back(Message::Accept(index));
},
SlotState::AlreadyConnected(_) | SlotState::Upgraded(_) => {
SlotState::AlreadyExists(_) | SlotState::Upgraded(_) => {
// we are already connected. in this case we do not answer
return;
},
SlotState::MaxConnections(peer_id) => {
SlotState::MaxCapacity(peer_id) => {
self.data.discovered.add_peer(peer_id, slot_type);
self.message_queue.push_back(Message::Reject(index));
return;
@@ -427,14 +369,14 @@ impl Peerset {
peer_id, self.data.in_slots, self.data.out_slots
);
// Automatically connect back if reserved.
if self.data.in_slots.is_connected_and_reserved(&peer_id) || self.data.out_slots.is_connected_and_reserved(&peer_id) {
if self.data.in_slots.is_reserved(&peer_id) || self.data.out_slots.is_reserved(&peer_id) {
self.message_queue.push_back(Message::Connect(peer_id));
return;
}
// Otherwise, free the slot.
self.data.in_slots.clear_slot(&peer_id);
self.data.out_slots.clear_slot(&peer_id);
self.data.in_slots.remove_peer(&peer_id);
self.data.out_slots.remove_peer(&peer_id);
// Note: in this dummy implementation we consider that peers never expire. As soon as we
// are disconnected from a peer, we try again.
+129 -54
View File
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::mem;
use std::{fmt, mem};
use libp2p::PeerId;
use linked_hash_map::LinkedHashMap;
@@ -32,28 +32,49 @@ pub enum SlotType {
pub enum SlotState {
/// Returned when `add_peer` successfully adds a peer to the slot.
Added(PeerId),
/// Returned we already have a connection to a given peer, but it is upgraded from
/// Returned when we already have given peer in our list, but it is upgraded from
/// `Common` to `Reserved`.
Upgraded(PeerId),
/// Returned when we should removed a common peer to make space for a reserved peer.
Swaped {
/// Peer we should disconnect from.
/// Peer was removed from the list.
removed: PeerId,
/// Peer we should connect to.
/// Peer was added to the list.
added: PeerId,
},
/// Error returned when we are already connected to this peer.
AlreadyConnected(PeerId),
/// Error returned when max number of connections has been already established.
MaxConnections(PeerId),
/// Error returned when we are already know about given peer.
AlreadyExists(PeerId),
/// Error returned when list is full and no more peers can be added.
MaxCapacity(PeerId),
}
/// Contains all information about group of slots.
#[derive(Debug)]
pub struct Slots {
/// Maximum number of slots. Total number of reserved and common slots must be always
/// smaller or equal to `max_slots`.
max_slots: usize,
/// Nodes and their type. We use `LinkedHashMap` to make this data structure more predictable
slots: LinkedHashMap<PeerId, SlotType>,
/// Reserved slots.
reserved: LinkedHashMap<PeerId,()>,
/// Common slots.
common: LinkedHashMap<PeerId, ()>,
}
impl fmt::Debug for Slots {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
struct ListFormatter<'a>(&'a LinkedHashMap<PeerId, ()>);
impl<'a> fmt::Debug for ListFormatter<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_list().entries(self.0.keys()).finish()
}
}
f.debug_struct("Slots")
.field("max_slots", &self.max_slots)
.field("reserved", &ListFormatter(&self.reserved))
.field("common", &ListFormatter(&self.common))
.finish()
}
}
impl Slots {
@@ -62,87 +83,141 @@ impl Slots {
let max_slots = max_slots as usize;
Slots {
max_slots,
slots: LinkedHashMap::with_capacity(max_slots),
reserved: LinkedHashMap::new(),
common: LinkedHashMap::new(),
}
}
/// Returns true if one of the slots contains given peer.
pub fn contains(&self, peer_id: &PeerId) -> bool {
self.slots.contains_key(peer_id)
self.common.contains_key(peer_id) || self.reserved.contains_key(peer_id)
}
/// Tries to find a slot for a given peer and returns `SlotState`.
///
/// - If a peer is already inserted into reserved list or inserted or
/// inserted into common list and readded with the same `SlotType`,
/// the function returns `SlotState::AlreadyExists`
/// - If a peer is already inserted common list returns `SlotState::Upgraded`
/// - If there is no slot for a reserved peer, we try to drop one common peer
/// and it a new reserved one in it's place, function returns `SlotState::Swaped`
/// - If there is no place for a peer, function returns `SlotState::MaxCapacity`
/// - If the peer was simply added, `SlotState::Added` is returned
pub fn add_peer(&mut self, peer_id: PeerId, slot_type: SlotType) -> SlotState {
if let Some(st) = self.slots.get_mut(&peer_id) {
if *st == SlotType::Common && slot_type == SlotType::Reserved {
*st = SlotType::Reserved;
if self.reserved.contains_key(&peer_id) {
return SlotState::AlreadyExists(peer_id);
}
if self.common.contains_key(&peer_id) {
if slot_type == SlotType::Reserved {
self.common.remove(&peer_id);
self.reserved.insert(peer_id.clone(), ());
return SlotState::Upgraded(peer_id);
} else {
return SlotState::AlreadyConnected(peer_id);
return SlotState::AlreadyExists(peer_id);
}
}
if self.slots.len() == self.max_slots {
if self.max_slots == (self.common.len() + self.reserved.len()) {
if let SlotType::Reserved = slot_type {
// if we are trying to insert a reserved peer, but we all of our slots are full,
// we need to remove one of the existing common connections
let to_remove = self.slots.iter()
.find(|(_, &slot_type)| slot_type == SlotType::Common)
.map(|(to_remove, _)| to_remove)
.cloned();
if let Some(to_remove) = to_remove {
self.slots.remove(&to_remove);
self.slots.insert(peer_id.clone(), slot_type);
if let Some((to_remove, _)) = self.common.pop_front() {
self.reserved.insert(peer_id.clone(), ());
return SlotState::Swaped {
removed: to_remove,
added: peer_id,
};
}
}
return SlotState::MaxConnections(peer_id);
return SlotState::MaxCapacity(peer_id);
}
self.slots.insert(peer_id.clone(), slot_type);
match slot_type {
SlotType::Common => self.common.insert(peer_id.clone(), ()),
SlotType::Reserved => self.reserved.insert(peer_id.clone(), ()),
};
SlotState::Added(peer_id)
}
pub fn clear_common_slots(&mut self) -> Vec<PeerId> {
let slots = mem::replace(&mut self.slots, LinkedHashMap::with_capacity(self.max_slots));
let mut common_peers = Vec::new();
for (peer_id, slot_type) in slots {
match slot_type {
SlotType::Common => {
common_peers.push(peer_id);
},
SlotType::Reserved => {
self.slots.insert(peer_id, slot_type);
},
}
/// Pops the oldest reserved peer. If none exists and `reserved_only = false` pops a common peer.
pub fn pop_most_important_peer(&mut self, reserved_only: bool) -> Option<(PeerId, SlotType)> {
if let Some((peer_id, _)) = self.reserved.pop_front() {
return Some((peer_id, SlotType::Reserved));
}
common_peers
if reserved_only {
return None;
}
self.common.pop_front()
.map(|(peer_id, _)| (peer_id, SlotType::Common))
}
/// Removes all common peers from the list and returns an iterator over them.
pub fn clear_common_slots(&mut self) -> impl Iterator<Item = PeerId> {
let slots = mem::replace(&mut self.common, LinkedHashMap::new());
slots.into_iter().map(|(peer_id, _)| peer_id)
}
/// Marks given peer as a reserved one.
pub fn mark_reserved(&mut self, peer_id: &PeerId) {
if let Some(slot_type) = self.slots.get_mut(peer_id) {
*slot_type = SlotType::Reserved;
if let Some(_) = self.common.remove(peer_id) {
self.reserved.insert(peer_id.clone(), ());
}
}
/// Marks given peer as not reserved one.
pub fn mark_not_reserved(&mut self, peer_id: &PeerId) {
if let Some(slot_type) = self.slots.get_mut(peer_id) {
*slot_type = SlotType::Common;
if let Some(_) = self.reserved.remove(peer_id) {
self.common.insert(peer_id.clone(), ());
}
}
pub fn clear_slot(&mut self, peer_id: &PeerId) -> bool {
self.slots.remove(peer_id).is_some()
/// Removes a peer from a list and returns true if it existed.
pub fn remove_peer(&mut self, peer_id: &PeerId) -> bool {
self.common.remove(peer_id).is_some() || self.reserved.remove(peer_id).is_some()
}
pub fn is_connected_and_reserved(&self, peer_id: &PeerId) -> bool {
self.slots.get(peer_id)
.map(|slot_type| *slot_type == SlotType::Reserved)
.unwrap_or_else(|| false)
/// Returns true if given peer is reserved.
pub fn is_reserved(&self, peer_id: &PeerId) -> bool {
self.reserved.contains_key(peer_id)
}
}
#[cfg(test)]
mod tests {
use libp2p::PeerId;
use super::{Slots, SlotType};
#[test]
fn test_slots_debug() {
let reserved_peer = PeerId::random();
let reserved_peer2 = PeerId::random();
let common_peer = PeerId::random();
let mut slots = Slots::new(10);
slots.add_peer(reserved_peer.clone(), SlotType::Reserved);
slots.add_peer(reserved_peer2.clone(), SlotType::Reserved);
slots.add_peer(common_peer.clone(), SlotType::Common);
let expected = format!("Slots {{
max_slots: 10,
reserved: [
PeerId(
{:?}
),
PeerId(
{:?}
)
],
common: [
PeerId(
{:?}
)
]
}}", reserved_peer.to_base58(), reserved_peer2.to_base58(), common_peer.to_base58());
let s = format!("{:#?}", slots);
assert_eq!(expected, s);
}
}