diff --git a/substrate/core/peerset/src/lib.rs b/substrate/core/peerset/src/lib.rs index 98287bf7a6..15633f09d0 100644 --- a/substrate/core/peerset/src/lib.rs +++ b/substrate/core/peerset/src/lib.rs @@ -22,77 +22,19 @@ mod slots; use std::collections::VecDeque; use futures::{prelude::*, sync::mpsc, try_ready}; use libp2p::PeerId; -use linked_hash_map::LinkedHashMap; use log::trace; use lru_cache::LruCache; use slots::{SlotType, SlotState, Slots}; pub use serde_json::Value; const PEERSET_SCORES_CACHE_SIZE: usize = 1000; - -/// FIFO-ordered list of nodes that we know exist, but we are not connected to. -#[derive(Debug, Default)] -struct Discovered { - /// Nodes we should connect to first. - reserved: LinkedHashMap, - /// All remaining nodes. - common: LinkedHashMap, -} - -impl Discovered { - /// Returns true if we already know given node. - fn contains(&self, peer_id: &PeerId) -> bool { - self.reserved.contains_key(peer_id) || self.common.contains_key(peer_id) - } - - /// Returns true if given node is reserved. - fn is_reserved(&self, peer_id: &PeerId) -> bool { - self.reserved.contains_key(peer_id) - } - - /// Adds new peer of a given type. - fn add_peer(&mut self, peer_id: PeerId, slot_type: SlotType) { - if !self.contains(&peer_id) { - match slot_type { - SlotType::Common => self.common.insert(peer_id, ()), - SlotType::Reserved => self.reserved.insert(peer_id, ()), - }; - } - } - - /// Pops the oldest peer from the list. - fn pop_peer(&mut self, reserved_only: bool) -> Option<(PeerId, SlotType)> { - if let Some((peer_id, _)) = self.reserved.pop_front() { - return Some((peer_id, SlotType::Reserved)); - } - - if reserved_only { - return None; - } - - self.common.pop_front() - .map(|(peer_id, _)| (peer_id, SlotType::Common)) - } - - /// Marks the node as not reserved. - fn mark_not_reserved(&mut self, peer_id: &PeerId) { - if let Some(_) = self.reserved.remove(peer_id) { - self.common.insert(peer_id.clone(), ()); - } - } - - /// Removes the node from the list. - fn remove_peer(&mut self, peer_id: &PeerId) { - self.reserved.remove(peer_id); - self.common.remove(peer_id); - } -} +const DISCOVERED_NODES_LIMIT: u32 = 1000; #[derive(Debug)] struct PeersetData { /// List of nodes that we know exist, but we are not connected to. /// Elements in this list must never be in `out_slots` or `in_slots`. - discovered: Discovered, + discovered: Slots, /// If true, we only accept reserved nodes. reserved_only: bool, /// Node slots for outgoing connections. @@ -216,7 +158,7 @@ impl Peerset { let (tx, rx) = mpsc::unbounded(); let data = PeersetData { - discovered: Default::default(), + discovered: Slots::new(DISCOVERED_NODES_LIMIT), reserved_only: config.reserved_only, out_slots: Slots::new(config.out_peers), in_slots: Slots::new(config.in_peers), @@ -270,10 +212,10 @@ impl Peerset { self.message_queue.push_back(Message::Drop(removed)); self.message_queue.push_back(Message::Connect(added)); } - SlotState::AlreadyConnected(_) | SlotState::Upgraded(_) => { + SlotState::AlreadyExists(_) | SlotState::Upgraded(_) => { return; } - SlotState::MaxConnections(peer_id) => { + SlotState::MaxCapacity(peer_id) => { self.data.discovered.add_peer(peer_id, SlotType::Reserved); return; } @@ -285,7 +227,7 @@ impl Peerset { self.data.out_slots.mark_not_reserved(&peer_id); self.data.discovered.mark_not_reserved(&peer_id); if self.data.reserved_only { - if self.data.in_slots.clear_slot(&peer_id) || self.data.out_slots.clear_slot(&peer_id) { + if self.data.in_slots.remove_peer(&peer_id) || self.data.out_slots.remove_peer(&peer_id) { // insert peer back into discovered list self.data.discovered.add_peer(peer_id.clone(), SlotType::Common); self.message_queue.push_back(Message::Drop(peer_id)); @@ -325,15 +267,15 @@ impl Peerset { if score < 0 { // peer will be removed from `in_slots` or `out_slots` in `on_dropped` method if self.data.in_slots.contains(&peer_id) || self.data.out_slots.contains(&peer_id) { - self.data.in_slots.clear_slot(&peer_id); - self.data.out_slots.clear_slot(&peer_id); + self.data.in_slots.remove_peer(&peer_id); + self.data.out_slots.remove_peer(&peer_id); self.message_queue.push_back(Message::Drop(peer_id)); } } } fn alloc_slots(&mut self) { - while let Some((peer_id, slot_type)) = self.data.discovered.pop_peer(self.data.reserved_only) { + while let Some((peer_id, slot_type)) = self.data.discovered.pop_most_important_peer(self.data.reserved_only) { match self.data.out_slots.add_peer(peer_id, slot_type) { SlotState::Added(peer_id) => { self.message_queue.push_back(Message::Connect(peer_id)); @@ -344,10 +286,10 @@ impl Peerset { self.message_queue.push_back(Message::Drop(removed)); self.message_queue.push_back(Message::Connect(added)); } - SlotState::Upgraded(_) | SlotState::AlreadyConnected(_) => { + SlotState::Upgraded(_) | SlotState::AlreadyExists(_) => { // TODO: we should never reach this point }, - SlotState::MaxConnections(peer_id) => { + SlotState::MaxCapacity(peer_id) => { self.data.discovered.add_peer(peer_id, slot_type); break; }, @@ -404,11 +346,11 @@ impl Peerset { self.message_queue.push_back(Message::Drop(removed)); self.message_queue.push_back(Message::Accept(index)); }, - SlotState::AlreadyConnected(_) | SlotState::Upgraded(_) => { + SlotState::AlreadyExists(_) | SlotState::Upgraded(_) => { // we are already connected. in this case we do not answer return; }, - SlotState::MaxConnections(peer_id) => { + SlotState::MaxCapacity(peer_id) => { self.data.discovered.add_peer(peer_id, slot_type); self.message_queue.push_back(Message::Reject(index)); return; @@ -427,14 +369,14 @@ impl Peerset { peer_id, self.data.in_slots, self.data.out_slots ); // Automatically connect back if reserved. - if self.data.in_slots.is_connected_and_reserved(&peer_id) || self.data.out_slots.is_connected_and_reserved(&peer_id) { + if self.data.in_slots.is_reserved(&peer_id) || self.data.out_slots.is_reserved(&peer_id) { self.message_queue.push_back(Message::Connect(peer_id)); return; } // Otherwise, free the slot. - self.data.in_slots.clear_slot(&peer_id); - self.data.out_slots.clear_slot(&peer_id); + self.data.in_slots.remove_peer(&peer_id); + self.data.out_slots.remove_peer(&peer_id); // Note: in this dummy implementation we consider that peers never expire. As soon as we // are disconnected from a peer, we try again. diff --git a/substrate/core/peerset/src/slots.rs b/substrate/core/peerset/src/slots.rs index 7fa655d7ff..2ea9e5199b 100644 --- a/substrate/core/peerset/src/slots.rs +++ b/substrate/core/peerset/src/slots.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use std::mem; +use std::{fmt, mem}; use libp2p::PeerId; use linked_hash_map::LinkedHashMap; @@ -32,28 +32,49 @@ pub enum SlotType { pub enum SlotState { /// Returned when `add_peer` successfully adds a peer to the slot. Added(PeerId), - /// Returned we already have a connection to a given peer, but it is upgraded from + /// Returned when we already have given peer in our list, but it is upgraded from /// `Common` to `Reserved`. Upgraded(PeerId), /// Returned when we should removed a common peer to make space for a reserved peer. Swaped { - /// Peer we should disconnect from. + /// Peer was removed from the list. removed: PeerId, - /// Peer we should connect to. + /// Peer was added to the list. added: PeerId, }, - /// Error returned when we are already connected to this peer. - AlreadyConnected(PeerId), - /// Error returned when max number of connections has been already established. - MaxConnections(PeerId), + /// Error returned when we are already know about given peer. + AlreadyExists(PeerId), + /// Error returned when list is full and no more peers can be added. + MaxCapacity(PeerId), } /// Contains all information about group of slots. -#[derive(Debug)] pub struct Slots { + /// Maximum number of slots. Total number of reserved and common slots must be always + /// smaller or equal to `max_slots`. max_slots: usize, - /// Nodes and their type. We use `LinkedHashMap` to make this data structure more predictable - slots: LinkedHashMap, + /// Reserved slots. + reserved: LinkedHashMap, + /// Common slots. + common: LinkedHashMap, +} + +impl fmt::Debug for Slots { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + struct ListFormatter<'a>(&'a LinkedHashMap); + + impl<'a> fmt::Debug for ListFormatter<'a> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_list().entries(self.0.keys()).finish() + } + } + + f.debug_struct("Slots") + .field("max_slots", &self.max_slots) + .field("reserved", &ListFormatter(&self.reserved)) + .field("common", &ListFormatter(&self.common)) + .finish() + } } impl Slots { @@ -62,87 +83,141 @@ impl Slots { let max_slots = max_slots as usize; Slots { max_slots, - slots: LinkedHashMap::with_capacity(max_slots), + reserved: LinkedHashMap::new(), + common: LinkedHashMap::new(), } } /// Returns true if one of the slots contains given peer. pub fn contains(&self, peer_id: &PeerId) -> bool { - self.slots.contains_key(peer_id) + self.common.contains_key(peer_id) || self.reserved.contains_key(peer_id) } /// Tries to find a slot for a given peer and returns `SlotState`. + /// + /// - If a peer is already inserted into reserved list or inserted or + /// inserted into common list and readded with the same `SlotType`, + /// the function returns `SlotState::AlreadyExists` + /// - If a peer is already inserted common list returns `SlotState::Upgraded` + /// - If there is no slot for a reserved peer, we try to drop one common peer + /// and it a new reserved one in it's place, function returns `SlotState::Swaped` + /// - If there is no place for a peer, function returns `SlotState::MaxCapacity` + /// - If the peer was simply added, `SlotState::Added` is returned pub fn add_peer(&mut self, peer_id: PeerId, slot_type: SlotType) -> SlotState { - if let Some(st) = self.slots.get_mut(&peer_id) { - if *st == SlotType::Common && slot_type == SlotType::Reserved { - *st = SlotType::Reserved; + if self.reserved.contains_key(&peer_id) { + return SlotState::AlreadyExists(peer_id); + } + + if self.common.contains_key(&peer_id) { + if slot_type == SlotType::Reserved { + self.common.remove(&peer_id); + self.reserved.insert(peer_id.clone(), ()); return SlotState::Upgraded(peer_id); } else { - return SlotState::AlreadyConnected(peer_id); + return SlotState::AlreadyExists(peer_id); } } - if self.slots.len() == self.max_slots { + if self.max_slots == (self.common.len() + self.reserved.len()) { if let SlotType::Reserved = slot_type { - // if we are trying to insert a reserved peer, but we all of our slots are full, - // we need to remove one of the existing common connections - let to_remove = self.slots.iter() - .find(|(_, &slot_type)| slot_type == SlotType::Common) - .map(|(to_remove, _)| to_remove) - .cloned(); - - if let Some(to_remove) = to_remove { - self.slots.remove(&to_remove); - self.slots.insert(peer_id.clone(), slot_type); - + if let Some((to_remove, _)) = self.common.pop_front() { + self.reserved.insert(peer_id.clone(), ()); return SlotState::Swaped { removed: to_remove, added: peer_id, }; } } - return SlotState::MaxConnections(peer_id); + return SlotState::MaxCapacity(peer_id); } - self.slots.insert(peer_id.clone(), slot_type); + match slot_type { + SlotType::Common => self.common.insert(peer_id.clone(), ()), + SlotType::Reserved => self.reserved.insert(peer_id.clone(), ()), + }; + SlotState::Added(peer_id) } - pub fn clear_common_slots(&mut self) -> Vec { - let slots = mem::replace(&mut self.slots, LinkedHashMap::with_capacity(self.max_slots)); - let mut common_peers = Vec::new(); - for (peer_id, slot_type) in slots { - match slot_type { - SlotType::Common => { - common_peers.push(peer_id); - }, - SlotType::Reserved => { - self.slots.insert(peer_id, slot_type); - }, - } + /// Pops the oldest reserved peer. If none exists and `reserved_only = false` pops a common peer. + pub fn pop_most_important_peer(&mut self, reserved_only: bool) -> Option<(PeerId, SlotType)> { + if let Some((peer_id, _)) = self.reserved.pop_front() { + return Some((peer_id, SlotType::Reserved)); } - common_peers + + if reserved_only { + return None; + } + + self.common.pop_front() + .map(|(peer_id, _)| (peer_id, SlotType::Common)) } + /// Removes all common peers from the list and returns an iterator over them. + pub fn clear_common_slots(&mut self) -> impl Iterator { + let slots = mem::replace(&mut self.common, LinkedHashMap::new()); + slots.into_iter().map(|(peer_id, _)| peer_id) + } + + /// Marks given peer as a reserved one. pub fn mark_reserved(&mut self, peer_id: &PeerId) { - if let Some(slot_type) = self.slots.get_mut(peer_id) { - *slot_type = SlotType::Reserved; + if let Some(_) = self.common.remove(peer_id) { + self.reserved.insert(peer_id.clone(), ()); } } + /// Marks given peer as not reserved one. pub fn mark_not_reserved(&mut self, peer_id: &PeerId) { - if let Some(slot_type) = self.slots.get_mut(peer_id) { - *slot_type = SlotType::Common; + if let Some(_) = self.reserved.remove(peer_id) { + self.common.insert(peer_id.clone(), ()); } } - pub fn clear_slot(&mut self, peer_id: &PeerId) -> bool { - self.slots.remove(peer_id).is_some() + /// Removes a peer from a list and returns true if it existed. + pub fn remove_peer(&mut self, peer_id: &PeerId) -> bool { + self.common.remove(peer_id).is_some() || self.reserved.remove(peer_id).is_some() } - pub fn is_connected_and_reserved(&self, peer_id: &PeerId) -> bool { - self.slots.get(peer_id) - .map(|slot_type| *slot_type == SlotType::Reserved) - .unwrap_or_else(|| false) + /// Returns true if given peer is reserved. + pub fn is_reserved(&self, peer_id: &PeerId) -> bool { + self.reserved.contains_key(peer_id) + } +} + +#[cfg(test)] +mod tests { + use libp2p::PeerId; + use super::{Slots, SlotType}; + + #[test] + fn test_slots_debug() { + let reserved_peer = PeerId::random(); + let reserved_peer2 = PeerId::random(); + let common_peer = PeerId::random(); + let mut slots = Slots::new(10); + + slots.add_peer(reserved_peer.clone(), SlotType::Reserved); + slots.add_peer(reserved_peer2.clone(), SlotType::Reserved); + slots.add_peer(common_peer.clone(), SlotType::Common); + + let expected = format!("Slots {{ + max_slots: 10, + reserved: [ + PeerId( + {:?} + ), + PeerId( + {:?} + ) + ], + common: [ + PeerId( + {:?} + ) + ] +}}", reserved_peer.to_base58(), reserved_peer2.to_base58(), common_peer.to_base58()); + + let s = format!("{:#?}", slots); + assert_eq!(expected, s); } }