Remove support for multiple network protocols (#2005)

* Remove support multiple network protocols

* Address concerns

* Add back debug_asserts
This commit is contained in:
Pierre Krieger
2019-03-19 11:56:56 +01:00
committed by GitHub
parent 57387ef585
commit 002143d0a2
11 changed files with 452 additions and 733 deletions
+23 -31
View File
@@ -14,8 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use crate::custom_proto::{CustomProtos, CustomProtosOut, RegisteredProtocols};
use crate::{NetworkConfiguration, ProtocolId};
use crate::custom_proto::{CustomProto, CustomProtoOut, RegisteredProtocol};
use crate::NetworkConfiguration;
use futures::prelude::*;
use libp2p::NetworkBehaviour;
use libp2p::core::{Multiaddr, PeerId, ProtocolsHandler, PublicKey};
@@ -37,7 +37,7 @@ pub struct Behaviour<TMessage, TSubstream> {
/// Periodically ping nodes, and close the connection if it's unresponsive.
ping: Ping<TSubstream>,
/// Custom protocols (dot, bbq, sub, etc.).
custom_protocols: CustomProtos<TMessage, TSubstream>,
custom_protocols: CustomProto<TMessage, TSubstream>,
/// Discovers nodes of the network. Defined below.
discovery: DiscoveryBehaviour<TSubstream>,
/// Periodically identifies the remote and responds to incoming requests.
@@ -51,7 +51,7 @@ pub struct Behaviour<TMessage, TSubstream> {
impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
/// Builds a new `Behaviour`.
// TODO: redundancy between config and local_public_key (https://github.com/libp2p/rust-libp2p/issues/745)
pub fn new(config: &NetworkConfiguration, local_public_key: PublicKey, protocols: RegisteredProtocols<TMessage>) -> Self {
pub fn new(config: &NetworkConfiguration, local_public_key: PublicKey, protocols: RegisteredProtocol<TMessage>) -> Self {
let identify = {
let proto_version = "/substrate/1.0".to_string();
let user_agent = format!("{} ({})", config.client_version, config.node_name);
@@ -59,7 +59,7 @@ impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
};
let local_peer_id = local_public_key.into_peer_id();
let custom_protocols = CustomProtos::new(config, &local_peer_id, protocols);
let custom_protocols = CustomProto::new(config, &local_peer_id, protocols);
Behaviour {
ping: Ping::new(),
@@ -70,15 +70,15 @@ impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
}
}
/// Sends a message to a peer using the given custom protocol.
/// Sends a message to a peer.
///
/// Has no effect if the custom protocol is not open with the given peer.
///
/// Also note that even we have a valid open substream, it may in fact be already closed
/// without us knowing, in which case the packet will not be received.
#[inline]
pub fn send_custom_message(&mut self, target: &PeerId, protocol_id: ProtocolId, data: TMessage) {
self.custom_protocols.send_packet(target, protocol_id, data)
pub fn send_custom_message(&mut self, target: &PeerId, data: TMessage) {
self.custom_protocols.send_packet(target, data)
}
/// Returns the number of peers in the topology.
@@ -149,9 +149,9 @@ impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
self.custom_protocols.is_enabled(peer_id)
}
/// Returns the list of protocols we have open with the given peer.
pub fn open_protocols<'a>(&'a self, peer_id: &'a PeerId) -> impl Iterator<Item = ProtocolId> + 'a {
self.custom_protocols.open_protocols(peer_id)
/// Returns true if we have an open protocol with the given peer.
pub fn is_open(&self, peer_id: &PeerId) -> bool {
self.custom_protocols.is_open(peer_id)
}
/// Disconnects the custom protocols from a peer.
@@ -184,8 +184,6 @@ impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
pub enum BehaviourOut<TMessage> {
/// Opened a custom protocol with the remote.
CustomProtocolOpen {
/// Identifier of the protocol.
protocol_id: ProtocolId,
/// Version of the protocol that has been opened.
version: u8,
/// Id of the node we have opened a connection with.
@@ -198,8 +196,6 @@ pub enum BehaviourOut<TMessage> {
CustomProtocolClosed {
/// Id of the peer we were connected to.
peer_id: PeerId,
/// Identifier of the protocol.
protocol_id: ProtocolId,
/// Reason why the substream closed. If `Ok`, then it's a graceful exit (EOF).
result: io::Result<()>,
},
@@ -208,8 +204,6 @@ pub enum BehaviourOut<TMessage> {
CustomMessage {
/// Id of the peer the message came from.
peer_id: PeerId,
/// Protocol which generated the message.
protocol_id: ProtocolId,
/// Message that has been received.
message: TMessage,
},
@@ -218,8 +212,6 @@ pub enum BehaviourOut<TMessage> {
Clogged {
/// Id of the peer the message came from.
peer_id: PeerId,
/// Protocol which generated the message.
protocol_id: ProtocolId,
/// Copy of the messages that are within the buffer, for further diagnostic.
messages: Vec<TMessage>,
},
@@ -241,20 +233,20 @@ pub enum BehaviourOut<TMessage> {
},
}
impl<TMessage> From<CustomProtosOut<TMessage>> for BehaviourOut<TMessage> {
fn from(other: CustomProtosOut<TMessage>) -> BehaviourOut<TMessage> {
impl<TMessage> From<CustomProtoOut<TMessage>> for BehaviourOut<TMessage> {
fn from(other: CustomProtoOut<TMessage>) -> BehaviourOut<TMessage> {
match other {
CustomProtosOut::CustomProtocolOpen { protocol_id, version, peer_id, endpoint } => {
BehaviourOut::CustomProtocolOpen { protocol_id, version, peer_id, endpoint }
CustomProtoOut::CustomProtocolOpen { version, peer_id, endpoint } => {
BehaviourOut::CustomProtocolOpen { version, peer_id, endpoint }
}
CustomProtosOut::CustomProtocolClosed { protocol_id, peer_id, result } => {
BehaviourOut::CustomProtocolClosed { protocol_id, peer_id, result }
CustomProtoOut::CustomProtocolClosed { peer_id, result } => {
BehaviourOut::CustomProtocolClosed { peer_id, result }
}
CustomProtosOut::CustomMessage { protocol_id, peer_id, message } => {
BehaviourOut::CustomMessage { protocol_id, peer_id, message }
CustomProtoOut::CustomMessage { peer_id, message } => {
BehaviourOut::CustomMessage { peer_id, message }
}
CustomProtosOut::Clogged { protocol_id, peer_id, messages } => {
BehaviourOut::Clogged { protocol_id, peer_id, messages }
CustomProtoOut::Clogged { peer_id, messages } => {
BehaviourOut::Clogged { peer_id, messages }
}
}
}
@@ -266,8 +258,8 @@ impl<TMessage, TSubstream> NetworkBehaviourEventProcess<void::Void> for Behaviou
}
}
impl<TMessage, TSubstream> NetworkBehaviourEventProcess<CustomProtosOut<TMessage>> for Behaviour<TMessage, TSubstream> {
fn inject_event(&mut self, event: CustomProtosOut<TMessage>) {
impl<TMessage, TSubstream> NetworkBehaviourEventProcess<CustomProtoOut<TMessage>> for Behaviour<TMessage, TSubstream> {
fn inject_event(&mut self, event: CustomProtoOut<TMessage>) {
self.events.push(event.into());
}
}
@@ -14,10 +14,10 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use crate::custom_proto::handler::{CustomProtosHandler, CustomProtosHandlerOut, CustomProtosHandlerIn};
use crate::custom_proto::handler::{CustomProtoHandler, CustomProtoHandlerOut, CustomProtoHandlerIn};
use crate::custom_proto::topology::NetTopology;
use crate::custom_proto::upgrade::{CustomMessage, RegisteredProtocols};
use crate::{NetworkConfiguration, NonReservedPeerMode, ProtocolId};
use crate::custom_proto::upgrade::{CustomMessage, RegisteredProtocol};
use crate::{NetworkConfiguration, NonReservedPeerMode};
use crate::parse_str_addr;
use fnv::{FnvHashMap, FnvHashSet};
use futures::prelude::*;
@@ -35,22 +35,22 @@ const NODES_FILE: &str = "nodes.json";
const PEER_DISABLE_DURATION: Duration = Duration::from_secs(5 * 60);
/// Network behaviour that handles opening substreams for custom protocols with other nodes.
pub struct CustomProtos<TMessage, TSubstream> {
pub struct CustomProto<TMessage, TSubstream> {
/// List of protocols to open with peers. Never modified.
registered_protocols: RegisteredProtocols<TMessage>,
protocol: RegisteredProtocol<TMessage>,
/// Topology of the network.
topology: NetTopology,
/// List of custom protocols that we have open with remotes.
open_protocols: Vec<(PeerId, ProtocolId)>,
/// List of peers for which the custom protocol is open.
opened_peers: FnvHashSet<PeerId>,
/// List of peer handlers that were enabled.
///
/// Note that it is possible for a peer to be in the shutdown process, in which case it will
/// not be in this list but will be present in `open_protocols`.
/// not be in this list but will be present in `opened_peers`.
/// It is also possible that we have *just* enabled a peer, in which case it will be in this
/// list but not in `open_protocols`.
/// list but not in `opened_peers`.
enabled_peers: FnvHashSet<PeerId>,
/// Maximum number of incoming non-reserved connections, taken from the config. Never modified.
@@ -76,19 +76,17 @@ pub struct CustomProtos<TMessage, TSubstream> {
next_connect_to_nodes: Delay,
/// Events to produce from `poll()`.
events: SmallVec<[NetworkBehaviourAction<CustomProtosHandlerIn<TMessage>, CustomProtosOut<TMessage>>; 4]>,
events: SmallVec<[NetworkBehaviourAction<CustomProtoHandlerIn<TMessage>, CustomProtoOut<TMessage>>; 4]>,
/// Marker to pin the generics.
marker: PhantomData<TSubstream>,
}
/// Event that can be emitted by the `CustomProtos`.
/// Event that can be emitted by the `CustomProto`.
#[derive(Debug)]
pub enum CustomProtosOut<TMessage> {
pub enum CustomProtoOut<TMessage> {
/// Opened a custom protocol with the remote.
CustomProtocolOpen {
/// Identifier of the protocol.
protocol_id: ProtocolId,
/// Version of the protocol that has been opened.
version: u8,
/// Id of the node we have opened a connection with.
@@ -101,8 +99,6 @@ pub enum CustomProtosOut<TMessage> {
CustomProtocolClosed {
/// Id of the peer we were connected to.
peer_id: PeerId,
/// Identifier of the protocol.
protocol_id: ProtocolId,
/// Reason why the substream closed. If `Ok`, then it's a graceful exit (EOF).
result: io::Result<()>,
},
@@ -111,8 +107,6 @@ pub enum CustomProtosOut<TMessage> {
CustomMessage {
/// Id of the peer the message came from.
peer_id: PeerId,
/// Protocol which generated the message.
protocol_id: ProtocolId,
/// Message that has been received.
message: TMessage,
},
@@ -122,16 +116,14 @@ pub enum CustomProtosOut<TMessage> {
Clogged {
/// Id of the peer which is clogged.
peer_id: PeerId,
/// Protocol which has a problem.
protocol_id: ProtocolId,
/// Copy of the messages that are within the buffer, for further diagnostic.
messages: Vec<TMessage>,
},
}
impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> {
/// Creates a `CustomProtos`.
pub fn new(config: &NetworkConfiguration, local_peer_id: &PeerId, registered_protocols: RegisteredProtocols<TMessage>) -> Self {
impl<TMessage, TSubstream> CustomProto<TMessage, TSubstream> {
/// Creates a `CustomProto`.
pub fn new(config: &NetworkConfiguration, local_peer_id: &PeerId, protocol: RegisteredProtocol<TMessage>) -> Self {
// Initialize the topology of the network.
let mut topology = if let Some(ref path) = config.net_config_path {
let path = Path::new(path).join(NODES_FILE);
@@ -157,11 +149,8 @@ impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> {
.saturating_add(max_outgoing_connections)
.saturating_add(4); // We add an arbitrary number for reserved peers slots
// Expected maximum number of substreams.
let open_protos_cap = connec_cap.saturating_mul(registered_protocols.len());
CustomProtos {
registered_protocols,
CustomProto {
protocol,
topology,
max_incoming_connections,
max_outgoing_connections,
@@ -169,7 +158,7 @@ impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> {
connected_peers: Default::default(),
reserved_peers: Default::default(),
banned_peers: Vec::new(),
open_protocols: Vec::with_capacity(open_protos_cap),
opened_peers: FnvHashSet::with_capacity_and_hasher(connec_cap, Default::default()),
enabled_peers: FnvHashSet::with_capacity_and_hasher(connec_cap, Default::default()),
next_connect_to_nodes: Delay::new(Instant::now()),
events: SmallVec::new(),
@@ -232,7 +221,7 @@ impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> {
}
events.push(NetworkBehaviourAction::SendEvent {
peer_id: peer_id.clone(),
event: CustomProtosHandlerIn::Disable,
event: CustomProtoHandlerIn::Disable,
});
false
})
@@ -248,7 +237,7 @@ impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> {
if self.enabled_peers.remove(peer) {
self.events.push(NetworkBehaviourAction::SendEvent {
peer_id: peer.clone(),
event: CustomProtosHandlerIn::Disable,
event: CustomProtoHandlerIn::Disable,
});
}
}
@@ -273,7 +262,7 @@ impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> {
if self.enabled_peers.remove(&peer_id) {
self.events.push(NetworkBehaviourAction::SendEvent {
peer_id,
event: CustomProtosHandlerIn::Disable,
event: CustomProtoHandlerIn::Disable,
});
}
}
@@ -288,25 +277,21 @@ impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> {
self.enabled_peers.contains(peer_id)
}
/// Returns the list of protocols we have open with the given peer.
pub fn open_protocols<'a>(&'a self, peer_id: &'a PeerId) -> impl Iterator<Item = ProtocolId> + 'a {
self.open_protocols
.iter()
.filter(move |(p, _)| p == peer_id)
.map(|(_, proto)| *proto)
/// Returns true if we have opened a protocol with the given peer.
pub fn is_open(&self, peer_id: &PeerId) -> bool {
self.opened_peers.contains(peer_id)
}
/// Sends a message to a peer using the given custom protocol.
/// Sends a message to a peer.
///
/// Has no effect if the custom protocol is not open with the given peer.
///
/// Also note that even we have a valid open substream, it may in fact be already closed
/// without us knowing, in which case the packet will not be received.
pub fn send_packet(&mut self, target: &PeerId, protocol_id: ProtocolId, message: TMessage) {
pub fn send_packet(&mut self, target: &PeerId, message: TMessage) {
self.events.push(NetworkBehaviourAction::SendEvent {
peer_id: target.clone(),
event: CustomProtosHandlerIn::SendCustomMessage {
protocol: protocol_id,
event: CustomProtoHandlerIn::SendCustomMessage {
message,
}
});
@@ -408,7 +393,7 @@ impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> {
num_to_open -= 1;
self.events.push(NetworkBehaviourAction::SendEvent {
peer_id: peer_id.clone(),
event: CustomProtosHandlerIn::Enable(Endpoint::Dialer),
event: CustomProtoHandlerIn::Enable(Endpoint::Dialer),
});
}
@@ -443,16 +428,16 @@ impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> {
}
}
impl<TMessage, TSubstream> NetworkBehaviour for CustomProtos<TMessage, TSubstream>
impl<TMessage, TSubstream> NetworkBehaviour for CustomProto<TMessage, TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
TMessage: CustomMessage,
{
type ProtocolsHandler = CustomProtosHandler<TMessage, TSubstream>;
type OutEvent = CustomProtosOut<TMessage>;
type ProtocolsHandler = CustomProtoHandler<TMessage, TSubstream>;
type OutEvent = CustomProtoOut<TMessage>;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
CustomProtosHandler::new(self.registered_protocols.clone())
CustomProtoHandler::new(self.protocol.clone())
}
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
@@ -470,7 +455,7 @@ where
debug!(target: "sub-libp2p", "Ignoring {:?} because we're in reserved mode", peer_id);
self.events.push(NetworkBehaviourAction::SendEvent {
peer_id: peer_id.clone(),
event: CustomProtosHandlerIn::Disable,
event: CustomProtoHandlerIn::Disable,
});
return
}
@@ -482,7 +467,7 @@ where
debug!(target: "sub-libp2p", "Ignoring banned peer {:?}", peer_id);
self.events.push(NetworkBehaviourAction::SendEvent {
peer_id: peer_id.clone(),
event: CustomProtosHandlerIn::Disable,
event: CustomProtoHandlerIn::Disable,
});
return
}
@@ -501,7 +486,7 @@ where
if num_outgoing == self.max_outgoing_connections {
self.events.push(NetworkBehaviourAction::SendEvent {
peer_id: peer_id.clone(),
event: CustomProtosHandlerIn::Disable,
event: CustomProtoHandlerIn::Disable,
});
return
}
@@ -518,7 +503,7 @@ where
we're full", peer_id);
self.events.push(NetworkBehaviourAction::SendEvent {
peer_id: peer_id.clone(),
event: CustomProtosHandlerIn::Disable,
event: CustomProtoHandlerIn::Disable,
});
return
}
@@ -533,13 +518,13 @@ where
trace!(target: "sub-libp2p", "Enabling custom protocols with {:?} (active)", peer_id);
self.events.push(NetworkBehaviourAction::SendEvent {
peer_id: peer_id.clone(),
event: CustomProtosHandlerIn::Enable(Endpoint::Dialer),
event: CustomProtoHandlerIn::Enable(Endpoint::Dialer),
});
} else {
trace!(target: "sub-libp2p", "Enabling custom protocols with {:?} (passive)", peer_id);
self.events.push(NetworkBehaviourAction::SendEvent {
peer_id: peer_id.clone(),
event: CustomProtosHandlerIn::Enable(Endpoint::Listener),
event: CustomProtoHandlerIn::Enable(Endpoint::Listener),
});
}
@@ -553,11 +538,8 @@ where
self.topology.set_disconnected(peer_id, &endpoint);
while let Some(pos) = self.open_protocols.iter().position(|(p, _)| p == peer_id) {
let (_, protocol_id) = self.open_protocols.remove(pos);
let event = CustomProtosOut::CustomProtocolClosed {
protocol_id,
if self.opened_peers.remove(&peer_id) {
let event = CustomProtoOut::CustomProtocolClosed {
peer_id: peer_id.clone(),
result: Ok(()),
};
@@ -596,35 +578,23 @@ where
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
) {
match event {
CustomProtosHandlerOut::CustomProtocolClosed { protocol_id, result } => {
let pos = self.open_protocols.iter().position(|(s, p)|
s == &source && p == &protocol_id
);
CustomProtoHandlerOut::CustomProtocolClosed { result } => {
self.opened_peers.remove(&source);
if let Some(pos) = pos {
self.open_protocols.remove(pos);
} else {
debug_assert!(false, "Couldn't find protocol in open_protocols");
}
let event = CustomProtosOut::CustomProtocolClosed {
protocol_id,
let event = CustomProtoOut::CustomProtocolClosed {
result,
peer_id: source,
};
self.events.push(NetworkBehaviourAction::GenerateEvent(event));
}
CustomProtosHandlerOut::CustomProtocolOpen { protocol_id, version } => {
debug_assert!(!self.open_protocols.iter().any(|(s, p)|
s == &source && p == &protocol_id
));
self.open_protocols.push((source.clone(), protocol_id));
CustomProtoHandlerOut::CustomProtocolOpen { version } => {
debug_assert!(!self.is_open(&source));
self.opened_peers.insert(source.clone());
let endpoint = self.connected_peers.get(&source)
.expect("We only receive events from connected nodes; QED").clone();
let event = CustomProtosOut::CustomProtocolOpen {
protocol_id,
let event = CustomProtoOut::CustomProtocolOpen {
version,
peer_id: source,
endpoint,
@@ -632,38 +602,32 @@ where
self.events.push(NetworkBehaviourAction::GenerateEvent(event));
}
CustomProtosHandlerOut::CustomMessage { protocol_id, message } => {
debug_assert!(self.open_protocols.iter().any(|(s, p)|
s == &source && p == &protocol_id
));
let event = CustomProtosOut::CustomMessage {
CustomProtoHandlerOut::CustomMessage { message } => {
debug_assert!(self.is_open(&source));
let event = CustomProtoOut::CustomMessage {
peer_id: source,
protocol_id,
message,
};
self.events.push(NetworkBehaviourAction::GenerateEvent(event));
}
CustomProtosHandlerOut::Clogged { protocol_id, messages } => {
debug_assert!(self.open_protocols.iter().any(|(s, p)|
s == &source && p == &protocol_id
));
warn!(target: "sub-libp2p", "Queue of packets to send to {:?} (protocol: {:?}) is \
pretty large", source, protocol_id);
self.events.push(NetworkBehaviourAction::GenerateEvent(CustomProtosOut::Clogged {
CustomProtoHandlerOut::Clogged { messages } => {
debug_assert!(self.is_open(&source));
warn!(target: "sub-libp2p", "Queue of packets to send to {:?} is \
pretty large", source);
self.events.push(NetworkBehaviourAction::GenerateEvent(CustomProtoOut::Clogged {
peer_id: source,
protocol_id,
messages,
}));
}
CustomProtosHandlerOut::ProtocolError { protocol_id, error, is_severe } => {
CustomProtoHandlerOut::ProtocolError { error, is_severe } => {
if is_severe {
warn!(target: "sub-libp2p", "Network misbehaviour from {:?} with protocol \
{:?}: {:?}", source, protocol_id, error);
warn!(target: "sub-libp2p", "Network misbehaviour from {:?}: {:?}",
source, error);
self.ban_peer(source);
} else {
debug!(target: "sub-libp2p", "Network misbehaviour from {:?} with protocol \
{:?}: {:?}", source, protocol_id, error);
debug!(target: "sub-libp2p", "Network misbehaviour from {:?}: {:?}",
source, error);
self.disconnect_peer(&source);
}
}
File diff suppressed because it is too large Load Diff
@@ -14,8 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
pub use self::behaviour::{CustomProtos, CustomProtosOut};
pub use self::upgrade::{CustomMessage, CustomMessageId, RegisteredProtocol, RegisteredProtocols};
pub use self::behaviour::{CustomProto, CustomProtoOut};
pub use self::upgrade::{CustomMessage, CustomMessageId, RegisteredProtocol};
mod behaviour;
mod handler;
@@ -437,99 +437,3 @@ where TSubstream: AsyncRead + AsyncWrite,
})
}
}
// Connection upgrade for all the protocols contained in it.
pub struct RegisteredProtocols<TMessage>(pub Vec<RegisteredProtocol<TMessage>>);
impl<TMessage> RegisteredProtocols<TMessage> {
/// Returns the number of protocols.
#[inline]
pub fn len(&self) -> usize {
self.0.len()
}
}
impl<TMessage> Default for RegisteredProtocols<TMessage> {
fn default() -> Self {
RegisteredProtocols(Vec::new())
}
}
impl<TMessage> UpgradeInfo for RegisteredProtocols<TMessage> {
type Info = RegisteredProtocolsName;
type InfoIter = VecIntoIter<Self::Info>;
#[inline]
fn protocol_info(&self) -> Self::InfoIter {
// We concat the lists of `RegisteredProtocol::protocol_names` for
// each protocol.
self.0.iter().enumerate().flat_map(|(n, proto)|
UpgradeInfo::protocol_info(proto)
.map(move |inner| {
RegisteredProtocolsName {
inner,
index: n,
}
})
).collect::<Vec<_>>().into_iter()
}
}
impl<TMessage> Clone for RegisteredProtocols<TMessage> {
fn clone(&self) -> Self {
RegisteredProtocols(self.0.clone())
}
}
/// Implementation of `ProtocolName` for several custom protocols.
#[derive(Debug, Clone)]
pub struct RegisteredProtocolsName {
/// Inner registered protocol.
inner: RegisteredProtocolName,
/// Index of the protocol in the list of registered custom protocols.
index: usize,
}
impl ProtocolName for RegisteredProtocolsName {
fn protocol_name(&self) -> &[u8] {
self.inner.protocol_name()
}
}
impl<TMessage, TSubstream> InboundUpgrade<TSubstream> for RegisteredProtocols<TMessage>
where TSubstream: AsyncRead + AsyncWrite,
{
type Output = <RegisteredProtocol<TMessage> as InboundUpgrade<TSubstream>>::Output;
type Future = <RegisteredProtocol<TMessage> as InboundUpgrade<TSubstream>>::Future;
type Error = io::Error;
#[inline]
fn upgrade_inbound(
self,
socket: TSubstream,
info: Self::Info,
) -> Self::Future {
self.0.into_iter()
.nth(info.index)
.expect("invalid protocol index ; programmer logic error")
.upgrade_inbound(socket, info.inner)
}
}
impl<TMessage, TSubstream> OutboundUpgrade<TSubstream> for RegisteredProtocols<TMessage>
where TSubstream: AsyncRead + AsyncWrite,
{
type Output = <Self as InboundUpgrade<TSubstream>>::Output;
type Future = <Self as InboundUpgrade<TSubstream>>::Future;
type Error = <Self as InboundUpgrade<TSubstream>>::Error;
#[inline]
fn upgrade_outbound(
self,
socket: TSubstream,
info: Self::Info,
) -> Self::Future {
// Upgrades are symmetrical.
self.upgrade_inbound(socket, info)
}
}
+3 -2
View File
@@ -134,8 +134,9 @@ pub struct NetworkStatePeer {
/// If true, the peer is "enabled", which means that we try to open Substrate-related protocols
/// with this peer. If false, we stick to Kademlia and/or other network-only protocols.
pub enabled: bool,
/// List of protocols that we have open with the given peer.
pub open_protocols: HashSet<ProtocolId>,
/// If true, the peer is "open", which means that we have a Substrate-related protocol
/// with this peer.
pub open: bool,
/// List of addresses known for this node, with its reputation score.
pub known_addresses: HashMap<Multiaddr, u32>,
}
@@ -18,8 +18,8 @@ use crate::{
behaviour::Behaviour, behaviour::BehaviourOut,
transport, NetworkState, NetworkStatePeer, NetworkStateNotConnectedPeer
};
use crate::custom_proto::{CustomMessage, RegisteredProtocol, RegisteredProtocols};
use crate::{NetworkConfiguration, NodeIndex, ProtocolId, parse_str_addr};
use crate::custom_proto::{CustomMessage, RegisteredProtocol};
use crate::{NetworkConfiguration, NodeIndex, parse_str_addr};
use fnv::FnvHashMap;
use futures::{prelude::*, Stream};
use libp2p::{multiaddr::Protocol, Multiaddr, PeerId};
@@ -37,12 +37,11 @@ use tokio_timer::Interval;
/// Starts the substrate libp2p service.
///
/// Returns a stream that must be polled regularly in order for the networking to function.
pub fn start_service<TProtos, TMessage>(
pub fn start_service<TMessage>(
config: NetworkConfiguration,
registered_custom: TProtos,
registered_custom: RegisteredProtocol<TMessage>,
) -> Result<Service<TMessage>, IoError>
where TProtos: IntoIterator<Item = RegisteredProtocol<TMessage>>,
TMessage: CustomMessage + Send + 'static {
where TMessage: CustomMessage + Send + 'static {
if let Some(ref path) = config.net_config_path {
fs::create_dir_all(Path::new(path))?;
@@ -55,7 +54,6 @@ where TProtos: IntoIterator<Item = RegisteredProtocol<TMessage>>,
// Build the swarm.
let (mut swarm, bandwidth) = {
let registered_custom = RegisteredProtocols(registered_custom.into_iter().collect());
let behaviour = Behaviour::new(&config, local_public, registered_custom);
let (transport, bandwidth) = transport::build_transport(local_identity);
(Swarm::new(transport, behaviour, local_peer_id.clone()), bandwidth)
@@ -118,8 +116,6 @@ pub enum ServiceEvent<TMessage> {
peer_id: PeerId,
/// Index of the node.
node_index: NodeIndex,
/// Protocol that has been opened.
protocol: ProtocolId,
/// Version of the protocol that was opened.
version: u8,
/// Node debug info
@@ -130,20 +126,6 @@ pub enum ServiceEvent<TMessage> {
ClosedCustomProtocol {
/// Index of the node.
node_index: NodeIndex,
/// Protocol that has been closed.
protocol: ProtocolId,
/// Node debug info
debug_info: String,
},
/// Sustom protocol substreams has been closed.
///
/// Same as `ClosedCustomProtocol` but with multiple protocols.
ClosedCustomProtocols {
/// Index of the node.
node_index: NodeIndex,
/// Protocols that have been closed.
protocols: Vec<ProtocolId>,
/// Node debug info
debug_info: String,
},
@@ -152,8 +134,6 @@ pub enum ServiceEvent<TMessage> {
CustomMessage {
/// Index of the node.
node_index: NodeIndex,
/// Protocol which generated the message.
protocol_id: ProtocolId,
/// Message that has been received.
message: TMessage,
},
@@ -162,8 +142,6 @@ pub enum ServiceEvent<TMessage> {
Clogged {
/// Index of the node.
node_index: NodeIndex,
/// Protocol which generated the message.
protocol_id: ProtocolId,
/// Copy of the messages that are within the buffer, for further diagnostic.
messages: Vec<TMessage>,
},
@@ -224,7 +202,7 @@ where TMessage: CustomMessage + Send + 'static {
version_string: info.client_version.clone(),
latest_ping_time: info.latest_ping,
enabled: swarm.is_enabled(&info.peer_id),
open_protocols: swarm.open_protocols(&info.peer_id).collect(),
open: swarm.is_open(&info.peer_id),
known_addresses,
})
}).collect()
@@ -340,11 +318,10 @@ where TMessage: CustomMessage + Send + 'static {
pub fn send_custom_message(
&mut self,
node_index: NodeIndex,
protocol: ProtocolId,
message: TMessage
) {
if let Some(peer_id) = self.nodes_info.get(&node_index).map(|info| &info.peer_id) {
self.swarm.send_custom_message(peer_id, protocol, message);
self.swarm.send_custom_message(peer_id, message);
} else {
warn!(target: "sub-libp2p", "Tried to send message to unknown node: {:}", node_index);
}
@@ -416,39 +393,35 @@ where TMessage: CustomMessage + Send + 'static {
fn poll_swarm(&mut self) -> Poll<Option<ServiceEvent<TMessage>>, IoError> {
loop {
match self.swarm.poll() {
Ok(Async::Ready(Some(BehaviourOut::CustomProtocolOpen { protocol_id, peer_id, version, endpoint }))) => {
Ok(Async::Ready(Some(BehaviourOut::CustomProtocolOpen { peer_id, version, endpoint }))) => {
debug!(target: "sub-libp2p", "Opened custom protocol with {:?}", peer_id);
let node_index = self.index_of_peer_or_assign(peer_id.clone(), endpoint);
break Ok(Async::Ready(Some(ServiceEvent::OpenedCustomProtocol {
peer_id,
node_index,
protocol: protocol_id,
version,
debug_info: self.peer_debug_info(node_index),
})))
}
Ok(Async::Ready(Some(BehaviourOut::CustomProtocolClosed { protocol_id, peer_id, result }))) => {
Ok(Async::Ready(Some(BehaviourOut::CustomProtocolClosed { peer_id, result }))) => {
debug!(target: "sub-libp2p", "Custom protocol with {:?} closed: {:?}", peer_id, result);
let node_index = *self.index_by_id.get(&peer_id).expect("index_by_id is always kept in sync with the state of the behaviour");
break Ok(Async::Ready(Some(ServiceEvent::ClosedCustomProtocol {
node_index,
protocol: protocol_id,
debug_info: self.peer_debug_info(node_index),
})))
}
Ok(Async::Ready(Some(BehaviourOut::CustomMessage { protocol_id, peer_id, message }))) => {
Ok(Async::Ready(Some(BehaviourOut::CustomMessage { peer_id, message }))) => {
let node_index = *self.index_by_id.get(&peer_id).expect("index_by_id is always kept in sync with the state of the behaviour");
break Ok(Async::Ready(Some(ServiceEvent::CustomMessage {
node_index,
protocol_id,
message,
})))
}
Ok(Async::Ready(Some(BehaviourOut::Clogged { protocol_id, peer_id, messages }))) => {
Ok(Async::Ready(Some(BehaviourOut::Clogged { peer_id, messages }))) => {
let node_index = *self.index_by_id.get(&peer_id).expect("index_by_id is always kept in sync with the state of the behaviour");
break Ok(Async::Ready(Some(ServiceEvent::Clogged {
node_index,
protocol_id,
messages,
})))
}