Incorporate sc-peerset into sc-network (#14236)

This commit is contained in:
Dmitry Markin
2023-05-29 20:56:57 +03:00
committed by GitHub
parent b8bca85e9d
commit d4b2bf7394
40 changed files with 306 additions and 337 deletions
+2 -26
View File
@@ -9441,6 +9441,7 @@ dependencies = [
"multistream-select", "multistream-select",
"parity-scale-codec", "parity-scale-codec",
"parking_lot 0.12.1", "parking_lot 0.12.1",
"partial_sort",
"pin-project", "pin-project",
"rand 0.8.5", "rand 0.8.5",
"sc-block-builder", "sc-block-builder",
@@ -9449,7 +9450,6 @@ dependencies = [
"sc-network-common", "sc-network-common",
"sc-network-light", "sc-network-light",
"sc-network-sync", "sc-network-sync",
"sc-peerset",
"sc-utils", "sc-utils",
"serde", "serde",
"serde_json", "serde_json",
@@ -9471,6 +9471,7 @@ dependencies = [
"tokio-test", "tokio-test",
"tokio-util", "tokio-util",
"unsigned-varint", "unsigned-varint",
"wasm-timer",
"zeroize", "zeroize",
] ]
@@ -9515,7 +9516,6 @@ dependencies = [
"parity-scale-codec", "parity-scale-codec",
"prost-build", "prost-build",
"sc-consensus", "sc-consensus",
"sc-peerset",
"sc-utils", "sc-utils",
"serde", "serde",
"smallvec", "smallvec",
@@ -9542,7 +9542,6 @@ dependencies = [
"quickcheck", "quickcheck",
"sc-network", "sc-network",
"sc-network-common", "sc-network-common",
"sc-peerset",
"sp-runtime", "sp-runtime",
"substrate-prometheus-endpoint", "substrate-prometheus-endpoint",
"substrate-test-runtime-client", "substrate-test-runtime-client",
@@ -9565,7 +9564,6 @@ dependencies = [
"sc-client-api", "sc-client-api",
"sc-network", "sc-network",
"sc-network-common", "sc-network-common",
"sc-peerset",
"sp-blockchain", "sp-blockchain",
"sp-core", "sp-core",
"sp-runtime", "sp-runtime",
@@ -9585,7 +9583,6 @@ dependencies = [
"pin-project", "pin-project",
"sc-network", "sc-network",
"sc-network-common", "sc-network-common",
"sc-peerset",
"sp-consensus", "sp-consensus",
"sp-runtime", "sp-runtime",
"sp-statement-store", "sp-statement-store",
@@ -9615,7 +9612,6 @@ dependencies = [
"sc-consensus", "sc-consensus",
"sc-network", "sc-network",
"sc-network-common", "sc-network-common",
"sc-peerset",
"sc-utils", "sc-utils",
"smallvec", "smallvec",
"sp-arithmetic", "sp-arithmetic",
@@ -9674,7 +9670,6 @@ dependencies = [
"parity-scale-codec", "parity-scale-codec",
"sc-network", "sc-network",
"sc-network-common", "sc-network-common",
"sc-peerset",
"sc-utils", "sc-utils",
"sp-consensus", "sp-consensus",
"sp-runtime", "sp-runtime",
@@ -9704,7 +9699,6 @@ dependencies = [
"sc-client-db", "sc-client-db",
"sc-network", "sc-network",
"sc-network-common", "sc-network-common",
"sc-peerset",
"sc-transaction-pool", "sc-transaction-pool",
"sc-transaction-pool-api", "sc-transaction-pool-api",
"sc-utils", "sc-utils",
@@ -9720,24 +9714,6 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "sc-peerset"
version = "4.0.0-dev"
dependencies = [
"futures",
"libp2p-identity",
"log",
"mockall",
"parking_lot 0.12.1",
"partial_sort",
"rand 0.8.5",
"sc-utils",
"serde_json",
"sp-arithmetic",
"sp-tracing",
"wasm-timer",
]
[[package]] [[package]]
name = "sc-proposer-metrics" name = "sc-proposer-metrics"
version = "0.10.0-dev" version = "0.10.0-dev"
-1
View File
@@ -53,7 +53,6 @@ members = [
"client/network/sync", "client/network/sync",
"client/network/test", "client/network/test",
"client/offchain", "client/offchain",
"client/peerset",
"client/allocator", "client/allocator",
"client/proposer-metrics", "client/proposer-metrics",
"client/rpc", "client/rpc",
@@ -24,7 +24,6 @@ tracing = "0.1.29"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../utils/prometheus" } prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../utils/prometheus" }
sc-network = { version = "0.10.0-dev", path = "../network/" } sc-network = { version = "0.10.0-dev", path = "../network/" }
sc-network-common = { version = "0.10.0-dev", path = "../network/common" } sc-network-common = { version = "0.10.0-dev", path = "../network/common" }
sc-peerset = { version = "4.0.0-dev", path = "../peerset" }
sp-runtime = { version = "8.0.0", path = "../../primitives/runtime" } sp-runtime = { version = "8.0.0", path = "../../primitives/runtime" }
[dev-dependencies] [dev-dependencies]
@@ -21,9 +21,8 @@ use crate::{
Network, Syncing, Validator, Network, Syncing, Validator,
}; };
use sc_network::{event::Event, types::ProtocolName}; use sc_network::{event::Event, types::ProtocolName, ReputationChange};
use sc_network_common::sync::SyncEvent; use sc_network_common::sync::SyncEvent;
use sc_peerset::ReputationChange;
use futures::{ use futures::{
channel::mpsc::{channel, Receiver, Sender}, channel::mpsc::{channel, Receiver, Sender},
@@ -43,7 +43,7 @@ const REBROADCAST_INTERVAL: time::Duration = time::Duration::from_millis(750);
pub(crate) const PERIODIC_MAINTENANCE_INTERVAL: time::Duration = time::Duration::from_millis(1100); pub(crate) const PERIODIC_MAINTENANCE_INTERVAL: time::Duration = time::Duration::from_millis(1100);
mod rep { mod rep {
use sc_peerset::ReputationChange as Rep; use sc_network::ReputationChange as Rep;
/// Reputation change when a peer sends us a gossip message that we didn't know about. /// Reputation change when a peer sends us a gossip message that we didn't know about.
pub const GOSSIP_SUCCESS: Rep = Rep::new(1 << 4, "Successful gossip"); pub const GOSSIP_SUCCESS: Rep = Rep::new(1 << 4, "Successful gossip");
/// Reputation change when a peer sends us a gossip message that we already knew about. /// Reputation change when a peer sends us a gossip message that we already knew about.
@@ -529,9 +529,8 @@ mod tests {
use sc_network::{ use sc_network::{
config::MultiaddrWithPeerId, event::Event, NetworkBlock, NetworkEventStream, config::MultiaddrWithPeerId, event::Event, NetworkBlock, NetworkEventStream,
NetworkNotification, NetworkPeers, NotificationSenderError, NetworkNotification, NetworkPeers, NotificationSenderError,
NotificationSenderT as NotificationSender, NotificationSenderT as NotificationSender, ReputationChange,
}; };
use sc_peerset::ReputationChange;
use sp_runtime::{ use sp_runtime::{
testing::{Block as RawBlock, ExtrinsicWrapper, H256}, testing::{Block as RawBlock, ExtrinsicWrapper, H256},
traits::NumberFor, traits::NumberFor,
+3 -1
View File
@@ -31,6 +31,7 @@ log = "0.4.17"
lru = "0.10.0" lru = "0.10.0"
mockall = "0.11.3" mockall = "0.11.3"
parking_lot = "0.12.1" parking_lot = "0.12.1"
partial_sort = "0.2.0"
pin-project = "1.0.12" pin-project = "1.0.12"
rand = "0.8.5" rand = "0.8.5"
serde = { version = "1.0.136", features = ["derive"] } serde = { version = "1.0.136", features = ["derive"] }
@@ -44,7 +45,6 @@ sc-block-builder = { version = "0.10.0-dev", path = "../block-builder" }
sc-client-api = { version = "4.0.0-dev", path = "../api" } sc-client-api = { version = "4.0.0-dev", path = "../api" }
sc-consensus = { version = "0.10.0-dev", path = "../consensus/common" } sc-consensus = { version = "0.10.0-dev", path = "../consensus/common" }
sc-network-common = { version = "0.10.0-dev", path = "./common" } sc-network-common = { version = "0.10.0-dev", path = "./common" }
sc-peerset = { version = "4.0.0-dev", path = "../peerset" }
sc-utils = { version = "4.0.0-dev", path = "../utils" } sc-utils = { version = "4.0.0-dev", path = "../utils" }
sp-arithmetic = { version = "7.0.0", path = "../../primitives/arithmetic" } sp-arithmetic = { version = "7.0.0", path = "../../primitives/arithmetic" }
sp-blockchain = { version = "4.0.0-dev", path = "../../primitives/blockchain" } sp-blockchain = { version = "4.0.0-dev", path = "../../primitives/blockchain" }
@@ -55,9 +55,11 @@ sp-runtime = { version = "8.0.0", path = "../../primitives/runtime" }
# #
# When libp2p also enforces this version, we can get rid off this extra dep here. # When libp2p also enforces this version, we can get rid off this extra dep here.
snow = "0.9.2" snow = "0.9.2"
wasm-timer = "0.2"
[dev-dependencies] [dev-dependencies]
assert_matches = "1.3" assert_matches = "1.3"
mockall = "0.11.3"
multistream-select = "0.12.1" multistream-select = "0.12.1"
rand = "0.8.5" rand = "0.8.5"
tempfile = "3.1.0" tempfile = "3.1.0"
@@ -29,7 +29,6 @@ libp2p-identity = { version = "0.1.2", features = ["peerid"] }
prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../../utils/prometheus" } prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../../utils/prometheus" }
smallvec = "1.8.0" smallvec = "1.8.0"
sc-consensus = { version = "0.10.0-dev", path = "../../consensus/common" } sc-consensus = { version = "0.10.0-dev", path = "../../consensus/common" }
sc-peerset = { version = "4.0.0-dev", path = "../../peerset" }
sc-utils = { version = "4.0.0-dev", path = "../../utils" } sc-utils = { version = "4.0.0-dev", path = "../../utils" }
serde = { version = "1.0.136", features = ["derive"] } serde = { version = "1.0.136", features = ["derive"] }
sp-consensus = { version = "0.10.0-dev", path = "../../../primitives/consensus/common" } sp-consensus = { version = "0.10.0-dev", path = "../../../primitives/consensus/common" }
@@ -21,6 +21,7 @@
pub mod message; pub mod message;
pub mod role; pub mod role;
pub mod sync; pub mod sync;
pub mod types;
/// Minimum Requirements for a Hash within Networking /// Minimum Requirements for a Hash within Networking
pub trait ExHashT: std::hash::Hash + Eq + std::fmt::Debug + Clone + Send + Sync + 'static {} pub trait ExHashT: std::hash::Hash + Eq + std::fmt::Debug + Clone + Send + Sync + 'static {}
+2 -2
View File
@@ -22,7 +22,7 @@ pub mod message;
pub mod metrics; pub mod metrics;
pub mod warp; pub mod warp;
use crate::role::Roles; use crate::{role::Roles, types::ReputationChange};
use futures::Stream; use futures::Stream;
use libp2p_identity::PeerId; use libp2p_identity::PeerId;
@@ -106,7 +106,7 @@ pub struct SyncStatus<Block: BlockT> {
/// A peer did not behave as expected and should be reported. /// A peer did not behave as expected and should be reported.
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub struct BadPeer(pub PeerId, pub sc_peerset::ReputationChange); pub struct BadPeer(pub PeerId, pub ReputationChange);
impl fmt::Display for BadPeer { impl fmt::Display for BadPeer {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
@@ -0,0 +1,38 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
/// Description of a reputation adjustment for a node.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ReputationChange {
/// Reputation delta.
pub value: i32,
/// Reason for reputation change.
pub reason: &'static str,
}
impl ReputationChange {
/// New reputation change with given delta and reason.
pub const fn new(value: i32, reason: &'static str) -> ReputationChange {
Self { value, reason }
}
/// New reputation change that forces minimum possible reputation.
pub const fn new_fatal(reason: &'static str) -> ReputationChange {
Self { value: i32::MIN, reason }
}
}
@@ -29,7 +29,6 @@ sp-blockchain = { version = "4.0.0-dev", path = "../../../primitives/blockchain"
sc-client-api = { version = "4.0.0-dev", path = "../../api" } sc-client-api = { version = "4.0.0-dev", path = "../../api" }
sc-network = { version = "0.10.0-dev", path = "../" } sc-network = { version = "0.10.0-dev", path = "../" }
sc-network-common = { version = "0.10.0-dev", path = "../common" } sc-network-common = { version = "0.10.0-dev", path = "../common" }
sc-peerset = { version = "4.0.0-dev", path = "../../peerset" }
sp-core = { version = "8.0.0", path = "../../../primitives/core" } sp-core = { version = "8.0.0", path = "../../../primitives/core" }
sp-runtime = { version = "8.0.0", path = "../../../primitives/runtime" } sp-runtime = { version = "8.0.0", path = "../../../primitives/runtime" }
thiserror = "1.0" thiserror = "1.0"
@@ -32,8 +32,8 @@ use sc_client_api::{BlockBackend, ProofProvider};
use sc_network::{ use sc_network::{
config::ProtocolId, config::ProtocolId,
request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig}, request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig},
ReputationChange,
}; };
use sc_peerset::ReputationChange;
use sp_core::{ use sp_core::{
hexdisplay::HexDisplay, hexdisplay::HexDisplay,
storage::{ChildInfo, ChildType, PrefixedStorageKey}, storage::{ChildInfo, ChildType, PrefixedStorageKey},
+2 -1
View File
@@ -20,9 +20,11 @@ use crate::{
discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut}, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut},
event::DhtEvent, event::DhtEvent,
peer_info, peer_info,
peerset::PeersetHandle,
protocol::{CustomMessageOutcome, NotificationsSink, Protocol}, protocol::{CustomMessageOutcome, NotificationsSink, Protocol},
request_responses::{self, IfDisconnected, ProtocolConfig, RequestFailure}, request_responses::{self, IfDisconnected, ProtocolConfig, RequestFailure},
types::ProtocolName, types::ProtocolName,
ReputationChange,
}; };
use bytes::Bytes; use bytes::Bytes;
@@ -33,7 +35,6 @@ use libp2p::{
}; };
use sc_network_common::role::{ObservedRole, Roles}; use sc_network_common::role::{ObservedRole, Roles};
use sc_peerset::{PeersetHandle, ReputationChange};
use sp_runtime::traits::Block as BlockT; use sp_runtime::traits::Block as BlockT;
use std::{collections::HashSet, time::Duration}; use std::{collections::HashSet, time::Duration};
+4 -2
View File
@@ -243,7 +243,10 @@
//! More precise usage details are still being worked on and will likely change in the future. //! More precise usage details are still being worked on and will likely change in the future.
mod behaviour; mod behaviour;
mod peer_store;
mod peerset;
mod protocol; mod protocol;
mod protocol_controller;
mod service; mod service;
pub mod config; pub mod config;
@@ -267,6 +270,7 @@ pub use sc_network_common::{
warp::{WarpSyncPhase, WarpSyncProgress}, warp::{WarpSyncPhase, WarpSyncProgress},
ExtendedPeerInfo, StateDownloadProgress, SyncEventStream, SyncState, SyncStatusProvider, ExtendedPeerInfo, StateDownloadProgress, SyncEventStream, SyncState, SyncStatusProvider,
}, },
types::ReputationChange,
}; };
pub use service::{ pub use service::{
signature::Signature, signature::Signature,
@@ -281,8 +285,6 @@ pub use service::{
}; };
pub use types::ProtocolName; pub use types::ProtocolName;
pub use sc_peerset::ReputationChange;
/// The maximum allowed number of established connections per peer. /// The maximum allowed number of established connections per peer.
/// ///
/// Typically, and by design of the network behaviours in this crate, /// Typically, and by design of the network behaviours in this crate,
@@ -16,10 +16,11 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>. // along with this program. If not, see <https://www.gnu.org/licenses/>.
use libp2p_identity::PeerId; use libp2p::PeerId;
use log::trace; use log::trace;
use parking_lot::Mutex; use parking_lot::Mutex;
use partial_sort::PartialSort; use partial_sort::PartialSort;
use sc_network_common::types::ReputationChange;
use std::{ use std::{
cmp::{Ord, Ordering, PartialOrd}, cmp::{Ord, Ordering, PartialOrd},
collections::{hash_map::Entry, HashMap, HashSet}, collections::{hash_map::Entry, HashMap, HashSet},
@@ -29,7 +30,10 @@ use std::{
}; };
use wasm_timer::Delay; use wasm_timer::Delay;
use crate::{protocol_controller::ProtocolHandle, ReputationChange, LOG_TARGET}; use crate::protocol_controller::ProtocolHandle;
/// Log target for this file.
pub const LOG_TARGET: &str = "peerset";
/// We don't accept nodes whose reputation is under this value. /// We don't accept nodes whose reputation is under this value.
pub const BANNED_THRESHOLD: i32 = 82 * (i32::MIN / 100); pub const BANNED_THRESHOLD: i32 = 82 * (i32::MIN / 100);
@@ -32,11 +32,10 @@
//! In addition, for each, set, the peerset also holds a list of reserved nodes towards which it //! In addition, for each, set, the peerset also holds a list of reserved nodes towards which it
//! will at all time try to maintain a connection with. //! will at all time try to maintain a connection with.
mod peer_store; use crate::{
mod protocol_controller; peer_store::{PeerStore, PeerStoreHandle, PeerStoreProvider},
protocol_controller::{ProtocolController, ProtocolHandle},
use peer_store::{PeerStore, PeerStoreHandle, PeerStoreProvider}; };
use protocol_controller::{ProtocolController, ProtocolHandle};
use futures::{ use futures::{
channel::oneshot, channel::oneshot,
@@ -45,6 +44,7 @@ use futures::{
stream::Stream, stream::Stream,
}; };
use log::debug; use log::debug;
use sc_network_common::types::ReputationChange;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use serde_json::json; use serde_json::json;
use std::{ use std::{
@@ -53,9 +53,7 @@ use std::{
task::{Context, Poll}, task::{Context, Poll},
}; };
pub use libp2p_identity::PeerId; use libp2p::PeerId;
pub use peer_store::BANNED_THRESHOLD;
pub const LOG_TARGET: &str = "peerset"; pub const LOG_TARGET: &str = "peerset";
@@ -97,27 +95,6 @@ impl From<SetId> for usize {
} }
} }
/// Description of a reputation adjustment for a node.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ReputationChange {
/// Reputation delta.
pub value: i32,
/// Reason for reputation change.
pub reason: &'static str,
}
impl ReputationChange {
/// New reputation change with given delta and reason.
pub const fn new(value: i32, reason: &'static str) -> ReputationChange {
Self { value, reason }
}
/// New reputation change that forces minimum possible reputation.
pub const fn new_fatal(reason: &'static str) -> ReputationChange {
Self { value: i32::MIN, reason }
}
}
/// Shared handle to the peer set manager (PSM). Distributed around the code. /// Shared handle to the peer set manager (PSM). Distributed around the code.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct PeersetHandle { pub struct PeersetHandle {
@@ -262,8 +239,6 @@ pub struct Peerset {
from_controllers: TracingUnboundedReceiver<Message>, from_controllers: TracingUnboundedReceiver<Message>,
/// Receiver for messages from the `PeersetHandle` and from `to_self`. /// Receiver for messages from the `PeersetHandle` and from `to_self`.
from_handle: TracingUnboundedReceiver<Action>, from_handle: TracingUnboundedReceiver<Action>,
/// Sending side of `from_handle`.
to_self: TracingUnboundedSender<Action>,
} }
impl Peerset { impl Peerset {
@@ -292,9 +267,9 @@ impl Peerset {
let (protocol_handles, protocol_controllers): (Vec<ProtocolHandle>, Vec<_>) = let (protocol_handles, protocol_controllers): (Vec<ProtocolHandle>, Vec<_>) =
controllers.into_iter().unzip(); controllers.into_iter().unzip();
let (to_self, from_handle) = tracing_unbounded("mpsc_peerset_messages", 10_000); let (tx, from_handle) = tracing_unbounded("mpsc_peerset_messages", 10_000);
let handle = PeersetHandle { tx: to_self.clone() }; let handle = PeersetHandle { tx };
let protocol_controller_futures = let protocol_controller_futures =
join_all(protocol_controllers.into_iter().map(|c| c.run().boxed())); join_all(protocol_controllers.into_iter().map(|c| c.run().boxed()));
@@ -306,7 +281,6 @@ impl Peerset {
protocol_controller_futures, protocol_controller_futures,
from_controllers, from_controllers,
from_handle, from_handle,
to_self,
}; };
(peerset, handle) (peerset, handle)
@@ -338,14 +312,6 @@ impl Peerset {
self.protocol_handles[set_id.0].dropped(peer_id); self.protocol_handles[set_id.0].dropped(peer_id);
} }
/// Reports an adjustment to the reputation of the given peer.
pub fn report_peer(&mut self, peer_id: PeerId, score_diff: ReputationChange) {
// We don't immediately perform the adjustments in order to have state consistency. We
// don't want the reporting here to take priority over messages sent using the
// `PeersetHandle`.
let _ = self.to_self.unbounded_send(Action::ReportPeer(peer_id, score_diff));
}
/// Produces a JSON object containing the state of the peerset manager, for debugging purposes. /// Produces a JSON object containing the state of the peerset manager, for debugging purposes.
pub fn debug_info(&mut self) -> serde_json::Value { pub fn debug_info(&mut self) -> serde_json::Value {
// TODO: Check what info we can include here. // TODO: Check what info we can include here.
+17 -14
View File
@@ -20,6 +20,7 @@ use crate::{
config::{self, NonReservedPeerMode}, config::{self, NonReservedPeerMode},
error, error,
types::ProtocolName, types::ProtocolName,
ReputationChange,
}; };
use bytes::Bytes; use bytes::Bytes;
@@ -61,13 +62,13 @@ pub mod message;
pub(crate) const BLOCK_ANNOUNCES_TRANSACTIONS_SUBSTREAM_SIZE: u64 = 16 * 1024 * 1024; pub(crate) const BLOCK_ANNOUNCES_TRANSACTIONS_SUBSTREAM_SIZE: u64 = 16 * 1024 * 1024;
/// Identifier of the peerset for the block announces protocol. /// Identifier of the peerset for the block announces protocol.
const HARDCODED_PEERSETS_SYNC: sc_peerset::SetId = sc_peerset::SetId::from(0); const HARDCODED_PEERSETS_SYNC: crate::peerset::SetId = crate::peerset::SetId::from(0);
/// Number of hardcoded peersets (the constants right above). Any set whose identifier is equal or /// Number of hardcoded peersets (the constants right above). Any set whose identifier is equal or
/// superior to this value corresponds to a user-defined protocol. /// superior to this value corresponds to a user-defined protocol.
const NUM_HARDCODED_PEERSETS: usize = 1; const NUM_HARDCODED_PEERSETS: usize = 1;
mod rep { mod rep {
use sc_peerset::ReputationChange as Rep; use crate::ReputationChange as Rep;
/// We received a message that failed to decode. /// We received a message that failed to decode.
pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message"); pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
} }
@@ -78,7 +79,7 @@ type PendingSyncSubstreamValidation =
// Lock must always be taken in order declared here. // Lock must always be taken in order declared here.
pub struct Protocol<B: BlockT> { pub struct Protocol<B: BlockT> {
/// Used to report reputation changes. /// Used to report reputation changes.
peerset_handle: sc_peerset::PeersetHandle, peerset_handle: crate::peerset::PeersetHandle,
/// Handles opening the unique substream and sending and receiving raw messages. /// Handles opening the unique substream and sending and receiving raw messages.
behaviour: Notifications, behaviour: Notifications,
/// List of notifications protocols that have been registered. /// List of notifications protocols that have been registered.
@@ -89,7 +90,7 @@ pub struct Protocol<B: BlockT> {
/// event to the outer layers, we also shouldn't propagate this "substream closed" event. To /// event to the outer layers, we also shouldn't propagate this "substream closed" event. To
/// solve this, an entry is added to this map whenever an invalid handshake is received. /// solve this, an entry is added to this map whenever an invalid handshake is received.
/// Entries are removed when the corresponding "substream closed" is later received. /// Entries are removed when the corresponding "substream closed" is later received.
bad_handshake_substreams: HashSet<(PeerId, sc_peerset::SetId)>, bad_handshake_substreams: HashSet<(PeerId, crate::peerset::SetId)>,
/// Connected peers. /// Connected peers.
peers: HashMap<PeerId, Roles>, peers: HashMap<PeerId, Roles>,
sync_substream_validations: FuturesUnordered<PendingSyncSubstreamValidation>, sync_substream_validations: FuturesUnordered<PendingSyncSubstreamValidation>,
@@ -105,7 +106,7 @@ impl<B: BlockT> Protocol<B> {
notification_protocols: Vec<config::NonDefaultSetConfig>, notification_protocols: Vec<config::NonDefaultSetConfig>,
block_announces_protocol: config::NonDefaultSetConfig, block_announces_protocol: config::NonDefaultSetConfig,
tx: TracingUnboundedSender<crate::event::SyncEvent<B>>, tx: TracingUnboundedSender<crate::event::SyncEvent<B>>,
) -> error::Result<(Self, sc_peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> { ) -> error::Result<(Self, crate::peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> {
let mut known_addresses = Vec::new(); let mut known_addresses = Vec::new();
let (peerset, peerset_handle) = { let (peerset, peerset_handle) = {
@@ -127,7 +128,7 @@ impl<B: BlockT> Protocol<B> {
} }
// Set number 0 is used for block announces. // Set number 0 is used for block announces.
sets.push(sc_peerset::SetConfig { sets.push(crate::peerset::SetConfig {
in_peers: network_config.default_peers_set.in_peers, in_peers: network_config.default_peers_set.in_peers,
out_peers: network_config.default_peers_set.out_peers, out_peers: network_config.default_peers_set.out_peers,
bootnodes, bootnodes,
@@ -146,7 +147,7 @@ impl<B: BlockT> Protocol<B> {
let reserved_only = let reserved_only =
set_cfg.set_config.non_reserved_mode == NonReservedPeerMode::Deny; set_cfg.set_config.non_reserved_mode == NonReservedPeerMode::Deny;
sets.push(sc_peerset::SetConfig { sets.push(crate::peerset::SetConfig {
in_peers: set_cfg.set_config.in_peers, in_peers: set_cfg.set_config.in_peers,
out_peers: set_cfg.set_config.out_peers, out_peers: set_cfg.set_config.out_peers,
bootnodes: Vec::new(), bootnodes: Vec::new(),
@@ -155,7 +156,7 @@ impl<B: BlockT> Protocol<B> {
}); });
} }
sc_peerset::Peerset::from_config(sc_peerset::PeersetConfig { sets }) crate::peerset::Peerset::from_config(crate::peerset::PeersetConfig { sets })
}; };
let behaviour = { let behaviour = {
@@ -210,7 +211,7 @@ impl<B: BlockT> Protocol<B> {
pub fn disconnect_peer(&mut self, peer_id: &PeerId, protocol_name: ProtocolName) { pub fn disconnect_peer(&mut self, peer_id: &PeerId, protocol_name: ProtocolName) {
if let Some(position) = self.notification_protocols.iter().position(|p| *p == protocol_name) if let Some(position) = self.notification_protocols.iter().position(|p| *p == protocol_name)
{ {
self.behaviour.disconnect_peer(peer_id, sc_peerset::SetId::from(position)); self.behaviour.disconnect_peer(peer_id, crate::peerset::SetId::from(position));
self.peers.remove(peer_id); self.peers.remove(peer_id);
} else { } else {
warn!(target: "sub-libp2p", "disconnect_peer() with invalid protocol name") warn!(target: "sub-libp2p", "disconnect_peer() with invalid protocol name")
@@ -228,7 +229,7 @@ impl<B: BlockT> Protocol<B> {
} }
/// Adjusts the reputation of a node. /// Adjusts the reputation of a node.
pub fn report_peer(&self, who: PeerId, reputation: sc_peerset::ReputationChange) { pub fn report_peer(&self, who: PeerId, reputation: ReputationChange) {
self.peerset_handle.report_peer(who, reputation) self.peerset_handle.report_peer(who, reputation)
} }
@@ -236,7 +237,7 @@ impl<B: BlockT> Protocol<B> {
pub fn set_notification_handshake(&mut self, protocol: ProtocolName, handshake: Vec<u8>) { pub fn set_notification_handshake(&mut self, protocol: ProtocolName, handshake: Vec<u8>) {
if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) { if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) {
self.behaviour self.behaviour
.set_notif_protocol_handshake(sc_peerset::SetId::from(index), handshake); .set_notif_protocol_handshake(crate::peerset::SetId::from(index), handshake);
} else { } else {
error!( error!(
target: "sub-libp2p", target: "sub-libp2p",
@@ -274,7 +275,8 @@ impl<B: BlockT> Protocol<B> {
/// Sets the list of reserved peers for the given protocol/peerset. /// Sets the list of reserved peers for the given protocol/peerset.
pub fn set_reserved_peerset_peers(&self, protocol: ProtocolName, peers: HashSet<PeerId>) { pub fn set_reserved_peerset_peers(&self, protocol: ProtocolName, peers: HashSet<PeerId>) {
if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) { if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) {
self.peerset_handle.set_reserved_peers(sc_peerset::SetId::from(index), peers); self.peerset_handle
.set_reserved_peers(crate::peerset::SetId::from(index), peers);
} else { } else {
error!( error!(
target: "sub-libp2p", target: "sub-libp2p",
@@ -287,7 +289,8 @@ impl<B: BlockT> Protocol<B> {
/// Removes a `PeerId` from the list of reserved peers. /// Removes a `PeerId` from the list of reserved peers.
pub fn remove_set_reserved_peer(&self, protocol: ProtocolName, peer: PeerId) { pub fn remove_set_reserved_peer(&self, protocol: ProtocolName, peer: PeerId) {
if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) { if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) {
self.peerset_handle.remove_reserved_peer(sc_peerset::SetId::from(index), peer); self.peerset_handle
.remove_reserved_peer(crate::peerset::SetId::from(index), peer);
} else { } else {
error!( error!(
target: "sub-libp2p", target: "sub-libp2p",
@@ -300,7 +303,7 @@ impl<B: BlockT> Protocol<B> {
/// Adds a `PeerId` to the list of reserved peers. /// Adds a `PeerId` to the list of reserved peers.
pub fn add_set_reserved_peer(&self, protocol: ProtocolName, peer: PeerId) { pub fn add_set_reserved_peer(&self, protocol: ProtocolName, peer: PeerId) {
if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) { if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) {
self.peerset_handle.add_reserved_peer(sc_peerset::SetId::from(index), peer); self.peerset_handle.add_reserved_peer(crate::peerset::SetId::from(index), peer);
} else { } else {
error!( error!(
target: "sub-libp2p", target: "sub-libp2p",
@@ -17,6 +17,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>. // along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::{ use crate::{
peerset::DropReason,
protocol::notifications::handler::{ protocol::notifications::handler::{
self, NotificationsSink, NotifsHandler, NotifsHandlerIn, NotifsHandlerOut, self, NotificationsSink, NotifsHandler, NotifsHandlerIn, NotifsHandlerOut,
}, },
@@ -38,7 +39,6 @@ use libp2p::{
use log::{debug, error, info, trace, warn}; use log::{debug, error, info, trace, warn};
use parking_lot::RwLock; use parking_lot::RwLock;
use rand::distributions::{Distribution as _, Uniform}; use rand::distributions::{Distribution as _, Uniform};
use sc_peerset::DropReason;
use smallvec::SmallVec; use smallvec::SmallVec;
use std::{ use std::{
cmp, cmp,
@@ -108,10 +108,10 @@ pub struct Notifications {
notif_protocols: Vec<handler::ProtocolConfig>, notif_protocols: Vec<handler::ProtocolConfig>,
/// Receiver for instructions about who to connect to or disconnect from. /// Receiver for instructions about who to connect to or disconnect from.
peerset: sc_peerset::Peerset, peerset: crate::peerset::Peerset,
/// List of peers in our state. /// List of peers in our state.
peers: FnvHashMap<(PeerId, sc_peerset::SetId), PeerState>, peers: FnvHashMap<(PeerId, crate::peerset::SetId), PeerState>,
/// The elements in `peers` occasionally contain `Delay` objects that we would normally have /// The elements in `peers` occasionally contain `Delay` objects that we would normally have
/// to be polled one by one. In order to avoid doing so, as an optimization, every `Delay` is /// to be polled one by one. In order to avoid doing so, as an optimization, every `Delay` is
@@ -121,7 +121,7 @@ pub struct Notifications {
/// By design, we never remove elements from this list. Elements are removed only when the /// By design, we never remove elements from this list. Elements are removed only when the
/// `Delay` triggers. As such, this stream may produce obsolete elements. /// `Delay` triggers. As such, this stream may produce obsolete elements.
delays: stream::FuturesUnordered< delays: stream::FuturesUnordered<
Pin<Box<dyn Future<Output = (DelayId, PeerId, sc_peerset::SetId)> + Send>>, Pin<Box<dyn Future<Output = (DelayId, PeerId, crate::peerset::SetId)> + Send>>,
>, >,
/// [`DelayId`] to assign to the next delay. /// [`DelayId`] to assign to the next delay.
@@ -133,7 +133,7 @@ pub struct Notifications {
/// We generate indices to identify incoming connections. This is the next value for the index /// We generate indices to identify incoming connections. This is the next value for the index
/// to use when a connection is incoming. /// to use when a connection is incoming.
next_incoming_index: sc_peerset::IncomingIndex, next_incoming_index: crate::peerset::IncomingIndex,
/// Events to produce from `poll()`. /// Events to produce from `poll()`.
events: VecDeque<ToSwarm<NotificationsOut, NotifsHandlerIn>>, events: VecDeque<ToSwarm<NotificationsOut, NotifsHandlerIn>>,
@@ -232,7 +232,7 @@ enum PeerState {
backoff_until: Option<Instant>, backoff_until: Option<Instant>,
/// Incoming index tracking this connection. /// Incoming index tracking this connection.
incoming_index: sc_peerset::IncomingIndex, incoming_index: crate::peerset::IncomingIndex,
/// List of connections with this peer, and their state. /// List of connections with this peer, and their state.
connections: SmallVec<[(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER]>, connections: SmallVec<[(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER]>,
@@ -296,12 +296,12 @@ struct IncomingPeer {
/// Id of the remote peer of the incoming substream. /// Id of the remote peer of the incoming substream.
peer_id: PeerId, peer_id: PeerId,
/// Id of the set the incoming substream would belong to. /// Id of the set the incoming substream would belong to.
set_id: sc_peerset::SetId, set_id: crate::peerset::SetId,
/// If true, this "incoming" still corresponds to an actual connection. If false, then the /// If true, this "incoming" still corresponds to an actual connection. If false, then the
/// connection corresponding to it has been closed or replaced already. /// connection corresponding to it has been closed or replaced already.
alive: bool, alive: bool,
/// Id that the we sent to the peerset. /// Id that the we sent to the peerset.
incoming_id: sc_peerset::IncomingIndex, incoming_id: crate::peerset::IncomingIndex,
} }
/// Event that can be emitted by the `Notifications`. /// Event that can be emitted by the `Notifications`.
@@ -312,7 +312,7 @@ pub enum NotificationsOut {
/// Id of the peer we are connected to. /// Id of the peer we are connected to.
peer_id: PeerId, peer_id: PeerId,
/// Peerset set ID the substream is tied to. /// Peerset set ID the substream is tied to.
set_id: sc_peerset::SetId, set_id: crate::peerset::SetId,
/// If `Some`, a fallback protocol name has been used rather the main protocol name. /// If `Some`, a fallback protocol name has been used rather the main protocol name.
/// Always matches one of the fallback names passed at initialization. /// Always matches one of the fallback names passed at initialization.
negotiated_fallback: Option<ProtocolName>, negotiated_fallback: Option<ProtocolName>,
@@ -332,7 +332,7 @@ pub enum NotificationsOut {
/// Id of the peer we are connected to. /// Id of the peer we are connected to.
peer_id: PeerId, peer_id: PeerId,
/// Peerset set ID the substream is tied to. /// Peerset set ID the substream is tied to.
set_id: sc_peerset::SetId, set_id: crate::peerset::SetId,
/// Replacement for the previous [`NotificationsSink`]. /// Replacement for the previous [`NotificationsSink`].
notifications_sink: NotificationsSink, notifications_sink: NotificationsSink,
}, },
@@ -343,7 +343,7 @@ pub enum NotificationsOut {
/// Id of the peer we were connected to. /// Id of the peer we were connected to.
peer_id: PeerId, peer_id: PeerId,
/// Peerset set ID the substream was tied to. /// Peerset set ID the substream was tied to.
set_id: sc_peerset::SetId, set_id: crate::peerset::SetId,
}, },
/// Receives a message on a custom protocol substream. /// Receives a message on a custom protocol substream.
@@ -353,7 +353,7 @@ pub enum NotificationsOut {
/// Id of the peer the message came from. /// Id of the peer the message came from.
peer_id: PeerId, peer_id: PeerId,
/// Peerset set ID the substream is tied to. /// Peerset set ID the substream is tied to.
set_id: sc_peerset::SetId, set_id: crate::peerset::SetId,
/// Message that has been received. /// Message that has been received.
message: BytesMut, message: BytesMut,
}, },
@@ -362,7 +362,7 @@ pub enum NotificationsOut {
impl Notifications { impl Notifications {
/// Creates a `CustomProtos`. /// Creates a `CustomProtos`.
pub fn new( pub fn new(
peerset: sc_peerset::Peerset, peerset: crate::peerset::Peerset,
notif_protocols: impl Iterator<Item = ProtocolConfig>, notif_protocols: impl Iterator<Item = ProtocolConfig>,
) -> Self { ) -> Self {
let notif_protocols = notif_protocols let notif_protocols = notif_protocols
@@ -383,7 +383,7 @@ impl Notifications {
delays: Default::default(), delays: Default::default(),
next_delay_id: DelayId(0), next_delay_id: DelayId(0),
incoming: SmallVec::new(), incoming: SmallVec::new(),
next_incoming_index: sc_peerset::IncomingIndex(0), next_incoming_index: crate::peerset::IncomingIndex(0),
events: VecDeque::new(), events: VecDeque::new(),
} }
} }
@@ -391,7 +391,7 @@ impl Notifications {
/// Modifies the handshake of the given notifications protocol. /// Modifies the handshake of the given notifications protocol.
pub fn set_notif_protocol_handshake( pub fn set_notif_protocol_handshake(
&mut self, &mut self,
set_id: sc_peerset::SetId, set_id: crate::peerset::SetId,
handshake_message: impl Into<Vec<u8>>, handshake_message: impl Into<Vec<u8>>,
) { ) {
if let Some(p) = self.notif_protocols.get_mut(usize::from(set_id)) { if let Some(p) = self.notif_protocols.get_mut(usize::from(set_id)) {
@@ -413,18 +413,18 @@ impl Notifications {
} }
/// Returns true if we have an open substream to the given peer. /// Returns true if we have an open substream to the given peer.
pub fn is_open(&self, peer_id: &PeerId, set_id: sc_peerset::SetId) -> bool { pub fn is_open(&self, peer_id: &PeerId, set_id: crate::peerset::SetId) -> bool {
self.peers.get(&(*peer_id, set_id)).map(|p| p.is_open()).unwrap_or(false) self.peers.get(&(*peer_id, set_id)).map(|p| p.is_open()).unwrap_or(false)
} }
/// Disconnects the given peer if we are connected to it. /// Disconnects the given peer if we are connected to it.
pub fn disconnect_peer(&mut self, peer_id: &PeerId, set_id: sc_peerset::SetId) { pub fn disconnect_peer(&mut self, peer_id: &PeerId, set_id: crate::peerset::SetId) {
trace!(target: "sub-libp2p", "External API => Disconnect({}, {:?})", peer_id, set_id); trace!(target: "sub-libp2p", "External API => Disconnect({}, {:?})", peer_id, set_id);
self.disconnect_peer_inner(peer_id, set_id); self.disconnect_peer_inner(peer_id, set_id);
} }
/// Inner implementation of `disconnect_peer`. /// Inner implementation of `disconnect_peer`.
fn disconnect_peer_inner(&mut self, peer_id: &PeerId, set_id: sc_peerset::SetId) { fn disconnect_peer_inner(&mut self, peer_id: &PeerId, set_id: crate::peerset::SetId) {
let mut entry = if let Entry::Occupied(entry) = self.peers.entry((*peer_id, set_id)) { let mut entry = if let Entry::Occupied(entry) = self.peers.entry((*peer_id, set_id)) {
entry entry
} else { } else {
@@ -541,7 +541,7 @@ impl Notifications {
/// Returns the list of reserved peers. /// Returns the list of reserved peers.
pub fn reserved_peers( pub fn reserved_peers(
&self, &self,
set_id: sc_peerset::SetId, set_id: crate::peerset::SetId,
pending_response: oneshot::Sender<Vec<PeerId>>, pending_response: oneshot::Sender<Vec<PeerId>>,
) { ) {
self.peerset.reserved_peers(set_id, pending_response); self.peerset.reserved_peers(set_id, pending_response);
@@ -553,7 +553,7 @@ impl Notifications {
} }
/// Function that is called when the peerset wants us to connect to a peer. /// Function that is called when the peerset wants us to connect to a peer.
fn peerset_report_connect(&mut self, peer_id: PeerId, set_id: sc_peerset::SetId) { fn peerset_report_connect(&mut self, peer_id: PeerId, set_id: crate::peerset::SetId) {
// If `PeerId` is unknown to us, insert an entry, start dialing, and return early. // If `PeerId` is unknown to us, insert an entry, start dialing, and return early.
let mut occ_entry = match self.peers.entry((peer_id, set_id)) { let mut occ_entry = match self.peers.entry((peer_id, set_id)) {
Entry::Occupied(entry) => entry, Entry::Occupied(entry) => entry,
@@ -731,7 +731,7 @@ impl Notifications {
} }
/// Function that is called when the peerset wants us to disconnect from a peer. /// Function that is called when the peerset wants us to disconnect from a peer.
fn peerset_report_disconnect(&mut self, peer_id: PeerId, set_id: sc_peerset::SetId) { fn peerset_report_disconnect(&mut self, peer_id: PeerId, set_id: crate::peerset::SetId) {
let mut entry = match self.peers.entry((peer_id, set_id)) { let mut entry = match self.peers.entry((peer_id, set_id)) {
Entry::Occupied(entry) => entry, Entry::Occupied(entry) => entry,
Entry::Vacant(entry) => { Entry::Vacant(entry) => {
@@ -839,7 +839,7 @@ impl Notifications {
/// Function that is called when the peerset wants us to accept a connection /// Function that is called when the peerset wants us to accept a connection
/// request from a peer. /// request from a peer.
fn peerset_report_accept(&mut self, index: sc_peerset::IncomingIndex) { fn peerset_report_accept(&mut self, index: crate::peerset::IncomingIndex) {
let incoming = if let Some(pos) = self.incoming.iter().position(|i| i.incoming_id == index) let incoming = if let Some(pos) = self.incoming.iter().position(|i| i.incoming_id == index)
{ {
self.incoming.remove(pos) self.incoming.remove(pos)
@@ -925,7 +925,7 @@ impl Notifications {
} }
/// Function that is called when the peerset wants us to reject an incoming peer. /// Function that is called when the peerset wants us to reject an incoming peer.
fn peerset_report_reject(&mut self, index: sc_peerset::IncomingIndex) { fn peerset_report_reject(&mut self, index: crate::peerset::IncomingIndex) {
let incoming = if let Some(pos) = self.incoming.iter().position(|i| i.incoming_id == index) let incoming = if let Some(pos) = self.incoming.iter().position(|i| i.incoming_id == index)
{ {
self.incoming.remove(pos) self.incoming.remove(pos)
@@ -1059,7 +1059,7 @@ impl NetworkBehaviour for Notifications {
connection_id, connection_id,
.. ..
}) => { }) => {
for set_id in (0..self.notif_protocols.len()).map(sc_peerset::SetId::from) { for set_id in (0..self.notif_protocols.len()).map(crate::peerset::SetId::from) {
match self.peers.entry((peer_id, set_id)).or_insert(PeerState::Poisoned) { match self.peers.entry((peer_id, set_id)).or_insert(PeerState::Poisoned) {
// Requested | PendingRequest => Enabled // Requested | PendingRequest => Enabled
st @ &mut PeerState::Requested | st @ &mut PeerState::Requested |
@@ -1113,7 +1113,7 @@ impl NetworkBehaviour for Notifications {
} }
}, },
FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, connection_id, .. }) => { FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, connection_id, .. }) => {
for set_id in (0..self.notif_protocols.len()).map(sc_peerset::SetId::from) { for set_id in (0..self.notif_protocols.len()).map(crate::peerset::SetId::from) {
let mut entry = if let Entry::Occupied(entry) = let mut entry = if let Entry::Occupied(entry) =
self.peers.entry((peer_id, set_id)) self.peers.entry((peer_id, set_id))
{ {
@@ -1406,7 +1406,7 @@ impl NetworkBehaviour for Notifications {
if let Some(peer_id) = peer_id { if let Some(peer_id) = peer_id {
trace!(target: "sub-libp2p", "Libp2p => Dial failure for {:?}", peer_id); trace!(target: "sub-libp2p", "Libp2p => Dial failure for {:?}", peer_id);
for set_id in (0..self.notif_protocols.len()).map(sc_peerset::SetId::from) { for set_id in (0..self.notif_protocols.len()).map(crate::peerset::SetId::from) {
if let Entry::Occupied(mut entry) = self.peers.entry((peer_id, set_id)) { if let Entry::Occupied(mut entry) = self.peers.entry((peer_id, set_id)) {
match mem::replace(entry.get_mut(), PeerState::Poisoned) { match mem::replace(entry.get_mut(), PeerState::Poisoned) {
// The peer is not in our list. // The peer is not in our list.
@@ -1485,7 +1485,7 @@ impl NetworkBehaviour for Notifications {
) { ) {
match event { match event {
NotifsHandlerOut::OpenDesiredByRemote { protocol_index } => { NotifsHandlerOut::OpenDesiredByRemote { protocol_index } => {
let set_id = sc_peerset::SetId::from(protocol_index); let set_id = crate::peerset::SetId::from(protocol_index);
trace!(target: "sub-libp2p", trace!(target: "sub-libp2p",
"Handler({:?}, {:?}]) => OpenDesiredByRemote({:?})", "Handler({:?}, {:?}]) => OpenDesiredByRemote({:?})",
@@ -1675,7 +1675,7 @@ impl NetworkBehaviour for Notifications {
}, },
NotifsHandlerOut::CloseDesired { protocol_index } => { NotifsHandlerOut::CloseDesired { protocol_index } => {
let set_id = sc_peerset::SetId::from(protocol_index); let set_id = crate::peerset::SetId::from(protocol_index);
trace!(target: "sub-libp2p", trace!(target: "sub-libp2p",
"Handler({}, {:?}) => CloseDesired({:?})", "Handler({}, {:?}) => CloseDesired({:?})",
@@ -1775,7 +1775,7 @@ impl NetworkBehaviour for Notifications {
}, },
NotifsHandlerOut::CloseResult { protocol_index } => { NotifsHandlerOut::CloseResult { protocol_index } => {
let set_id = sc_peerset::SetId::from(protocol_index); let set_id = crate::peerset::SetId::from(protocol_index);
trace!(target: "sub-libp2p", trace!(target: "sub-libp2p",
"Handler({}, {:?}) => CloseResult({:?})", "Handler({}, {:?}) => CloseResult({:?})",
@@ -1814,7 +1814,7 @@ impl NetworkBehaviour for Notifications {
notifications_sink, notifications_sink,
.. ..
} => { } => {
let set_id = sc_peerset::SetId::from(protocol_index); let set_id = crate::peerset::SetId::from(protocol_index);
trace!(target: "sub-libp2p", trace!(target: "sub-libp2p",
"Handler({}, {:?}) => OpenResultOk({:?})", "Handler({}, {:?}) => OpenResultOk({:?})",
peer_id, connection_id, set_id); peer_id, connection_id, set_id);
@@ -1880,7 +1880,7 @@ impl NetworkBehaviour for Notifications {
}, },
NotifsHandlerOut::OpenResultErr { protocol_index } => { NotifsHandlerOut::OpenResultErr { protocol_index } => {
let set_id = sc_peerset::SetId::from(protocol_index); let set_id = crate::peerset::SetId::from(protocol_index);
trace!(target: "sub-libp2p", trace!(target: "sub-libp2p",
"Handler({:?}, {:?}) => OpenResultErr({:?})", "Handler({:?}, {:?}) => OpenResultErr({:?})",
peer_id, connection_id, set_id); peer_id, connection_id, set_id);
@@ -1969,7 +1969,7 @@ impl NetworkBehaviour for Notifications {
}, },
NotifsHandlerOut::Notification { protocol_index, message } => { NotifsHandlerOut::Notification { protocol_index, message } => {
let set_id = sc_peerset::SetId::from(protocol_index); let set_id = crate::peerset::SetId::from(protocol_index);
if self.is_open(&peer_id, set_id) { if self.is_open(&peer_id, set_id) {
trace!( trace!(
target: "sub-libp2p", target: "sub-libp2p",
@@ -2015,16 +2015,16 @@ impl NetworkBehaviour for Notifications {
// Note that the peerset is a *best effort* crate, and we have to use defensive programming. // Note that the peerset is a *best effort* crate, and we have to use defensive programming.
loop { loop {
match futures::Stream::poll_next(Pin::new(&mut self.peerset), cx) { match futures::Stream::poll_next(Pin::new(&mut self.peerset), cx) {
Poll::Ready(Some(sc_peerset::Message::Accept(index))) => { Poll::Ready(Some(crate::peerset::Message::Accept(index))) => {
self.peerset_report_accept(index); self.peerset_report_accept(index);
}, },
Poll::Ready(Some(sc_peerset::Message::Reject(index))) => { Poll::Ready(Some(crate::peerset::Message::Reject(index))) => {
self.peerset_report_reject(index); self.peerset_report_reject(index);
}, },
Poll::Ready(Some(sc_peerset::Message::Connect { peer_id, set_id, .. })) => { Poll::Ready(Some(crate::peerset::Message::Connect { peer_id, set_id, .. })) => {
self.peerset_report_connect(peer_id, set_id); self.peerset_report_connect(peer_id, set_id);
}, },
Poll::Ready(Some(sc_peerset::Message::Drop { peer_id, set_id, .. })) => { Poll::Ready(Some(crate::peerset::Message::Drop { peer_id, set_id, .. })) => {
self.peerset_report_disconnect(peer_id, set_id); self.peerset_report_disconnect(peer_id, set_id);
}, },
Poll::Ready(None) => { Poll::Ready(None) => {
@@ -2105,9 +2105,8 @@ impl NetworkBehaviour for Notifications {
#[allow(deprecated)] #[allow(deprecated)]
mod tests { mod tests {
use super::*; use super::*;
use crate::protocol::notifications::handler::tests::*; use crate::{peerset::IncomingIndex, protocol::notifications::handler::tests::*};
use libp2p::swarm::AddressRecord; use libp2p::swarm::AddressRecord;
use sc_peerset::IncomingIndex;
use std::{collections::HashSet, iter}; use std::{collections::HashSet, iter};
impl PartialEq for ConnectionState { impl PartialEq for ConnectionState {
@@ -2153,11 +2152,11 @@ mod tests {
} }
} }
fn development_notifs() -> (Notifications, sc_peerset::PeersetHandle) { fn development_notifs() -> (Notifications, crate::peerset::PeersetHandle) {
let (peerset, peerset_handle) = { let (peerset, peerset_handle) = {
let mut sets = Vec::with_capacity(1); let mut sets = Vec::with_capacity(1);
sets.push(sc_peerset::SetConfig { sets.push(crate::peerset::SetConfig {
in_peers: 25, in_peers: 25,
out_peers: 25, out_peers: 25,
bootnodes: Vec::new(), bootnodes: Vec::new(),
@@ -2165,7 +2164,7 @@ mod tests {
reserved_only: false, reserved_only: false,
}); });
sc_peerset::Peerset::from_config(sc_peerset::PeersetConfig { sets }) crate::peerset::Peerset::from_config(crate::peerset::PeersetConfig { sets })
}; };
( (
@@ -2311,7 +2310,7 @@ mod tests {
assert!(std::matches!( assert!(std::matches!(
notif.incoming.pop(), notif.incoming.pop(),
Some(IncomingPeer { alive: true, incoming_id: sc_peerset::IncomingIndex(0), .. }), Some(IncomingPeer { alive: true, incoming_id: crate::peerset::IncomingIndex(0), .. }),
)); ));
} }
@@ -2354,7 +2353,7 @@ mod tests {
#[test] #[test]
fn peerset_report_connect_backoff() { fn peerset_report_connect_backoff() {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let peer = PeerId::random(); let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
@@ -2421,7 +2420,7 @@ mod tests {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let peer = PeerId::random(); let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(), local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(),
@@ -2455,7 +2454,7 @@ mod tests {
#[test] #[test]
fn peerset_disconnect_disable_pending_enable() { fn peerset_disconnect_disable_pending_enable() {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let peer = PeerId::random(); let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
@@ -2504,7 +2503,7 @@ mod tests {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let peer = PeerId::random(); let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(), local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(),
@@ -2539,7 +2538,7 @@ mod tests {
fn peerset_disconnect_requested() { fn peerset_disconnect_requested() {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let peer = PeerId::random(); let peer = PeerId::random();
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
// Set peer into `Requested` state. // Set peer into `Requested` state.
notif.peerset_report_connect(peer, set_id); notif.peerset_report_connect(peer, set_id);
@@ -2553,7 +2552,7 @@ mod tests {
#[test] #[test]
fn peerset_disconnect_pending_request() { fn peerset_disconnect_pending_request() {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let peer = PeerId::random(); let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
@@ -2608,7 +2607,7 @@ mod tests {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let peer = PeerId::random(); let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(), local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(),
@@ -2635,17 +2634,17 @@ mod tests {
assert!(std::matches!( assert!(std::matches!(
notif.incoming[0], notif.incoming[0],
IncomingPeer { alive: true, incoming_id: sc_peerset::IncomingIndex(0), .. }, IncomingPeer { alive: true, incoming_id: crate::peerset::IncomingIndex(0), .. },
)); ));
notif.disconnect_peer(&peer, set_id); notif.disconnect_peer(&peer, set_id);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
assert!(std::matches!( assert!(std::matches!(
notif.incoming[0], notif.incoming[0],
IncomingPeer { alive: false, incoming_id: sc_peerset::IncomingIndex(0), .. }, IncomingPeer { alive: false, incoming_id: crate::peerset::IncomingIndex(0), .. },
)); ));
notif.peerset_report_accept(sc_peerset::IncomingIndex(0)); notif.peerset_report_accept(crate::peerset::IncomingIndex(0));
assert_eq!(notif.incoming.len(), 0); assert_eq!(notif.incoming.len(), 0);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(PeerState::Disabled { .. }))); assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(PeerState::Disabled { .. })));
} }
@@ -2656,7 +2655,7 @@ mod tests {
let peer = PeerId::random(); let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let conn2 = ConnectionId::new_unchecked(1); let conn2 = ConnectionId::new_unchecked(1);
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(), local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(),
@@ -2710,7 +2709,7 @@ mod tests {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let peer = PeerId::random(); let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(), local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(),
@@ -2744,7 +2743,7 @@ mod tests {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let peer = PeerId::random(); let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(), local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(),
@@ -2780,7 +2779,7 @@ mod tests {
assert!(notif.peers.get(&(peer, set_id)).is_none()); assert!(notif.peers.get(&(peer, set_id)).is_none());
assert!(std::matches!( assert!(std::matches!(
notif.incoming[0], notif.incoming[0],
IncomingPeer { alive: false, incoming_id: sc_peerset::IncomingIndex(0), .. }, IncomingPeer { alive: false, incoming_id: crate::peerset::IncomingIndex(0), .. },
)); ));
} }
@@ -2790,7 +2789,7 @@ mod tests {
let peer = PeerId::random(); let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let conn1 = ConnectionId::new_unchecked(1); let conn1 = ConnectionId::new_unchecked(1);
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(), local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(),
@@ -2858,7 +2857,7 @@ mod tests {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let peer = PeerId::random(); let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(), local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(),
@@ -2886,7 +2885,7 @@ mod tests {
// We rely on the implementation detail that incoming indices are counted // We rely on the implementation detail that incoming indices are counted
// from 0 to not mock the `Peerset`. // from 0 to not mock the `Peerset`.
notif.peerset_report_accept(sc_peerset::IncomingIndex(0)); notif.peerset_report_accept(crate::peerset::IncomingIndex(0));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. }))); assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
// open new substream // open new substream
@@ -2913,7 +2912,7 @@ mod tests {
let peer = PeerId::random(); let peer = PeerId::random();
let conn1 = ConnectionId::new_unchecked(0); let conn1 = ConnectionId::new_unchecked(0);
let conn2 = ConnectionId::new_unchecked(1); let conn2 = ConnectionId::new_unchecked(1);
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(), local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(),
@@ -3006,7 +3005,7 @@ mod tests {
fn dial_failure_for_requested_peer() { fn dial_failure_for_requested_peer() {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let peer = PeerId::random(); let peer = PeerId::random();
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
// Set peer into `Requested` state. // Set peer into `Requested` state.
notif.peerset_report_connect(peer, set_id); notif.peerset_report_connect(peer, set_id);
@@ -3030,7 +3029,7 @@ mod tests {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let peer = PeerId::random(); let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(), local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(),
@@ -3077,7 +3076,7 @@ mod tests {
#[test] #[test]
fn peerset_report_connect_backoff_expired() { fn peerset_report_connect_backoff_expired() {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let peer = PeerId::random(); let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
@@ -3126,7 +3125,7 @@ mod tests {
fn peerset_report_disconnect_disabled() { fn peerset_report_disconnect_disabled() {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let peer = PeerId::random(); let peer = PeerId::random();
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(), local_addr: Multiaddr::empty(),
@@ -3151,7 +3150,7 @@ mod tests {
#[test] #[test]
fn peerset_report_disconnect_backoff() { fn peerset_report_disconnect_backoff() {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let peer = PeerId::random(); let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
@@ -3197,7 +3196,7 @@ mod tests {
#[test] #[test]
fn peer_is_backed_off_if_both_connections_get_closed_while_peer_is_disabled_with_back_off() { fn peer_is_backed_off_if_both_connections_get_closed_while_peer_is_disabled_with_back_off() {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let peer = PeerId::random(); let peer = PeerId::random();
let conn1 = ConnectionId::new_unchecked(0); let conn1 = ConnectionId::new_unchecked(0);
let conn2 = ConnectionId::new_unchecked(1); let conn2 = ConnectionId::new_unchecked(1);
@@ -3271,7 +3270,7 @@ mod tests {
fn inject_connection_closed_incoming_with_backoff() { fn inject_connection_closed_incoming_with_backoff() {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let peer = PeerId::random(); let peer = PeerId::random();
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(), local_addr: Multiaddr::empty(),
@@ -3324,7 +3323,7 @@ mod tests {
let peer = PeerId::random(); let peer = PeerId::random();
let conn1 = ConnectionId::new_unchecked(0); let conn1 = ConnectionId::new_unchecked(0);
let conn2 = ConnectionId::new_unchecked(1); let conn2 = ConnectionId::new_unchecked(1);
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(), local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(),
@@ -3379,7 +3378,7 @@ mod tests {
let peer = PeerId::random(); let peer = PeerId::random();
let conn1 = ConnectionId::new_unchecked(0); let conn1 = ConnectionId::new_unchecked(0);
let conn2 = ConnectionId::new_unchecked(1); let conn2 = ConnectionId::new_unchecked(1);
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(), local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(),
@@ -3437,7 +3436,7 @@ mod tests {
let peer = PeerId::random(); let peer = PeerId::random();
let conn1 = ConnectionId::new_unchecked(0); let conn1 = ConnectionId::new_unchecked(0);
let conn2 = ConnectionId::new_unchecked(1); let conn2 = ConnectionId::new_unchecked(1);
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(), local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(),
@@ -3502,7 +3501,7 @@ mod tests {
#[test] #[test]
fn inject_dial_failure_for_pending_request() { fn inject_dial_failure_for_pending_request() {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let peer = PeerId::random(); let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
@@ -3566,7 +3565,7 @@ mod tests {
fn peerstate_incoming_open_desired_by_remote() { fn peerstate_incoming_open_desired_by_remote() {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let peer = PeerId::random(); let peer = PeerId::random();
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let conn1 = ConnectionId::new_unchecked(0); let conn1 = ConnectionId::new_unchecked(0);
let conn2 = ConnectionId::new_unchecked(1); let conn2 = ConnectionId::new_unchecked(1);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
@@ -3620,7 +3619,7 @@ mod tests {
async fn remove_backoff_peer_after_timeout() { async fn remove_backoff_peer_after_timeout() {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let peer = PeerId::random(); let peer = PeerId::random();
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(), local_addr: Multiaddr::empty(),
@@ -3699,7 +3698,7 @@ mod tests {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let peer = PeerId::random(); let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(), local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(),
@@ -3817,7 +3816,7 @@ mod tests {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let peer = PeerId::random(); let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(), local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(),
@@ -3865,7 +3864,7 @@ mod tests {
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
fn peerset_report_connect_with_disabled_pending_enable_peer() { fn peerset_report_connect_with_disabled_pending_enable_peer() {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let peer = PeerId::random(); let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
@@ -3912,7 +3911,7 @@ mod tests {
fn peerset_report_connect_with_requested_peer() { fn peerset_report_connect_with_requested_peer() {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let peer = PeerId::random(); let peer = PeerId::random();
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
// Set peer into `Requested` state. // Set peer into `Requested` state.
notif.peerset_report_connect(peer, set_id); notif.peerset_report_connect(peer, set_id);
@@ -3927,7 +3926,7 @@ mod tests {
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
fn peerset_report_connect_with_pending_requested() { fn peerset_report_connect_with_pending_requested() {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let peer = PeerId::random(); let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
@@ -3985,7 +3984,7 @@ mod tests {
fn peerset_report_connect_with_incoming_peer() { fn peerset_report_connect_with_incoming_peer() {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let peer = PeerId::random(); let peer = PeerId::random();
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(), local_addr: Multiaddr::empty(),
@@ -4020,7 +4019,7 @@ mod tests {
fn peerset_report_disconnect_with_incoming_peer() { fn peerset_report_disconnect_with_incoming_peer() {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let peer = PeerId::random(); let peer = PeerId::random();
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(), local_addr: Multiaddr::empty(),
@@ -4057,7 +4056,7 @@ mod tests {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let peer = PeerId::random(); let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(), local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(),
@@ -4084,11 +4083,11 @@ mod tests {
assert!(std::matches!( assert!(std::matches!(
notif.incoming[0], notif.incoming[0],
IncomingPeer { alive: true, incoming_id: sc_peerset::IncomingIndex(0), .. }, IncomingPeer { alive: true, incoming_id: crate::peerset::IncomingIndex(0), .. },
)); ));
notif.peers.remove(&(peer, set_id)); notif.peers.remove(&(peer, set_id));
notif.peerset_report_accept(sc_peerset::IncomingIndex(0)); notif.peerset_report_accept(crate::peerset::IncomingIndex(0));
} }
#[test] #[test]
@@ -4098,7 +4097,7 @@ mod tests {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let peer = PeerId::random(); let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(), local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(),
@@ -4126,7 +4125,7 @@ mod tests {
assert!(std::matches!( assert!(std::matches!(
notif.incoming[0], notif.incoming[0],
IncomingPeer { alive: true, incoming_id: sc_peerset::IncomingIndex(0), .. }, IncomingPeer { alive: true, incoming_id: crate::peerset::IncomingIndex(0), .. },
)); ));
notif.peerset_report_connect(peer, set_id); notif.peerset_report_connect(peer, set_id);
@@ -4137,7 +4136,7 @@ mod tests {
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. }))); assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
notif.incoming[0].alive = true; notif.incoming[0].alive = true;
notif.peerset_report_accept(sc_peerset::IncomingIndex(0)); notif.peerset_report_accept(crate::peerset::IncomingIndex(0));
} }
#[test] #[test]
@@ -4166,7 +4165,7 @@ mod tests {
fn disconnect_non_existent_peer() { fn disconnect_non_existent_peer() {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let peer = PeerId::random(); let peer = PeerId::random();
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
notif.peerset_report_disconnect(peer, set_id); notif.peerset_report_disconnect(peer, set_id);
@@ -4199,7 +4198,7 @@ mod tests {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let peer = PeerId::random(); let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(), local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(),
@@ -4237,7 +4236,7 @@ mod tests {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let peer = PeerId::random(); let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(), local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(),
@@ -4263,7 +4262,7 @@ mod tests {
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. }))); assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
assert!(std::matches!( assert!(std::matches!(
notif.incoming[0], notif.incoming[0],
IncomingPeer { alive: true, incoming_id: sc_peerset::IncomingIndex(0), .. }, IncomingPeer { alive: true, incoming_id: crate::peerset::IncomingIndex(0), .. },
)); ));
notif.peers.remove(&(peer, set_id)); notif.peers.remove(&(peer, set_id));
@@ -4277,7 +4276,7 @@ mod tests {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let peer = PeerId::random(); let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(), local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(),
@@ -4318,7 +4317,7 @@ mod tests {
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
fn inject_non_existent_connection_closed_for_disabled_peer() { fn inject_non_existent_connection_closed_for_disabled_peer() {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let peer = PeerId::random(); let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
@@ -4353,7 +4352,7 @@ mod tests {
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
fn inject_non_existent_connection_closed_for_disabled_pending_enable() { fn inject_non_existent_connection_closed_for_disabled_pending_enable() {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let peer = PeerId::random(); let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
@@ -4406,7 +4405,7 @@ mod tests {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let peer = PeerId::random(); let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(), local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(),
@@ -4450,7 +4449,7 @@ mod tests {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let peer = PeerId::random(); let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(), local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(),
@@ -4495,7 +4494,7 @@ mod tests {
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
fn inject_connection_closed_for_backoff_peer() { fn inject_connection_closed_for_backoff_peer() {
let (mut notif, _peerset) = development_notifs(); let (mut notif, _peerset) = development_notifs();
let set_id = sc_peerset::SetId::from(0); let set_id = crate::peerset::SetId::from(0);
let peer = PeerId::random(); let peer = PeerId::random();
let conn = ConnectionId::new_unchecked(0); let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener { let connected = ConnectedPoint::Listener {
@@ -65,19 +65,24 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
.timeout(Duration::from_secs(20)) .timeout(Duration::from_secs(20))
.boxed(); .boxed();
let (peerset, _) = sc_peerset::Peerset::from_config(sc_peerset::PeersetConfig { let (peerset, handle) =
sets: vec![sc_peerset::SetConfig { crate::peerset::Peerset::from_config(crate::peerset::PeersetConfig {
in_peers: 25, sets: vec![crate::peerset::SetConfig {
out_peers: 25, in_peers: 25,
bootnodes: if index == 0 { out_peers: 25,
keypairs.iter().skip(1).map(|keypair| keypair.public().to_peer_id()).collect() bootnodes: if index == 0 {
} else { keypairs
vec![] .iter()
}, .skip(1)
reserved_nodes: Default::default(), .map(|keypair| keypair.public().to_peer_id())
reserved_only: false, .collect()
}], } else {
}); vec![]
},
reserved_nodes: Default::default(),
reserved_only: false,
}],
});
let behaviour = CustomProtoWithAddr { let behaviour = CustomProtoWithAddr {
inner: Notifications::new( inner: Notifications::new(
@@ -89,6 +94,7 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
max_notification_size: 1024 * 1024, max_notification_size: 1024 * 1024,
}), }),
), ),
_peerset_handle: handle,
addrs: addrs addrs: addrs
.iter() .iter()
.enumerate() .enumerate()
@@ -124,6 +130,8 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
/// Wraps around the `CustomBehaviour` network behaviour, and adds hardcoded node addresses to it. /// Wraps around the `CustomBehaviour` network behaviour, and adds hardcoded node addresses to it.
struct CustomProtoWithAddr { struct CustomProtoWithAddr {
inner: Notifications, inner: Notifications,
// We need to keep `PeersetHandle` for `Peerset` not to shut down.
_peerset_handle: crate::peerset::PeersetHandle,
addrs: Vec<(PeerId, Multiaddr)>, addrs: Vec<(PeerId, Multiaddr)>,
} }
@@ -266,7 +274,7 @@ fn reconnect_after_disconnect() {
if service2_state == ServiceState::FirstConnec { if service2_state == ServiceState::FirstConnec {
service1.behaviour_mut().disconnect_peer( service1.behaviour_mut().disconnect_peer(
Swarm::local_peer_id(&service2), Swarm::local_peer_id(&service2),
sc_peerset::SetId::from(0), crate::peerset::SetId::from(0),
); );
} }
}, },
@@ -289,7 +297,7 @@ fn reconnect_after_disconnect() {
if service1_state == ServiceState::FirstConnec { if service1_state == ServiceState::FirstConnec {
service1.behaviour_mut().disconnect_peer( service1.behaviour_mut().disconnect_peer(
Swarm::local_peer_id(&service2), Swarm::local_peer_id(&service2),
sc_peerset::SetId::from(0), crate::peerset::SetId::from(0),
); );
} }
}, },
@@ -42,7 +42,7 @@
//! view of the peers' states at any given moment, the eventual consistency is maintained. //! view of the peers' states at any given moment, the eventual consistency is maintained.
use futures::{channel::oneshot, future::Either, FutureExt, StreamExt}; use futures::{channel::oneshot, future::Either, FutureExt, StreamExt};
use libp2p_identity::PeerId; use libp2p::PeerId;
use log::{error, trace, warn}; use log::{error, trace, warn};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_arithmetic::traits::SaturatedConversion; use sp_arithmetic::traits::SaturatedConversion;
@@ -52,7 +52,13 @@ use std::{
}; };
use wasm_timer::Delay; use wasm_timer::Delay;
use crate::{peer_store::PeerStoreProvider, IncomingIndex, Message, SetConfig, SetId, LOG_TARGET}; use crate::{
peer_store::PeerStoreProvider,
peerset::{IncomingIndex, Message, SetConfig, SetId},
};
/// Log target for this file.
pub const LOG_TARGET: &str = "peerset";
/// External API actions. /// External API actions.
#[derive(Debug)] #[derive(Debug)]
@@ -738,9 +744,11 @@ impl ProtocolController {
mod tests { mod tests {
use super::{Direction, PeerState, ProtocolController, ProtocolHandle}; use super::{Direction, PeerState, ProtocolController, ProtocolHandle};
use crate::{ use crate::{
peer_store::PeerStoreProvider, IncomingIndex, Message, ReputationChange, SetConfig, SetId, peer_store::PeerStoreProvider,
peerset::{IncomingIndex, Message, SetConfig, SetId},
ReputationChange,
}; };
use libp2p_identity::PeerId; use libp2p::PeerId;
use sc_utils::mpsc::{tracing_unbounded, TryRecvError}; use sc_utils::mpsc::{tracing_unbounded, TryRecvError};
use std::collections::HashSet; use std::collections::HashSet;
@@ -779,7 +787,7 @@ mod tests {
peer_store.expect_report_disconnect().times(2).return_const(()); peer_store.expect_report_disconnect().times(2).return_const(());
let (_handle, mut controller) = let (_handle, mut controller) =
ProtocolController::new(SetId(0), config, tx, Box::new(peer_store)); ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
// Add second reserved node at runtime (this currently calls `alloc_slots` internally). // Add second reserved node at runtime (this currently calls `alloc_slots` internally).
controller.on_add_reserved_peer(reserved2); controller.on_add_reserved_peer(reserved2);
@@ -793,8 +801,8 @@ mod tests {
messages.push(message); messages.push(message);
} }
assert_eq!(messages.len(), 2); assert_eq!(messages.len(), 2);
assert!(messages.contains(&Message::Connect { set_id: SetId(0), peer_id: reserved1 })); assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
assert!(messages.contains(&Message::Connect { set_id: SetId(0), peer_id: reserved2 })); assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
// Reserved peers do not occupy slots. // Reserved peers do not occupy slots.
assert_eq!(controller.num_out, 0); assert_eq!(controller.num_out, 0);
@@ -841,7 +849,7 @@ mod tests {
peer_store.expect_is_banned().times(6).return_const(true); peer_store.expect_is_banned().times(6).return_const(true);
let (_handle, mut controller) = let (_handle, mut controller) =
ProtocolController::new(SetId(0), config, tx, Box::new(peer_store)); ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
// Add second reserved node at runtime (this currently calls `alloc_slots` internally). // Add second reserved node at runtime (this currently calls `alloc_slots` internally).
controller.on_add_reserved_peer(reserved2); controller.on_add_reserved_peer(reserved2);
@@ -894,7 +902,7 @@ mod tests {
peer_store.expect_report_disconnect().times(2).return_const(()); peer_store.expect_report_disconnect().times(2).return_const(());
let (_handle, mut controller) = let (_handle, mut controller) =
ProtocolController::new(SetId(0), config, tx, Box::new(peer_store)); ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
// Add second reserved node at runtime (this calls `alloc_slots` internally). // Add second reserved node at runtime (this calls `alloc_slots` internally).
controller.on_add_reserved_peer(reserved2); controller.on_add_reserved_peer(reserved2);
@@ -908,8 +916,8 @@ mod tests {
} }
assert_eq!(messages.len(), 2); assert_eq!(messages.len(), 2);
assert!(messages.contains(&Message::Connect { set_id: SetId(0), peer_id: reserved1 })); assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
assert!(messages.contains(&Message::Connect { set_id: SetId(0), peer_id: reserved2 })); assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
// Drop both reserved nodes. // Drop both reserved nodes.
controller.on_peer_dropped(reserved1); controller.on_peer_dropped(reserved1);
@@ -924,8 +932,8 @@ mod tests {
} }
assert_eq!(messages.len(), 2); assert_eq!(messages.len(), 2);
assert!(messages.contains(&Message::Connect { set_id: SetId(0), peer_id: reserved1 })); assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
assert!(messages.contains(&Message::Connect { set_id: SetId(0), peer_id: reserved2 })); assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
// No slots occupied. // No slots occupied.
assert_eq!(controller.num_out, 0); assert_eq!(controller.num_out, 0);
@@ -953,7 +961,7 @@ mod tests {
peer_store.expect_outgoing_candidates().once().return_const(candidates); peer_store.expect_outgoing_candidates().once().return_const(candidates);
let (_handle, mut controller) = let (_handle, mut controller) =
ProtocolController::new(SetId(0), config, tx, Box::new(peer_store)); ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
// Initiate connections. // Initiate connections.
controller.alloc_slots(); controller.alloc_slots();
@@ -965,8 +973,8 @@ mod tests {
// Only first two peers are connected (we only have 2 slots). // Only first two peers are connected (we only have 2 slots).
assert_eq!(messages.len(), 2); assert_eq!(messages.len(), 2);
assert!(messages.contains(&Message::Connect { set_id: SetId(0), peer_id: peer1 })); assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
assert!(messages.contains(&Message::Connect { set_id: SetId(0), peer_id: peer2 })); assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
// Outgoing slots occupied. // Outgoing slots occupied.
assert_eq!(controller.num_out, 2); assert_eq!(controller.num_out, 2);
@@ -1005,7 +1013,7 @@ mod tests {
peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates); peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
let (_handle, mut controller) = let (_handle, mut controller) =
ProtocolController::new(SetId(0), config, tx, Box::new(peer_store)); ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
// Initiate connections. // Initiate connections.
controller.alloc_slots(); controller.alloc_slots();
@@ -1015,10 +1023,10 @@ mod tests {
messages.push(message); messages.push(message);
} }
assert_eq!(messages.len(), 4); assert_eq!(messages.len(), 4);
assert!(messages.contains(&Message::Connect { set_id: SetId(0), peer_id: reserved1 })); assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
assert!(messages.contains(&Message::Connect { set_id: SetId(0), peer_id: reserved2 })); assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
assert!(messages.contains(&Message::Connect { set_id: SetId(0), peer_id: regular1 })); assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: regular1 }));
assert!(messages.contains(&Message::Connect { set_id: SetId(0), peer_id: regular2 })); assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: regular2 }));
assert_eq!(controller.num_out, 2); assert_eq!(controller.num_out, 2);
assert_eq!(controller.num_in, 0); assert_eq!(controller.num_in, 0);
} }
@@ -1048,7 +1056,7 @@ mod tests {
peer_store.expect_report_disconnect().times(2).return_const(()); peer_store.expect_report_disconnect().times(2).return_const(());
let (_handle, mut controller) = let (_handle, mut controller) =
ProtocolController::new(SetId(0), config, tx, Box::new(peer_store)); ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
// Initiate connections. // Initiate connections.
controller.alloc_slots(); controller.alloc_slots();
@@ -1060,8 +1068,8 @@ mod tests {
// Only first two peers are connected (we only have 2 slots). // Only first two peers are connected (we only have 2 slots).
assert_eq!(messages.len(), 2); assert_eq!(messages.len(), 2);
assert!(messages.contains(&Message::Connect { set_id: SetId(0), peer_id: peer1 })); assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
assert!(messages.contains(&Message::Connect { set_id: SetId(0), peer_id: peer2 })); assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
// Outgoing slots occupied. // Outgoing slots occupied.
assert_eq!(controller.num_out, 2); assert_eq!(controller.num_out, 2);
@@ -1093,7 +1101,7 @@ mod tests {
// Peers are connected. // Peers are connected.
assert_eq!(messages.len(), 1); assert_eq!(messages.len(), 1);
assert!(messages.contains(&Message::Connect { set_id: SetId(0), peer_id: peer3 })); assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer3 }));
// Outgoing slots occupied. // Outgoing slots occupied.
assert_eq!(controller.num_out, 1); assert_eq!(controller.num_out, 1);
@@ -1116,7 +1124,7 @@ mod tests {
peer_store.expect_register_protocol().once().return_const(()); peer_store.expect_register_protocol().once().return_const(());
let (_handle, mut controller) = let (_handle, mut controller) =
ProtocolController::new(SetId(0), config, tx, Box::new(peer_store)); ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
// Initiate connections. // Initiate connections.
controller.alloc_slots(); controller.alloc_slots();
@@ -1143,7 +1151,7 @@ mod tests {
peer_store.expect_register_protocol().once().return_const(()); peer_store.expect_register_protocol().once().return_const(());
let (_handle, mut controller) = let (_handle, mut controller) =
ProtocolController::new(SetId(0), config, tx, Box::new(peer_store)); ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
let peer = PeerId::random(); let peer = PeerId::random();
let incoming_index = IncomingIndex(1); let incoming_index = IncomingIndex(1);
@@ -1182,7 +1190,7 @@ mod tests {
peer_store.expect_outgoing_candidates().once().return_const(candidates); peer_store.expect_outgoing_candidates().once().return_const(candidates);
let (_handle, mut controller) = let (_handle, mut controller) =
ProtocolController::new(SetId(0), config, tx, Box::new(peer_store)); ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
// Initiate connections. // Initiate connections.
controller.alloc_slots(); controller.alloc_slots();
@@ -1201,8 +1209,8 @@ mod tests {
} }
assert_eq!(messages.len(), 2); assert_eq!(messages.len(), 2);
assert!(messages.contains(&Message::Connect { set_id: SetId(0), peer_id: peer1 })); assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
assert!(messages.contains(&Message::Connect { set_id: SetId(0), peer_id: peer2 })); assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
assert_eq!(controller.num_out, 2); assert_eq!(controller.num_out, 2);
assert_eq!(controller.num_in, 0); assert_eq!(controller.num_in, 0);
} }
@@ -1230,7 +1238,7 @@ mod tests {
peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates); peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
let (_handle, mut controller) = let (_handle, mut controller) =
ProtocolController::new(SetId(0), config, tx, Box::new(peer_store)); ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
assert_eq!(controller.num_out, 0); assert_eq!(controller.num_out, 0);
assert_eq!(controller.num_in, 0); assert_eq!(controller.num_in, 0);
@@ -1242,9 +1250,9 @@ mod tests {
messages.push(message); messages.push(message);
} }
assert_eq!(messages.len(), 3); assert_eq!(messages.len(), 3);
assert!(messages.contains(&Message::Connect { set_id: SetId(0), peer_id: reserved1 })); assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
assert!(messages.contains(&Message::Connect { set_id: SetId(0), peer_id: reserved2 })); assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
assert!(messages.contains(&Message::Connect { set_id: SetId(0), peer_id: regular1 })); assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: regular1 }));
assert_eq!(controller.num_out, 1); assert_eq!(controller.num_out, 1);
assert_eq!(controller.num_in, 0); assert_eq!(controller.num_in, 0);
@@ -1264,8 +1272,8 @@ mod tests {
messages.push(message); messages.push(message);
} }
assert_eq!(messages.len(), 2); assert_eq!(messages.len(), 2);
assert!(messages.contains(&Message::Drop { set_id: SetId(0), peer_id: regular1 })); assert!(messages.contains(&Message::Drop { set_id: SetId::from(0), peer_id: regular1 }));
assert!(messages.contains(&Message::Drop { set_id: SetId(0), peer_id: regular2 })); assert!(messages.contains(&Message::Drop { set_id: SetId::from(0), peer_id: regular2 }));
assert_eq!(controller.nodes.len(), 0); assert_eq!(controller.nodes.len(), 0);
assert_eq!(controller.num_out, 0); assert_eq!(controller.num_out, 0);
assert_eq!(controller.num_in, 0); assert_eq!(controller.num_in, 0);
@@ -1289,7 +1297,7 @@ mod tests {
peer_store.expect_register_protocol().once().return_const(()); peer_store.expect_register_protocol().once().return_const(());
let (_handle, mut controller) = let (_handle, mut controller) =
ProtocolController::new(SetId(0), config, tx, Box::new(peer_store)); ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
assert_eq!(controller.reserved_nodes.len(), 2); assert_eq!(controller.reserved_nodes.len(), 2);
assert_eq!(controller.nodes.len(), 0); assert_eq!(controller.nodes.len(), 0);
assert_eq!(controller.num_out, 0); assert_eq!(controller.num_out, 0);
@@ -1323,7 +1331,7 @@ mod tests {
peer_store.expect_is_banned().times(2).return_const(false); peer_store.expect_is_banned().times(2).return_const(false);
let (_handle, mut controller) = let (_handle, mut controller) =
ProtocolController::new(SetId(0), config, tx, Box::new(peer_store)); ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
// Initiate connections. // Initiate connections.
controller.alloc_slots(); controller.alloc_slots();
@@ -1332,8 +1340,8 @@ mod tests {
messages.push(message); messages.push(message);
} }
assert_eq!(messages.len(), 2); assert_eq!(messages.len(), 2);
assert!(messages.contains(&Message::Connect { set_id: SetId(0), peer_id: reserved1 })); assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
assert!(messages.contains(&Message::Connect { set_id: SetId(0), peer_id: reserved2 })); assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
assert_eq!(controller.reserved_nodes.len(), 2); assert_eq!(controller.reserved_nodes.len(), 2);
assert!(controller.reserved_nodes.contains_key(&reserved1)); assert!(controller.reserved_nodes.contains_key(&reserved1));
assert!(controller.reserved_nodes.contains_key(&reserved2)); assert!(controller.reserved_nodes.contains_key(&reserved2));
@@ -1341,7 +1349,10 @@ mod tests {
// Remove reserved node // Remove reserved node
controller.on_remove_reserved_peer(reserved1); controller.on_remove_reserved_peer(reserved1);
assert_eq!(rx.try_recv().unwrap(), Message::Drop { set_id: SetId(0), peer_id: reserved1 }); assert_eq!(
rx.try_recv().unwrap(),
Message::Drop { set_id: SetId::from(0), peer_id: reserved1 }
);
assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty); assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
assert_eq!(controller.reserved_nodes.len(), 1); assert_eq!(controller.reserved_nodes.len(), 1);
assert!(controller.reserved_nodes.contains_key(&reserved2)); assert!(controller.reserved_nodes.contains_key(&reserved2));
@@ -1368,7 +1379,7 @@ mod tests {
peer_store.expect_outgoing_candidates().once().return_const(Vec::new()); peer_store.expect_outgoing_candidates().once().return_const(Vec::new());
let (_handle, mut controller) = let (_handle, mut controller) =
ProtocolController::new(SetId(0), config, tx, Box::new(peer_store)); ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
// Connect `peer1` as inbound, `peer2` as outbound. // Connect `peer1` as inbound, `peer2` as outbound.
controller.on_incoming_connection(peer1, IncomingIndex(1)); controller.on_incoming_connection(peer1, IncomingIndex(1));
@@ -1379,7 +1390,7 @@ mod tests {
} }
assert_eq!(messages.len(), 2); assert_eq!(messages.len(), 2);
assert!(messages.contains(&Message::Accept(IncomingIndex(1)))); assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
assert!(messages.contains(&Message::Connect { set_id: SetId(0), peer_id: peer2 })); assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
assert_eq!(controller.num_out, 0); assert_eq!(controller.num_out, 0);
assert_eq!(controller.num_in, 0); assert_eq!(controller.num_in, 0);
@@ -1415,7 +1426,7 @@ mod tests {
peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates); peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
let (_handle, mut controller) = let (_handle, mut controller) =
ProtocolController::new(SetId(0), config, tx, Box::new(peer_store)); ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
// Connect `peer1` as outbound & `peer2` as inbound. // Connect `peer1` as outbound & `peer2` as inbound.
controller.alloc_slots(); controller.alloc_slots();
@@ -1425,7 +1436,7 @@ mod tests {
messages.push(message); messages.push(message);
} }
assert_eq!(messages.len(), 2); assert_eq!(messages.len(), 2);
assert!(messages.contains(&Message::Connect { set_id: SetId(0), peer_id: peer1 })); assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
assert!(messages.contains(&Message::Accept(IncomingIndex(1)))); assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
assert_eq!(controller.num_in, 1); assert_eq!(controller.num_in, 1);
assert_eq!(controller.num_out, 1); assert_eq!(controller.num_out, 1);
@@ -1458,7 +1469,7 @@ mod tests {
peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates); peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
let (_handle, mut controller) = let (_handle, mut controller) =
ProtocolController::new(SetId(0), config, tx, Box::new(peer_store)); ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
// Connect `peer1` as outbound & `peer2` as inbound. // Connect `peer1` as outbound & `peer2` as inbound.
controller.alloc_slots(); controller.alloc_slots();
@@ -1468,7 +1479,7 @@ mod tests {
messages.push(message); messages.push(message);
} }
assert_eq!(messages.len(), 2); assert_eq!(messages.len(), 2);
assert!(messages.contains(&Message::Connect { set_id: SetId(0), peer_id: peer1 })); assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
assert!(messages.contains(&Message::Accept(IncomingIndex(1)))); assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
assert_eq!(controller.nodes.len(), 2); assert_eq!(controller.nodes.len(), 2);
assert!(matches!(controller.nodes.get(&peer1), Some(Direction::Outbound))); assert!(matches!(controller.nodes.get(&peer1), Some(Direction::Outbound)));
@@ -1477,7 +1488,10 @@ mod tests {
assert_eq!(controller.num_out, 1); assert_eq!(controller.num_out, 1);
controller.on_disconnect_peer(peer1); controller.on_disconnect_peer(peer1);
assert_eq!(rx.try_recv().unwrap(), Message::Drop { set_id: SetId(0), peer_id: peer1 }); assert_eq!(
rx.try_recv().unwrap(),
Message::Drop { set_id: SetId::from(0), peer_id: peer1 }
);
assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty); assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
assert_eq!(controller.nodes.len(), 1); assert_eq!(controller.nodes.len(), 1);
assert!(!controller.nodes.contains_key(&peer1)); assert!(!controller.nodes.contains_key(&peer1));
@@ -1485,7 +1499,10 @@ mod tests {
assert_eq!(controller.num_out, 0); assert_eq!(controller.num_out, 0);
controller.on_disconnect_peer(peer2); controller.on_disconnect_peer(peer2);
assert_eq!(rx.try_recv().unwrap(), Message::Drop { set_id: SetId(0), peer_id: peer2 }); assert_eq!(
rx.try_recv().unwrap(),
Message::Drop { set_id: SetId::from(0), peer_id: peer2 }
);
assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty); assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
assert_eq!(controller.nodes.len(), 0); assert_eq!(controller.nodes.len(), 0);
assert_eq!(controller.num_in, 0); assert_eq!(controller.num_in, 0);
@@ -1512,7 +1529,7 @@ mod tests {
peer_store.expect_outgoing_candidates().once().return_const(Vec::new()); peer_store.expect_outgoing_candidates().once().return_const(Vec::new());
let (_handle, mut controller) = let (_handle, mut controller) =
ProtocolController::new(SetId(0), config, tx, Box::new(peer_store)); ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
// Connect `reserved1` as inbound & `reserved2` as outbound. // Connect `reserved1` as inbound & `reserved2` as outbound.
controller.on_incoming_connection(reserved1, IncomingIndex(1)); controller.on_incoming_connection(reserved1, IncomingIndex(1));
@@ -1523,7 +1540,7 @@ mod tests {
} }
assert_eq!(messages.len(), 2); assert_eq!(messages.len(), 2);
assert!(messages.contains(&Message::Accept(IncomingIndex(1)))); assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
assert!(messages.contains(&Message::Connect { set_id: SetId(0), peer_id: reserved2 })); assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
assert!(matches!( assert!(matches!(
controller.reserved_nodes.get(&reserved1), controller.reserved_nodes.get(&reserved1),
Some(PeerState::Connected(Direction::Inbound)) Some(PeerState::Connected(Direction::Inbound))
@@ -1570,7 +1587,7 @@ mod tests {
peer_store.expect_report_disconnect().times(2).return_const(()); peer_store.expect_report_disconnect().times(2).return_const(());
let (_handle, mut controller) = let (_handle, mut controller) =
ProtocolController::new(SetId(0), config, tx, Box::new(peer_store)); ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
// Connect `peer1` as outbound & `peer2` as inbound. // Connect `peer1` as outbound & `peer2` as inbound.
controller.alloc_slots(); controller.alloc_slots();
@@ -1580,7 +1597,7 @@ mod tests {
messages.push(message); messages.push(message);
} }
assert_eq!(messages.len(), 2); assert_eq!(messages.len(), 2);
assert!(messages.contains(&Message::Connect { set_id: SetId(0), peer_id: peer1 })); assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
assert!(messages.contains(&Message::Accept(IncomingIndex(1)))); assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
assert_eq!(controller.nodes.len(), 2); assert_eq!(controller.nodes.len(), 2);
assert!(matches!(controller.nodes.get(&peer1), Some(Direction::Outbound))); assert!(matches!(controller.nodes.get(&peer1), Some(Direction::Outbound)));
@@ -1622,7 +1639,7 @@ mod tests {
peer_store.expect_outgoing_candidates().once().return_const(Vec::new()); peer_store.expect_outgoing_candidates().once().return_const(Vec::new());
let (_handle, mut controller) = let (_handle, mut controller) =
ProtocolController::new(SetId(0), config, tx, Box::new(peer_store)); ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
// Connect `reserved1` as inbound & `reserved2` as outbound. // Connect `reserved1` as inbound & `reserved2` as outbound.
controller.on_incoming_connection(reserved1, IncomingIndex(1)); controller.on_incoming_connection(reserved1, IncomingIndex(1));
@@ -1633,7 +1650,7 @@ mod tests {
} }
assert_eq!(messages.len(), 2); assert_eq!(messages.len(), 2);
assert!(messages.contains(&Message::Accept(IncomingIndex(1)))); assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
assert!(messages.contains(&Message::Connect { set_id: SetId(0), peer_id: reserved2 })); assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
assert!(matches!( assert!(matches!(
controller.reserved_nodes.get(&reserved1), controller.reserved_nodes.get(&reserved1),
Some(PeerState::Connected(Direction::Inbound)) Some(PeerState::Connected(Direction::Inbound))
@@ -1683,7 +1700,7 @@ mod tests {
peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates); peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
let (_handle, mut controller) = let (_handle, mut controller) =
ProtocolController::new(SetId(0), config, tx, Box::new(peer_store)); ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
assert_eq!(controller.num_out, 0); assert_eq!(controller.num_out, 0);
assert_eq!(controller.num_in, 0); assert_eq!(controller.num_in, 0);
@@ -1691,7 +1708,7 @@ mod tests {
controller.alloc_slots(); controller.alloc_slots();
assert_eq!( assert_eq!(
rx.try_recv().ok().unwrap(), rx.try_recv().ok().unwrap(),
Message::Connect { set_id: SetId(0), peer_id: regular1 } Message::Connect { set_id: SetId::from(0), peer_id: regular1 }
); );
assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty); assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
assert!(matches!(controller.nodes.get(&regular1).unwrap(), Direction::Outbound,)); assert!(matches!(controller.nodes.get(&regular1).unwrap(), Direction::Outbound,));
@@ -1737,7 +1754,7 @@ mod tests {
peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates); peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
let (_handle, mut controller) = let (_handle, mut controller) =
ProtocolController::new(SetId(0), config, tx, Box::new(peer_store)); ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
assert_eq!(controller.num_out, 0); assert_eq!(controller.num_out, 0);
assert_eq!(controller.num_in, 0); assert_eq!(controller.num_in, 0);
@@ -1745,7 +1762,7 @@ mod tests {
controller.alloc_slots(); controller.alloc_slots();
assert_eq!( assert_eq!(
rx.try_recv().ok().unwrap(), rx.try_recv().ok().unwrap(),
Message::Connect { set_id: SetId(0), peer_id: regular1 } Message::Connect { set_id: SetId::from(0), peer_id: regular1 }
); );
assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty); assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
assert!(matches!(controller.nodes.get(&regular1).unwrap(), Direction::Outbound,)); assert!(matches!(controller.nodes.get(&regular1).unwrap(), Direction::Outbound,));
@@ -1790,7 +1807,7 @@ mod tests {
peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates); peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
let (_handle, mut controller) = let (_handle, mut controller) =
ProtocolController::new(SetId(0), config, tx, Box::new(peer_store)); ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
assert_eq!(controller.num_out, 0); assert_eq!(controller.num_out, 0);
assert_eq!(controller.num_in, 0); assert_eq!(controller.num_in, 0);
@@ -1798,7 +1815,7 @@ mod tests {
controller.alloc_slots(); controller.alloc_slots();
assert_eq!( assert_eq!(
rx.try_recv().ok().unwrap(), rx.try_recv().ok().unwrap(),
Message::Connect { set_id: SetId(0), peer_id: regular1 } Message::Connect { set_id: SetId::from(0), peer_id: regular1 }
); );
assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty); assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
assert!(matches!(controller.nodes.get(&regular1).unwrap(), Direction::Outbound,)); assert!(matches!(controller.nodes.get(&regular1).unwrap(), Direction::Outbound,));
@@ -1843,7 +1860,7 @@ mod tests {
peer_store.expect_is_banned().once().return_const(false); peer_store.expect_is_banned().once().return_const(false);
let (_handle, mut controller) = let (_handle, mut controller) =
ProtocolController::new(SetId(0), config, tx, Box::new(peer_store)); ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
// Connect `peer1` as inbound. // Connect `peer1` as inbound.
controller.on_incoming_connection(peer1, IncomingIndex(1)); controller.on_incoming_connection(peer1, IncomingIndex(1));
@@ -1874,7 +1891,7 @@ mod tests {
peer_store.expect_is_banned().once().return_const(true); peer_store.expect_is_banned().once().return_const(true);
let (_handle, mut controller) = let (_handle, mut controller) =
ProtocolController::new(SetId(0), config, tx, Box::new(peer_store)); ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
// Incoming request. // Incoming request.
controller.on_incoming_connection(peer1, IncomingIndex(1)); controller.on_incoming_connection(peer1, IncomingIndex(1));
@@ -1900,7 +1917,7 @@ mod tests {
peer_store.expect_is_banned().once().return_const(true); peer_store.expect_is_banned().once().return_const(true);
let (_handle, mut controller) = let (_handle, mut controller) =
ProtocolController::new(SetId(0), config, tx, Box::new(peer_store)); ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
assert!(controller.reserved_nodes.contains_key(&reserved1)); assert!(controller.reserved_nodes.contains_key(&reserved1));
// Incoming request. // Incoming request.
@@ -1928,7 +1945,7 @@ mod tests {
peer_store.expect_outgoing_candidates().once().return_const(Vec::new()); peer_store.expect_outgoing_candidates().once().return_const(Vec::new());
let (_handle, mut controller) = let (_handle, mut controller) =
ProtocolController::new(SetId(0), config, tx, Box::new(peer_store)); ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
assert!(matches!(controller.reserved_nodes.get(&reserved1), Some(PeerState::NotConnected))); assert!(matches!(controller.reserved_nodes.get(&reserved1), Some(PeerState::NotConnected)));
// Initiate connectios // Initiate connectios
@@ -34,7 +34,9 @@
//! - If provided, a ["requests processing"](ProtocolConfig::inbound_queue) channel //! - If provided, a ["requests processing"](ProtocolConfig::inbound_queue) channel
//! is used to handle incoming requests. //! is used to handle incoming requests.
use crate::{types::ProtocolName, ReputationChange}; use crate::{
peer_store::BANNED_THRESHOLD, peerset::PeersetHandle, types::ProtocolName, ReputationChange,
};
use futures::{channel::oneshot, prelude::*}; use futures::{channel::oneshot, prelude::*};
use libp2p::{ use libp2p::{
@@ -49,8 +51,6 @@ use libp2p::{
PeerId, PeerId,
}; };
use sc_peerset::{PeersetHandle, BANNED_THRESHOLD};
use std::{ use std::{
collections::{hash_map::Entry, HashMap}, collections::{hash_map::Entry, HashMap},
io, iter, io, iter,
@@ -1040,6 +1040,7 @@ impl Codec for GenericCodec {
mod tests { mod tests {
use super::*; use super::*;
use crate::peerset::{Peerset, PeersetConfig, SetConfig};
use futures::{channel::oneshot, executor::LocalPool, task::Spawn}; use futures::{channel::oneshot, executor::LocalPool, task::Spawn};
use libp2p::{ use libp2p::{
core::{ core::{
@@ -1051,7 +1052,6 @@ mod tests {
swarm::{Executor, Swarm, SwarmBuilder, SwarmEvent}, swarm::{Executor, Swarm, SwarmBuilder, SwarmEvent},
Multiaddr, Multiaddr,
}; };
use sc_peerset::{Peerset, PeersetConfig, SetConfig};
use std::{iter, time::Duration}; use std::{iter, time::Duration};
struct TokioExecutor(tokio::runtime::Runtime); struct TokioExecutor(tokio::runtime::Runtime);
+1 -1
View File
@@ -36,6 +36,7 @@ use crate::{
network_state::{ network_state::{
NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer, NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
}, },
peerset::PeersetHandle,
protocol::{self, NotifsHandlerError, Protocol, Ready}, protocol::{self, NotifsHandlerError, Protocol, Ready},
request_responses::{IfDisconnected, RequestFailure}, request_responses::{IfDisconnected, RequestFailure},
service::{ service::{
@@ -73,7 +74,6 @@ use metrics::{Histogram, HistogramVec, MetricSources, Metrics};
use parking_lot::Mutex; use parking_lot::Mutex;
use sc_network_common::ExHashT; use sc_network_common::ExHashT;
use sc_peerset::PeersetHandle;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_runtime::traits::Block as BlockT; use sp_runtime::traits::Block as BlockT;
@@ -24,13 +24,12 @@ use crate::{
request_responses::{IfDisconnected, RequestFailure}, request_responses::{IfDisconnected, RequestFailure},
service::signature::Signature, service::signature::Signature,
types::ProtocolName, types::ProtocolName,
ReputationChange,
}; };
use futures::{channel::oneshot, Stream}; use futures::{channel::oneshot, Stream};
use libp2p::{Multiaddr, PeerId}; use libp2p::{Multiaddr, PeerId};
use sc_peerset::ReputationChange;
use std::{collections::HashSet, future::Future, pin::Pin, sync::Arc}; use std::{collections::HashSet, future::Future, pin::Pin, sync::Arc};
pub use libp2p::{identity::SigningError, kad::record::Key as KademliaKey}; pub use libp2p::{identity::SigningError, kad::record::Key as KademliaKey};
@@ -23,7 +23,6 @@ pin-project = "1.0.12"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../../utils/prometheus" } prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../../utils/prometheus" }
sc-network-common = { version = "0.10.0-dev", path = "../common" } sc-network-common = { version = "0.10.0-dev", path = "../common" }
sc-network = { version = "0.10.0-dev", path = "../" } sc-network = { version = "0.10.0-dev", path = "../" }
sc-peerset = { version = "4.0.0-dev", path = "../../peerset" }
sp-runtime = { version = "8.0.0", path = "../../../primitives/runtime" } sp-runtime = { version = "8.0.0", path = "../../../primitives/runtime" }
sp-consensus = { version = "0.10.0-dev", path = "../../../primitives/consensus/common" } sp-consensus = { version = "0.10.0-dev", path = "../../../primitives/consensus/common" }
sp-statement-store = { version = "4.0.0-dev", path = "../../../primitives/statement-store" } sp-statement-store = { version = "4.0.0-dev", path = "../../../primitives/statement-store" }
@@ -62,7 +62,7 @@ pub type Statements = Vec<Statement>;
pub type StatementImportFuture = oneshot::Receiver<SubmitResult>; pub type StatementImportFuture = oneshot::Receiver<SubmitResult>;
mod rep { mod rep {
use sc_peerset::ReputationChange as Rep; use sc_network::ReputationChange as Rep;
/// Reputation change when a peer sends us any statement. /// Reputation change when a peer sends us any statement.
/// ///
/// This forces node to verify it, thus the negative value here. Once statement is verified, /// This forces node to verify it, thus the negative value here. Once statement is verified,
-1
View File
@@ -35,7 +35,6 @@ sc-client-api = { version = "4.0.0-dev", path = "../../api" }
sc-consensus = { version = "0.10.0-dev", path = "../../consensus/common" } sc-consensus = { version = "0.10.0-dev", path = "../../consensus/common" }
sc-network = { version = "0.10.0-dev", path = "../" } sc-network = { version = "0.10.0-dev", path = "../" }
sc-network-common = { version = "0.10.0-dev", path = "../common" } sc-network-common = { version = "0.10.0-dev", path = "../common" }
sc-peerset = { version = "4.0.0-dev", path = "../../peerset" }
sc-utils = { version = "4.0.0-dev", path = "../../utils" } sc-utils = { version = "4.0.0-dev", path = "../../utils" }
sp-arithmetic = { version = "7.0.0", path = "../../../primitives/arithmetic" } sp-arithmetic = { version = "7.0.0", path = "../../../primitives/arithmetic" }
sp-blockchain = { version = "4.0.0-dev", path = "../../../primitives/blockchain" } sp-blockchain = { version = "4.0.0-dev", path = "../../../primitives/blockchain" }
@@ -54,7 +54,7 @@ const MAX_BODY_BYTES: usize = 8 * 1024 * 1024;
const MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER: usize = 2; const MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER: usize = 2;
mod rep { mod rep {
use sc_peerset::ReputationChange as Rep; use sc_network::ReputationChange as Rep;
/// Reputation change when a peer sent us the same request multiple times. /// Reputation change when a peer sent us the same request multiple times.
pub const SAME_REQUEST: Rep = Rep::new_fatal("Same block request multiple times"); pub const SAME_REQUEST: Rep = Rep::new_fatal("Same block request multiple times");
+3 -3
View File
@@ -40,7 +40,7 @@ use sc_network::{
FullNetworkConfiguration, NonDefaultSetConfig, ProtocolId, SyncMode as SyncOperationMode, FullNetworkConfiguration, NonDefaultSetConfig, ProtocolId, SyncMode as SyncOperationMode,
}, },
utils::LruHashSet, utils::LruHashSet,
NotificationsSink, ProtocolName, NotificationsSink, ProtocolName, ReputationChange,
}; };
use sc_network_common::{ use sc_network_common::{
role::Roles, role::Roles,
@@ -94,7 +94,7 @@ const INACTIVITY_EVICT_THRESHOLD: Duration = Duration::from_secs(30);
const INITIAL_EVICTION_WAIT_PERIOD: Duration = Duration::from_secs(2 * 60); const INITIAL_EVICTION_WAIT_PERIOD: Duration = Duration::from_secs(2 * 60);
mod rep { mod rep {
use sc_peerset::ReputationChange as Rep; use sc_network::ReputationChange as Rep;
/// Peer has different genesis. /// Peer has different genesis.
pub const GENESIS_MISMATCH: Rep = Rep::new_fatal("Genesis mismatch"); pub const GENESIS_MISMATCH: Rep = Rep::new_fatal("Genesis mismatch");
/// Peer send us a block announcement that failed at validation. /// Peer send us a block announcement that failed at validation.
@@ -719,7 +719,7 @@ where
.disconnect_peer(peer, self.block_announce_protocol_name.clone()); .disconnect_peer(peer, self.block_announce_protocol_name.clone());
self.network_service.report_peer( self.network_service.report_peer(
peer, peer,
sc_peerset::ReputationChange::new_fatal("Invalid justification"), ReputationChange::new_fatal("Invalid justification"),
); );
} }
}, },
+1 -1
View File
@@ -148,7 +148,7 @@ const MAX_BLOCK_ANNOUNCE_SIZE: u64 = 1024 * 1024;
pub(crate) const MAX_BLOCKS_IN_RESPONSE: usize = 128; pub(crate) const MAX_BLOCKS_IN_RESPONSE: usize = 128;
mod rep { mod rep {
use sc_peerset::ReputationChange as Rep; use sc_network::ReputationChange as Rep;
/// Reputation change when a peer sent us a message that led to a /// Reputation change when a peer sent us a message that led to a
/// database read error. /// database read error.
pub const BLOCKCHAIN_READ_ERROR: Rep = Rep::new(-(1 << 16), "DB Error"); pub const BLOCKCHAIN_READ_ERROR: Rep = Rep::new(-(1 << 16), "DB Error");
@@ -25,9 +25,8 @@ use sc_network::{
request_responses::{IfDisconnected, RequestFailure}, request_responses::{IfDisconnected, RequestFailure},
types::ProtocolName, types::ProtocolName,
NetworkNotification, NetworkPeers, NetworkRequest, NetworkSyncForkRequest, NetworkNotification, NetworkPeers, NetworkRequest, NetworkSyncForkRequest,
NotificationSenderError, NotificationSenderT, NotificationSenderError, NotificationSenderT, ReputationChange,
}; };
use sc_peerset::ReputationChange;
use sp_runtime::traits::{Block as BlockT, NumberFor}; use sp_runtime::traits::{Block as BlockT, NumberFor};
use std::collections::HashSet; use std::collections::HashSet;
@@ -22,9 +22,8 @@ use libp2p::PeerId;
use sc_network::{ use sc_network::{
request_responses::{IfDisconnected, RequestFailure}, request_responses::{IfDisconnected, RequestFailure},
types::ProtocolName, types::ProtocolName,
NetworkNotification, NetworkPeers, NetworkRequest, NetworkNotification, NetworkPeers, NetworkRequest, ReputationChange,
}; };
use sc_peerset::ReputationChange;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use std::sync::Arc; use std::sync::Arc;
@@ -159,7 +158,7 @@ mod tests {
let peer = PeerId::random(); let peer = PeerId::random();
let proto = ProtocolName::from("test-protocol"); let proto = ProtocolName::from("test-protocol");
let proto_clone = proto.clone(); let proto_clone = proto.clone();
let change = sc_peerset::ReputationChange::new_fatal("test-change"); let change = sc_network::ReputationChange::new_fatal("test-change");
let mut mock_network = MockNetwork::new(); let mut mock_network = MockNetwork::new();
mock_network mock_network
@@ -45,7 +45,7 @@ const MAX_RESPONSE_BYTES: usize = 2 * 1024 * 1024; // Actual reponse may be bigg
const MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER: usize = 2; const MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER: usize = 2;
mod rep { mod rep {
use sc_peerset::ReputationChange as Rep; use sc_network::ReputationChange as Rep;
/// Reputation change when a peer sent us the same request multiple times. /// Reputation change when a peer sent us the same request multiple times.
pub const SAME_REQUEST: Rep = Rep::new(i32::MIN, "Same state request multiple times"); pub const SAME_REQUEST: Rep = Rep::new(i32::MIN, "Same state request multiple times");
@@ -21,7 +21,6 @@ log = "0.4.17"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../../utils/prometheus" } prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../../utils/prometheus" }
sc-network = { version = "0.10.0-dev", path = "../" } sc-network = { version = "0.10.0-dev", path = "../" }
sc-network-common = { version = "0.10.0-dev", path = "../common" } sc-network-common = { version = "0.10.0-dev", path = "../common" }
sc-peerset = { version = "4.0.0-dev", path = "../../peerset" }
sc-utils = { version = "4.0.0-dev", path = "../../utils" } sc-utils = { version = "4.0.0-dev", path = "../../utils" }
sp-runtime = { version = "8.0.0", path = "../../../primitives/runtime" } sp-runtime = { version = "8.0.0", path = "../../../primitives/runtime" }
sp-consensus = { version = "0.10.0-dev", path = "../../../primitives/consensus/common" } sp-consensus = { version = "0.10.0-dev", path = "../../../primitives/consensus/common" }
@@ -65,7 +65,7 @@ pub mod config;
pub type Transactions<E> = Vec<E>; pub type Transactions<E> = Vec<E>;
mod rep { mod rep {
use sc_peerset::ReputationChange as Rep; use sc_network::ReputationChange as Rep;
/// Reputation change when a peer sends us any transaction. /// Reputation change when a peer sends us any transaction.
/// ///
/// This forces node to verify it, thus the negative value here. Once transaction is verified, /// This forces node to verify it, thus the negative value here. Once transaction is verified,
-1
View File
@@ -31,7 +31,6 @@ tracing = "0.1.29"
sc-client-api = { version = "4.0.0-dev", path = "../api" } sc-client-api = { version = "4.0.0-dev", path = "../api" }
sc-network = { version = "0.10.0-dev", path = "../network" } sc-network = { version = "0.10.0-dev", path = "../network" }
sc-network-common = { version = "0.10.0-dev", path = "../network/common" } sc-network-common = { version = "0.10.0-dev", path = "../network/common" }
sc-peerset = { version = "4.0.0-dev", path = "../peerset" }
sc-utils = { version = "4.0.0-dev", path = "../utils" } sc-utils = { version = "4.0.0-dev", path = "../utils" }
sp-api = { version = "4.0.0-dev", path = "../../primitives/api" } sp-api = { version = "4.0.0-dev", path = "../../primitives/api" }
sp-core = { version = "8.0.0", path = "../../primitives/core" } sp-core = { version = "8.0.0", path = "../../primitives/core" }
+1 -1
View File
@@ -328,8 +328,8 @@ mod tests {
use sc_client_db::offchain::LocalStorage; use sc_client_db::offchain::LocalStorage;
use sc_network::{ use sc_network::{
config::MultiaddrWithPeerId, types::ProtocolName, NetworkPeers, NetworkStateInfo, config::MultiaddrWithPeerId, types::ProtocolName, NetworkPeers, NetworkStateInfo,
ReputationChange,
}; };
use sc_peerset::ReputationChange;
use sp_core::offchain::{DbExternalities, Externalities}; use sp_core::offchain::{DbExternalities, Externalities};
use std::time::SystemTime; use std::time::SystemTime;
+1 -2
View File
@@ -246,8 +246,7 @@ mod tests {
use libp2p::{Multiaddr, PeerId}; use libp2p::{Multiaddr, PeerId};
use sc_block_builder::BlockBuilderProvider as _; use sc_block_builder::BlockBuilderProvider as _;
use sc_client_api::Backend as _; use sc_client_api::Backend as _;
use sc_network::{config::MultiaddrWithPeerId, types::ProtocolName}; use sc_network::{config::MultiaddrWithPeerId, types::ProtocolName, ReputationChange};
use sc_peerset::ReputationChange;
use sc_transaction_pool::{BasicPool, FullChainApi}; use sc_transaction_pool::{BasicPool, FullChainApi};
use sc_transaction_pool_api::{InPoolTransaction, TransactionPool}; use sc_transaction_pool_api::{InPoolTransaction, TransactionPool};
use sp_consensus::BlockOrigin; use sp_consensus::BlockOrigin;
-30
View File
@@ -1,30 +0,0 @@
[package]
description = "Connectivity manager based on reputation"
homepage = "https://substrate.io"
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
name = "sc-peerset"
version = "4.0.0-dev"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2021"
repository = "https://github.com/paritytech/substrate/"
documentation = "https://docs.rs/sc-peerset"
readme = "README.md"
[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
futures = "0.3.21"
libp2p-identity = { version = "0.1.2", features = ["peerid", "ed25519"] }
log = "0.4.17"
parking_lot = "0.12.1"
partial_sort = "0.2.0"
serde_json = "1.0.85"
wasm-timer = "0.2"
sc-utils = { version = "4.0.0-dev", path = "../utils" }
sp-arithmetic = { version = "7.0.0", path = "../../primitives/arithmetic" }
[dev-dependencies]
sp-tracing = { version = "7.0.0", path = "../../primitives/tracing" }
mockall = "0.11.3"
rand = "0.8.5"
-4
View File
@@ -1,4 +0,0 @@
Peer Set Manager (PSM). Contains the strategy for choosing which nodes the network should be
connected to.
License: GPL-3.0-or-later WITH Classpath-exception-2.0