Add a reputation system (#645)

* Add a reputation system

* Fix tests

* Don't try to dial peers to which we are already connected

* Use the master branch of libp2p
This commit is contained in:
Pierre Krieger
2018-09-04 08:52:20 +02:00
committed by Gav Wood
parent 146ebceab4
commit 360ffa2dbb
7 changed files with 1383 additions and 913 deletions
@@ -11,8 +11,7 @@ bytes = "0.4"
error-chain = { version = "0.12", default-features = false }
fnv = "1.0"
futures = "0.1"
# libp2p = { git = "https://github.com/tomaka/libp2p-rs", branch = "polkadot-2", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] }
libp2p = { git = "https://github.com/tomaka/libp2p-rs", rev = "6aa139a12dbea3d75d898ce0b2af7fcec129e294", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] }
libp2p = { git = "https://github.com/libp2p/rust-libp2p", rev = "02576eecf140a06134519ed9438d061d99bb2e69", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] }
ethcore-io = { git = "https://github.com/paritytech/parity.git" }
ethkey = { git = "https://github.com/paritytech/parity.git" }
ethereum-types = "0.3"
@@ -20,10 +19,13 @@ parking_lot = "0.5"
libc = "0.2"
log = "0.3"
rand = "0.5.0"
serde = "1.0.70"
serde_derive = "1.0.70"
serde_json = "1.0.24"
tokio = "0.1"
tokio-io = "0.1"
tokio-timer = "0.2"
unsigned-varint = { version = "0.1", features = ["codec"] }
unsigned-varint = { version = "0.2", features = ["codec"] }
[dev-dependencies]
assert_matches = "1.2"
@@ -27,6 +27,10 @@ extern crate ethkey;
extern crate libc;
extern crate libp2p;
extern crate rand;
extern crate serde;
#[macro_use]
extern crate serde_derive;
extern crate serde_json;
extern crate bytes;
extern crate unsigned_varint;
@@ -52,6 +56,7 @@ mod error;
mod network_state;
mod service;
mod timeouts;
mod topology;
mod traits;
mod transport;
@@ -20,20 +20,17 @@ use futures::sync::mpsc;
use libp2p::core::{multiaddr::ToMultiaddr, Multiaddr, AddrComponent, Endpoint, UniqueConnec};
use libp2p::core::{UniqueConnecState, PeerId, PublicKey};
use libp2p::kad::KadConnecController;
use libp2p::peerstore::{Peerstore, PeerAccess};
use libp2p::peerstore::json_peerstore::JsonPeerstore;
use libp2p::peerstore::memory_peerstore::MemoryPeerstore;
use libp2p::ping::Pinger;
use libp2p::secio;
use {Error, ErrorKind, NetworkConfiguration, NonReservedPeerMode};
use {NodeIndex, ProtocolId, SessionInfo};
use parking_lot::{Mutex, RwLock};
use rand::{self, Rng};
use topology::{DisconnectReason, NetTopology};
use std::fs;
use std::io::{Error as IoError, ErrorKind as IoErrorKind, Read, Write};
use std::path::Path;
use std::sync::atomic;
use std::{thread, time};
use std::time::{Duration, Instant};
// File where the peers are stored.
@@ -46,7 +43,7 @@ const PEER_DISABLE_DURATION: Duration = Duration::from_secs(5 * 60);
// Common struct shared throughout all the components of the service.
pub struct NetworkState {
/// Contains the information about the network.
node_store: NodeStore,
topology: RwLock<NetTopology>,
/// Active connections.
connections: RwLock<Connections>,
@@ -74,13 +71,6 @@ pub struct NetworkState {
local_public_key: PublicKey,
}
enum NodeStore {
/// Peers are stored in memory. Nothing is stored on disk.
Memory(MemoryPeerstore),
/// Peers are stored in a JSON file on the disk.
Json(JsonPeerstore),
}
struct Connections {
/// For each libp2p peer ID, the ID of the peer in the API we expose.
/// Also corresponds to the index in `info_by_peer`.
@@ -106,23 +96,24 @@ struct PeerConnectionInfo {
/// Id of the peer.
id: PeerId,
/// True if this connection was initiated by us.
/// True if this connection was initiated by us. `None` if we're not connected.
/// Note that it is theoretically possible that we dial the remote at the
/// same time they dial us, in which case the protocols may be dispatched
/// between both connections, and in which case the value here will be racy.
originated: bool,
originated: Option<bool>,
/// Latest known ping duration.
ping: Mutex<Option<Duration>>,
ping: Option<Duration>,
/// The client version of the remote, or `None` if not known.
client_version: Option<String>,
/// The multiaddress of the remote, or `None` if not known.
remote_address: Option<Multiaddr>,
/// The multiaddresses of the remote, or `None` if not known.
remote_addresses: Vec<Multiaddr>,
/// The local multiaddress used to communicate with the remote, or `None`
/// if not known.
// TODO: never filled ; also shouldn't be an `Option`
local_address: Option<Multiaddr>,
}
@@ -144,7 +135,7 @@ pub struct PeerInfo {
/// The client version of the remote, or `None` if not known.
pub client_version: Option<String>,
/// The multiaddress of the remote, or `None` if not known.
/// The multiaddress of the remote.
pub remote_address: Option<Multiaddr>,
/// The local multiaddress used to communicate with the remote, or `None`
@@ -156,10 +147,10 @@ impl<'a> From<&'a PeerConnectionInfo> for PeerInfo {
fn from(i: &'a PeerConnectionInfo) -> PeerInfo {
PeerInfo {
id: i.id.clone(),
originated: i.originated,
ping: i.ping.lock().clone(),
originated: i.originated.unwrap_or(true),
ping: i.ping,
client_version: i.client_version.clone(),
remote_address: i.remote_address.clone(),
remote_address: i.remote_addresses.get(0).map(|a| a.clone()),
local_address: i.local_address.clone(),
}
}
@@ -172,39 +163,13 @@ impl NetworkState {
let local_public_key = local_private_key.to_public_key();
// Build the storage for peers, including the bootstrap nodes.
let node_store = if let Some(ref path) = config.net_config_path {
let mut topology = if let Some(ref path) = config.net_config_path {
let path = Path::new(path).join(NODES_FILE);
if let Ok(node_store) = JsonPeerstore::new(path.clone()) {
debug!(target: "sub-libp2p", "Initialized peer store for JSON file {:?}", path);
NodeStore::Json(node_store)
} else {
warn!(target: "sub-libp2p", "Failed to open peer storage {:?}; peers file will be reset", path);
fs::remove_file(&path).expect("Failed deleting peers.json");
// we check for about 1s if the file was really deleted and move on
for _x in 0..200 {
if !Path::new(&path).exists() {
break;
} else {
debug!("Waiting for effective deletion of invalid/outdate peers.json");
thread::sleep(time::Duration::from_millis(5));
}
}
if let Ok(peerstore) = JsonPeerstore::new(path.clone()) {
debug!("peers.json reset");
NodeStore::Json(peerstore)
} else {
warn!(target: "sub-libp2p",
"Failed to reset peer storage {:?}; peers change will not be saved",
path
);
NodeStore::Memory(MemoryPeerstore::empty())
}
}
debug!(target: "sub-libp2p", "Initializing peer store for JSON file {:?}", path);
NetTopology::from_file(path)
} else {
debug!(target: "sub-libp2p", "No peers file configured ; peers won't be saved");
NodeStore::Memory(MemoryPeerstore::empty())
NetTopology::memory()
};
let reserved_peers = {
@@ -213,7 +178,7 @@ impl NetworkState {
Default::default()
);
for peer in config.reserved_nodes.iter() {
let id = parse_and_add_to_node_store(peer, &node_store)?;
let (id, _) = parse_and_add_to_topology(peer, &mut topology)?;
reserved_peers.insert(id);
}
RwLock::new(reserved_peers)
@@ -222,7 +187,7 @@ impl NetworkState {
let expected_max_peers = config.max_peers as usize + config.reserved_nodes.len();
Ok(NetworkState {
node_store,
topology: RwLock::new(topology),
max_outgoing_peers: config.min_peers,
max_incoming_peers: config.max_peers.saturating_sub(config.min_peers),
connections: RwLock::new(Connections {
@@ -248,37 +213,55 @@ impl NetworkState {
&self.local_public_key
}
/// Returns the ID of a random peer of the network.
/// Returns a list of peers and addresses which we should try connect to.
///
/// Returns `None` if we don't know any peer.
pub fn random_peer(&self) -> Option<PeerId> {
// TODO: optimize by putting the operation directly in the node_store
// https://github.com/libp2p/rust-libp2p/issues/316
let peers = match self.node_store {
NodeStore::Memory(ref mem) =>
mem.peers().collect::<Vec<_>>(),
NodeStore::Json(ref json) =>
json.peers().collect::<Vec<_>>(),
/// Because of expiration and back-off mechanisms, this list can change
/// by itself over time. The `Instant` that is returned corresponds to
/// the earlier known time when a new entry will be added automatically to
/// the list.
pub fn outgoing_connections_to_attempt(&self) -> (Vec<(PeerId, Multiaddr)>, Instant) {
// TODO: handle better
let connections = self.connections.read();
let num_to_attempt = if self.reserved_only.load(atomic::Ordering::Relaxed) {
0
} else {
let num_open_custom_connections = num_open_custom_connections(&connections, &self.reserved_peers.read());
self.max_outgoing_peers.saturating_sub(num_open_custom_connections.unreserved_outgoing)
};
if peers.is_empty() {
return None
}
let topology = self.topology.read();
let (list, change) = topology.addrs_to_attempt();
let list = list
.filter(|&(peer, _)| {
// Filter out peers which we are already connected to.
let cur = match connections.peer_by_nodeid.get(peer) {
Some(e) => e,
None => return true
};
let nth = rand::random::<usize>() % peers.len();
Some(peers[nth].clone())
let infos = match connections.info_by_peer.get(&cur) {
Some(i) => i,
None => return true
};
!infos.protocols.iter().any(|(_, conn)| conn.is_alive())
})
.take(num_to_attempt as usize)
.map(|(addr, peer)| (addr.clone(), peer.clone()))
.collect();
(list, change)
}
/// Returns all the IDs of the peers on the network we have knowledge of.
///
/// This includes peers we are not connected to.
pub fn known_peers(&self) -> impl Iterator<Item = PeerId> {
match self.node_store {
NodeStore::Memory(ref mem) =>
mem.peers().collect::<Vec<_>>().into_iter(),
NodeStore::Json(ref json) =>
json.peers().collect::<Vec<_>>().into_iter(),
}
pub fn known_peers(&self) -> Vec<PeerId> {
let topology = self.topology.read();
// Note: I have no idea why, but fusing the two lines below fails the
// borrow check
let out: Vec<_> = topology.peers().cloned().collect();
out
}
/// Returns true if we are connected to any peer at all.
@@ -307,7 +290,7 @@ impl NetworkState {
Some(info) => info,
None => return,
};
*info.ping.lock() = Some(ping);
info.ping = Some(ping);
}
/// If we're connected to a peer with the given protocol, returns
@@ -329,18 +312,15 @@ impl NetworkState {
None => return None,
};
let ping = info.ping.lock().clone();
Some(SessionInfo {
id: None, // TODO: ???? what to do??? wrong format!
client_version: info.client_version.clone().take().unwrap_or(String::new()),
protocol_version,
capabilities: Vec::new(), // TODO: list of supported protocols ; hard
peer_capabilities: Vec::new(), // TODO: difference with `peer_capabilities`?
ping,
originated: info.originated,
remote_address: info.remote_address.as_ref().map(|a| a.to_string())
.unwrap_or(String::new()),
ping: info.ping,
originated: info.originated.unwrap_or(true),
remote_address: info.remote_addresses.get(0).map(|a| a.to_string()).unwrap_or_default(),
local_address: info.local_address.as_ref().map(|a| a.to_string())
.unwrap_or(String::new()),
})
@@ -371,83 +351,45 @@ impl NetworkState {
/// Adds an address discovered by Kademlia.
/// Note that we don't have to be connected to a peer to add an address.
pub fn add_kad_discovered_addr(&self, node_id: &PeerId, addr: Multiaddr) {
trace!(target: "sub-libp2p", "Peer store: adding address {} for {:?}",
addr, node_id);
match self.node_store {
NodeStore::Memory(ref mem) =>
mem.peer_or_create(node_id)
.add_addr(addr, Duration::from_secs(3600)),
NodeStore::Json(ref json) =>
json.peer_or_create(node_id)
.add_addr(addr, Duration::from_secs(3600)),
}
}
/// Signals that an address doesn't match the corresponding node ID.
/// This removes the address from the peer store, so that it is not
/// returned by `addrs_of_peer` again in the future.
pub fn set_invalid_kad_address(&self, node_id: &PeerId, addr: &Multiaddr) {
// TODO: blacklist the address?
match self.node_store {
NodeStore::Memory(ref mem) =>
if let Some(mut peer) = mem.peer(node_id) {
peer.rm_addr(addr.clone()) // TODO: cloning necessary?
},
NodeStore::Json(ref json) =>
if let Some(mut peer) = json.peer(node_id) {
peer.rm_addr(addr.clone()) // TODO: cloning necessary?
},
}
self.topology.write().add_kademlia_discovered_addr(node_id, addr)
}
/// Returns the known multiaddresses of a peer.
pub fn addrs_of_peer(&self, node_id: &PeerId) -> Vec<Multiaddr> {
match self.node_store {
NodeStore::Memory(ref mem) =>
mem.peer(node_id)
.into_iter()
.flat_map(|p| p.addrs())
.collect::<Vec<_>>(),
NodeStore::Json(ref json) =>
json.peer(node_id)
.into_iter()
.flat_map(|p| p.addrs())
.collect::<Vec<_>>(),
}
let topology = self.topology.read();
// Note: I have no idea why, but fusing the two lines below fails the
// borrow check
let out: Vec<Multiaddr> = topology.addrs_of_peer(node_id).cloned().collect();
out
}
/// Sets information about a peer.
pub fn set_peer_info(
///
/// No-op if the node index is invalid.
pub fn set_node_info(
&self,
node_id: PeerId,
endpoint: Endpoint,
client_version: String,
local_addr: Multiaddr,
remote_addr: Multiaddr
) -> Result<NodeIndex, IoError> {
node_index: NodeIndex,
client_version: String
) {
let mut connections = self.connections.write();
let who = accept_connection(&mut connections, &self.next_node_index,
node_id.clone(), endpoint)?;
let infos = connections.info_by_peer.get_mut(&who)
.expect("Newly-created peer id is always valid");
let infos = match connections.info_by_peer.get_mut(&node_index) {
Some(i) => i,
None => return
};
infos.client_version = Some(client_version);
infos.remote_address = Some(remote_addr);
infos.local_address = Some(local_addr);
Ok(who)
}
/// Adds a peer to the internal peer store.
/// Returns an error if the peer address is invalid.
pub fn add_peer(&self, peer: &str) -> Result<PeerId, Error> {
parse_and_add_to_node_store(peer, &self.node_store)
pub fn add_bootstrap_peer(&self, peer: &str) -> Result<(PeerId, Multiaddr), Error> {
parse_and_add_to_topology(peer, &mut self.topology.write())
}
/// Adds a reserved peer to the list of reserved peers.
/// Returns an error if the peer address is invalid.
pub fn add_reserved_peer(&self, peer: &str) -> Result<(), Error> {
let id = parse_and_add_to_node_store(peer, &self.node_store)?;
let (id, _) = parse_and_add_to_topology(peer, &mut self.topology.write())?;
self.reserved_peers.write().insert(id);
Ok(())
}
@@ -456,7 +398,7 @@ impl NetworkState {
/// active connection to this peer.
/// Returns an error if the peer address is invalid.
pub fn remove_reserved_peer(&self, peer: &str) -> Result<(), Error> {
let id = parse_and_add_to_node_store(peer, &self.node_store)?;
let (id, _) = parse_and_add_to_topology(peer, &mut self.topology.write())?;
self.reserved_peers.write().remove(&id);
// Dropping the peer if we're in reserved mode.
@@ -464,6 +406,7 @@ impl NetworkState {
let mut connections = self.connections.write();
if let Some(who) = connections.peer_by_nodeid.remove(&id) {
connections.info_by_peer.remove(&who);
// TODO: use drop_peer instead
}
}
@@ -481,58 +424,168 @@ impl NetworkState {
}
}
/// Returns the number of new outgoing custom connections to peers to
/// open. This takes into account the number of active peers.
pub fn should_open_outgoing_custom_connections(&self) -> u32 {
if self.reserved_only.load(atomic::Ordering::Relaxed) {
0
} else {
let num_open_custom_connections = num_open_custom_connections(&self.connections.read(), &self.reserved_peers.read());
self.max_outgoing_peers.saturating_sub(num_open_custom_connections.unreserved_outgoing)
}
}
/// Returns true if we are connected to the given node.
pub fn has_connection(&self, node_id: &PeerId) -> bool {
let connections = self.connections.read();
connections.peer_by_nodeid.contains_key(node_id)
}
/// Reports that we tried to connect to the given address but failed.
///
/// This decreases the chance this address will be tried again in the future.
#[inline]
pub fn report_failed_to_connect(&self, addr: &Multiaddr) {
trace!(target: "sub-libp2p", "Failed to connect to {:?}", addr);
self.topology.write().report_failed_to_connect(addr);
}
/// Returns the `NodeIndex` corresponding to a node id, or assigns a `NodeIndex` if none
/// exists.
///
/// Returns an error if this node is on the list of disabled/banned nodes..
pub fn assign_node_index(
&self,
node_id: &PeerId
) -> Result<NodeIndex, IoError> {
// Check whether node is disabled.
// TODO: figure out the locking strategy here to avoid possible deadlocks
// TODO: put disabled_nodes in connections?
let mut disabled_nodes = self.disabled_nodes.lock();
if let Some(timeout) = disabled_nodes.get(node_id).cloned() {
if timeout > Instant::now() {
debug!(target: "sub-libp2p", "Refusing peer {:?} because it is disabled", node_id);
return Err(IoError::new(IoErrorKind::ConnectionRefused, "peer is disabled"));
} else {
disabled_nodes.remove(node_id);
}
}
drop(disabled_nodes);
let mut connections = self.connections.write();
let connections = &mut *connections;
let peer_by_nodeid = &mut connections.peer_by_nodeid;
let info_by_peer = &mut connections.info_by_peer;
let who = *peer_by_nodeid.entry(node_id.clone()).or_insert_with(|| {
let new_id = self.next_node_index.fetch_add(1, atomic::Ordering::Relaxed);
trace!(target: "sub-libp2p", "Creating new peer #{:?} for {:?}", new_id, node_id);
info_by_peer.insert(new_id, PeerConnectionInfo {
protocols: Vec::new(), // TODO: Vec::with_capacity(num_registered_protocols),
kad_connec: UniqueConnec::empty(),
ping_connec: UniqueConnec::empty(),
id: node_id.clone(),
originated: None,
ping: None,
client_version: None,
local_address: None,
remote_addresses: Vec::with_capacity(1),
});
new_id
});
Ok(who)
}
/// Notifies that we're connected to a node through an address.
///
/// Returns an error if we refuse the connection.
///
/// Note that is it legal to connection multiple times to the same node id through different
/// addresses and endpoints.
pub fn report_connected(
&self,
node_index: NodeIndex,
addr: &Multiaddr,
endpoint: Endpoint
) -> Result<(), IoError> {
let mut connections = self.connections.write();
// TODO: double locking in this function ; although this has been reviewed to not deadlock
// as of the writing of this code, it is possible that a later change that isn't carefully
// reviewed triggers one
if endpoint == Endpoint::Listener {
let stats = num_open_custom_connections(&connections, &self.reserved_peers.read());
if stats.unreserved_incoming >= self.max_incoming_peers {
debug!(target: "sub-libp2p", "Refusing incoming connection from {} because we \
reached max incoming peers", addr);
return Err(IoError::new(IoErrorKind::ConnectionRefused,
"maximum incoming peers reached"));
}
}
let infos = match connections.info_by_peer.get_mut(&node_index) {
Some(i) => i,
None => return Ok(())
};
if !infos.remote_addresses.iter().any(|a| a == addr) {
infos.remote_addresses.push(addr.clone());
}
if infos.originated.is_none() {
infos.originated = Some(endpoint == Endpoint::Dialer);
}
self.topology.write().report_connected(addr, &infos.id);
Ok(())
}
/// Returns the node id from a node index.
///
/// Returns `None` if the node index is invalid.
pub fn node_id_from_index(
&self,
node_index: NodeIndex
) -> Option<PeerId> {
let mut connections = self.connections.write();
let infos = match connections.info_by_peer.get_mut(&node_index) {
Some(i) => i,
None => return None
};
Some(infos.id.clone())
}
/// Obtains the `UniqueConnec` corresponding to the Kademlia connection to a peer.
///
/// Returns `None` if the node index is invalid.
pub fn kad_connection(
&self,
node_id: PeerId
) -> Result<(NodeIndex, UniqueConnec<KadConnecController>), IoError> {
// TODO: check that the peer is disabled? should disabling a peer also prevent
// kad from working?
node_index: NodeIndex
) -> Option<UniqueConnec<KadConnecController>> {
let mut connections = self.connections.write();
let who = accept_connection(&mut connections, &self.next_node_index,
node_id, Endpoint::Listener)?;
let infos = connections.info_by_peer.get_mut(&who)
.expect("Newly-created peer id is always valid");
let connec = infos.kad_connec.clone();
Ok((who, connec))
let infos = match connections.info_by_peer.get_mut(&node_index) {
Some(i) => i,
None => return None
};
Some(infos.kad_connec.clone())
}
/// Obtains the `UniqueConnec` corresponding to the Ping connection to a peer.
///
/// Returns `None` if the node index is invalid.
pub fn ping_connection(
&self,
node_id: PeerId
) -> Result<(NodeIndex, UniqueConnec<Pinger>), IoError> {
node_index: NodeIndex
) -> Option<UniqueConnec<Pinger>> {
let mut connections = self.connections.write();
let who = accept_connection(&mut connections, &self.next_node_index,
node_id, Endpoint::Listener)?;
let infos = connections.info_by_peer.get_mut(&who)
.expect("Newly-created peer id is always valid");
let connec = infos.ping_connec.clone();
Ok((who, connec))
let infos = match connections.info_by_peer.get_mut(&node_index) {
Some(i) => i,
None => return None
};
Some(infos.ping_connec.clone())
}
/// Cleans up inactive connections and returns a list of
/// connections to ping.
pub fn cleanup_and_prepare_ping(
/// connections to ping and identify.
pub fn cleanup_and_prepare_updates(
&self
) -> Vec<(NodeIndex, PeerId, UniqueConnec<Pinger>)> {
) -> Vec<PeriodicUpdate> {
self.topology.write().cleanup();
let mut connections = self.connections.write();
let connections = &mut *connections;
let peer_by_nodeid = &mut connections.peer_by_nodeid;
@@ -550,70 +603,46 @@ impl NetworkState {
return false;
}
ret.push((who, infos.id.clone(), infos.ping_connec.clone()));
if let Some(addr) = infos.remote_addresses.get(0) {
ret.push(PeriodicUpdate {
node_index: who,
peer_id: infos.id.clone(),
address: addr.clone(),
pinger: infos.ping_connec.clone(),
identify: infos.client_version.is_none(),
});
}
true
});
ret
}
/// Try to add a new connection to a node in the list.
/// Obtains the `UniqueConnec` corresponding to a custom protocol connection to a peer.
///
/// Returns a `NodeIndex` to allow further interfacing with this connection.
/// Note that all `NodeIndex`s are unique and never reused.
///
/// Can return an error if we are refusing the connection to the remote.
///
/// You must pass an `UnboundedSender` which will be used by the `send`
/// method. Actually sending the data is not covered by this code.
///
/// The various methods of the `NetworkState` that close a connection do
/// so by dropping this sender.
/// Returns `None` if the node index is invalid.
pub fn custom_proto(
&self,
node_id: PeerId,
node_index: NodeIndex,
protocol_id: ProtocolId,
endpoint: Endpoint,
) -> Result<(NodeIndex, UniqueConnec<(mpsc::UnboundedSender<Bytes>, u8)>), IoError> {
) -> Option<UniqueConnec<(mpsc::UnboundedSender<Bytes>, u8)>> {
let mut connections = self.connections.write();
if is_peer_disabled(&self.disabled_nodes, &node_id) {
debug!(target: "sub-libp2p", "Refusing node {:?} because it was disabled", node_id);
return Err(IoError::new(IoErrorKind::PermissionDenied, "disabled peer"))
}
let who = accept_connection(&mut connections, &self.next_node_index,
node_id.clone(), endpoint)?;
let num_open_connections = num_open_custom_connections(&connections, &self.reserved_peers.read());
let infos = connections.info_by_peer.get_mut(&who)
.expect("Newly-created peer id is always valid");
let node_is_reserved = self.reserved_peers.read().contains(&infos.id);
if !node_is_reserved {
if self.reserved_only.load(atomic::Ordering::Relaxed) ||
(endpoint == Endpoint::Listener &&
num_open_connections.unreserved_incoming >= self.max_incoming_peers) ||
(endpoint == Endpoint::Dialer &&
num_open_connections.unreserved_outgoing >= self.max_outgoing_peers)
{
debug!(target: "sub-libp2p", "Refusing node {:?} because we reached the max number of peers", node_id);
return Err(IoError::new(IoErrorKind::PermissionDenied, "maximum number of peers reached"))
}
}
let infos = match connections.info_by_peer.get_mut(&node_index) {
Some(i) => i,
None => return None
};
if let Some((_, ref uconn)) = infos.protocols.iter().find(|&(prot, _)| prot == &protocol_id) {
return Ok((who, uconn.clone()))
return Some(uconn.clone())
}
let unique_connec = UniqueConnec::empty();
infos.protocols.push((protocol_id.clone(), unique_connec.clone()));
Ok((who, unique_connec))
Some(unique_connec)
}
/// Sends some data to the given peer, using the sender that was passed
/// to the `UniqueConnec` of `custom_proto`.
pub fn send(&self, protocol: ProtocolId, who: NodeIndex, message: Bytes) -> Result<(), Error> {
pub fn send(&self, who: NodeIndex, protocol: ProtocolId, message: Bytes) -> Result<(), Error> {
if let Some(peer) = self.connections.read().info_by_peer.get(&who) {
let sender = peer.protocols.iter().find(|elem| elem.0 == protocol)
.and_then(|e| e.1.poll())
@@ -660,6 +689,10 @@ impl NetworkState {
peer_info.protocols.iter().filter(|c| c.1.is_alive()).count());
let old = connections.peer_by_nodeid.remove(&peer_info.id);
debug_assert_eq!(old, Some(who));
for addr in &peer_info.remote_addresses {
self.topology.write().report_disconnected(addr,
DisconnectReason::ClosedGracefully); // TODO: wrong reason
}
}
}
@@ -682,13 +715,13 @@ impl NetworkState {
/// of `custom_proto`).
pub fn ban_peer(&self, who: NodeIndex, reason: &str) {
// TODO: what do we do if the peer is reserved?
// TODO: same logging as in disconnect_peer
// TODO: same logging as in drop_peer
let mut connections = self.connections.write();
let peer_info = if let Some(peer_info) = connections.info_by_peer.remove(&who) {
if let (&Some(ref client_version), &Some(ref remote_address)) = (&peer_info.client_version, &peer_info.remote_address) {
info!(target: "network", "Peer {} (version: {}, address: {}) disabled. {}", who, client_version, remote_address, reason);
if let &Some(ref client_version) = &peer_info.client_version {
info!(target: "network", "Peer {} (version: {}, addresses: {:?}) disabled. {}", who, client_version, peer_info.remote_addresses, reason);
} else {
info!(target: "network", "Peer {} disabled. {}", who, reason);
info!(target: "network", "Peer {} (addresses: {:?}) disabled. {}", who, peer_info.remote_addresses, reason);
}
let old = connections.peer_by_nodeid.remove(&peer_info.id);
debug_assert_eq!(old, Some(who));
@@ -707,19 +740,15 @@ impl NetworkState {
/// This is done in an atomical way, so that an error doesn't corrupt
/// anything.
pub fn flush_caches_to_disk(&self) -> Result<(), IoError> {
match self.node_store {
NodeStore::Memory(_) => Ok(()),
NodeStore::Json(ref json) =>
match json.flush() {
Ok(()) => {
debug!(target: "sub-libp2p", "Flushed JSON peer store to disk");
Ok(())
}
Err(err) => {
warn!(target: "sub-libp2p", "Failed to flush changes to JSON peer store: {}", err);
Err(err)
}
}
match self.topology.read().flush_to_disk() {
Ok(()) => {
debug!(target: "sub-libp2p", "Flushed JSON peer store to disk");
Ok(())
}
Err(err) => {
warn!(target: "sub-libp2p", "Failed to flush changes to JSON peer store: {}", err);
Err(err)
}
}
}
}
@@ -730,56 +759,18 @@ impl Drop for NetworkState {
}
}
/// Assigns a `NodeIndex` to a node, or returns an existing ID if any exists.
///
/// The function only accepts already-locked structs, so that we don't risk
/// any deadlock.
fn accept_connection(
connections: &mut Connections,
next_node_index: &atomic::AtomicUsize,
node_id: PeerId,
endpoint: Endpoint
) -> Result<NodeIndex, IoError> {
let peer_by_nodeid = &mut connections.peer_by_nodeid;
let info_by_peer = &mut connections.info_by_peer;
let who = *peer_by_nodeid.entry(node_id.clone()).or_insert_with(|| {
let new_id = next_node_index.fetch_add(1, atomic::Ordering::Relaxed);
trace!(target: "sub-libp2p", "Creating new peer #{:?} for {:?}", new_id, node_id);
info_by_peer.insert(new_id, PeerConnectionInfo {
protocols: Vec::new(), // TODO: Vec::with_capacity(num_registered_protocols),
kad_connec: UniqueConnec::empty(),
ping_connec: UniqueConnec::empty(),
id: node_id.clone(),
originated: endpoint == Endpoint::Dialer,
ping: Mutex::new(None),
client_version: None,
local_address: None,
remote_address: None,
});
new_id
});
Ok(who)
}
/// Returns true if a peer is disabled.
fn is_peer_disabled(
list: &Mutex<FnvHashMap<PeerId, Instant>>,
peer: &PeerId
) -> bool {
let mut list = list.lock();
if let Some(timeout) = list.get(peer).cloned() {
if timeout > Instant::now() {
true
} else {
list.remove(peer);
false
}
} else {
false
}
/// Periodic update that should be performed by the user of the network state.
pub struct PeriodicUpdate {
/// Index of the node in the network state.
pub node_index: NodeIndex,
/// Id of the peer.
pub peer_id: PeerId,
/// Address of the node to ping.
pub address: Multiaddr,
/// Object that allows pinging the node.
pub pinger: UniqueConnec<Pinger>,
/// The node should be identified as well.
pub identify: bool,
}
struct OpenCustomConnectionsNumbers {
@@ -814,7 +805,7 @@ fn num_open_custom_connections(connections: &Connections, reserved_peers: &FnvHa
total += 1;
let node_is_reserved = reserved_peers.contains(&info.id);
if !node_is_reserved {
if !info.originated {
if !info.originated.unwrap_or(true) {
unreserved_incoming += 1;
} else {
unreserved_outgoing += 1;
@@ -830,32 +821,21 @@ fn num_open_custom_connections(connections: &Connections, reserved_peers: &FnvHa
}
/// Parses an address of the form `/ip4/x.x.x.x/tcp/x/p2p/xxxxxx`, and adds it
/// to the given node_store. Returns the corresponding peer ID.
fn parse_and_add_to_node_store(
/// to the given topology. Returns the corresponding peer ID and multiaddr.
fn parse_and_add_to_topology(
addr_str: &str,
node_store: &NodeStore
) -> Result<PeerId, Error> {
topology: &mut NetTopology
) -> Result<(PeerId, Multiaddr), Error> {
let mut addr = addr_str.to_multiaddr().map_err(|_| ErrorKind::AddressParse)?;
let who = match addr.pop() {
Some(AddrComponent::P2P(key)) =>
PeerId::from_bytes(key).map_err(|_| ErrorKind::AddressParse)?,
PeerId::from_multihash(key).map_err(|_| ErrorKind::AddressParse)?,
_ => return Err(ErrorKind::AddressParse.into()),
};
// Registering the bootstrap node with a TTL of 100000 years TODO: wrong
match node_store {
NodeStore::Memory(ref node_store) =>
node_store
.peer_or_create(&who)
.set_addr_ttl(addr, Duration::from_secs(100000 * 365 * 24 * 3600)),
NodeStore::Json(ref node_store) =>
node_store
.peer_or_create(&who)
.set_addr_ttl(addr, Duration::from_secs(100000 * 365 * 24 * 3600)),
}
Ok(who)
topology.add_bootstrap_addr(&who, addr.clone());
Ok((who, addr))
}
/// Obtains or generates the local private key using the configuration.
@@ -969,7 +949,7 @@ fn open_priv_key_file<P>(path: P) -> Result<fs::File, IoError>
#[cfg(test)]
mod tests {
use libp2p::core::{Endpoint, PublicKey};
use libp2p::core::PublicKey;
use network_state::NetworkState;
#[test]
@@ -977,18 +957,9 @@ mod tests {
let state = NetworkState::new(&Default::default()).unwrap();
let example_peer = PublicKey::Rsa(vec![1, 2, 3, 4]).into_peer_id();
let (who, _) = state.custom_proto(
example_peer.clone(),
[1, 2, 3],
Endpoint::Dialer
).unwrap();
let who = state.assign_node_index(&example_peer).unwrap();
state.ban_peer(who, "Just a test");
assert!(state.custom_proto(
example_peer.clone(),
[1, 2, 3],
Endpoint::Dialer
).is_err());
assert!(state.assign_node_index(&example_peer).is_err());
}
}
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,623 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.?
use fnv::FnvHashMap;
use parking_lot::Mutex;
use libp2p::{Multiaddr, PeerId};
use serde_json;
use std::{cmp, fs};
use std::io::{Read, Cursor, Error as IoError, ErrorKind as IoErrorKind, Write};
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant, SystemTime};
/// For each address we're connected to, a period of this duration increases the score by 1.
const CONNEC_DURATION_PER_SCORE: Duration = Duration::from_secs(10);
/// Maximum value for the score.
const MAX_SCORE: u32 = 100;
/// Initial score that a node discovered through Kademlia receives.
const KADEMLIA_DISCOVERY_INITIAL_SCORE: u32 = 10;
/// Score adjustement when we fail to connect to an address.
const SCORE_DIFF_ON_FAILED_TO_CONNECT: i32 = -1;
/// Default time-to-live for addresses discovered through Kademlia.
/// After this time has elapsed and no connection has succeeded, the address will be removed.
const KADEMLIA_DISCOVERY_EXPIRATION: Duration = Duration::from_secs(2 * 3600);
/// After a successful connection, the TTL is set to a minimum at this amount.
const EXPIRATION_PUSH_BACK_CONNEC: Duration = Duration::from_secs(2 * 3600);
/// Initial score that a bootstrap node receives when registered.
const BOOTSTRAP_NODE_SCORE: u32 = 100;
/// Time to live of a boostrap node. This only applies if you start the node later *without*
/// that bootstrap node configured anymore.
const BOOTSTRAP_NODE_EXPIRATION: Duration = Duration::from_secs(24 * 3600);
/// The first time we fail to connect to an address, wait this duration before trying again.
const FIRST_CONNECT_FAIL_BACKOFF: Duration = Duration::from_secs(2);
/// Every time we fail to connect to an address, multiply the backoff by this constant.
const FAIL_BACKOFF_MULTIPLIER: u32 = 2;
/// We need a maximum value for the backoff, overwise we risk an overflow.
const MAX_BACKOFF: Duration = Duration::from_secs(30 * 60);
// TODO: should be merged with the Kademlia k-buckets
/// Stores information about the topology of the network.
#[derive(Debug)]
pub struct NetTopology {
store: FnvHashMap<PeerId, PeerInfo>,
cache_path: Option<PathBuf>,
}
impl Default for NetTopology {
#[inline]
fn default() -> NetTopology {
NetTopology::memory()
}
}
impl NetTopology {
/// Initializes a new `NetTopology` that isn't tied to any file.
///
/// `flush_to_disk()` will be a no-op.
#[inline]
pub fn memory() -> NetTopology {
NetTopology {
store: Default::default(),
cache_path: None,
}
}
/// Builds a `NetTopology` that will use `path` as a cache.
///
/// This function tries to load a known topology from the file. If the file doesn't exist
/// or contains garbage data, the execution still continues.
///
/// Calling `flush_to_disk()` in the future writes to the given path.
pub fn from_file<P: AsRef<Path>>(path: P) -> NetTopology {
let path = path.as_ref();
debug!(target: "sub-libp2p", "Initializing peer store for JSON file {:?}", path);
NetTopology {
store: try_load(path),
cache_path: Some(path.to_owned()),
}
}
/// Writes the topology into the path passed to `from_file`.
///
/// No-op if the object was created with `memory()`.
pub fn flush_to_disk(&self) -> Result<(), IoError> {
let path = match self.cache_path {
Some(ref p) => p,
None => return Ok(())
};
let file = fs::File::create(path)?;
serialize(file, &self.store)
}
/// Perform a cleanup pass, removing all obsolete addresses and peers.
///
/// This should be done from time to time.
pub fn cleanup(&mut self) {
let now_systime = SystemTime::now();
self.store.retain(|_, peer| {
peer.addrs.retain(|a| {
a.expires > now_systime
});
!peer.addrs.is_empty()
});
}
/// Returns a list of all the known peers.
pub fn peers(&self) -> impl Iterator<Item = &PeerId> {
self.store.keys()
}
/// Returns the known potential addresses of a peer, ordered by score.
///
/// If we're already connected to that peer, the address(es) we're connected with will be at
/// the top of the list.
// TODO: filter out backed off ones?
pub fn addrs_of_peer(&self, peer: &PeerId) -> impl Iterator<Item = &Multiaddr> {
let peer = if let Some(peer) = self.store.get(peer) {
peer
} else {
// TODO: use an EitherIterator or something
return Vec::new().into_iter();
};
let now = SystemTime::now();
let mut list = peer.addrs.iter().filter_map(move |addr| {
let (score, connected) = addr.score_and_is_connected();
if (addr.expires >= now && score > 0) || connected {
Some((score, &addr.addr))
} else {
None
}
}).collect::<Vec<_>>();
list.sort_by(|a, b| a.0.cmp(&b.0));
// TODO: meh, optimize
let l = list.into_iter().map(|(_, addr)| addr).collect::<Vec<_>>();
l.into_iter()
}
/// Returns a list of all the known addresses of peers, ordered by the
/// order in which we should attempt to connect to them.
///
/// Because of expiration and back-off mechanisms, this list can grow
/// by itself over time. The `Instant` that is returned corresponds to
/// the earlier known time when a new entry will be added automatically to
/// the list.
pub fn addrs_to_attempt(&self) -> (impl Iterator<Item = (&PeerId, &Multiaddr)>, Instant) {
// TODO: optimize
let now = Instant::now();
let now_systime = SystemTime::now();
let mut instant = now + Duration::from_secs(3600);
let mut addrs_out = Vec::new();
for (peer, info) in &self.store {
for addr in &info.addrs {
let (score, is_connected) = addr.score_and_is_connected();
if score == 0 || addr.expires < now_systime {
continue;
}
if !is_connected && addr.back_off_until > now {
instant = cmp::min(instant, addr.back_off_until);
continue;
}
addrs_out.push(((peer, &addr.addr), score));
}
}
addrs_out.sort_by(|a, b| b.1.cmp(&a.1));
(addrs_out.into_iter().map(|a| a.0), instant)
}
/// Adds an address corresponding to a boostrap node.
///
/// We assume that the address is valid, so its score starts very high.
pub fn add_bootstrap_addr(&mut self, peer: &PeerId, addr: Multiaddr) {
let now_systime = SystemTime::now();
let now = Instant::now();
let peer = peer_access(&mut self.store, peer);
let mut found = false;
peer.addrs.retain(|a| {
if a.expires < now_systime {
return false;
}
if a.addr == addr {
found = true;
}
true
});
if !found {
peer.addrs.push(Addr {
addr,
expires: now_systime + BOOTSTRAP_NODE_EXPIRATION,
back_off_until: now,
next_back_off: FIRST_CONNECT_FAIL_BACKOFF,
score: Mutex::new(AddrScore {
connected_since: None,
score: BOOTSTRAP_NODE_SCORE,
latest_score_update: now,
}),
});
}
}
/// Adds an address discovered through the Kademlia DHT.
///
/// This address is not necessarily valid and should expire after a TTL.
pub fn add_kademlia_discovered_addr(&mut self, peer_id: &PeerId, addr: Multiaddr) {
let now_systime = SystemTime::now();
let now = Instant::now();
let peer = peer_access(&mut self.store, peer_id);
let mut found = false;
peer.addrs.retain(|a| {
if a.expires < now_systime {
return false;
}
if a.addr == addr {
found = true;
}
true
});
if !found {
trace!(target: "sub-libp2p", "Peer store: adding address {} for {:?}", addr, peer_id);
peer.addrs.push(Addr {
addr,
expires: now_systime + KADEMLIA_DISCOVERY_EXPIRATION,
back_off_until: now,
next_back_off: FIRST_CONNECT_FAIL_BACKOFF,
score: Mutex::new(AddrScore {
connected_since: None,
score: KADEMLIA_DISCOVERY_INITIAL_SCORE,
latest_score_update: now,
}),
});
}
}
/// Indicates the peer store that we're connected to this given address.
///
/// This increases the score of the address that we connected to. Since we assume that only
/// one peer can be reached with any specific address, we also remove all addresses from other
/// peers that match the one we connected to.
pub fn report_connected(&mut self, addr: &Multiaddr, peer: &PeerId) {
let now = Instant::now();
// Just making sure that we have an entry for this peer in `store`, but don't use it.
let _ = peer_access(&mut self.store, peer);
for (peer_in_store, info_in_store) in self.store.iter_mut() {
if peer == peer_in_store {
if let Some(addr) = info_in_store.addrs.iter_mut().find(|a| &a.addr == addr) {
addr.connected_now();
addr.back_off_until = now;
addr.next_back_off = FIRST_CONNECT_FAIL_BACKOFF;
continue;
}
// TODO: a else block would be better, but we get borrowck errors
info_in_store.addrs.push(Addr {
addr: addr.clone(),
expires: SystemTime::now() + EXPIRATION_PUSH_BACK_CONNEC,
back_off_until: now,
next_back_off: FIRST_CONNECT_FAIL_BACKOFF,
score: Mutex::new(AddrScore {
connected_since: Some(now),
latest_score_update: now,
score: KADEMLIA_DISCOVERY_INITIAL_SCORE
}),
});
} else {
// Set the score to 0 for any address that matches the one we connected to.
for addr_in_store in &mut info_in_store.addrs {
if &addr_in_store.addr == addr {
addr_in_store.adjust_score(-(MAX_SCORE as i32));
}
}
}
}
}
/// Indicates the peer store that we're disconnected from an address.
///
/// There's no need to indicate a peer ID, as each address can only have one peer ID.
/// If we were indeed connected to this addr, then we can find out which peer ID it is.
pub fn report_disconnected(&mut self, addr: &Multiaddr, reason: DisconnectReason) {
let score_diff = match reason {
DisconnectReason::ClosedGracefully => -1,
};
for info in self.store.values_mut() {
for a in info.addrs.iter_mut() {
if &a.addr == addr {
a.disconnected_now(score_diff);
a.back_off_until = Instant::now() + a.next_back_off;
a.next_back_off = cmp::min(a.next_back_off * FAIL_BACKOFF_MULTIPLIER, MAX_BACKOFF);
let expires_push_back = SystemTime::now() + EXPIRATION_PUSH_BACK_CONNEC;
if a.expires < expires_push_back {
a.expires = expires_push_back;
}
return;
}
}
}
}
/// Indicates the peer store that we failed to connect to an address.
///
/// We don't care about which peer is supposed to be behind that address. If we failed to dial
/// it for a specific peer, we would also fail to dial it for all peers that have this
/// address.
pub fn report_failed_to_connect(&mut self, addr: &Multiaddr) {
for info in self.store.values_mut() {
for a in info.addrs.iter_mut() {
if &a.addr == addr {
a.adjust_score(SCORE_DIFF_ON_FAILED_TO_CONNECT);
a.back_off_until = Instant::now() + a.next_back_off;
a.next_back_off = cmp::min(a.next_back_off * FAIL_BACKOFF_MULTIPLIER, MAX_BACKOFF);
}
}
}
}
}
/// Reason why we disconnected from a peer.
pub enum DisconnectReason {
/// The disconnection was graceful.
ClosedGracefully,
}
fn peer_access<'a>(store: &'a mut FnvHashMap<PeerId, PeerInfo>, peer: &PeerId) -> &'a mut PeerInfo {
// TODO: should be optimizable if HashMap gets a better API
store.entry(peer.clone()).or_insert_with(Default::default)
}
#[derive(Debug, Clone, Default)]
struct PeerInfo {
/// Addresses of that peer.
addrs: Vec<Addr>,
}
#[derive(Debug)]
struct Addr {
/// The multiaddress.
addr: Multiaddr,
/// When the address expires.
expires: SystemTime,
next_back_off: Duration,
/// Don't try to connect to this node until `Instant`.
back_off_until: Instant,
score: Mutex<AddrScore>,
}
impl Clone for Addr {
fn clone(&self) -> Addr {
Addr {
addr: self.addr.clone(),
expires: self.expires.clone(),
next_back_off: self.next_back_off.clone(),
back_off_until: self.back_off_until.clone(),
score: Mutex::new(self.score.lock().clone()),
}
}
}
#[derive(Debug, Clone)]
struct AddrScore {
/// If connected, contains the moment when we connected. `None` if we're not connected.
connected_since: Option<Instant>,
/// Score of this address. Potentially needs to be updated based on `latest_score_update`.
score: u32,
/// When we last updated the score.
latest_score_update: Instant,
}
impl Addr {
/// Sets the addr to connected.
fn connected_now(&self) {
let mut score = self.score.lock();
let now = Instant::now();
Addr::flush(&mut score, now);
score.connected_since = Some(now);
}
/// Applies a modification to the score.
fn adjust_score(&self, score_diff: i32) {
let mut score = self.score.lock();
Addr::flush(&mut score, Instant::now());
if score_diff >= 0 {
score.score = cmp::min(MAX_SCORE, score.score + score_diff as u32);
} else {
score.score = score.score.saturating_sub(-score_diff as u32);
}
}
/// Sets the addr to disconnected and applies a modification to the score.
fn disconnected_now(&self, score_diff: i32) {
let mut score = self.score.lock();
Addr::flush(&mut score, Instant::now());
score.connected_since = None;
if score_diff >= 0 {
score.score = cmp::min(MAX_SCORE, score.score + score_diff as u32);
} else {
score.score = score.score.saturating_sub(-score_diff as u32);
}
}
/// Returns the score, and true if we are connected to this addr.
fn score_and_is_connected(&self) -> (u32, bool) {
let mut score = self.score.lock();
Addr::flush(&mut score, Instant::now());
let is_connected = score.connected_since.is_some();
(score.score, is_connected)
}
/// Updates `score` and `latest_score_update`, and returns the score.
fn score(&self) -> u32 {
let mut score = self.score.lock();
Addr::flush(&mut score, Instant::now());
score.score
}
fn flush(score: &mut AddrScore, now: Instant) {
if let Some(connected_since) = score.connected_since {
let potential_score: u32 = div_dur_with_dur(now - connected_since, CONNEC_DURATION_PER_SCORE);
// We flush when we connect to an address.
debug_assert!(score.latest_score_update >= connected_since);
let effective_score: u32 =
div_dur_with_dur(score.latest_score_update - connected_since, CONNEC_DURATION_PER_SCORE);
let to_add = potential_score.saturating_sub(effective_score);
score.score = cmp::min(MAX_SCORE, score.score + to_add);
}
score.latest_score_update = now;
}
}
/// Divides a `Duration` with a `Duration`. This exists in the stdlib but isn't stable yet.
// TODO: remove this function once stable
fn div_dur_with_dur(a: Duration, b: Duration) -> u32 {
let a_ms = a.as_secs() * 1_000_000 + (a.subsec_nanos() / 1_000) as u64;
let b_ms = b.as_secs() * 1_000_000 + (b.subsec_nanos() / 1_000) as u64;
(a_ms / b_ms) as u32
}
/// Serialized version of a `PeerInfo`. Suitable for storage in the cache file.
#[derive(Debug, Clone, Serialize, Deserialize)]
struct SerializedPeerInfo {
addrs: Vec<SerializedAddr>,
}
/// Serialized version of an `Addr`. Suitable for storage in the cache file.
#[derive(Debug, Clone, Serialize, Deserialize)]
struct SerializedAddr {
addr: String,
expires: SystemTime,
score: u32,
}
impl<'a> From<&'a Addr> for SerializedAddr {
fn from(addr: &'a Addr) -> SerializedAddr {
SerializedAddr {
addr: addr.addr.to_string(),
expires: addr.expires,
score: addr.score(),
}
}
}
/// Attempts to load storage from a file.
/// Deletes the file and returns an empty map if the file doesn't exist, cannot be opened
/// or is corrupted.
fn try_load(path: impl AsRef<Path>) -> FnvHashMap<PeerId, PeerInfo> {
let path = path.as_ref();
if !path.exists() {
debug!(target: "sub-libp2p", "Peer storage file {:?} doesn't exist", path);
return Default::default()
}
let mut file = match fs::File::open(path) {
Ok(f) => f,
Err(err) => {
warn!(target: "sub-libp2p", "Failed to open peer storage file: {:?}", err);
info!(target: "sub-libp2p", "Deleting peer storage file {:?}", path);
let _ = fs::remove_file(path);
return Default::default()
}
};
// We want to support empty files (and treat them as an empty recordset). Unfortunately
// `serde_json` will always produce an error if we do this ("unexpected EOF at line 0
// column 0"). Therefore we start by reading one byte from the file in order to check
// for EOF.
let mut first_byte = [0];
let num_read = match file.read(&mut first_byte) {
Ok(f) => f,
Err(err) => {
// TODO: DRY
warn!(target: "sub-libp2p", "Failed to read peer storage file: {:?}", err);
info!(target: "sub-libp2p", "Deleting peer storage file {:?}", path);
let _ = fs::remove_file(path);
return Default::default()
}
};
if num_read == 0 {
// File is empty.
debug!(target: "sub-libp2p", "Peer storage file {:?} is empty", path);
Default::default()
} else {
let data = Cursor::new(first_byte).chain(file);
match serde_json::from_reader::<_, serde_json::Value>(data) {
Ok(serde_json::Value::Null) => Default::default(),
Ok(serde_json::Value::Object(map)) => deserialize_tolerant(map.into_iter()),
Ok(_) | Err(_) => {
// The `Ok(_)` case means that the file doesn't contain a map.
let _ = fs::remove_file(path);
Default::default()
},
}
}
}
/// Attempts to turn a deserialized version of the storage into the final version.
///
/// Skips entries that are invalid.
fn deserialize_tolerant(
iter: impl Iterator<Item = (String, serde_json::Value)>
) -> FnvHashMap<PeerId, PeerInfo> {
let now = Instant::now();
let now_systime = SystemTime::now();
let mut out = FnvHashMap::default();
for (peer, info) in iter {
let peer: PeerId = match peer.parse() {
Ok(p) => p,
Err(_) => continue,
};
let info: SerializedPeerInfo = match serde_json::from_value(info) {
Ok(i) => i,
Err(_) => continue,
};
let mut addrs = Vec::with_capacity(info.addrs.len());
for addr in info.addrs {
let multiaddr = match addr.addr.parse() {
Ok(a) => a,
Err(_) => continue,
};
if addr.expires < now_systime {
continue
}
addrs.push(Addr {
addr: multiaddr,
expires: addr.expires,
next_back_off: FIRST_CONNECT_FAIL_BACKOFF,
back_off_until: now,
score: Mutex::new(AddrScore {
connected_since: None,
score: addr.score,
latest_score_update: now,
}),
});
}
if addrs.is_empty() {
continue;
}
out.insert(peer, PeerInfo { addrs });
}
out
}
/// Attempts to turn a deserialized version of the storage into the final version.
///
/// Skips entries that are invalid or expired.
fn serialize<W: Write>(out: W, map: &FnvHashMap<PeerId, PeerInfo>) -> Result<(), IoError> {
let now = SystemTime::now();
let array: FnvHashMap<_, _> = map.iter().filter_map(|(peer, info)| {
if info.addrs.is_empty() {
return None
}
let peer = peer.to_base58();
let info = SerializedPeerInfo {
addrs: info.addrs.iter()
.filter(|a| a.expires > now)
.map(Into::into)
.collect(),
};
Some((peer, info))
}).collect();
serde_json::to_writer_pretty(out, &array)
.map_err(|err| IoError::new(IoErrorKind::Other, err))
}
@@ -14,58 +14,36 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use libp2p::{self, Transport, mplex, secio, yamux};
use libp2p::{self, PeerId, Transport, mplex, secio, yamux};
use libp2p::core::{MuxedTransport, either, upgrade};
use libp2p::transport_timeout::TransportTimeout;
use std::time::Duration;
use std::usize;
use tokio_io::{AsyncRead, AsyncWrite};
/// Builds the transport that serves as a common ground for all connections.
pub fn build_transport(
unencrypted_allowed: UnencryptedAllowed,
local_private_key: secio::SecioKeyPair
) -> impl MuxedTransport<Output = impl AsyncRead + AsyncWrite> + Clone {
) -> impl MuxedTransport<Output = (PeerId, impl AsyncRead + AsyncWrite)> + Clone {
let mut mplex_config = mplex::MplexConfig::new();
mplex_config.max_buffer_len_behaviour(mplex::MaxBufferBehaviour::Block);
mplex_config.max_buffer_len(usize::MAX);
let base = libp2p::CommonTransport::new()
.with_upgrade({
let secio = secio::SecioConfig {
key: local_private_key,
};
let mut plaintext = upgrade::toggleable(upgrade::PlainTextConfig);
match unencrypted_allowed {
UnencryptedAllowed::Allowed => plaintext.disable(),
UnencryptedAllowed::Denied => (),
};
// TODO: this `EitherOutput` thing shows that libp2p's API could be improved
upgrade::or(
upgrade::map(plaintext, |out|
(either::EitherOutput::First(out), None)
),
upgrade::map(secio, |out: secio::SecioOutput<_>|
(either::EitherOutput::Second(out.stream),
Some(out.remote_key))
),
)
.with_upgrade(secio::SecioConfig {
key: local_private_key,
})
.and_then(move |out, endpoint, client_addr| {
let upgrade = upgrade::or(
upgrade::map(mplex_config, either::EitherOutput::First),
upgrade::map(yamux::Config::default(), either::EitherOutput::Second),
);
let key = out.remote_key;
let upgrade = upgrade::map(upgrade, move |muxer| (key, muxer));
upgrade::apply(out.stream, upgrade, endpoint, client_addr)
})
// TODO: check that the public key matches what is reported by identify
.map(|(socket, _key), _| socket)
// TODO: this `EitherOutput` thing shows that libp2p's API could be improved
.with_upgrade(upgrade::or(
upgrade::map(mplex::MplexConfig::new(), either::EitherOutput::First),
upgrade::map(yamux::Config::default(), either::EitherOutput::Second),
))
.map(|out, _| ((), out))
.into_connection_reuse()
.map(|((), out), _| out);
.map(|(key, substream), _| (key.into_peer_id(), substream));
TransportTimeout::new(base, Duration::from_secs(20))
}
/// Specifies whether unencrypted communications are allowed or denied.
#[derive(Debug, Copy, Clone)]
pub enum UnencryptedAllowed {
#[allow(dead_code)]
Allowed,
Denied,
}