mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-07-03 22:47:25 +00:00
87776e63bb
* Send high-level consensus telemetry by default * Notify telemetry on finalized * Send used authority set to telemetry * Do not send commit message telemetry by default * Fix typo * Allow for notifications on telemetry connect ...and send the current authority set on each connect. * Send authority set to telemetry on change * Revert "Send used authority set to telemetry" This reverts commit 1deceead52bb7443a02879ac8138afad9a6ca5ff. * Merge branch 'master' into 'cmichi-send-high-level-consensus-telemetry-by-default' Squashed commit of the following: commit19d77cbc23Author: Xiliang Chen <xlchen1291@gmail.com> Date: Wed Apr 10 20:26:29 2019 +1200 update authers for rest of the node-template cargo.toml files (#2242) commit0afc357a97Author: Bastian Köcher <bkchr@users.noreply.github.com> Date: Tue Apr 9 10:31:18 2019 +0200 Throw a compile error for `on_finalise` and `on_initialise` (#2236) commite57e54ab9cAuthor: Pierre Krieger <pierre.krieger1708@gmail.com> Date: Tue Apr 9 05:30:43 2019 -0300 Add warning when using default protocol ID (#2234) * Add warning when using default protocol ID * Update core/service/src/lib.rs commitcb766e5f5dAuthor: Xiliang Chen <xlchen1291@gmail.com> Date: Tue Apr 9 17:22:20 2019 +1200 update name and authors to placeholder text for node-template (#2222) * update name and authors to placeholder text * revert package name change commita1e15ae55aAuthor: André Silva <andre.beat@gmail.com> Date: Mon Apr 8 12:50:34 2019 +0100 grandpa: Voter persistence and upgrade to finality-grandpa v0.7 (#2139) * core: grandpa: migrate to grandpa 0.7 * core: grandpa: store current round votes and load them on startup * core: grandpa: resend old persisted votes for the current round * core: grandpa: store base and votes for last completed round * core: grandpa: fix latest grandpa 0.7 changes * core: grandpa: update to grandpa 0.7.1 * core: grandpa: persist votes for last two completed rounds * core: grandpa: simplify VoterSetState usage * core: grandpa: use Environment::update_voter_set_state * core: grandpa: fix aux_schema test * core: grandpa: add docs * core: grandpa: add note about environment assumption * core: grandpa: don't update voter set state on ignored votes * core: grandpa: add test for v1 -> v2 aux_schema migration * core: grandpa: add test for voter vote persistence * core: grandpa: use grandpa 0.7.1 from crates.io * core: grandpa: use try_init in test * core: grandpa: add comment about block_import in test * core: grandpa: avoid cloning HasVoted * core: grandpa: add missing docs * core: grandpa: cleanup up can_propose/prevote/precommit commited3ae4ac39Author: Gregory Terzian <2792687+gterzian@users.noreply.github.com> Date: Mon Apr 8 13:17:00 2019 +0200 remove clone bound on specialization in testnet factory (#2157) commit03f3fb1442Author: Andrew Jones <ascjones@gmail.com> Date: Sat Apr 6 12:23:56 2019 +0100 Contract import/export validation (#2203) * Reject validation of contract with unknown exports * Validate imports eagerly * Increment spec version commitdecddaab0fAuthor: Pierre Krieger <pierre.krieger1708@gmail.com> Date: Fri Apr 5 14:07:09 2019 -0300 Fix state inconsistency between handler and behaviour (#2220) * Fix state inconsistency between handler and behaviour * Fix the error! being in the wrong place commitdce0b4ea49Author: Bastian Köcher <bkchr@users.noreply.github.com> Date: Fri Apr 5 18:50:38 2019 +0200 Use `storage_root` of newly calculated header (#2216) Instead of calculating the `storage_root` a second time, we just can take the `storage_root` from the new header. commitb01136c90dAuthor: Marek Kotewicz <marek.kotewicz@gmail.com> Date: Fri Apr 5 14:44:46 2019 +0200 Peerset::discovered accepts many peer ids (#2213) * Peerset::discovered accepts many peer ids * Improve tracing in peerset commit1142bcde97Author: Marek Kotewicz <marek.kotewicz@gmail.com> Date: Thu Apr 4 19:40:40 2019 +0200 simplification of peerset api (#2123) * Introduction of PeersetHandle * integrate PeersetHandle with the rest of the codebase * fix compilation errors * more tests for peerset, fixed overwriting bug in add_reserved_peer * Slots data structure and bugfixes for peerset * bend to pressure * updated lru-cache to 0.1.2 and updated linked-hash-map to 0.5.2 * peerset discovered list is now a LinkedHashMap * fix review suggestions * split back Peerset and PeersetHandle * test for Peerset::discovered * applied review suggestions * fixes to peerset::incoming * peerset disconnects are all instantaneous * instantaneous drop in peerset finished * Peerset::set_reserved_only can also reconnect nodes * Peerset scores cache uses lru-cache * remove redundant function call and comment from Peerset::on_set_reserved_only * add_peer returns SlotState enum * apply review suggestions * is_reserved -> is_connected_and_reserved commit301844dd56Author: Arkadiy Paronyan <arkady.paronyan@gmail.com> Date: Thu Apr 4 18:01:28 2019 +0200 Disconnect on protocol timeout (#2212) commitcb3c912b1aAuthor: André Silva <andre.beat@gmail.com> Date: Thu Apr 4 15:56:49 2019 +0100 core: grandpa: verify commit target in justification (#2201) commit6920b169cdAuthor: Bastian Köcher <bkchr@users.noreply.github.com> Date: Thu Apr 4 16:56:16 2019 +0200 Introduce `original_storage` and `original_storage_hash` (#2211) Both functions will ignore any overlayed changes and access the backend directly. commitcb7a8161f5Author: Xiliang Chen <xlchen1291@gmail.com> Date: Fri Apr 5 03:55:55 2019 +1300 code cleanup (#2206) commitacaf1fe625Author: Arkadiy Paronyan <arkady.paronyan@gmail.com> Date: Wed Apr 3 15:52:46 2019 +0200 Emberic elm testnet (#2197) * Make telemetry onconnect hoook optional * Merge branch 'master' into 'cmichi-send-high-level-consensus-telemetry-by-default' * Introduce GrandpaParams struct to condense parameters * Remove debug statement * Fix tests * Rename parameter * Fix tests * Rename struct * Do not send verbosity level * Combine imports * Implement comments * Run cargo build --all * Remove noisy telemetry * Add docs for public items * Unbox and support Clone trait * Fix merge * Fix merge * Update core/finality-grandpa/src/lib.rs Co-Authored-By: cmichi <mich@elmueller.net>
702 lines
22 KiB
Rust
702 lines
22 KiB
Rust
// Copyright 2018-2019 Parity Technologies (UK) Ltd.
|
|
// This file is part of Substrate.
|
|
|
|
// Substrate is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
|
|
// Substrate is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU General Public License for more details.
|
|
|
|
// You should have received a copy of the GNU General Public License
|
|
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
//! Peer Set Manager (PSM). Contains the strategy for choosing which nodes the network should be
|
|
//! connected to.
|
|
|
|
mod slots;
|
|
|
|
use std::collections::VecDeque;
|
|
use futures::{prelude::*, sync::mpsc, try_ready};
|
|
use libp2p::PeerId;
|
|
use linked_hash_map::LinkedHashMap;
|
|
use log::trace;
|
|
use lru_cache::LruCache;
|
|
use slots::{SlotType, SlotState, Slots};
|
|
use serde_json::json;
|
|
|
|
const PEERSET_SCORES_CACHE_SIZE: usize = 1000;
|
|
const DISCOVERED_NODES_LIMIT: u32 = 1000;
|
|
|
|
#[derive(Debug)]
|
|
struct PeersetData {
|
|
/// List of nodes that we know exist, but we are not connected to.
|
|
/// Elements in this list must never be in `out_slots` or `in_slots`.
|
|
discovered: Slots,
|
|
/// If true, we only accept reserved nodes.
|
|
reserved_only: bool,
|
|
/// Node slots for outgoing connections.
|
|
out_slots: Slots,
|
|
/// Node slots for incoming connections.
|
|
in_slots: Slots,
|
|
/// List of node scores.
|
|
scores: LruCache<PeerId, i32>,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
enum Action {
|
|
AddReservedPeer(PeerId),
|
|
RemoveReservedPeer(PeerId),
|
|
SetReservedOnly(bool),
|
|
ReportPeer(PeerId, i32),
|
|
}
|
|
|
|
/// Shared handle to the peer set manager (PSM). Distributed around the code.
|
|
#[derive(Debug, Clone)]
|
|
pub struct PeersetHandle {
|
|
tx: mpsc::UnboundedSender<Action>,
|
|
}
|
|
|
|
impl PeersetHandle {
|
|
/// Adds a new reserved peer. The peerset will make an effort to always remain connected to
|
|
/// this peer.
|
|
///
|
|
/// Has no effect if the node was already a reserved peer.
|
|
///
|
|
/// > **Note**: Keep in mind that the networking has to know an address for this node,
|
|
/// > otherwise it will not be able to connect to it.
|
|
pub fn add_reserved_peer(&self, peer_id: PeerId) {
|
|
let _ = self.tx.unbounded_send(Action::AddReservedPeer(peer_id));
|
|
}
|
|
|
|
/// Remove a previously-added reserved peer.
|
|
///
|
|
/// Has no effect if the node was not a reserved peer.
|
|
pub fn remove_reserved_peer(&self, peer_id: PeerId) {
|
|
let _ = self.tx.unbounded_send(Action::RemoveReservedPeer(peer_id));
|
|
}
|
|
|
|
/// Sets whether or not the peerset only has connections .
|
|
pub fn set_reserved_only(&self, reserved: bool) {
|
|
let _ = self.tx.unbounded_send(Action::SetReservedOnly(reserved));
|
|
}
|
|
|
|
/// Reports an adjustment to the reputation of the given peer.
|
|
pub fn report_peer(&self, peer_id: PeerId, score_diff: i32) {
|
|
let _ = self.tx.unbounded_send(Action::ReportPeer(peer_id, score_diff));
|
|
}
|
|
}
|
|
|
|
/// Message that can be sent by the peer set manager (PSM).
|
|
#[derive(Debug, PartialEq)]
|
|
pub enum Message {
|
|
/// Request to open a connection to the given peer. From the point of view of the PSM, we are
|
|
/// immediately connected.
|
|
Connect(PeerId),
|
|
|
|
/// Drop the connection to the given peer, or cancel the connection attempt after a `Connect`.
|
|
Drop(PeerId),
|
|
|
|
/// Equivalent to `Connect` for the peer corresponding to this incoming index.
|
|
Accept(IncomingIndex),
|
|
|
|
/// Equivalent to `Drop` for the peer corresponding to this incoming index.
|
|
Reject(IncomingIndex),
|
|
}
|
|
|
|
/// Opaque identifier for an incoming connection. Allocated by the network.
|
|
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
|
pub struct IncomingIndex(pub u64);
|
|
|
|
impl From<u64> for IncomingIndex {
|
|
fn from(val: u64) -> IncomingIndex {
|
|
IncomingIndex(val)
|
|
}
|
|
}
|
|
|
|
/// Configuration to pass when creating the peer set manager.
|
|
#[derive(Debug)]
|
|
pub struct PeersetConfig {
|
|
/// Maximum number of ingoing links to peers.
|
|
pub in_peers: u32,
|
|
|
|
/// Maximum number of outgoing links to peers.
|
|
pub out_peers: u32,
|
|
|
|
/// List of bootstrap nodes to initialize the peer with.
|
|
///
|
|
/// > **Note**: Keep in mind that the networking has to know an address for these nodes,
|
|
/// > otherwise it will not be able to connect to them.
|
|
pub bootnodes: Vec<PeerId>,
|
|
|
|
/// If true, we only accept reserved nodes.
|
|
pub reserved_only: bool,
|
|
|
|
/// List of nodes that we should always be connected to.
|
|
///
|
|
/// > **Note**: Keep in mind that the networking has to know an address for these nodes,
|
|
/// > otherwise it will not be able to connect to them.
|
|
pub reserved_nodes: Vec<PeerId>,
|
|
}
|
|
|
|
/// Side of the peer set manager owned by the network. In other words, the "receiving" side.
|
|
///
|
|
/// Implements the `Stream` trait and can be polled for messages. The `Stream` never ends and never
|
|
/// errors.
|
|
#[derive(Debug)]
|
|
pub struct Peerset {
|
|
data: PeersetData,
|
|
rx: mpsc::UnboundedReceiver<Action>,
|
|
message_queue: VecDeque<Message>,
|
|
}
|
|
|
|
impl Peerset {
|
|
/// Builds a new peerset from the given configuration.
|
|
pub fn from_config(config: PeersetConfig) -> (Peerset, PeersetHandle) {
|
|
let (tx, rx) = mpsc::unbounded();
|
|
|
|
let data = PeersetData {
|
|
discovered: Slots::new(DISCOVERED_NODES_LIMIT),
|
|
reserved_only: config.reserved_only,
|
|
out_slots: Slots::new(config.out_peers),
|
|
in_slots: Slots::new(config.in_peers),
|
|
scores: LruCache::new(PEERSET_SCORES_CACHE_SIZE),
|
|
};
|
|
|
|
let handle = PeersetHandle {
|
|
tx,
|
|
};
|
|
|
|
let mut peerset = Peerset {
|
|
data,
|
|
rx,
|
|
message_queue: VecDeque::new(),
|
|
};
|
|
|
|
for peer_id in config.reserved_nodes {
|
|
peerset.data.discovered.add_peer(peer_id, SlotType::Reserved);
|
|
}
|
|
|
|
for peer_id in config.bootnodes {
|
|
peerset.data.discovered.add_peer(peer_id, SlotType::Common);
|
|
}
|
|
|
|
peerset.alloc_slots();
|
|
(peerset, handle)
|
|
}
|
|
|
|
fn on_add_reserved_peer(&mut self, peer_id: PeerId) {
|
|
// Nothing more to do if we're already connected.
|
|
if self.data.in_slots.contains(&peer_id) {
|
|
self.data.in_slots.mark_reserved(&peer_id);
|
|
return;
|
|
}
|
|
|
|
match self.data.out_slots.add_peer(peer_id, SlotType::Reserved) {
|
|
SlotState::Added(peer_id) => {
|
|
// reserved node may have been previously stored as normal node in discovered list
|
|
self.data.discovered.remove_peer(&peer_id);
|
|
|
|
// notify that connection has been made
|
|
trace!(target: "peerset", "Connecting to new reserved peer {}", peer_id);
|
|
self.message_queue.push_back(Message::Connect(peer_id));
|
|
return;
|
|
},
|
|
SlotState::Swaped { removed, added } => {
|
|
// reserved node may have been previously stored as normal node in discovered list
|
|
self.data.discovered.remove_peer(&added);
|
|
// let's add the peer we disconnected from to the discovered list again
|
|
self.data.discovered.add_peer(removed.clone(), SlotType::Common);
|
|
// swap connections
|
|
trace!(target: "peerset", "Connecting to new reserved peer {}, dropping {}", added, removed);
|
|
self.message_queue.push_back(Message::Drop(removed));
|
|
self.message_queue.push_back(Message::Connect(added));
|
|
}
|
|
SlotState::AlreadyExists(_) | SlotState::Upgraded(_) => {
|
|
return;
|
|
}
|
|
SlotState::MaxCapacity(peer_id) => {
|
|
self.data.discovered.add_peer(peer_id, SlotType::Reserved);
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
fn on_remove_reserved_peer(&mut self, peer_id: PeerId) {
|
|
self.data.in_slots.mark_not_reserved(&peer_id);
|
|
self.data.out_slots.mark_not_reserved(&peer_id);
|
|
self.data.discovered.mark_not_reserved(&peer_id);
|
|
if self.data.reserved_only {
|
|
if self.data.in_slots.remove_peer(&peer_id) || self.data.out_slots.remove_peer(&peer_id) {
|
|
// insert peer back into discovered list
|
|
self.data.discovered.add_peer(peer_id.clone(), SlotType::Common);
|
|
self.message_queue.push_back(Message::Drop(peer_id));
|
|
// call alloc_slots again, cause we may have some reserved peers in discovered list
|
|
// waiting for the slot that was just cleared
|
|
self.alloc_slots();
|
|
}
|
|
}
|
|
}
|
|
|
|
fn on_set_reserved_only(&mut self, reserved_only: bool) {
|
|
// Disconnect non-reserved nodes.
|
|
self.data.reserved_only = reserved_only;
|
|
if self.data.reserved_only {
|
|
for peer_id in self.data.in_slots.clear_common_slots().into_iter().chain(self.data.out_slots.clear_common_slots().into_iter()) {
|
|
// insert peer back into discovered list
|
|
self.data.discovered.add_peer(peer_id.clone(), SlotType::Common);
|
|
self.message_queue.push_back(Message::Drop(peer_id));
|
|
}
|
|
} else {
|
|
self.alloc_slots();
|
|
}
|
|
}
|
|
|
|
fn on_report_peer(&mut self, peer_id: PeerId, score_diff: i32) {
|
|
let score = match self.data.scores.get_mut(&peer_id) {
|
|
Some(score) => {
|
|
*score = score.saturating_add(score_diff);
|
|
*score
|
|
},
|
|
None => {
|
|
self.data.scores.insert(peer_id.clone(), score_diff);
|
|
score_diff
|
|
}
|
|
};
|
|
|
|
if score < 0 {
|
|
// peer will be removed from `in_slots` or `out_slots` in `on_dropped` method
|
|
if self.data.in_slots.contains(&peer_id) || self.data.out_slots.contains(&peer_id) {
|
|
self.data.in_slots.remove_peer(&peer_id);
|
|
self.data.out_slots.remove_peer(&peer_id);
|
|
self.message_queue.push_back(Message::Drop(peer_id));
|
|
}
|
|
}
|
|
}
|
|
|
|
fn alloc_slots(&mut self) {
|
|
while let Some((peer_id, slot_type)) = self.data.discovered.pop_most_important_peer(self.data.reserved_only) {
|
|
match self.data.out_slots.add_peer(peer_id, slot_type) {
|
|
SlotState::Added(peer_id) => {
|
|
trace!(target: "peerset", "Connecting to new peer {}", peer_id);
|
|
self.message_queue.push_back(Message::Connect(peer_id));
|
|
},
|
|
SlotState::Swaped { removed, added } => {
|
|
// insert peer back into discovered list
|
|
trace!(target: "peerset", "Connecting to new peer {}, dropping {}", added, removed);
|
|
self.data.discovered.add_peer(removed.clone(), SlotType::Common);
|
|
self.message_queue.push_back(Message::Drop(removed));
|
|
self.message_queue.push_back(Message::Connect(added));
|
|
}
|
|
SlotState::Upgraded(_) | SlotState::AlreadyExists(_) => {
|
|
// TODO: we should never reach this point
|
|
},
|
|
SlotState::MaxCapacity(peer_id) => {
|
|
self.data.discovered.add_peer(peer_id, slot_type);
|
|
break;
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Indicate that we received an incoming connection. Must be answered either with
|
|
/// a corresponding `Accept` or `Reject`, except if we were already connected to this peer.
|
|
///
|
|
/// Note that this mechanism is orthogonal to `Connect`/`Drop`. Accepting an incoming
|
|
/// connection implicitely means `Accept`, but incoming connections aren't cancelled by
|
|
/// `dropped`.
|
|
///
|
|
/// Because of concurrency issues, it is acceptable to call `incoming` with a `PeerId` the
|
|
/// peerset is already connected to, in which case it must not answer.
|
|
pub fn incoming(&mut self, peer_id: PeerId, index: IncomingIndex) {
|
|
trace!(
|
|
target: "peerset",
|
|
"Incoming {:?}\nin_slots={:?}\nout_slots={:?}",
|
|
peer_id, self.data.in_slots, self.data.out_slots
|
|
);
|
|
// if `reserved_only` is set, but this peer is not a part of our discovered list,
|
|
// a) it is not reserved, so we reject the connection
|
|
// b) we are already connected to it, so we reject the connection
|
|
if self.data.reserved_only && !self.data.discovered.is_reserved(&peer_id) {
|
|
self.message_queue.push_back(Message::Reject(index));
|
|
return;
|
|
}
|
|
|
|
// check if we are already connected to this peer
|
|
if self.data.out_slots.contains(&peer_id) {
|
|
// we are already connected. in this case we do not answer
|
|
return;
|
|
}
|
|
|
|
let slot_type = if self.data.reserved_only {
|
|
SlotType::Reserved
|
|
} else {
|
|
SlotType::Common
|
|
};
|
|
|
|
match self.data.in_slots.add_peer(peer_id, slot_type) {
|
|
SlotState::Added(peer_id) => {
|
|
// reserved node may have been previously stored as normal node in discovered list
|
|
self.data.discovered.remove_peer(&peer_id);
|
|
self.message_queue.push_back(Message::Accept(index));
|
|
return;
|
|
},
|
|
SlotState::Swaped { removed, added } => {
|
|
// reserved node may have been previously stored as normal node in discovered list
|
|
self.data.discovered.remove_peer(&added);
|
|
// swap connections.
|
|
self.message_queue.push_back(Message::Drop(removed));
|
|
self.message_queue.push_back(Message::Accept(index));
|
|
},
|
|
SlotState::AlreadyExists(_) | SlotState::Upgraded(_) => {
|
|
// we are already connected. in this case we do not answer
|
|
return;
|
|
},
|
|
SlotState::MaxCapacity(peer_id) => {
|
|
self.data.discovered.add_peer(peer_id, slot_type);
|
|
self.message_queue.push_back(Message::Reject(index));
|
|
return;
|
|
},
|
|
}
|
|
}
|
|
|
|
/// Indicate that we dropped an active connection with a peer, or that we failed to connect.
|
|
///
|
|
/// Must only be called after the PSM has either generated a `Connect` message with this
|
|
/// `PeerId`, or accepted an incoming connection with this `PeerId`.
|
|
pub fn dropped(&mut self, peer_id: PeerId) {
|
|
trace!(
|
|
target: "peerset",
|
|
"Dropping {:?}\nin_slots={:?}\nout_slots={:?}",
|
|
peer_id, self.data.in_slots, self.data.out_slots
|
|
);
|
|
// Automatically connect back if reserved.
|
|
if self.data.in_slots.is_reserved(&peer_id) || self.data.out_slots.is_reserved(&peer_id) {
|
|
self.message_queue.push_back(Message::Connect(peer_id));
|
|
return;
|
|
}
|
|
|
|
// Otherwise, free the slot.
|
|
self.data.in_slots.remove_peer(&peer_id);
|
|
self.data.out_slots.remove_peer(&peer_id);
|
|
|
|
// Note: in this dummy implementation we consider that peers never expire. As soon as we
|
|
// are disconnected from a peer, we try again.
|
|
self.data.discovered.add_peer(peer_id, SlotType::Common);
|
|
self.alloc_slots();
|
|
}
|
|
|
|
/// Adds discovered peer ids to the PSM.
|
|
///
|
|
/// > **Note**: There is no equivalent "expired" message, meaning that it is the responsibility
|
|
/// > of the PSM to remove `PeerId`s that fail to dial too often.
|
|
pub fn discovered<I: IntoIterator<Item = PeerId>>(&mut self, peer_ids: I) {
|
|
for peer_id in peer_ids {
|
|
if !self.data.in_slots.contains(&peer_id) && !self.data.out_slots.contains(&peer_id) && !self.data.discovered.contains(&peer_id) {
|
|
trace!(target: "peerset", "Discovered new peer: {:?}", peer_id);
|
|
self.data.discovered.add_peer(peer_id, SlotType::Common);
|
|
} else {
|
|
trace!(target: "peerset", "Discovered known peer: {:?}", peer_id);
|
|
}
|
|
}
|
|
|
|
self.alloc_slots();
|
|
}
|
|
|
|
/// Produces a JSON object containing the state of the peerset manager, for debugging purposes.
|
|
pub fn debug_info(&self) -> serde_json::Value {
|
|
json!({
|
|
"data": {
|
|
// add scores
|
|
"discovered": self.data.discovered.debug_info(),
|
|
"reserved_only": self.data.reserved_only,
|
|
"out_slots": self.data.out_slots.debug_info(),
|
|
"in_slots": self.data.in_slots.debug_info()
|
|
},
|
|
"message_queue": self.message_queue.len(),
|
|
})
|
|
}
|
|
}
|
|
|
|
impl Stream for Peerset {
|
|
type Item = Message;
|
|
type Error = ();
|
|
|
|
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
|
loop {
|
|
if let Some(message) = self.message_queue.pop_front() {
|
|
return Ok(Async::Ready(Some(message)));
|
|
}
|
|
match try_ready!(self.rx.poll()) {
|
|
None => return Ok(Async::Ready(None)),
|
|
Some(action) => match action {
|
|
Action::AddReservedPeer(peer_id) => self.on_add_reserved_peer(peer_id),
|
|
Action::RemoveReservedPeer(peer_id) => self.on_remove_reserved_peer(peer_id),
|
|
Action::SetReservedOnly(reserved) => self.on_set_reserved_only(reserved),
|
|
Action::ReportPeer(peer_id, score_diff) => self.on_report_peer(peer_id, score_diff),
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use libp2p::PeerId;
|
|
use futures::prelude::*;
|
|
use super::{PeersetConfig, Peerset, Message, IncomingIndex};
|
|
|
|
fn assert_messages(mut peerset: Peerset, messages: Vec<Message>) -> Peerset {
|
|
for expected_message in messages {
|
|
let (message, p) = next_message(peerset).expect("expected message");
|
|
assert_eq!(message, expected_message);
|
|
peerset = p;
|
|
}
|
|
assert!(peerset.message_queue.is_empty());
|
|
peerset
|
|
}
|
|
|
|
fn next_message(peerset: Peerset) -> Result<(Message, Peerset), ()> {
|
|
let (next, peerset) = peerset.into_future()
|
|
.wait()
|
|
.map_err(|_| ())?;
|
|
let message = next.ok_or_else(|| ())?;
|
|
Ok((message, peerset))
|
|
}
|
|
|
|
#[test]
|
|
fn test_peerset_from_config_with_bootnodes() {
|
|
let bootnode = PeerId::random();
|
|
let bootnode2 = PeerId::random();
|
|
let config = PeersetConfig {
|
|
in_peers: 0,
|
|
out_peers: 2,
|
|
bootnodes: vec![bootnode.clone(), bootnode2.clone()],
|
|
reserved_only: false,
|
|
reserved_nodes: Vec::new(),
|
|
};
|
|
|
|
let (peerset, _handle) = Peerset::from_config(config);
|
|
|
|
assert_messages(peerset, vec![
|
|
Message::Connect(bootnode),
|
|
Message::Connect(bootnode2),
|
|
]);
|
|
}
|
|
|
|
#[test]
|
|
fn test_peerset_from_config_with_reserved_nodes() {
|
|
let bootnode = PeerId::random();
|
|
let bootnode2 = PeerId::random();
|
|
let reserved_peer = PeerId::random();
|
|
let reserved_peer2 = PeerId::random();
|
|
let config = PeersetConfig {
|
|
in_peers: 0,
|
|
out_peers: 3,
|
|
bootnodes: vec![bootnode.clone(), bootnode2.clone()],
|
|
reserved_only: false,
|
|
reserved_nodes: vec![reserved_peer.clone(), reserved_peer2.clone()],
|
|
};
|
|
|
|
let (peerset, _handle) = Peerset::from_config(config);
|
|
|
|
assert_messages(peerset, vec![
|
|
Message::Connect(reserved_peer),
|
|
Message::Connect(reserved_peer2),
|
|
Message::Connect(bootnode)
|
|
]);
|
|
}
|
|
|
|
#[test]
|
|
fn test_peerset_add_reserved_peer() {
|
|
let bootnode = PeerId::random();
|
|
let reserved_peer = PeerId::random();
|
|
let reserved_peer2 = PeerId::random();
|
|
let config = PeersetConfig {
|
|
in_peers: 0,
|
|
out_peers: 2,
|
|
bootnodes: vec![bootnode],
|
|
reserved_only: true,
|
|
reserved_nodes: Vec::new(),
|
|
};
|
|
|
|
let (peerset, handle) = Peerset::from_config(config);
|
|
handle.add_reserved_peer(reserved_peer.clone());
|
|
handle.add_reserved_peer(reserved_peer2.clone());
|
|
|
|
assert_messages(peerset, vec![
|
|
Message::Connect(reserved_peer),
|
|
Message::Connect(reserved_peer2)
|
|
]);
|
|
}
|
|
|
|
#[test]
|
|
fn test_peerset_remove_reserved_peer() {
|
|
let reserved_peer = PeerId::random();
|
|
let reserved_peer2 = PeerId::random();
|
|
let config = PeersetConfig {
|
|
in_peers: 0,
|
|
out_peers: 2,
|
|
bootnodes: vec![],
|
|
reserved_only: false,
|
|
reserved_nodes: vec![reserved_peer.clone(), reserved_peer2.clone()],
|
|
};
|
|
|
|
let (peerset, handle) = Peerset::from_config(config);
|
|
handle.remove_reserved_peer(reserved_peer.clone());
|
|
|
|
let peerset = assert_messages(peerset, vec![
|
|
Message::Connect(reserved_peer.clone()),
|
|
Message::Connect(reserved_peer2.clone()),
|
|
]);
|
|
|
|
handle.set_reserved_only(true);
|
|
handle.remove_reserved_peer(reserved_peer2.clone());
|
|
|
|
assert_messages(peerset, vec![
|
|
Message::Drop(reserved_peer),
|
|
Message::Drop(reserved_peer2),
|
|
]);
|
|
}
|
|
|
|
#[test]
|
|
fn test_peerset_set_reserved_only() {
|
|
let bootnode = PeerId::random();
|
|
let bootnode2 = PeerId::random();
|
|
let reserved_peer = PeerId::random();
|
|
let reserved_peer2 = PeerId::random();
|
|
let config = PeersetConfig {
|
|
in_peers: 0,
|
|
out_peers: 4,
|
|
bootnodes: vec![bootnode.clone(), bootnode2.clone()],
|
|
reserved_only: false,
|
|
reserved_nodes: vec![reserved_peer.clone(), reserved_peer2.clone()],
|
|
};
|
|
|
|
let (peerset, handle) = Peerset::from_config(config);
|
|
handle.set_reserved_only(true);
|
|
handle.set_reserved_only(false);
|
|
|
|
assert_messages(peerset, vec![
|
|
Message::Connect(reserved_peer),
|
|
Message::Connect(reserved_peer2),
|
|
Message::Connect(bootnode.clone()),
|
|
Message::Connect(bootnode2.clone()),
|
|
Message::Drop(bootnode.clone()),
|
|
Message::Drop(bootnode2.clone()),
|
|
Message::Connect(bootnode),
|
|
Message::Connect(bootnode2),
|
|
]);
|
|
}
|
|
|
|
#[test]
|
|
fn test_peerset_report_peer() {
|
|
let bootnode = PeerId::random();
|
|
let bootnode2 = PeerId::random();
|
|
let config = PeersetConfig {
|
|
in_peers: 0,
|
|
out_peers: 1,
|
|
bootnodes: vec![bootnode.clone(), bootnode2.clone()],
|
|
reserved_only: false,
|
|
reserved_nodes: Vec::new(),
|
|
};
|
|
|
|
let (peerset, handle) = Peerset::from_config(config);
|
|
handle.report_peer(bootnode2, -1);
|
|
handle.report_peer(bootnode.clone(), -1);
|
|
|
|
assert_messages(peerset, vec![
|
|
Message::Connect(bootnode.clone()),
|
|
Message::Drop(bootnode)
|
|
]);
|
|
}
|
|
|
|
#[test]
|
|
fn test_peerset_incoming() {
|
|
let bootnode = PeerId::random();
|
|
let incoming = PeerId::random();
|
|
let incoming2 = PeerId::random();
|
|
let incoming3 = PeerId::random();
|
|
let ii = IncomingIndex(1);
|
|
let ii2 = IncomingIndex(2);
|
|
let ii3 = IncomingIndex(3);
|
|
let ii4 = IncomingIndex(3);
|
|
let config = PeersetConfig {
|
|
in_peers: 2,
|
|
out_peers: 1,
|
|
bootnodes: vec![bootnode.clone()],
|
|
reserved_only: false,
|
|
reserved_nodes: Vec::new(),
|
|
};
|
|
|
|
let (mut peerset, _handle) = Peerset::from_config(config);
|
|
peerset.incoming(incoming.clone(), ii);
|
|
peerset.incoming(incoming.clone(), ii4);
|
|
peerset.incoming(incoming2.clone(), ii2);
|
|
peerset.incoming(incoming3.clone(), ii3);
|
|
|
|
assert_messages(peerset, vec![
|
|
Message::Connect(bootnode.clone()),
|
|
Message::Accept(ii),
|
|
Message::Accept(ii2),
|
|
Message::Reject(ii3),
|
|
]);
|
|
}
|
|
|
|
#[test]
|
|
fn test_peerset_dropped() {
|
|
let bootnode = PeerId::random();
|
|
let bootnode2 = PeerId::random();
|
|
let reserved_peer = PeerId::random();
|
|
let config = PeersetConfig {
|
|
in_peers: 0,
|
|
out_peers: 2,
|
|
bootnodes: vec![bootnode.clone(), bootnode2.clone()],
|
|
reserved_only: false,
|
|
reserved_nodes: vec![reserved_peer.clone()],
|
|
};
|
|
|
|
let (peerset, _handle) = Peerset::from_config(config);
|
|
|
|
let mut peerset = assert_messages(peerset, vec![
|
|
Message::Connect(reserved_peer.clone()),
|
|
Message::Connect(bootnode.clone()),
|
|
]);
|
|
|
|
peerset.dropped(reserved_peer.clone());
|
|
peerset.dropped(bootnode);
|
|
|
|
let _peerset = assert_messages(peerset, vec![
|
|
Message::Connect(reserved_peer),
|
|
Message::Connect(bootnode2),
|
|
]);
|
|
}
|
|
|
|
#[test]
|
|
fn test_peerset_discovered() {
|
|
let bootnode = PeerId::random();
|
|
let discovered = PeerId::random();
|
|
let discovered2 = PeerId::random();
|
|
let config = PeersetConfig {
|
|
in_peers: 0,
|
|
out_peers: 2,
|
|
bootnodes: vec![bootnode.clone()],
|
|
reserved_only: false,
|
|
reserved_nodes: vec![],
|
|
};
|
|
|
|
let (mut peerset, _handle) = Peerset::from_config(config);
|
|
peerset.discovered(Some(discovered.clone()));
|
|
peerset.discovered(Some(discovered.clone()));
|
|
peerset.discovered(Some(discovered2));
|
|
|
|
assert_messages(peerset, vec![
|
|
Message::Connect(bootnode),
|
|
Message::Connect(discovered),
|
|
]);
|
|
}
|
|
}
|