Split the Roles in three types (#5520)

* Split the Roles bitfield in three

* Forgot to include some changes

* Fix cli test

* More test fixes

* Oh God, merging master broke other tests

* Didn't run the doctests

* Address review

* I'm trying to fix the build blindly because it's taking a good hour to compile on my machine

* Address some review

* Also update the peerset's API to make sense

* Fix peerset tests

* Fix browser node

* client: distinguish between local and network authority

Co-authored-by: André Silva <andre.beat@gmail.com>
This commit is contained in:
Pierre Krieger
2020-04-03 19:08:14 +02:00
committed by GitHub
parent 9dbcb11f66
commit 8c03a4fcef
44 changed files with 591 additions and 432 deletions
+60 -8
View File
@@ -15,18 +15,19 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use crate::{
config::Role,
debug_info, discovery::DiscoveryBehaviour, discovery::DiscoveryOut,
Event, protocol::event::DhtEvent, ExHashT,
Event, ObservedRole, DhtEvent, ExHashT,
};
use crate::protocol::{self, light_client_handler, CustomMessageOutcome, Protocol};
use crate::protocol::{self, light_client_handler, message::Roles, CustomMessageOutcome, Protocol};
use libp2p::NetworkBehaviour;
use libp2p::core::{Multiaddr, PeerId, PublicKey};
use libp2p::kad::record;
use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters};
use log::debug;
use sp_consensus::{BlockOrigin, import_queue::{IncomingBlock, Origin}};
use sp_runtime::{traits::{Block as BlockT, NumberFor}, Justification};
use std::{iter, task::Context, task::Poll};
use sp_runtime::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId, Justification};
use std::{borrow::Cow, iter, task::Context, task::Poll};
use void;
/// General behaviour of the network. Combines all protocols together.
@@ -44,9 +45,14 @@ pub struct Behaviour<B: BlockT, H: ExHashT> {
block_requests: protocol::BlockRequests<B>,
/// Light client request handling.
light_client_handler: protocol::LightClientHandler<B>,
/// Queue of events to produce for the outside.
#[behaviour(ignore)]
events: Vec<BehaviourOut<B>>,
/// Role of our local node, as originally passed from the configuration.
#[behaviour(ignore)]
role: Role,
}
/// Event generated by `Behaviour`.
@@ -63,6 +69,7 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
/// Builds a new `Behaviour`.
pub async fn new(
substrate: Protocol<B, H>,
role: Role,
user_agent: String,
local_public_key: PublicKey,
known_addresses: Vec<(PeerId, Multiaddr)>,
@@ -84,7 +91,8 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
).await,
block_requests,
light_client_handler,
events: Vec::new()
events: Vec::new(),
role,
}
}
@@ -112,6 +120,32 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
self.debug_info.node(peer_id)
}
/// Registers a new notifications protocol.
///
/// After that, you can call `write_notifications`.
///
/// Please call `event_stream` before registering a protocol, otherwise you may miss events
/// about the protocol that you have registered.
///
/// You are very strongly encouraged to call this method very early on. Any connection open
/// will retain the protocols that were registered then, and not any new one.
pub fn register_notifications_protocol(
&mut self,
engine_id: ConsensusEngineId,
protocol_name: impl Into<Cow<'static, [u8]>>,
) {
let list = self.substrate.register_notifications_protocol(engine_id, protocol_name);
for (remote, roles) in list {
let role = reported_roles_to_observed_role(&self.role, remote, roles);
let ev = Event::NotificationStreamOpened {
remote: remote.clone(),
engine_id,
role,
};
self.events.push(BehaviourOut::Event(ev));
}
}
/// Returns a shared reference to the user protocol.
pub fn user_protocol(&self) -> &Protocol<B, H> {
&self.substrate
@@ -138,6 +172,22 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
}
}
fn reported_roles_to_observed_role(local_role: &Role, remote: &PeerId, roles: Roles) -> ObservedRole {
if roles.is_authority() {
match local_role {
Role::Authority { sentry_nodes }
if sentry_nodes.iter().any(|s| s.peer_id == *remote) => ObservedRole::OurSentry,
Role::Sentry { validators }
if validators.iter().any(|s| s.peer_id == *remote) => ObservedRole::OurGuardedAuthority,
_ => ObservedRole::Authority
}
} else if roles.is_full() {
ObservedRole::Full
} else {
ObservedRole::Light
}
}
impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<void::Void> for
Behaviour<B, H> {
fn inject_event(&mut self, event: void::Void) {
@@ -155,14 +205,16 @@ Behaviour<B, H> {
self.events.push(BehaviourOut::JustificationImport(origin, hash, nb, justification)),
CustomMessageOutcome::FinalityProofImport(origin, hash, nb, proof) =>
self.events.push(BehaviourOut::FinalityProofImport(origin, hash, nb, proof)),
CustomMessageOutcome::NotificationStreamOpened { remote, protocols, roles } =>
CustomMessageOutcome::NotificationStreamOpened { remote, protocols, roles } => {
let role = reported_roles_to_observed_role(&self.role, &remote, roles);
for engine_id in protocols {
self.events.push(BehaviourOut::Event(Event::NotificationStreamOpened {
remote: remote.clone(),
engine_id,
roles,
role: role.clone(),
}));
},
}
},
CustomMessageOutcome::NotificationStreamClosed { remote, protocols } =>
for engine_id in protocols {
self.events.push(BehaviourOut::Event(Event::NotificationStreamClosed {
+98 -48
View File
@@ -31,23 +31,21 @@ pub use crate::protocol::ProtocolConfig;
use crate::service::ExHashT;
use bitflags::bitflags;
use sp_consensus::{block_validation::BlockAnnounceValidator, import_queue::ImportQueue};
use sp_runtime::traits::{Block as BlockT};
use libp2p::identity::{Keypair, ed25519};
use libp2p::wasm_ext;
use libp2p::{PeerId, Multiaddr, multiaddr};
use core::{fmt, iter};
use std::{future::Future, pin::Pin};
use std::{convert::TryFrom, future::Future, pin::Pin, str::FromStr};
use std::{error::Error, fs, io::{self, Write}, net::Ipv4Addr, path::{Path, PathBuf}, sync::Arc};
use zeroize::Zeroize;
use prometheus_endpoint::Registry;
/// Network initialization parameters.
pub struct Params<B: BlockT, H: ExHashT> {
/// Assigned roles for our node (full, light, ...).
pub roles: Roles,
/// Assigned role for our node (full, light, ...).
pub role: Role,
/// How to spawn background tasks. If you pass `None`, then a threads pool will be used by
/// default.
@@ -97,54 +95,48 @@ pub struct Params<B: BlockT, H: ExHashT> {
pub metrics_registry: Option<Registry>,
}
bitflags! {
/// Bitmask of the roles that a node fulfills.
pub struct Roles: u8 {
/// No network.
const NONE = 0b00000000;
/// Full node, does not participate in consensus.
const FULL = 0b00000001;
/// Light client node.
const LIGHT = 0b00000010;
/// Act as an authority
const AUTHORITY = 0b00000100;
/// Role of the local node.
#[derive(Debug, Clone)]
pub enum Role {
/// Regular full node.
Full,
/// Regular light node.
Light,
/// Sentry node that guards an authority. Will be reported as "authority" on the wire protocol.
Sentry {
/// Address and identity of the validator nodes that we're guarding.
///
/// The nodes will be granted some priviledged status.
validators: Vec<MultiaddrWithPeerId>,
},
/// Actual authority.
Authority {
/// List of public addresses and identities of our sentry nodes.
sentry_nodes: Vec<MultiaddrWithPeerId>,
}
}
impl Roles {
/// Does this role represents a client that holds full chain data locally?
pub fn is_full(&self) -> bool {
self.intersects(Roles::FULL | Roles::AUTHORITY)
}
/// Does this role represents a client that does not participates in the consensus?
impl Role {
/// True for `Role::Authority`
pub fn is_authority(&self) -> bool {
*self == Roles::AUTHORITY
matches!(self, Role::Authority { .. })
}
/// Does this role represents a client that does not hold full chain data locally?
pub fn is_light(&self) -> bool {
!self.is_full()
/// True for `Role::Authority` and `Role::Sentry` since they're both
/// announced as having the authority role to the network.
pub fn is_network_authority(&self) -> bool {
matches!(self, Role::Authority { .. } | Role::Sentry { .. })
}
}
impl fmt::Display for Roles {
impl fmt::Display for Role {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}
impl codec::Encode for Roles {
fn encode_to<T: codec::Output>(&self, dest: &mut T) {
dest.push_byte(self.bits())
}
}
impl codec::EncodeLike for Roles {}
impl codec::Decode for Roles {
fn decode<I: codec::Input>(input: &mut I) -> Result<Self, codec::Error> {
Self::from_bits(input.read_byte()?).ok_or_else(|| codec::Error::from("Invalid bytes"))
match self {
Role::Full => write!(f, "FULL"),
Role::Light => write!(f, "LIGHT"),
Role::Sentry { .. } => write!(f, "SENTRY"),
Role::Authority { .. } => write!(f, "AUTHORITY"),
}
}
}
@@ -214,6 +206,67 @@ pub fn parse_addr(mut addr: Multiaddr)-> Result<(PeerId, Multiaddr), ParseErr> {
Ok((who, addr))
}
/// Address of a node, including its identity.
///
/// This struct represents a decoded version of a multiaddress that ends with `/p2p/<peerid>`.
///
/// # Example
///
/// ```
/// # use sc_network::{Multiaddr, PeerId, config::MultiaddrWithPeerId};
/// let addr: MultiaddrWithPeerId =
/// "/ip4/198.51.100.19/tcp/30333/p2p/QmSk5HQbn6LhUwDiNMseVUjuRYhEtYj4aUZ6WfWoGURpdV".parse().unwrap();
/// assert_eq!(addr.peer_id.to_base58(), "QmSk5HQbn6LhUwDiNMseVUjuRYhEtYj4aUZ6WfWoGURpdV");
/// assert_eq!(addr.multiaddr.to_string(), "/ip4/198.51.100.19/tcp/30333");
/// ```
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(try_from = "String", into = "String")]
pub struct MultiaddrWithPeerId {
/// Address of the node.
pub multiaddr: Multiaddr,
/// Its identity.
pub peer_id: PeerId,
}
impl MultiaddrWithPeerId {
/// Concatenates the multiaddress and peer ID into one multiaddress containing both.
pub fn concat(&self) -> Multiaddr {
let proto = multiaddr::Protocol::P2p(From::from(self.peer_id.clone()));
self.multiaddr.clone().with(proto)
}
}
impl fmt::Display for MultiaddrWithPeerId {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Display::fmt(&self.concat(), f)
}
}
impl FromStr for MultiaddrWithPeerId {
type Err = ParseErr;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let (peer_id, multiaddr) = parse_str_addr(s)?;
Ok(MultiaddrWithPeerId {
peer_id,
multiaddr,
})
}
}
impl From<MultiaddrWithPeerId> for String {
fn from(ma: MultiaddrWithPeerId) -> String {
format!("{}", ma)
}
}
impl TryFrom<String> for MultiaddrWithPeerId {
type Error = ParseErr;
fn try_from(string: String) -> Result<Self, Self::Error> {
string.parse()
}
}
/// Error that can be generated by `parse_str_addr`.
#[derive(Debug)]
pub enum ParseErr {
@@ -263,7 +316,7 @@ pub struct NetworkConfiguration {
/// Multiaddresses to advertise. Detected automatically if empty.
pub public_addresses: Vec<Multiaddr>,
/// List of initial node addresses
pub boot_nodes: Vec<String>,
pub boot_nodes: Vec<MultiaddrWithPeerId>,
/// The node key configuration, which determines the node's network identity keypair.
pub node_key: NodeKeyConfig,
/// Maximum allowed number of incoming connections.
@@ -271,11 +324,9 @@ pub struct NetworkConfiguration {
/// Number of outgoing connections we're trying to maintain.
pub out_peers: u32,
/// List of reserved node addresses.
pub reserved_nodes: Vec<String>,
pub reserved_nodes: Vec<MultiaddrWithPeerId>,
/// The non-reserved peer mode.
pub non_reserved_mode: NonReservedPeerMode,
/// List of sentry node public addresses.
pub sentry_nodes: Vec<String>,
/// Client identifier. Sent over the wire for debugging purposes.
pub client_version: String,
/// Name of the node. Sent over the wire for debugging purposes.
@@ -299,7 +350,6 @@ impl Default for NetworkConfiguration {
out_peers: 75,
reserved_nodes: Vec::new(),
non_reserved_mode: NonReservedPeerMode::Accept,
sentry_nodes: Vec::new(),
client_version: "unknown".into(),
node_name: "unknown".into(),
transport: TransportConfig::Normal {
+1 -1
View File
@@ -248,7 +248,7 @@ pub mod network_state;
pub use service::{NetworkService, NetworkStateInfo, NetworkWorker, ExHashT, ReportHandle};
pub use protocol::PeerInfo;
pub use protocol::event::{Event, DhtEvent};
pub use protocol::event::{Event, DhtEvent, ObservedRole};
pub use protocol::sync::SyncState;
pub use libp2p::{Multiaddr, PeerId};
#[doc(inline)]
+13 -23
View File
@@ -39,11 +39,11 @@ use sp_runtime::traits::{
};
use sp_arithmetic::traits::SaturatedConversion;
use message::{BlockAnnounce, Message};
use message::generic::{Message as GenericMessage, ConsensusMessage};
use message::generic::{Message as GenericMessage, ConsensusMessage, Roles};
use prometheus_endpoint::{Registry, Gauge, GaugeVec, HistogramVec, PrometheusError, Opts, register, U64};
use sync::{ChainSync, SyncState};
use crate::service::{TransactionPool, ExHashT};
use crate::config::{BoxFinalityProofRequestBuilder, Roles};
use crate::config::BoxFinalityProofRequestBuilder;
use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
use std::sync::Arc;
@@ -338,7 +338,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
let important_peers = {
let mut imp_p = HashSet::new();
for reserved in &peerset_config.reserved_nodes {
for reserved in peerset_config.priority_groups.iter().flat_map(|(_, l)| l.iter()) {
imp_p.insert(reserved.clone());
}
imp_p.shrink_to_fit();
@@ -1033,13 +1033,14 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
/// Registers a new notifications protocol.
///
/// You are very strongly encouraged to call this method very early on. Any connection open
/// will retain the protocols that were registered then, and not any new one.
pub fn register_notifications_protocol(
&mut self,
/// While registering a protocol while we already have open connections is discouraged, we
/// nonetheless handle it by notifying that we opened channels with everyone. This function
/// returns a list of substreams to open as a result.
pub fn register_notifications_protocol<'a>(
&'a mut self,
engine_id: ConsensusEngineId,
protocol_name: impl Into<Cow<'static, [u8]>>,
) -> Vec<event::Event> {
) -> impl ExactSizeIterator<Item = (&'a PeerId, Roles)> + 'a {
let protocol_name = protocol_name.into();
if self.protocol_name_by_engine.insert(engine_id, protocol_name.clone()).is_some() {
error!(target: "sub-libp2p", "Notifications protocol already registered: {:?}", protocol_name);
@@ -1048,16 +1049,8 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
self.legacy_equiv_by_name.insert(protocol_name, Fallback::Consensus(engine_id));
}
// Registering a protocol while we already have open connections isn't great, but for now
// we handle it by notifying that we opened channels with everyone.
self.context_data.peers.iter()
.map(|(peer_id, peer)|
event::Event::NotificationStreamOpened {
remote: peer_id.clone(),
engine_id,
roles: peer.info.roles,
})
.collect()
.map(|(peer_id, peer)| (peer_id, peer.info.roles))
}
/// Called when peer sends us new extrinsics
@@ -2021,7 +2014,7 @@ impl<B: BlockT, H: ExHashT> Drop for Protocol<B, H> {
#[cfg(test)]
mod tests {
use crate::PeerId;
use crate::config::{EmptyTransactionPool, Roles};
use crate::config::EmptyTransactionPool;
use super::{CustomMessageOutcome, Protocol, ProtocolConfig};
use sp_consensus::block_validation::DefaultBlockAnnounceValidator;
@@ -2034,10 +2027,7 @@ mod tests {
let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0);
let (mut protocol, _) = Protocol::<Block, Hash>::new(
ProtocolConfig {
roles: Roles::FULL,
max_parallel_downloads: 10,
},
ProtocolConfig::default(),
client.clone(),
Arc::new(EmptyTransactionPool),
None,
@@ -2048,7 +2038,7 @@ mod tests {
out_peers: 10,
bootnodes: Vec::new(),
reserved_only: false,
reserved_nodes: Vec::new(),
priority_groups: Vec::new(),
},
Box::new(DefaultBlockAnnounceValidator::new(client.clone())),
None,
+25 -3
View File
@@ -17,7 +17,6 @@
//! Network event types. These are are not the part of the protocol, but rather
//! events that happen on the network like DHT get/put results received.
use crate::config::Roles;
use bytes::Bytes;
use libp2p::core::PeerId;
use libp2p::kad::record::Key;
@@ -55,8 +54,8 @@ pub enum Event {
remote: PeerId,
/// The concerned protocol. Each protocol uses a different substream.
engine_id: ConsensusEngineId,
/// Roles that the remote .
roles: Roles,
/// Role of the remote.
role: ObservedRole,
},
/// Closed a substream with the given node. Always matches a corresponding previous
@@ -76,3 +75,26 @@ pub enum Event {
messages: Vec<(ConsensusEngineId, Bytes)>,
},
}
/// Role that the peer sent to us during the handshake, with the addition of what our local node
/// knows about that peer.
#[derive(Debug, Clone)]
pub enum ObservedRole {
/// Full node.
Full,
/// Light node.
Light,
/// When we are a validator node, this is a sentry that protects us.
OurSentry,
/// When we are a sentry node, this is the authority we are protecting.
OurGuardedAuthority,
/// Third-party authority.
Authority,
}
impl ObservedRole {
/// Returns `true` for `ObservedRole::Light`.
pub fn is_light(&self) -> bool {
matches!(self, ObservedRole::Light)
}
}
@@ -78,7 +78,7 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
vec![]
},
reserved_only: false,
reserved_nodes: Vec::new(),
priority_groups: Vec::new(),
});
let behaviour = CustomProtoWithAddr {
@@ -1423,7 +1423,7 @@ mod tests {
out_peers: 128,
bootnodes: Vec::new(),
reserved_only: false,
reserved_nodes: Vec::new(),
priority_groups: Vec::new(),
};
sc_peerset::Peerset::from_config(cfg)
}
@@ -24,7 +24,7 @@ pub use self::generic::{
RemoteHeaderRequest, RemoteHeaderResponse,
RemoteChangesRequest, RemoteChangesResponse,
FinalityProofRequest, FinalityProofResponse,
FromBlock, RemoteReadChildRequest,
FromBlock, RemoteReadChildRequest, Roles,
};
use sc_client_api::StorageProof;
@@ -137,14 +137,71 @@ pub struct RemoteReadResponse {
/// Generic types.
pub mod generic {
use bitflags::bitflags;
use codec::{Encode, Decode, Input, Output};
use sp_runtime::Justification;
use crate::config::Roles;
use super::{
RemoteReadResponse, Transactions, Direction,
RequestId, BlockAttributes, RemoteCallResponse, ConsensusEngineId,
BlockState, StorageProof,
};
bitflags! {
/// Bitmask of the roles that a node fulfills.
pub struct Roles: u8 {
/// No network.
const NONE = 0b00000000;
/// Full node, does not participate in consensus.
const FULL = 0b00000001;
/// Light client node.
const LIGHT = 0b00000010;
/// Act as an authority
const AUTHORITY = 0b00000100;
}
}
impl Roles {
/// Does this role represents a client that holds full chain data locally?
pub fn is_full(&self) -> bool {
self.intersects(Roles::FULL | Roles::AUTHORITY)
}
/// Does this role represents a client that does not participates in the consensus?
pub fn is_authority(&self) -> bool {
*self == Roles::AUTHORITY
}
/// Does this role represents a client that does not hold full chain data locally?
pub fn is_light(&self) -> bool {
!self.is_full()
}
}
impl<'a> From<&'a crate::config::Role> for Roles {
fn from(roles: &'a crate::config::Role) -> Self {
match roles {
crate::config::Role::Full => Roles::FULL,
crate::config::Role::Light => Roles::LIGHT,
crate::config::Role::Sentry { .. } => Roles::AUTHORITY,
crate::config::Role::Authority { .. } => Roles::AUTHORITY,
}
}
}
impl codec::Encode for Roles {
fn encode_to<T: codec::Output>(&self, dest: &mut T) {
dest.push_byte(self.bits())
}
}
impl codec::EncodeLike for Roles {}
impl codec::Decode for Roles {
fn decode<I: codec::Input>(input: &mut I) -> Result<Self, codec::Error> {
Self::from_bits(input.read_byte()?).ok_or_else(|| codec::Error::from("Invalid bytes"))
}
}
/// Consensus is mostly opaque to us
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub struct ConsensusMessage {
@@ -34,9 +34,9 @@ use sp_consensus::{BlockOrigin, BlockStatus,
import_queue::{IncomingBlock, BlockImportResult, BlockImportError}
};
use crate::{
config::{Roles, BoxFinalityProofRequestBuilder},
config::BoxFinalityProofRequestBuilder,
protocol::message::{self, generic::FinalityProofRequest, BlockAnnounce, BlockAttributes, BlockRequest, BlockResponse,
FinalityProofResponse},
FinalityProofResponse, Roles},
};
use either::Either;
use extra_requests::ExtraRequests;
+37 -24
View File
@@ -27,7 +27,7 @@
use crate::{
behaviour::{Behaviour, BehaviourOut},
config::{parse_addr, parse_str_addr, NonReservedPeerMode, Params, TransportConfig},
config::{parse_addr, parse_str_addr, NonReservedPeerMode, Params, Role, TransportConfig},
error::Error,
network_state::{
NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
@@ -181,19 +181,13 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
// List of multiaddresses that we know in the network.
let mut known_addresses = Vec::new();
let mut bootnodes = Vec::new();
let mut reserved_nodes = Vec::new();
let mut boot_node_ids = HashSet::new();
// Process the bootnodes.
for bootnode in params.network_config.boot_nodes.iter() {
match parse_str_addr(bootnode) {
Ok((peer_id, addr)) => {
bootnodes.push(peer_id.clone());
boot_node_ids.insert(peer_id.clone());
known_addresses.push((peer_id, addr));
},
Err(_) => warn!(target: "sub-libp2p", "Not a valid bootnode address: {}", bootnode),
}
bootnodes.push(bootnode.peer_id.clone());
boot_node_ids.insert(bootnode.peer_id.clone());
known_addresses.push((bootnode.peer_id.clone(), bootnode.multiaddr.clone()));
}
let boot_node_ids = Arc::new(boot_node_ids);
@@ -215,22 +209,43 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
}
)?;
// Initialize the reserved peers.
for reserved in params.network_config.reserved_nodes.iter() {
if let Ok((peer_id, addr)) = parse_str_addr(reserved) {
reserved_nodes.push(peer_id.clone());
known_addresses.push((peer_id, addr));
} else {
warn!(target: "sub-libp2p", "Not a valid reserved node address: {}", reserved);
// Initialize the peers we should always be connected to.
let priority_groups = {
let mut reserved_nodes = HashSet::new();
for reserved in params.network_config.reserved_nodes.iter() {
reserved_nodes.insert(reserved.peer_id.clone());
known_addresses.push((reserved.peer_id.clone(), reserved.multiaddr.clone()));
}
}
let mut sentries_and_validators = HashSet::new();
match &params.role {
Role::Sentry { validators } => {
for validator in validators {
sentries_and_validators.insert(validator.peer_id.clone());
known_addresses.push((validator.peer_id.clone(), validator.multiaddr.clone()));
}
}
Role::Authority { sentry_nodes } => {
for sentry_node in sentry_nodes {
sentries_and_validators.insert(sentry_node.peer_id.clone());
known_addresses.push((sentry_node.peer_id.clone(), sentry_node.multiaddr.clone()));
}
}
_ => {}
}
vec![
("reserved".to_owned(), reserved_nodes),
("sentries_and_validators".to_owned(), sentries_and_validators),
]
};
let peerset_config = sc_peerset::PeersetConfig {
in_peers: params.network_config.in_peers,
out_peers: params.network_config.out_peers,
bootnodes,
reserved_only: params.network_config.non_reserved_mode == NonReservedPeerMode::Deny,
reserved_nodes,
priority_groups,
};
// Private and public keys configuration.
@@ -253,7 +268,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
let is_major_syncing = Arc::new(AtomicBool::new(false));
let (protocol, peerset_handle) = Protocol::new(
protocol::ProtocolConfig {
roles: params.roles,
roles: From::from(&params.role),
max_parallel_downloads: params.network_config.max_parallel_downloads,
},
params.chain.clone(),
@@ -285,6 +300,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
};
let behaviour = futures::executor::block_on(Behaviour::new(
protocol,
params.role,
user_agent,
local_public,
known_addresses,
@@ -971,11 +987,8 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
this.network_service.user_protocol_mut().write_notification(target, engine_id, message)
},
ServiceToWorkerMsg::RegisterNotifProtocol { engine_id, protocol_name } => {
let events = this.network_service.user_protocol_mut()
this.network_service
.register_notifications_protocol(engine_id, protocol_name);
for event in events {
this.event_streams.retain(|sender| sender.unbounded_send(event.clone()).is_ok());
}
},
ServiceToWorkerMsg::DisconnectPeer(who) =>
this.network_service.user_protocol_mut().disconnect_peer(&who),