Replace NodeIndex with PeerId everywhere (#2077)

* Replace NodeIndex with PeerId

* Fix tests

* More test fixing

* Whitespace
This commit is contained in:
Pierre Krieger
2019-03-23 10:34:28 +01:00
committed by Arkadiy Paronyan
parent 6fa40ec199
commit 6e394464b8
20 changed files with 424 additions and 480 deletions
-6
View File
@@ -37,12 +37,6 @@ use std::{collections::{HashMap, HashSet}, error, fmt, time::Duration};
/// Protocol / handler id
pub type ProtocolId = [u8; 3];
/// Node public key
pub type NodeId = PeerId;
/// Local (temporary) peer session ID.
pub type NodeIndex = usize;
/// Parses a string address and returns the component, if valid.
pub fn parse_str_addr(addr_str: &str) -> Result<(PeerId, Multiaddr), ParseErr> {
let mut addr: Multiaddr = addr_str.parse()?;
+44 -108
View File
@@ -19,14 +19,13 @@ use crate::{
transport, NetworkState, NetworkStatePeer, NetworkStateNotConnectedPeer
};
use crate::custom_proto::{CustomMessage, RegisteredProtocol};
use crate::{NetworkConfiguration, NonReservedPeerMode, NodeIndex, parse_str_addr};
use crate::{NetworkConfiguration, NonReservedPeerMode, parse_str_addr};
use fnv::FnvHashMap;
use futures::{prelude::*, Stream};
use libp2p::{multiaddr::Protocol, Multiaddr, core::swarm::NetworkBehaviour, PeerId};
use libp2p::core::{Swarm, nodes::Substream, transport::boxed::Boxed, muxing::StreamMuxerBox};
use libp2p::core::nodes::ConnectedPoint;
use log::{debug, error, info, warn};
use std::collections::hash_map::Entry;
use log::{debug, info, warn};
use std::fs;
use std::io::Error as IoError;
use std::path::Path;
@@ -114,8 +113,6 @@ where TMessage: CustomMessage + Send + 'static {
swarm,
bandwidth,
nodes_info: Default::default(),
index_by_id: Default::default(),
next_node_id: 1,
injected_events: Vec::new(),
};
@@ -127,10 +124,8 @@ where TMessage: CustomMessage + Send + 'static {
pub enum ServiceEvent<TMessage> {
/// A custom protocol substream has been opened with a node.
OpenedCustomProtocol {
/// The Id of the node.
/// Identity of the node.
peer_id: PeerId,
/// Index of the node.
node_index: NodeIndex,
/// Version of the protocol that was opened.
version: u8,
/// Node debug info
@@ -139,16 +134,16 @@ pub enum ServiceEvent<TMessage> {
/// A custom protocol substream has been closed.
ClosedCustomProtocol {
/// Index of the node.
node_index: NodeIndex,
/// Identity of the node.
peer_id: PeerId,
/// Node debug info
debug_info: String,
},
/// Receives a message on a custom protocol stream.
CustomMessage {
/// Index of the node.
node_index: NodeIndex,
/// Identity of the node.
peer_id: PeerId,
/// Message that has been received.
message: TMessage,
},
@@ -156,7 +151,7 @@ pub enum ServiceEvent<TMessage> {
/// The substream with a node is clogged. We should avoid sending data to it if possible.
Clogged {
/// Index of the node.
node_index: NodeIndex,
peer_id: PeerId,
/// Copy of the messages that are within the buffer, for further diagnostic.
messages: Vec<TMessage>,
},
@@ -171,13 +166,7 @@ pub struct Service<TMessage> where TMessage: CustomMessage {
bandwidth: Arc<transport::BandwidthSinks>,
/// Information about all the nodes we're connected to.
nodes_info: FnvHashMap<NodeIndex, NodeInfo>,
/// Opposite of `nodes_info`.
index_by_id: FnvHashMap<PeerId, NodeIndex>,
/// Next index to assign to a node.
next_node_id: NodeIndex,
nodes_info: FnvHashMap<PeerId, NodeInfo>,
/// Events to produce on the Stream.
injected_events: Vec<ServiceEvent<TMessage>>,
@@ -186,8 +175,6 @@ pub struct Service<TMessage> where TMessage: CustomMessage {
/// Information about a node we're connected to.
#[derive(Debug)]
struct NodeInfo {
/// Hash of the public key of the node.
peer_id: PeerId,
/// How we're connected to the node.
endpoint: ConnectedPoint,
/// Version reported by the remote, or `None` if unknown.
@@ -202,16 +189,16 @@ where TMessage: CustomMessage + Send + 'static {
pub fn state(&mut self) -> NetworkState {
let connected_peers = {
let swarm = &mut self.swarm;
self.nodes_info.values().map(move |info| {
let known_addresses = NetworkBehaviour::addresses_of_peer(&mut **swarm, &info.peer_id)
self.nodes_info.iter().map(move |(peer_id, info)| {
let known_addresses = NetworkBehaviour::addresses_of_peer(&mut **swarm, peer_id)
.into_iter().collect();
(info.peer_id.to_base58(), NetworkStatePeer {
(peer_id.to_base58(), NetworkStatePeer {
endpoint: info.endpoint.clone().into(),
version_string: info.client_version.clone(),
latest_ping_time: info.latest_ping,
enabled: swarm.is_enabled(&info.peer_id),
open: swarm.is_open(&info.peer_id),
enabled: swarm.is_enabled(&peer_id),
open: swarm.is_open(&peer_id),
known_addresses,
})
}).collect()
@@ -219,8 +206,8 @@ where TMessage: CustomMessage + Send + 'static {
let not_connected_peers = {
let swarm = &mut self.swarm;
let index_by_id = &self.index_by_id;
let list = swarm.known_peers().filter(|p| !index_by_id.contains_key(p))
let nodes_info = &self.nodes_info;
let list = swarm.known_peers().filter(|p| !nodes_info.contains_key(p))
.cloned().collect::<Vec<_>>();
list.into_iter().map(move |peer_id| {
(peer_id.to_base58(), NetworkStateNotConnectedPeer {
@@ -266,25 +253,19 @@ where TMessage: CustomMessage + Send + 'static {
/// Returns the list of all the peers we are connected to.
#[inline]
pub fn connected_peers<'a>(&'a self) -> impl Iterator<Item = NodeIndex> + 'a {
self.nodes_info.keys().cloned()
}
/// Returns the `PeerId` of a node.
#[inline]
pub fn peer_id_of_node(&self, node_index: NodeIndex) -> Option<&PeerId> {
self.nodes_info.get(&node_index).map(|info| &info.peer_id)
pub fn connected_peers<'a>(&'a self) -> impl Iterator<Item = &'a PeerId> + 'a {
self.nodes_info.keys()
}
/// Returns the way we are connected to a node.
#[inline]
pub fn node_endpoint(&self, node_index: NodeIndex) -> Option<&ConnectedPoint> {
self.nodes_info.get(&node_index).map(|info| &info.endpoint)
pub fn node_endpoint(&self, peer_id: &PeerId) -> Option<&ConnectedPoint> {
self.nodes_info.get(peer_id).map(|info| &info.endpoint)
}
/// Returns the client version reported by a node.
pub fn node_client_version(&self, node_index: NodeIndex) -> Option<&str> {
self.nodes_info.get(&node_index)
pub fn node_client_version(&self, peer_id: &PeerId) -> Option<&str> {
self.nodes_info.get(peer_id)
.and_then(|info| info.client_version.as_ref().map(|s| &s[..]))
}
@@ -294,25 +275,21 @@ where TMessage: CustomMessage + Send + 'static {
/// invalid.
pub fn send_custom_message(
&mut self,
node_index: NodeIndex,
peer_id: &PeerId,
message: TMessage
) {
if let Some(peer_id) = self.nodes_info.get(&node_index).map(|info| &info.peer_id) {
self.swarm.send_custom_message(peer_id, message);
} else {
warn!(target: "sub-libp2p", "Tried to send message to unknown node: {:}", node_index);
}
self.swarm.send_custom_message(peer_id, message);
}
/// Disconnects a peer.
///
/// This is asynchronous and will not immediately close the peer.
/// Corresponding closing events will be generated once the closing actually happens.
pub fn drop_node(&mut self, node_index: NodeIndex) {
if let Some(info) = self.nodes_info.get(&node_index) {
debug!(target: "sub-libp2p", "Dropping {:?} on purpose (#{:?}, {:?}, {:?})",
info.peer_id, node_index, info.endpoint, info.client_version);
self.swarm.drop_node(&info.peer_id);
pub fn drop_node(&mut self, peer_id: &PeerId) {
if let Some(info) = self.nodes_info.get(peer_id) {
debug!(target: "sub-libp2p", "Dropping {:?} on purpose ({:?}, {:?})",
peer_id, info.endpoint, info.client_version);
self.swarm.drop_node(peer_id);
}
}
@@ -322,73 +299,42 @@ where TMessage: CustomMessage + Send + 'static {
}
/// Get debug info for a given peer.
pub fn peer_debug_info(&self, who: NodeIndex) -> String {
if let Some(info) = self.nodes_info.get(&who) {
format!("{:?} (version: {:?}) through {:?}", info.peer_id, info.client_version, info.endpoint)
pub fn peer_debug_info(&self, who: &PeerId) -> String {
if let Some(info) = self.nodes_info.get(who) {
format!("{:?} (version: {:?}) through {:?}", who, info.client_version, info.endpoint)
} else {
"unknown".to_string()
}
}
/// Returns the `NodeIndex` of a peer, or assigns one if none exists.
fn index_of_peer_or_assign(&mut self, peer: PeerId, endpoint: ConnectedPoint) -> NodeIndex {
match self.index_by_id.entry(peer) {
Entry::Occupied(entry) => {
let id = *entry.get();
self.nodes_info.insert(id, NodeInfo {
peer_id: entry.key().clone(),
endpoint,
client_version: None,
latest_ping: None,
});
id
},
Entry::Vacant(entry) => {
let id = self.next_node_id;
self.next_node_id += 1;
self.nodes_info.insert(id, NodeInfo {
peer_id: entry.key().clone(),
endpoint,
client_version: None,
latest_ping: None,
});
entry.insert(id);
id
},
}
}
/// Polls for what happened on the network.
fn poll_swarm(&mut self) -> Poll<Option<ServiceEvent<TMessage>>, IoError> {
loop {
match self.swarm.poll() {
Ok(Async::Ready(Some(BehaviourOut::CustomProtocolOpen { peer_id, version, endpoint }))) => {
let node_index = self.index_of_peer_or_assign(peer_id.clone(), endpoint);
Ok(Async::Ready(Some(BehaviourOut::CustomProtocolOpen { peer_id, version, .. }))) => {
let debug_info = self.peer_debug_info(&peer_id);
break Ok(Async::Ready(Some(ServiceEvent::OpenedCustomProtocol {
peer_id,
node_index,
version,
debug_info: self.peer_debug_info(node_index),
debug_info,
})))
}
Ok(Async::Ready(Some(BehaviourOut::CustomProtocolClosed { peer_id, .. }))) => {
let node_index = *self.index_by_id.get(&peer_id).expect("index_by_id is always kept in sync with the state of the behaviour");
let debug_info = self.peer_debug_info(&peer_id);
break Ok(Async::Ready(Some(ServiceEvent::ClosedCustomProtocol {
node_index,
debug_info: self.peer_debug_info(node_index),
peer_id,
debug_info,
})))
}
Ok(Async::Ready(Some(BehaviourOut::CustomMessage { peer_id, message }))) => {
let node_index = *self.index_by_id.get(&peer_id).expect("index_by_id is always kept in sync with the state of the behaviour");
break Ok(Async::Ready(Some(ServiceEvent::CustomMessage {
node_index,
peer_id,
message,
})))
}
Ok(Async::Ready(Some(BehaviourOut::Clogged { peer_id, messages }))) => {
let node_index = *self.index_by_id.get(&peer_id).expect("index_by_id is always kept in sync with the state of the behaviour");
break Ok(Async::Ready(Some(ServiceEvent::Clogged {
node_index,
peer_id,
messages,
})))
}
@@ -396,26 +342,16 @@ where TMessage: CustomMessage + Send + 'static {
// Contrary to the other events, this one can happen even on nodes which don't
// have any open custom protocol slot. Therefore it is not necessarily in the
// list.
if let Some(id) = self.index_by_id.get(&peer_id) {
if let Some(n) = self.nodes_info.get_mut(id) {
n.client_version = Some(info.agent_version);
} else {
error!(target: "sub-libp2p",
"State inconsistency between index_by_id and nodes_info");
}
if let Some(n) = self.nodes_info.get_mut(&peer_id) {
n.client_version = Some(info.agent_version);
}
}
Ok(Async::Ready(Some(BehaviourOut::PingSuccess { peer_id, ping_time }))) => {
// Contrary to the other events, this one can happen even on nodes which don't
// have any open custom protocol slot. Therefore it is not necessarily in the
// list.
if let Some(id) = self.index_by_id.get(&peer_id) {
if let Some(n) = self.nodes_info.get_mut(id) {
n.latest_ping = Some(ping_time);
} else {
error!(target: "sub-libp2p",
"State inconsistency between index_by_id and nodes_info");
}
if let Some(n) = self.nodes_info.get_mut(&peer_id) {
n.latest_ping = Some(ping_time);
}
}
Ok(Async::NotReady) => break Ok(Async::NotReady),