From f67c2cc18109b61b57418c05a9fafd07abae1a64 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 21 Jan 2019 14:33:25 +0100 Subject: [PATCH] Store identification info about the remote (#1500) * Store identification info about the remote * Add node name on the wire * Fix tests --- substrate/core/cli/src/lib.rs | 3 +- .../core/network-libp2p/src/behaviour.rs | 88 ++++++++++++++++--- .../core/network-libp2p/src/service_task.rs | 77 +++++++++++----- substrate/core/network-libp2p/src/traits.rs | 7 +- substrate/core/service/test/src/lib.rs | 1 + 5 files changed, 142 insertions(+), 34 deletions(-) diff --git a/substrate/core/cli/src/lib.rs b/substrate/core/cli/src/lib.rs index 9f5c525b58..3205269faf 100644 --- a/substrate/core/cli/src/lib.rs +++ b/substrate/core/cli/src/lib.rs @@ -135,7 +135,7 @@ fn load_spec(matches: &clap::ArgMatches, factory: F) -> Result PathBuf { matches.value_of("base_path") .map(|x| Path::new(x).to_owned()) - .unwrap_or_else(|| + .unwrap_or_else(|| app_dirs::get_app_root( AppDataType::UserData, &AppInfo { @@ -305,6 +305,7 @@ where config.network.public_addresses = Vec::new(); config.network.client_version = config.client_id(); + config.network.node_name = config.name.clone(); config.network.use_secret = match matches.value_of("node_key").map(H256::from_str) { Some(Ok(secret)) => Some(secret.into()), Some(Err(err)) => bail!(create_input_err(format!("Error parsing node key: {}", err))), diff --git a/substrate/core/network-libp2p/src/behaviour.rs b/substrate/core/network-libp2p/src/behaviour.rs index f4318dfcaa..74d6204010 100644 --- a/substrate/core/network-libp2p/src/behaviour.rs +++ b/substrate/core/network-libp2p/src/behaviour.rs @@ -22,18 +22,18 @@ use libp2p::NetworkBehaviour; use libp2p::core::{PeerId, ProtocolsHandler}; use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction}; use libp2p::core::swarm::{NetworkBehaviourEventProcess, PollParameters}; -use libp2p::identify::{Identify, IdentifyEvent}; +use libp2p::identify::{Identify, IdentifyEvent, protocol::IdentifyInfo}; use libp2p::kad::{Kademlia, KademliaOut, KademliaTopology}; use libp2p::ping::{Ping, PingEvent}; use log::{debug, trace, warn}; -use std::{cmp, time::Duration, time::Instant}; +use std::{cmp, io, time::Duration, time::Instant}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_timer::Delay; use void; /// General behaviour of the network. #[derive(NetworkBehaviour)] -#[behaviour(out_event = "CustomProtosOut", poll_method = "poll")] +#[behaviour(out_event = "BehaviourOut", poll_method = "poll")] pub struct Behaviour { /// Periodically ping nodes, and close the connection if it's unresponsive. ping: Ping, @@ -46,22 +46,24 @@ pub struct Behaviour { /// Queue of events to produce for the outside. #[behaviour(ignore)] - events: Vec, + events: Vec, } impl Behaviour { /// Builds a new `Behaviour`. // TODO: redundancy between config and local_peer_id (https://github.com/libp2p/rust-libp2p/issues/745) pub fn new(config: &NetworkConfiguration, local_peer_id: PeerId, protocols: RegisteredProtocols) -> Self { + let identify = { + let proto_version = "/substrate/1.0".to_string(); + let user_agent = format!("{} ({})", config.client_version, config.node_name); + Identify::new(proto_version, user_agent) + }; + Behaviour { ping: Ping::new(), custom_protocols: CustomProtos::new(config, protocols), discovery: DiscoveryBehaviour::new(local_peer_id), - identify: Identify::new( - // The agent and protocol versions; maybe we should use something better? - concat!("substrate/", env!("CARGO_PKG_VERSION")).to_owned(), - concat!("substrate/", env!("CARGO_PKG_VERSION")).to_owned() - ), + identify, events: Vec::new(), } } @@ -123,6 +125,66 @@ impl Behaviour { } } +/// Event that can be emitted by the behaviour. +#[derive(Debug)] +pub enum BehaviourOut { + /// Opened a custom protocol with the remote. + CustomProtocolOpen { + /// Identifier of the protocol. + protocol_id: ProtocolId, + /// Version of the protocol that has been opened. + version: u8, + /// Id of the node we have opened a connection with. + peer_id: PeerId, + /// Endpoint used for this custom protocol. + endpoint: ConnectedPoint, + }, + + /// Closed a custom protocol with the remote. + CustomProtocolClosed { + /// Id of the peer we were connected to. + peer_id: PeerId, + /// Identifier of the protocol. + protocol_id: ProtocolId, + /// Reason why the substream closed. If `Ok`, then it's a graceful exit (EOF). + result: io::Result<()>, + }, + + /// Receives a message on a custom protocol substream. + CustomMessage { + /// Id of the peer the message came from. + peer_id: PeerId, + /// Protocol which generated the message. + protocol_id: ProtocolId, + /// Data that has been received. + data: Bytes, + }, + + /// We have obtained debug information from a peer. + Identified { + /// Id of the peer that has been identified. + peer_id: PeerId, + /// Information about the peer. + info: IdentifyInfo, + }, +} + +impl From for BehaviourOut { + fn from(other: CustomProtosOut) -> BehaviourOut { + match other { + CustomProtosOut::CustomProtocolOpen { protocol_id, version, peer_id, endpoint } => { + BehaviourOut::CustomProtocolOpen { protocol_id, version, peer_id, endpoint } + }, + CustomProtosOut::CustomProtocolClosed { protocol_id, peer_id, result } => { + BehaviourOut::CustomProtocolClosed { protocol_id, peer_id, result } + }, + CustomProtosOut::CustomMessage { protocol_id, peer_id, data } => { + BehaviourOut::CustomMessage { protocol_id, peer_id, data } + }, + } + } +} + impl NetworkBehaviourEventProcess for Behaviour { fn inject_event(&mut self, event: void::Void) { void::unreachable(event) @@ -131,7 +193,7 @@ impl NetworkBehaviourEventProcess for Behaviour NetworkBehaviourEventProcess for Behaviour { fn inject_event(&mut self, event: CustomProtosOut) { - self.events.push(event); + self.events.push(event.into()); } } @@ -140,6 +202,10 @@ impl NetworkBehaviourEventProcess for Behaviour { trace!(target: "sub-libp2p", "Identified {:?} => {:?}", peer_id, info); + // TODO: ideally we would delay the first identification to when we open the custom + // protocol, so that we only report id info to the service about the nodes we + // care about (https://github.com/libp2p/rust-libp2p/issues/876) + self.events.push(BehaviourOut::Identified { peer_id, info }); } IdentifyEvent::Error { .. } => {} } @@ -176,7 +242,7 @@ impl NetworkBehaviourEventProcess for Behaviour Behaviour { - fn poll(&mut self) -> Async> { + fn poll(&mut self) -> Async> { if !self.events.is_empty() { return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0))) } diff --git a/substrate/core/network-libp2p/src/service_task.rs b/substrate/core/network-libp2p/src/service_task.rs index 98206333f9..64f2ead99b 100644 --- a/substrate/core/network-libp2p/src/service_task.rs +++ b/substrate/core/network-libp2p/src/service_task.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use crate::{behaviour::Behaviour, custom_proto::CustomProtosOut, secret::obtain_private_key, transport}; +use crate::{behaviour::Behaviour, behaviour::BehaviourOut, secret::obtain_private_key, transport}; use crate::custom_proto::{RegisteredProtocol, RegisteredProtocols}; use crate::topology::NetTopology; use crate::{Error, NetworkConfiguration, NodeIndex, ProtocolId, parse_str_addr}; @@ -127,7 +127,7 @@ where TProtos: IntoIterator { Ok(Service { swarm, - nodes_addresses: Default::default(), + nodes_info: Default::default(), index_by_id: Default::default(), next_node_id: 1, cleanup: Interval::new_interval(Duration::from_secs(60)), @@ -182,10 +182,10 @@ pub struct Service { /// Stream of events of the swarm. swarm: Swarm, Behaviour>, NetTopology>, - /// For each node we're connected to, how we're connected to it. - nodes_addresses: FnvHashMap, + /// Information about all the nodes we're connected to. + nodes_info: FnvHashMap, - /// Opposite of `nodes_addresses`. + /// Opposite of `nodes_info`. index_by_id: FnvHashMap, /// Next index to assign to a node. @@ -199,6 +199,17 @@ pub struct Service { injected_events: Vec, } +/// Information about a node we're connected to. +#[derive(Debug)] +struct NodeInfo { + /// Hash of the public key of the node. + peer_id: PeerId, + /// How we're connected to the node. + endpoint: ConnectedPoint, + /// Version reported by the remote, or `None` if unknown. + client_version: Option, +} + impl Service { /// Returns an iterator that produces the list of addresses we're listening on. #[inline] @@ -215,7 +226,7 @@ impl Service { /// Returns the list of all the peers we are connected to. #[inline] pub fn connected_peers<'a>(&'a self) -> impl Iterator + 'a { - self.nodes_addresses.keys().cloned() + self.nodes_info.keys().cloned() } /// Try to add a reserved peer. @@ -247,13 +258,19 @@ impl Service { /// Returns the `PeerId` of a node. #[inline] pub fn peer_id_of_node(&self, node_index: NodeIndex) -> Option<&PeerId> { - self.nodes_addresses.get(&node_index).map(|(id, _)| id) + self.nodes_info.get(&node_index).map(|info| &info.peer_id) } /// Returns the way we are connected to a node. #[inline] pub fn node_endpoint(&self, node_index: NodeIndex) -> Option<&ConnectedPoint> { - self.nodes_addresses.get(&node_index).map(|(_, cp)| cp) + self.nodes_info.get(&node_index).map(|info| &info.endpoint) + } + + /// Returns the client version reported by a node. + pub fn node_client_version(&self, node_index: NodeIndex) -> Option<&str> { + self.nodes_info.get(&node_index) + .and_then(|info| info.client_version.as_ref().map(|s| &s[..])) } /// Sends a message to a peer using the custom protocol. @@ -266,7 +283,7 @@ impl Service { protocol: ProtocolId, data: Vec ) { - if let Some(peer_id) = self.nodes_addresses.get(&node_index).map(|(id, _)| id) { + if let Some(peer_id) = self.nodes_info.get(&node_index).map(|info| &info.peer_id) { self.swarm.send_custom_message(peer_id, protocol, data); } else { warn!(target: "sub-libp2p", "Tried to send message to unknown node: {:}", node_index); @@ -278,9 +295,10 @@ impl Service { /// Same as `drop_node`, except that the same peer will not be able to reconnect later. #[inline] pub fn ban_node(&mut self, node_index: NodeIndex) { - if let Some(peer_id) = self.nodes_addresses.get(&node_index).map(|(id, _)| id) { - info!(target: "sub-libp2p", "Banned {:?} (#{:?})", peer_id, node_index); - self.swarm.ban_node(peer_id.clone()); + if let Some(info) = self.nodes_info.get(&node_index) { + info!(target: "sub-libp2p", "Banned {:?} (#{:?}, {:?}, {:?})", info.peer_id, + node_index, info.endpoint, info.client_version); + self.swarm.ban_node(info.peer_id.clone()); } } @@ -290,9 +308,10 @@ impl Service { /// Corresponding closing events will be generated once the closing actually happens. #[inline] pub fn drop_node(&mut self, node_index: NodeIndex) { - if let Some(peer_id) = self.nodes_addresses.get(&node_index).map(|(id, _)| id) { - debug!(target: "sub-libp2p", "Dropping {:?} on purpose (#{:?})", peer_id, node_index); - self.swarm.drop_node(peer_id); + if let Some(info) = self.nodes_info.get(&node_index) { + debug!(target: "sub-libp2p", "Dropping {:?} on purpose (#{:?}, {:?}, {:?})", + info.peer_id, node_index, info.endpoint, info.client_version); + self.swarm.drop_node(&info.peer_id); } } @@ -301,13 +320,21 @@ impl Service { match self.index_by_id.entry(peer) { Entry::Occupied(entry) => { let id = *entry.get(); - self.nodes_addresses.insert(id, (entry.key().clone(), endpoint)); + self.nodes_info.insert(id, NodeInfo { + peer_id: entry.key().clone(), + endpoint, + client_version: None, + }); id }, Entry::Vacant(entry) => { let id = self.next_node_id; self.next_node_id += 1; - self.nodes_addresses.insert(id, (entry.key().clone(), endpoint)); + self.nodes_info.insert(id, NodeInfo { + peer_id: entry.key().clone(), + endpoint, + client_version: None, + }); entry.insert(id); id }, @@ -318,7 +345,7 @@ impl Service { fn poll_swarm(&mut self) -> Poll, IoError> { loop { match self.swarm.poll() { - Ok(Async::Ready(Some(CustomProtosOut::CustomProtocolOpen { protocol_id, peer_id, version, endpoint }))) => { + Ok(Async::Ready(Some(BehaviourOut::CustomProtocolOpen { protocol_id, peer_id, version, endpoint }))) => { debug!(target: "sub-libp2p", "Opened custom protocol with {:?}", peer_id); let node_index = self.index_of_peer_or_assign(peer_id, endpoint); break Ok(Async::Ready(Some(ServiceEvent::OpenedCustomProtocol { @@ -327,7 +354,7 @@ impl Service { version, }))) } - Ok(Async::Ready(Some(CustomProtosOut::CustomProtocolClosed { protocol_id, peer_id, result }))) => { + Ok(Async::Ready(Some(BehaviourOut::CustomProtocolClosed { protocol_id, peer_id, result }))) => { debug!(target: "sub-libp2p", "Custom protocol with {:?} closed: {:?}", peer_id, result); let node_index = *self.index_by_id.get(&peer_id).expect("index_by_id is always kept in sync with the state of the behaviour"); break Ok(Async::Ready(Some(ServiceEvent::ClosedCustomProtocol { @@ -335,7 +362,7 @@ impl Service { protocol: protocol_id, }))) } - Ok(Async::Ready(Some(CustomProtosOut::CustomMessage { protocol_id, peer_id, data }))) => { + Ok(Async::Ready(Some(BehaviourOut::CustomMessage { protocol_id, peer_id, data }))) => { let node_index = *self.index_by_id.get(&peer_id).expect("index_by_id is always kept in sync with the state of the behaviour"); break Ok(Async::Ready(Some(ServiceEvent::CustomMessage { node_index, @@ -343,6 +370,16 @@ impl Service { data, }))) } + Ok(Async::Ready(Some(BehaviourOut::Identified { peer_id, info }))) => { + // Contrary to the other events, this one can happen even on nodes which don't + // have any open custom protocol slot. Therefore it is not necessarily in the + // list. + if let Some(id) = self.index_by_id.get(&peer_id) { + self.nodes_info.get_mut(id) + .expect("index_by_id and nodes_info are always kept in sync; QED") + .client_version = Some(info.agent_version); + } + } Ok(Async::NotReady) => break Ok(Async::NotReady), Ok(Async::Ready(None)) => unreachable!("The Swarm stream never ends"), Err(_) => unreachable!("The Swarm never errors"), diff --git a/substrate/core/network-libp2p/src/traits.rs b/substrate/core/network-libp2p/src/traits.rs index 7e87ec1301..75bee4a87c 100644 --- a/substrate/core/network-libp2p/src/traits.rs +++ b/substrate/core/network-libp2p/src/traits.rs @@ -52,8 +52,10 @@ pub struct NetworkConfiguration { pub reserved_nodes: Vec, /// The non-reserved peer mode. pub non_reserved_mode: NonReservedPeerMode, - /// Client identifier + /// Client identifier. Sent over the wire for debugging purposes. pub client_version: String, + /// Name of the node. Sent over the wire for debugging purposes. + pub node_name: String, } impl Default for NetworkConfiguration { @@ -80,7 +82,8 @@ impl NetworkConfiguration { out_peers: 75, reserved_nodes: Vec::new(), non_reserved_mode: NonReservedPeerMode::Accept, - client_version: "Parity-network".into(), // TODO: meh + client_version: "unknown".into(), + node_name: "unknown".into(), } } diff --git a/substrate/core/service/test/src/lib.rs b/substrate/core/service/test/src/lib.rs index 2130ae4a83..cc0947372c 100644 --- a/substrate/core/service/test/src/lib.rs +++ b/substrate/core/service/test/src/lib.rs @@ -112,6 +112,7 @@ fn node_config ( reserved_nodes: vec![], non_reserved_mode: NonReservedPeerMode::Accept, client_version: "network/test/0.1".to_owned(), + node_name: "unknown".to_owned(), }; Configuration {