Store identification info about the remote (#1500)

* Store identification info about the remote

* Add node name on the wire

* Fix tests
This commit is contained in:
Pierre Krieger
2019-01-21 14:33:25 +01:00
committed by Gav Wood
parent 010e63116f
commit f67c2cc181
5 changed files with 142 additions and 34 deletions
+2 -1
View File
@@ -135,7 +135,7 @@ fn load_spec<F, G>(matches: &clap::ArgMatches, factory: F) -> Result<ChainSpec<G
fn base_path(matches: &clap::ArgMatches, version: &VersionInfo) -> PathBuf {
matches.value_of("base_path")
.map(|x| Path::new(x).to_owned())
.unwrap_or_else(||
.unwrap_or_else(||
app_dirs::get_app_root(
AppDataType::UserData,
&AppInfo {
@@ -305,6 +305,7 @@ where
config.network.public_addresses = Vec::new();
config.network.client_version = config.client_id();
config.network.node_name = config.name.clone();
config.network.use_secret = match matches.value_of("node_key").map(H256::from_str) {
Some(Ok(secret)) => Some(secret.into()),
Some(Err(err)) => bail!(create_input_err(format!("Error parsing node key: {}", err))),
+77 -11
View File
@@ -22,18 +22,18 @@ use libp2p::NetworkBehaviour;
use libp2p::core::{PeerId, ProtocolsHandler};
use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction};
use libp2p::core::swarm::{NetworkBehaviourEventProcess, PollParameters};
use libp2p::identify::{Identify, IdentifyEvent};
use libp2p::identify::{Identify, IdentifyEvent, protocol::IdentifyInfo};
use libp2p::kad::{Kademlia, KademliaOut, KademliaTopology};
use libp2p::ping::{Ping, PingEvent};
use log::{debug, trace, warn};
use std::{cmp, time::Duration, time::Instant};
use std::{cmp, io, time::Duration, time::Instant};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Delay;
use void;
/// General behaviour of the network.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "CustomProtosOut", poll_method = "poll")]
#[behaviour(out_event = "BehaviourOut", poll_method = "poll")]
pub struct Behaviour<TSubstream> {
/// Periodically ping nodes, and close the connection if it's unresponsive.
ping: Ping<TSubstream>,
@@ -46,22 +46,24 @@ pub struct Behaviour<TSubstream> {
/// Queue of events to produce for the outside.
#[behaviour(ignore)]
events: Vec<CustomProtosOut>,
events: Vec<BehaviourOut>,
}
impl<TSubstream> Behaviour<TSubstream> {
/// Builds a new `Behaviour`.
// TODO: redundancy between config and local_peer_id (https://github.com/libp2p/rust-libp2p/issues/745)
pub fn new(config: &NetworkConfiguration, local_peer_id: PeerId, protocols: RegisteredProtocols) -> Self {
let identify = {
let proto_version = "/substrate/1.0".to_string();
let user_agent = format!("{} ({})", config.client_version, config.node_name);
Identify::new(proto_version, user_agent)
};
Behaviour {
ping: Ping::new(),
custom_protocols: CustomProtos::new(config, protocols),
discovery: DiscoveryBehaviour::new(local_peer_id),
identify: Identify::new(
// The agent and protocol versions; maybe we should use something better?
concat!("substrate/", env!("CARGO_PKG_VERSION")).to_owned(),
concat!("substrate/", env!("CARGO_PKG_VERSION")).to_owned()
),
identify,
events: Vec::new(),
}
}
@@ -123,6 +125,66 @@ impl<TSubstream> Behaviour<TSubstream> {
}
}
/// Event that can be emitted by the behaviour.
#[derive(Debug)]
pub enum BehaviourOut {
/// Opened a custom protocol with the remote.
CustomProtocolOpen {
/// Identifier of the protocol.
protocol_id: ProtocolId,
/// Version of the protocol that has been opened.
version: u8,
/// Id of the node we have opened a connection with.
peer_id: PeerId,
/// Endpoint used for this custom protocol.
endpoint: ConnectedPoint,
},
/// Closed a custom protocol with the remote.
CustomProtocolClosed {
/// Id of the peer we were connected to.
peer_id: PeerId,
/// Identifier of the protocol.
protocol_id: ProtocolId,
/// Reason why the substream closed. If `Ok`, then it's a graceful exit (EOF).
result: io::Result<()>,
},
/// Receives a message on a custom protocol substream.
CustomMessage {
/// Id of the peer the message came from.
peer_id: PeerId,
/// Protocol which generated the message.
protocol_id: ProtocolId,
/// Data that has been received.
data: Bytes,
},
/// We have obtained debug information from a peer.
Identified {
/// Id of the peer that has been identified.
peer_id: PeerId,
/// Information about the peer.
info: IdentifyInfo,
},
}
impl From<CustomProtosOut> for BehaviourOut {
fn from(other: CustomProtosOut) -> BehaviourOut {
match other {
CustomProtosOut::CustomProtocolOpen { protocol_id, version, peer_id, endpoint } => {
BehaviourOut::CustomProtocolOpen { protocol_id, version, peer_id, endpoint }
},
CustomProtosOut::CustomProtocolClosed { protocol_id, peer_id, result } => {
BehaviourOut::CustomProtocolClosed { protocol_id, peer_id, result }
},
CustomProtosOut::CustomMessage { protocol_id, peer_id, data } => {
BehaviourOut::CustomMessage { protocol_id, peer_id, data }
},
}
}
}
impl<TSubstream> NetworkBehaviourEventProcess<void::Void> for Behaviour<TSubstream> {
fn inject_event(&mut self, event: void::Void) {
void::unreachable(event)
@@ -131,7 +193,7 @@ impl<TSubstream> NetworkBehaviourEventProcess<void::Void> for Behaviour<TSubstre
impl<TSubstream> NetworkBehaviourEventProcess<CustomProtosOut> for Behaviour<TSubstream> {
fn inject_event(&mut self, event: CustomProtosOut) {
self.events.push(event);
self.events.push(event.into());
}
}
@@ -140,6 +202,10 @@ impl<TSubstream> NetworkBehaviourEventProcess<IdentifyEvent> for Behaviour<TSubs
match event {
IdentifyEvent::Identified { peer_id, info, .. } => {
trace!(target: "sub-libp2p", "Identified {:?} => {:?}", peer_id, info);
// TODO: ideally we would delay the first identification to when we open the custom
// protocol, so that we only report id info to the service about the nodes we
// care about (https://github.com/libp2p/rust-libp2p/issues/876)
self.events.push(BehaviourOut::Identified { peer_id, info });
}
IdentifyEvent::Error { .. } => {}
}
@@ -176,7 +242,7 @@ impl<TSubstream> NetworkBehaviourEventProcess<PingEvent> for Behaviour<TSubstrea
}
impl<TSubstream> Behaviour<TSubstream> {
fn poll<TEv>(&mut self) -> Async<NetworkBehaviourAction<TEv, CustomProtosOut>> {
fn poll<TEv>(&mut self) -> Async<NetworkBehaviourAction<TEv, BehaviourOut>> {
if !self.events.is_empty() {
return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)))
}
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use crate::{behaviour::Behaviour, custom_proto::CustomProtosOut, secret::obtain_private_key, transport};
use crate::{behaviour::Behaviour, behaviour::BehaviourOut, secret::obtain_private_key, transport};
use crate::custom_proto::{RegisteredProtocol, RegisteredProtocols};
use crate::topology::NetTopology;
use crate::{Error, NetworkConfiguration, NodeIndex, ProtocolId, parse_str_addr};
@@ -127,7 +127,7 @@ where TProtos: IntoIterator<Item = RegisteredProtocol> {
Ok(Service {
swarm,
nodes_addresses: Default::default(),
nodes_info: Default::default(),
index_by_id: Default::default(),
next_node_id: 1,
cleanup: Interval::new_interval(Duration::from_secs(60)),
@@ -182,10 +182,10 @@ pub struct Service {
/// Stream of events of the swarm.
swarm: Swarm<Boxed<(PeerId, StreamMuxerBox), IoError>, Behaviour<Substream<StreamMuxerBox>>, NetTopology>,
/// For each node we're connected to, how we're connected to it.
nodes_addresses: FnvHashMap<NodeIndex, (PeerId, ConnectedPoint)>,
/// Information about all the nodes we're connected to.
nodes_info: FnvHashMap<NodeIndex, NodeInfo>,
/// Opposite of `nodes_addresses`.
/// Opposite of `nodes_info`.
index_by_id: FnvHashMap<PeerId, NodeIndex>,
/// Next index to assign to a node.
@@ -199,6 +199,17 @@ pub struct Service {
injected_events: Vec<ServiceEvent>,
}
/// Information about a node we're connected to.
#[derive(Debug)]
struct NodeInfo {
/// Hash of the public key of the node.
peer_id: PeerId,
/// How we're connected to the node.
endpoint: ConnectedPoint,
/// Version reported by the remote, or `None` if unknown.
client_version: Option<String>,
}
impl Service {
/// Returns an iterator that produces the list of addresses we're listening on.
#[inline]
@@ -215,7 +226,7 @@ impl Service {
/// Returns the list of all the peers we are connected to.
#[inline]
pub fn connected_peers<'a>(&'a self) -> impl Iterator<Item = NodeIndex> + 'a {
self.nodes_addresses.keys().cloned()
self.nodes_info.keys().cloned()
}
/// Try to add a reserved peer.
@@ -247,13 +258,19 @@ impl Service {
/// Returns the `PeerId` of a node.
#[inline]
pub fn peer_id_of_node(&self, node_index: NodeIndex) -> Option<&PeerId> {
self.nodes_addresses.get(&node_index).map(|(id, _)| id)
self.nodes_info.get(&node_index).map(|info| &info.peer_id)
}
/// Returns the way we are connected to a node.
#[inline]
pub fn node_endpoint(&self, node_index: NodeIndex) -> Option<&ConnectedPoint> {
self.nodes_addresses.get(&node_index).map(|(_, cp)| cp)
self.nodes_info.get(&node_index).map(|info| &info.endpoint)
}
/// Returns the client version reported by a node.
pub fn node_client_version(&self, node_index: NodeIndex) -> Option<&str> {
self.nodes_info.get(&node_index)
.and_then(|info| info.client_version.as_ref().map(|s| &s[..]))
}
/// Sends a message to a peer using the custom protocol.
@@ -266,7 +283,7 @@ impl Service {
protocol: ProtocolId,
data: Vec<u8>
) {
if let Some(peer_id) = self.nodes_addresses.get(&node_index).map(|(id, _)| id) {
if let Some(peer_id) = self.nodes_info.get(&node_index).map(|info| &info.peer_id) {
self.swarm.send_custom_message(peer_id, protocol, data);
} else {
warn!(target: "sub-libp2p", "Tried to send message to unknown node: {:}", node_index);
@@ -278,9 +295,10 @@ impl Service {
/// Same as `drop_node`, except that the same peer will not be able to reconnect later.
#[inline]
pub fn ban_node(&mut self, node_index: NodeIndex) {
if let Some(peer_id) = self.nodes_addresses.get(&node_index).map(|(id, _)| id) {
info!(target: "sub-libp2p", "Banned {:?} (#{:?})", peer_id, node_index);
self.swarm.ban_node(peer_id.clone());
if let Some(info) = self.nodes_info.get(&node_index) {
info!(target: "sub-libp2p", "Banned {:?} (#{:?}, {:?}, {:?})", info.peer_id,
node_index, info.endpoint, info.client_version);
self.swarm.ban_node(info.peer_id.clone());
}
}
@@ -290,9 +308,10 @@ impl Service {
/// Corresponding closing events will be generated once the closing actually happens.
#[inline]
pub fn drop_node(&mut self, node_index: NodeIndex) {
if let Some(peer_id) = self.nodes_addresses.get(&node_index).map(|(id, _)| id) {
debug!(target: "sub-libp2p", "Dropping {:?} on purpose (#{:?})", peer_id, node_index);
self.swarm.drop_node(peer_id);
if let Some(info) = self.nodes_info.get(&node_index) {
debug!(target: "sub-libp2p", "Dropping {:?} on purpose (#{:?}, {:?}, {:?})",
info.peer_id, node_index, info.endpoint, info.client_version);
self.swarm.drop_node(&info.peer_id);
}
}
@@ -301,13 +320,21 @@ impl Service {
match self.index_by_id.entry(peer) {
Entry::Occupied(entry) => {
let id = *entry.get();
self.nodes_addresses.insert(id, (entry.key().clone(), endpoint));
self.nodes_info.insert(id, NodeInfo {
peer_id: entry.key().clone(),
endpoint,
client_version: None,
});
id
},
Entry::Vacant(entry) => {
let id = self.next_node_id;
self.next_node_id += 1;
self.nodes_addresses.insert(id, (entry.key().clone(), endpoint));
self.nodes_info.insert(id, NodeInfo {
peer_id: entry.key().clone(),
endpoint,
client_version: None,
});
entry.insert(id);
id
},
@@ -318,7 +345,7 @@ impl Service {
fn poll_swarm(&mut self) -> Poll<Option<ServiceEvent>, IoError> {
loop {
match self.swarm.poll() {
Ok(Async::Ready(Some(CustomProtosOut::CustomProtocolOpen { protocol_id, peer_id, version, endpoint }))) => {
Ok(Async::Ready(Some(BehaviourOut::CustomProtocolOpen { protocol_id, peer_id, version, endpoint }))) => {
debug!(target: "sub-libp2p", "Opened custom protocol with {:?}", peer_id);
let node_index = self.index_of_peer_or_assign(peer_id, endpoint);
break Ok(Async::Ready(Some(ServiceEvent::OpenedCustomProtocol {
@@ -327,7 +354,7 @@ impl Service {
version,
})))
}
Ok(Async::Ready(Some(CustomProtosOut::CustomProtocolClosed { protocol_id, peer_id, result }))) => {
Ok(Async::Ready(Some(BehaviourOut::CustomProtocolClosed { protocol_id, peer_id, result }))) => {
debug!(target: "sub-libp2p", "Custom protocol with {:?} closed: {:?}", peer_id, result);
let node_index = *self.index_by_id.get(&peer_id).expect("index_by_id is always kept in sync with the state of the behaviour");
break Ok(Async::Ready(Some(ServiceEvent::ClosedCustomProtocol {
@@ -335,7 +362,7 @@ impl Service {
protocol: protocol_id,
})))
}
Ok(Async::Ready(Some(CustomProtosOut::CustomMessage { protocol_id, peer_id, data }))) => {
Ok(Async::Ready(Some(BehaviourOut::CustomMessage { protocol_id, peer_id, data }))) => {
let node_index = *self.index_by_id.get(&peer_id).expect("index_by_id is always kept in sync with the state of the behaviour");
break Ok(Async::Ready(Some(ServiceEvent::CustomMessage {
node_index,
@@ -343,6 +370,16 @@ impl Service {
data,
})))
}
Ok(Async::Ready(Some(BehaviourOut::Identified { peer_id, info }))) => {
// Contrary to the other events, this one can happen even on nodes which don't
// have any open custom protocol slot. Therefore it is not necessarily in the
// list.
if let Some(id) = self.index_by_id.get(&peer_id) {
self.nodes_info.get_mut(id)
.expect("index_by_id and nodes_info are always kept in sync; QED")
.client_version = Some(info.agent_version);
}
}
Ok(Async::NotReady) => break Ok(Async::NotReady),
Ok(Async::Ready(None)) => unreachable!("The Swarm stream never ends"),
Err(_) => unreachable!("The Swarm never errors"),
+5 -2
View File
@@ -52,8 +52,10 @@ pub struct NetworkConfiguration {
pub reserved_nodes: Vec<String>,
/// The non-reserved peer mode.
pub non_reserved_mode: NonReservedPeerMode,
/// Client identifier
/// Client identifier. Sent over the wire for debugging purposes.
pub client_version: String,
/// Name of the node. Sent over the wire for debugging purposes.
pub node_name: String,
}
impl Default for NetworkConfiguration {
@@ -80,7 +82,8 @@ impl NetworkConfiguration {
out_peers: 75,
reserved_nodes: Vec::new(),
non_reserved_mode: NonReservedPeerMode::Accept,
client_version: "Parity-network".into(), // TODO: meh
client_version: "unknown".into(),
node_name: "unknown".into(),
}
}
+1
View File
@@ -112,6 +112,7 @@ fn node_config<F: ServiceFactory> (
reserved_nodes: vec![],
non_reserved_mode: NonReservedPeerMode::Accept,
client_version: "network/test/0.1".to_owned(),
node_name: "unknown".to_owned(),
};
Configuration {