Switch from devp2p to libp2p (#268)

* Switch from devp2p to libp2p

* Move the keys in the network state

* Properly load, store or generate private key

* Some robustness

* Update for latest libp2p

* Allow secio

* Don't open a new Kademlia connec all the time

* Handle Kademlia disconnection

* Set correct permissions on key file

* Improvements to secret key storage

* Flush the peer store at Kademlia requests

* Use RAII guards for disconnection

* Some misc work

* Set informations about peers

* Fix tests and external URL

* Fix some style

* Split obtain_private_key into multiple function

* More style fixes

* More style fixes

* Fix some concerns

* Turn // into ///

* More style fixes

* More style fixes

* Add annotations to unreachable!

* Fix style again

* Remove commented out code

* Fix test year

* More concerns
This commit is contained in:
Pierre Krieger
2018-07-15 11:45:37 +02:00
committed by Gav Wood
parent ae5298f8b7
commit 5924b361a3
13 changed files with 3341 additions and 110 deletions
+713 -104
View File
File diff suppressed because it is too large Load Diff
+1 -1
View File
@@ -56,7 +56,7 @@ polkadot --chain=local --validator --key Alice -d /tmp/alice
and in the other, run:
```
polkadot --chain=local --validator --key Bob -d /tmp/bob --port 30334 --bootnodes 'enode://ALICE_BOOTNODE_ID_HERE@127.0.0.1:30333'
polkadot --chain=local --validator --key Bob -d /tmp/bob --port 30334 --bootnodes '/ip4/127.0.0.1/tcp/30333/p2p/ALICE_BOOTNODE_ID_HERE'
```
Ensure you replace `ALICE_BOOTNODE_ID_HERE` with the node ID from the output of
@@ -0,0 +1,27 @@
[package]
description = "libp2p implementation of the ethcore network library"
homepage = "http://parity.io"
license = "GPL-3.0"
name = "substrate-network-libp2p"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
bytes = "0.4"
fnv = "1.0"
futures = "0.1"
libp2p = { git = "https://github.com/tomaka/libp2p-rs", rev = "77b1c445807e53b8c5e4e5e2da751222da15b8cc", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] }
ethcore-network = { git = "https://github.com/paritytech/parity.git" }
ethkey = { git = "https://github.com/paritytech/parity.git" }
parking_lot = "0.5"
log = "0.3"
rand = "0.5.0"
tokio-core = "0.1"
tokio-io = "0.1"
tokio-timer = "0.2"
varint = { git = "https://github.com/libp2p/rust-libp2p" }
[dev-dependencies]
ethcore-bytes = { git = "https://github.com/paritytech/parity.git" }
ethcore-io = { git = "https://github.com/paritytech/parity.git" }
ethcore-logger = { git = "https://github.com/paritytech/parity.git" }
@@ -0,0 +1,287 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
use bytes::{Bytes, BytesMut};
use network::ProtocolId;
use libp2p::core::{Multiaddr, ConnectionUpgrade, Endpoint};
use network::PacketId;
use std::io::Error as IoError;
use std::vec::IntoIter as VecIntoIter;
use futures::{future, Future, stream, Stream, Sink};
use futures::sync::mpsc;
use tokio_io::{AsyncRead, AsyncWrite};
use varint::VarintCodec;
/// Connection upgrade for a single protocol.
///
/// Note that "a single protocol" here refers to `par` for example. However
/// each protocol can have multiple different versions for networking purposes.
#[derive(Clone)]
pub struct RegisteredProtocol<T> {
/// Id of the protocol for API purposes.
id: ProtocolId,
/// Base name of the protocol as advertised on the network.
/// Ends with `/` so that we can append a version number behind.
base_name: Bytes,
/// List of protocol versions that we support, plus their packet count.
/// Ordered in descending order so that the best comes first.
/// The packet count is used to filter out invalid messages.
supported_versions: Vec<(u8, u8)>,
/// Custom data.
custom_data: T,
}
/// Output of a `RegisteredProtocol` upgrade.
pub struct RegisteredProtocolOutput<T> {
/// Data passed to `RegisteredProtocol::new`.
pub custom_data: T,
/// Id of the protocol.
pub protocol_id: ProtocolId,
/// Version of the protocol that was negotiated.
pub protocol_version: u8,
/// Channel to sender outgoing messages to. Closing this channel closes the
/// connection.
// TODO: consider assembling packet_id here
pub outgoing: mpsc::UnboundedSender<Bytes>,
/// Stream where incoming messages are received. The stream ends whenever
/// either side is closed.
pub incoming: Box<Stream<Item = (PacketId, Bytes), Error = IoError>>,
}
impl<T> RegisteredProtocol<T> {
/// Creates a new `RegisteredProtocol`. The `custom_data` parameter will be
/// passed inside the `RegisteredProtocolOutput`.
pub fn new(custom_data: T, protocol: ProtocolId, versions: &[(u8, u8)])
-> Self {
let mut proto_name = Bytes::from_static(b"/substrate/");
proto_name.extend_from_slice(&protocol);
proto_name.extend_from_slice(b"/");
RegisteredProtocol {
base_name: proto_name,
id: protocol,
supported_versions: {
let mut tmp: Vec<_> = versions.iter().rev().cloned().collect();
tmp.sort_unstable_by(|a, b| b.1.cmp(&a.1));
tmp
},
custom_data: custom_data,
}
}
/// Returns the ID of the protocol.
pub fn id(&self) -> ProtocolId {
self.id
}
/// Returns the custom data that was passed to `new`.
pub fn custom_data(&self) -> &T {
&self.custom_data
}
}
// `Maf` is short for `MultiaddressFuture`
impl<T, C, Maf> ConnectionUpgrade<C, Maf> for RegisteredProtocol<T>
where C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/
Maf: Future<Item = Multiaddr, Error = IoError> + 'static, // TODO: 'static :(
{
type NamesIter = VecIntoIter<(Bytes, Self::UpgradeIdentifier)>;
type UpgradeIdentifier = u8; // Protocol version
#[inline]
fn protocol_names(&self) -> Self::NamesIter {
// Report each version as an individual protocol.
self.supported_versions.iter().map(|&(ver, _)| {
let num = ver.to_string();
let mut name = self.base_name.clone();
name.extend_from_slice(num.as_bytes());
(name, ver)
}).collect::<Vec<_>>().into_iter()
}
type Output = RegisteredProtocolOutput<T>;
type MultiaddrFuture = Maf;
type Future = future::FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>;
fn upgrade(
self,
socket: C,
protocol_version: Self::UpgradeIdentifier,
endpoint: Endpoint,
remote_addr: Maf
) -> Self::Future {
let packet_count = self.supported_versions
.iter()
.find(|&(v, _)| *v == protocol_version)
.expect("negotiated protocol version that wasn't advertised ; \
programmer error")
.1;
// This function is called whenever we successfully negotiated a
// protocol with a remote (both if initiated by us or by the remote)
// This channel is used to send outgoing packets to the custom_data
// for this open substream.
let (msg_tx, msg_rx) = mpsc::unbounded();
// Build the sink for outgoing network bytes, and the stream for
// incoming instructions. `stream` implements `Stream<Item = Message>`.
enum Message {
/// Received data from the network.
RecvSocket(BytesMut),
/// Data to send to the network.
/// The packet_id must already be inside the `Bytes`.
SendReq(Bytes),
/// The socket has been closed.
Finished,
}
let (sink, stream) = {
let framed = AsyncRead::framed(socket, VarintCodec::default());
let msg_rx = msg_rx.map(Message::SendReq)
.chain(stream::once(Ok(Message::Finished)))
.map_err(|()| unreachable!("mpsc::UnboundedReceiver never errors"));
let (sink, stream) = framed.split();
let stream = stream.map(Message::RecvSocket)
.chain(stream::once(Ok(Message::Finished)));
(sink, msg_rx.select(stream))
};
let incoming = stream::unfold((sink, stream, false), move |(sink, stream, finished)| {
if finished {
return None
}
Some(stream
.into_future()
.map_err(|(err, _)| err)
.and_then(move |(message, stream)|
match message {
Some(Message::RecvSocket(mut data)) => {
// The `data` should be prefixed by the packet ID,
// therefore an empty packet is invalid.
if data.is_empty() {
debug!(target: "sub-libp2p", "ignoring incoming \
packet because it was empty");
let f = future::ok((None, (sink, stream, false)));
return future::Either::A(f)
}
let packet_id = data[0];
let data = data.split_off(1);
if packet_id >= packet_count {
debug!(target: "sub-libp2p", "ignoring incoming packet \
because packet_id {} is too large", packet_id);
let f = future::ok((None, (sink, stream, false)));
future::Either::A(f)
} else {
let out = Some((packet_id, data.freeze()));
let f = future::ok((out, (sink, stream, false)));
future::Either::A(f)
}
},
Some(Message::SendReq(data)) => {
let fut = sink.send(data)
.map(move |sink| (None, (sink, stream, false)));
future::Either::B(fut)
},
Some(Message::Finished) | None => {
let f = future::ok((None, (sink, stream, true)));
future::Either::A(f)
},
}
))
}).filter_map(|v| v);
let out = RegisteredProtocolOutput {
custom_data: self.custom_data,
protocol_id: self.id,
protocol_version: protocol_version,
outgoing: msg_tx,
incoming: Box::new(incoming),
};
future::ok((out, remote_addr))
}
}
// Connection upgrade for all the protocols contained in it.
#[derive(Clone)]
pub struct RegisteredProtocols<T>(pub Vec<RegisteredProtocol<T>>);
impl<T> RegisteredProtocols<T> {
/// Finds a protocol in the list by its id.
pub fn find_protocol(&self, protocol: ProtocolId)
-> Option<&RegisteredProtocol<T>> {
self.0.iter().find(|p| p.id == protocol)
}
/// Returns true if the given protocol is in the list.
pub fn has_protocol(&self, protocol: ProtocolId) -> bool {
self.0.iter().any(|p| p.id == protocol)
}
}
impl<T> Default for RegisteredProtocols<T> {
fn default() -> Self {
RegisteredProtocols(Vec::new())
}
}
impl<T, C, Maf> ConnectionUpgrade<C, Maf> for RegisteredProtocols<T>
where C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/
Maf: Future<Item = Multiaddr, Error = IoError> + 'static, // TODO: 'static :(
{
type NamesIter = VecIntoIter<(Bytes, Self::UpgradeIdentifier)>;
type UpgradeIdentifier = (usize,
<RegisteredProtocol<T> as ConnectionUpgrade<C, Maf>>::UpgradeIdentifier);
fn protocol_names(&self) -> Self::NamesIter {
// We concat the lists of `RegisteredProtocol::protocol_names` for
// each protocol.
self.0.iter().enumerate().flat_map(|(n, proto)|
ConnectionUpgrade::<C, Maf>::protocol_names(proto)
.map(move |(name, id)| (name, (n, id)))
).collect::<Vec<_>>().into_iter()
}
type Output = <RegisteredProtocol<T> as ConnectionUpgrade<C, Maf>>::Output;
type MultiaddrFuture = <RegisteredProtocol<T> as
ConnectionUpgrade<C, Maf>>::MultiaddrFuture;
type Future = <RegisteredProtocol<T> as ConnectionUpgrade<C, Maf>>::Future;
#[inline]
fn upgrade(
self,
socket: C,
upgrade_identifier: Self::UpgradeIdentifier,
endpoint: Endpoint,
remote_addr: Maf
) -> Self::Future {
let (protocol_index, inner_proto_id) = upgrade_identifier;
self.0.into_iter()
.nth(protocol_index)
.expect("invalid protocol index ; programmer logic error")
.upgrade(socket, inner_proto_id, endpoint, remote_addr)
}
}
@@ -0,0 +1,49 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
#![type_length_limit = "268435456"]
extern crate parking_lot;
extern crate fnv;
extern crate futures;
extern crate tokio_core;
extern crate tokio_io;
extern crate tokio_timer;
extern crate ethkey;
extern crate ethcore_network as network;
extern crate libp2p;
extern crate rand;
extern crate bytes;
extern crate varint;
#[macro_use]
extern crate log;
mod custom_proto;
mod network_state;
mod service;
mod timeouts;
mod transport;
pub use service::NetworkService;
/// Check if node url is valid
pub fn validate_node_url(url: &str) -> Result<(), network::Error> {
match url.parse::<libp2p::multiaddr::Multiaddr>() {
Ok(_) => Ok(()),
Err(_) => Err(network::ErrorKind::InvalidNodeId.into()),
}
}
@@ -0,0 +1,861 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
use bytes::Bytes;
use fnv::{FnvHashMap, FnvHashSet};
use futures::{future, Future, Stream, sync::mpsc};
use libp2p::core::{Multiaddr, AddrComponent, Endpoint};
use libp2p::core::{PeerId as PeerstorePeerId, PublicKey};
use libp2p::kad::KadConnecController;
use libp2p::peerstore::{Peerstore, PeerAccess};
use libp2p::peerstore::json_peerstore::JsonPeerstore;
use libp2p::peerstore::memory_peerstore::MemoryPeerstore;
use libp2p::secio;
use network::{Error, ErrorKind, NetworkConfiguration, NonReservedPeerMode};
use network::{PeerId, ProtocolId, SessionInfo};
use parking_lot::{Mutex, RwLock};
use rand::{self, Rng};
use std::cmp;
use std::fs;
use std::io::{Error as IoError, ErrorKind as IoErrorKind, Read, Write};
use std::path::Path;
use std::sync::atomic;
use std::time::Duration;
// File where the peers are stored.
const NODES_FILE: &str = "nodes.json";
// File where the private key is stored.
const SECRET_FILE: &str = "secret";
// Common struct shared throughout all the components of the service.
pub struct NetworkState {
/// Contains the information about the network.
peerstore: PeersStorage,
/// Active connections.
connections: RwLock<Connections>,
/// `min_peers` taken from the configuration.
min_peers: u32,
/// `max_peers` taken from the configuration.
max_peers: u32,
/// If true, only reserved peers can connect.
reserved_only: atomic::AtomicBool,
/// List of the IDs of the reserved peers.
reserved_peers: RwLock<FnvHashSet<PeerstorePeerId>>,
/// Each peer gets assigned a new unique ID. This ID increases linearly.
next_peer_id: atomic::AtomicUsize,
/// List of the IDs of the disabled peers. These peers will see their
/// connections refused.
disabled_peers: RwLock<FnvHashSet<PeerstorePeerId>>,
/// Local private key.
local_private_key: secio::SecioKeyPair,
/// Local public key.
local_public_key: PublicKey,
}
enum PeersStorage {
/// Peers are stored in memory. Nothing is stored on disk.
Memory(MemoryPeerstore),
/// Peers are stored in a JSON file on the disk.
Json(JsonPeerstore),
}
struct Connections {
/// For each libp2p peer ID, the ID of the peer in the API we expose.
/// Also corresponds to the index in `info_by_peer`.
peer_by_nodeid: FnvHashMap<PeerstorePeerId, usize>,
/// For each peer ID, information about our connection to this peer.
info_by_peer: FnvHashMap<PeerId, PeerConnectionInfo>,
}
struct PeerConnectionInfo {
/// A list of message senders per protocol, and the protocol version.
/// The sender can be used to transmit data for the remote. Note that the
/// packet_id has to be inside the `Bytes`.
/// Closing the sender will drop the substream of this protocol.
senders: Vec<(ProtocolId, mpsc::UnboundedSender<Bytes>, u8)>,
/// True if we dialed a Kad connection towards this peer.
/// This indicates that `kad_connec` should eventually resolve, even
/// without doing anything.
opened_kad: bool,
/// When a Kad connection is received, we send it on this channel so that
/// it will be received by `kad_connec`.
incoming_kad_channel: mpsc::UnboundedSender<KadConnecController>,
/// The Kademlia connection to this node.
/// Contains the receiving end of `incoming_kad_channel`. If `opened_kad`
/// is true, we are guaranteed to finish.
/// TODO: proper error handling if a kad connection is closed
kad_connec: future::Shared<Box<Future<Item = KadConnecController, Error = IoError>>>,
/// Id of the peer.
id: PeerstorePeerId,
/// True if this connection was initiated by us.
/// Note that it is theoretically possible that we dial the remote at the
/// same time they dial us, in which case the protocols may be dispatched
/// between both connections, and in which case the value here will be racy.
originated: bool,
/// Latest known ping duration.
ping: Mutex<Option<Duration>>,
/// The client version of the remote, or `None` if not known.
client_version: Option<String>,
/// The multiaddress of the remote, or `None` if not known.
remote_address: Option<Multiaddr>,
/// The local multiaddress used to communicate with the remote, or `None`
/// if not known.
local_address: Option<Multiaddr>,
}
impl NetworkState {
pub fn new(config: &NetworkConfiguration) -> Result<NetworkState, Error> {
// Private and public keys configuration.
let local_private_key = obtain_private_key(&config)?;
let local_public_key = local_private_key.to_public_key();
// Build the storage for peers, including the bootstrap nodes.
let peerstore = if let Some(ref path) = config.net_config_path {
let path = Path::new(path).join(NODES_FILE);
if let Ok(peerstore) = JsonPeerstore::new(path.clone()) {
debug!(target: "sub-libp2p", "Initialized peer store for JSON \
file {:?}", path);
PeersStorage::Json(peerstore)
} else {
warn!(target: "sub-libp2p", "Failed to open peer storage {:?} \
; peers won't be saved", path);
PeersStorage::Memory(MemoryPeerstore::empty())
}
} else {
debug!(target: "sub-libp2p", "No peers file configured ; peers \
won't be saved");
PeersStorage::Memory(MemoryPeerstore::empty())
};
for bootnode in config.boot_nodes.iter() {
parse_and_add_to_peerstore(bootnode, &peerstore)?;
}
let reserved_peers = {
let mut reserved_peers = FnvHashSet::with_capacity_and_hasher(
config.reserved_nodes.len(),
Default::default()
);
for peer in config.reserved_nodes.iter() {
let id = parse_and_add_to_peerstore(peer, &peerstore)?;
reserved_peers.insert(id);
}
RwLock::new(reserved_peers)
};
let expected_max_peers = cmp::max(config.max_peers as usize,
config.reserved_nodes.len());
Ok(NetworkState {
peerstore,
min_peers: config.min_peers,
max_peers: config.max_peers,
connections: RwLock::new(Connections {
peer_by_nodeid: FnvHashMap::with_capacity_and_hasher(expected_max_peers, Default::default()),
info_by_peer: FnvHashMap::with_capacity_and_hasher(expected_max_peers, Default::default()),
}),
reserved_only: atomic::AtomicBool::new(false),
reserved_peers,
next_peer_id: atomic::AtomicUsize::new(0),
disabled_peers: RwLock::new(Default::default()),
local_private_key,
local_public_key,
})
}
/// Returns the private key of the local node.
pub fn local_private_key(&self) -> &secio::SecioKeyPair {
&self.local_private_key
}
/// Returns the public key of the local node.
pub fn local_public_key(&self) -> &PublicKey {
&self.local_public_key
}
/// Returns all the IDs of the peer we have knowledge of.
///
/// This includes peers we are not connected to.
pub fn known_peers(&self) -> impl Iterator<Item = PeerstorePeerId> {
match self.peerstore {
PeersStorage::Memory(ref mem) =>
mem.peers().collect::<Vec<_>>().into_iter(),
PeersStorage::Json(ref json) =>
json.peers().collect::<Vec<_>>().into_iter(),
}
}
/// Returns true if we are connected to any peer at all.
pub fn has_connected_peer(&self) -> bool {
!self.connections.read().peer_by_nodeid.is_empty()
}
/// Get a list of all connected peers by id.
pub fn connected_peers(&self) -> Vec<PeerId> {
self.connections.read().peer_by_nodeid.values().cloned().collect()
}
/// Returns true if the given `PeerId` is valid.
///
/// `PeerId`s are never reused, so once this function returns `false` it
/// will never return `true` again for the same `PeerId`.
pub fn is_peer_connected(&self, peer: PeerId) -> bool {
self.connections.read().info_by_peer.contains_key(&peer)
}
/// Reports the ping of the peer. Returned later by `session_info()`.
/// No-op if the `peer_id` is not valid/expired.
pub fn report_ping(&self, peer_id: PeerId, ping: Duration) {
let connections = self.connections.read();
let info = match connections.info_by_peer.get(&peer_id) {
Some(info) => info,
None => return,
};
*info.ping.lock() = Some(ping);
}
/// If we're connected to a peer with the given protocol, returns
/// information about the connection. Otherwise, returns `None`.
pub fn session_info(&self, peer: PeerId, protocol: ProtocolId)
-> Option<SessionInfo> {
let connections = self.connections.read();
let info = match connections.info_by_peer.get(&peer) {
Some(info) => info,
None => return None,
};
let protocol_version = match info.senders.iter().find(|&(ref p, _, _)| p == &protocol) {
Some(&(_, _, version)) => version as u32,
None => return None,
};
let ping = info.ping.lock().clone();
Some(SessionInfo {
id: None, // TODO: ???? what to do??? wrong format!
client_version: info.client_version.clone().take().unwrap_or(String::new()),
protocol_version,
capabilities: Vec::new(), // TODO: list of supported protocols ; hard
peer_capabilities: Vec::new(), // TODO: difference with `peer_capabilities`?
ping,
originated: info.originated,
remote_address: info.remote_address.as_ref().map(|a| a.to_string())
.unwrap_or(String::new()),
local_address: info.local_address.as_ref().map(|a| a.to_string())
.unwrap_or(String::new()),
})
}
/// If we're connected to a peer with the given protocol, returns the
/// protocol version. Otherwise, returns `None`.
pub fn protocol_version(&self, peer: PeerId, protocol: ProtocolId)
-> Option<u8> {
let connections = self.connections.read();
let peer = match connections.info_by_peer.get(&peer) {
Some(peer) => peer,
None => return None,
};
peer.senders.iter().find(|p| p.0 == protocol).map(|p| p.2)
}
/// Equivalent to `session_info(peer).map(|info| info.client_version)`.
pub fn peer_client_version(&self, peer: PeerId, protocol: ProtocolId)
-> Option<String> {
// TODO: implement more directly, without going through `session_info`
self.session_info(peer, protocol)
.map(|info| info.client_version)
}
/// Adds an address discovered by Kademlia.
/// Note that we don't have to be connected to a peer to add an address.
pub fn add_kad_discovered_addr(&self, node_id: &PeerstorePeerId, addr: Multiaddr) {
match self.peerstore {
PeersStorage::Memory(ref mem) =>
mem.peer_or_create(node_id)
.add_addr(addr, Duration::from_secs(3600)),
PeersStorage::Json(ref json) =>
json.peer_or_create(node_id)
.add_addr(addr, Duration::from_secs(3600)),
}
}
/// Signals that an address doesn't match the corresponding node ID.
/// This removes the address from the peer store, so that it is not
/// returned by `addrs_of_peer` again in the future.
pub fn set_invalid_kad_address(&self, node_id: &PeerstorePeerId,
addr: &Multiaddr) {
// TODO: blacklist the address?
match self.peerstore {
PeersStorage::Memory(ref mem) =>
if let Some(mut peer) = mem.peer(node_id) {
peer.rm_addr(addr.clone()) // TODO: cloning necessary?
},
PeersStorage::Json(ref json) =>
if let Some(mut peer) = json.peer(node_id) {
peer.rm_addr(addr.clone()) // TODO: cloning necessary?
},
}
}
/// Returns the known multiaddresses of a peer.
pub fn addrs_of_peer(&self, node_id: &PeerstorePeerId) -> Vec<Multiaddr> {
match self.peerstore {
PeersStorage::Memory(ref mem) =>
mem.peer(node_id)
.into_iter()
.flat_map(|p| p.addrs())
.collect::<Vec<_>>(),
PeersStorage::Json(ref json) =>
json.peer(node_id)
.into_iter()
.flat_map(|p| p.addrs())
.collect::<Vec<_>>(),
}
}
/// Sets information about a peer.
pub fn set_peer_info(
&self,
node_id: PeerstorePeerId,
endpoint: Endpoint,
client_version: String,
local_addr: Multiaddr,
remote_addr: Multiaddr
) -> Result<PeerId, IoError> {
let mut connections = self.connections.write();
let peer_id = accept_connection(&mut connections, &self.next_peer_id,
node_id.clone(), endpoint)?;
let infos = connections.info_by_peer.get_mut(&peer_id)
.expect("Newly-created peer id is always valid");
infos.client_version = Some(client_version);
infos.remote_address = Some(remote_addr);
infos.local_address = Some(local_addr);
Ok(peer_id)
}
/// Adds a reserved peer to the list of reserved peers.
/// Returns an error if the peer address is invalid.
pub fn add_reserved_peer(&self, peer: &str) -> Result<(), Error> {
let id = parse_and_add_to_peerstore(peer, &self.peerstore)?;
self.reserved_peers.write().insert(id);
Ok(())
}
/// Removes the peer from the list of reserved peers. If we're in reserved mode, drops any
/// active connection to this peer.
/// Returns an error if the peer address is invalid.
pub fn remove_reserved_peer(&self, peer: &str) -> Result<(), Error> {
let id = parse_and_add_to_peerstore(peer, &self.peerstore)?;
self.reserved_peers.write().remove(&id);
// Dropping the peer if we're in reserved mode.
if self.reserved_only.load(atomic::Ordering::SeqCst) {
let mut connections = self.connections.write();
if let Some(peer_id) = connections.peer_by_nodeid.remove(&id) {
connections.info_by_peer.remove(&peer_id);
}
}
Ok(())
}
/// Set the non-reserved peer mode.
pub fn set_non_reserved_mode(&self, mode: NonReservedPeerMode) {
match mode {
NonReservedPeerMode::Accept =>
self.reserved_only.store(false, atomic::Ordering::SeqCst),
NonReservedPeerMode::Deny =>
// TODO: drop existing peers?
self.reserved_only.store(true, atomic::Ordering::SeqCst),
}
}
/// Returns true if we should open a new outgoing connection to a peer.
/// This takes into account the number of active peers.
pub fn should_open_outgoing_connections(&self) -> bool {
!self.reserved_only.load(atomic::Ordering::Relaxed) &&
self.connections.read().peer_by_nodeid.len() < self.min_peers as usize
}
/// Returns true if we are connected to the given node.
pub fn has_connection(&self, node_id: &PeerstorePeerId) -> bool {
let connections = self.connections.read();
connections.peer_by_nodeid.contains_key(node_id)
}
/// Returns true if we are connected to the given node with the given protocol.
pub fn has_protocol_connection(&self, node_id: &PeerstorePeerId,
protocol_id: ProtocolId) -> bool {
let connections = self.connections.read();
if let Some(peer) = connections.peer_by_nodeid.get(node_id) {
let info = match connections.info_by_peer.get(&peer) {
Some(peer) => peer,
None => return false,
};
info.senders.iter().any(|p| p.0 == protocol_id)
} else {
false
}
}
/// Call this when a Kademlia connection has been opened from a remote.
pub fn incoming_kad_connection(&self, node_id: PeerstorePeerId,
ctrl: KadConnecController) -> Result<PeerId, IoError> {
// TODO: check that the peer is disabled? should disabling a peer also prevent
// kad from working?
let mut connections = self.connections.write();
let peer_id = accept_connection(&mut connections, &self.next_peer_id,
node_id, Endpoint::Listener)?;
let infos = connections.info_by_peer.get_mut(&peer_id)
.expect("Newly-created peer id is always valid");
let _ = infos.incoming_kad_channel.unbounded_send(ctrl);
Ok(peer_id)
}
/// Obtain a Kademlia connection to the given peer.
pub fn obtain_kad_connection<F, Fut>(&self, node_id: PeerstorePeerId, opener: F)
-> Result<(PeerId, impl Future<Item = KadConnecController, Error = IoError>), IoError>
where F: FnOnce() -> Fut, Fut: Future<Item = KadConnecController, Error = IoError>
{
let mut connections = self.connections.write();
let peer_id = accept_connection(&mut connections, &self.next_peer_id,
node_id, Endpoint::Dialer)?;
let infos = connections.info_by_peer.get_mut(&peer_id)
.expect("Newly-created peer id is always valid");
let future_to_process = if !infos.opened_kad {
let tx = infos.incoming_kad_channel.clone();
let new_kad = opener().and_then(move |ctrl| {
tx.unbounded_send(ctrl.clone())
.map_err(|err| IoError::new(IoErrorKind::ConnectionAborted, err))?;
Ok(ctrl)
});
infos.opened_kad = true;
future::Either::A(new_kad)
} else {
future::Either::B(future::empty())
};
let fut = infos.kad_connec
.clone()
.map(|ctrl| (*ctrl).clone())
.map_err(|err| IoError::new(IoErrorKind::ConnectionAborted, err))
.select(future_to_process)
.map(|(item, _)| item)
.map_err(|(err, _)| err);
Ok((peer_id, fut))
}
/// Disconnect the Kademlia controller with the peer.
pub fn disconnect_kademlia(&self, peer_id: PeerId) {
let mut connections = self.connections.write();
if let Some(peer) = connections.info_by_peer.get_mut(&peer_id) {
// TODO: that's code duplication
let (tx, rx) = mpsc::unbounded();
let rx = rx.into_future()
.map_err(|_| -> IoError { unreachable!("an `UnboundedReceiver` can never produce an error") })
.and_then(|i| i.0.ok_or(IoError::new(
IoErrorKind::ConnectionAborted, "kad aborted")));
let kad_connec = Box::new(rx) as Box<Future<Item = _, Error = _>>;
peer.incoming_kad_channel = tx;
peer.kad_connec = kad_connec.shared();
peer.opened_kad = false;
}
}
/// Try to add a new connection to a node in the list.
///
/// Returns a `PeerId` to allow further interfacing with this connection.
/// Note that all `PeerId`s are unique and never reused.
///
/// Can return an error if we are refusing the connection to the remote.
///
/// You must pass an `UnboundedSender` which will be used by the `send`
/// method. Actually sending the data is not covered by this code.
///
/// The various methods of the `NetworkState` that close a connection do
/// so by dropping this sender.
pub fn accept_custom_proto(
&self,
node_id: PeerstorePeerId,
protocol_id: ProtocolId,
protocol_version: u8,
endpoint: Endpoint,
msg_tx: mpsc::UnboundedSender<Bytes>
) -> Result<PeerId, IoError> {
let mut connections = self.connections.write();
if self.disabled_peers.read().contains(&node_id) {
debug!(target: "sub-libp2p", "Refusing node {:?} because it was \
disabled", node_id);
return Err(IoError::new(IoErrorKind::PermissionDenied,
"disabled peer"))
}
let peer_id = accept_connection(&mut connections, &self.next_peer_id,
node_id.clone(), endpoint)?;
let connections = &mut *connections;
let info_by_peer = &mut connections.info_by_peer;
let peer_by_nodeid = &mut connections.peer_by_nodeid;
let infos = info_by_peer.get_mut(&peer_id)
.expect("Newly-created peer id is always valid");
let node_is_reserved = self.reserved_peers.read().contains(&infos.id);
if !node_is_reserved {
if self.reserved_only.load(atomic::Ordering::Relaxed) ||
peer_by_nodeid.len() >= self.max_peers as usize
{
debug!(target: "sub-libp2p", "Refusing node {:?} because we \
reached the max number of peers", node_id);
return Err(IoError::new(IoErrorKind::PermissionDenied,
"maximum number of peers reached"))
}
}
if !infos.senders.iter().any(|&(prot, _, _)| prot == protocol_id) {
infos.senders.push((protocol_id.clone(), msg_tx, protocol_version));
}
Ok(peer_id)
}
/// Sends some data to the given peer, using the sender that was passed
/// to `accept_custom_proto`.
pub fn send(&self, protocol: ProtocolId, peer_id: PeerId, message: Bytes)
-> Result<(), Error> {
if let Some(peer) = self.connections.read().info_by_peer.get(&peer_id) {
let sender = peer.senders.iter().find(|elem| elem.0 == protocol)
.map(|e| &e.1);
if let Some(sender) = sender {
sender.unbounded_send(message)
.map_err(|err| ErrorKind::Io(IoError::new(IoErrorKind::Other, err)))?;
Ok(())
} else {
// We are connected to this peer, but not with the current
// protocol.
debug!(target: "sub-libp2p", "Tried to send message to peer {} \
for which we aren't connected with the requested protocol",
peer_id);
return Err(ErrorKind::PeerNotFound.into())
}
} else {
debug!(target: "sub-libp2p", "Tried to send message to invalid \
peer ID {}", peer_id);
return Err(ErrorKind::PeerNotFound.into())
}
}
/// Disconnects a peer, if a connection exists (ie. drops the Kademlia
/// controller, and the senders that were passed to `accept_custom_proto`).
pub fn disconnect_peer(&self, peer_id: PeerId) {
let mut connections = self.connections.write();
if let Some(peer_info) = connections.info_by_peer.remove(&peer_id) {
let old = connections.peer_by_nodeid.remove(&peer_info.id);
debug_assert_eq!(old, Some(peer_id));
}
}
/// Disconnects all the peers.
/// This destroys all the Kademlia controllers and the senders that were
/// passed to `accept_custom_proto`.
pub fn disconnect_all(&self) {
let mut connec = self.connections.write();
*connec = Connections {
info_by_peer: FnvHashMap::with_capacity_and_hasher(
connec.peer_by_nodeid.capacity(), Default::default()),
peer_by_nodeid: FnvHashMap::with_capacity_and_hasher(
connec.peer_by_nodeid.capacity(), Default::default()),
};
}
/// Disables a peer. This adds the peer to the list of disabled peers, and
/// drops any existing connections if necessary (ie. drops the sender that
/// was passed to `accept_custom_proto`).
pub fn disable_peer(&self, peer_id: PeerId) {
// TODO: what do we do if the peer is reserved?
let mut connections = self.connections.write();
let peer_info = if let Some(peer_info) = connections.info_by_peer.remove(&peer_id) {
let old = connections.peer_by_nodeid.remove(&peer_info.id);
debug_assert_eq!(old, Some(peer_id));
peer_info
} else {
return
};
drop(connections);
self.disabled_peers.write().insert(peer_info.id.clone());
}
/// Returns true if a peer is disabled.
pub fn is_peer_disabled(&self, node_id: &PeerstorePeerId) -> bool {
self.disabled_peers.read().contains(&node_id)
}
/// Flushes the caches to the disk.
///
/// This is done in an atomical way, so that an error doesn't corrupt
/// anything.
pub fn flush_caches_to_disk(&self) -> Result<(), IoError> {
match self.peerstore {
PeersStorage::Memory(_) => Ok(()),
PeersStorage::Json(ref json) =>
match json.flush() {
Ok(()) => {
debug!(target: "sub-libp2p", "Flushed JSON peer store \
to disk");
Ok(())
}
Err(err) => {
warn!(target: "sub-libp2p", "Failed to flush changes \
to JSON peer store: {}", err);
Err(err)
}
}
}
}
}
impl Drop for NetworkState {
fn drop(&mut self) {
let _ = self.flush_caches_to_disk();
}
}
/// Assigns a `PeerId` to a node, or returns an existing ID if any exists.
///
/// The function only accepts already-locked structs, so that we don't risk
/// any deadlock.
fn accept_connection(
connections: &mut Connections,
next_peer_id: &atomic::AtomicUsize,
node_id: PeerstorePeerId,
endpoint: Endpoint
) -> Result<PeerId, IoError> {
let peer_by_nodeid = &mut connections.peer_by_nodeid;
let info_by_peer = &mut connections.info_by_peer;
let peer_id = *peer_by_nodeid.entry(node_id.clone()).or_insert_with(|| {
let new_id = next_peer_id.fetch_add(1, atomic::Ordering::Relaxed);
let (tx, rx) = mpsc::unbounded();
let rx = rx
.into_future()
.map_err(|_| -> IoError { unreachable!("an `UnboundedReceiver` can never produce an error") })
.and_then(|i| i.0.ok_or(IoError::new(IoErrorKind::ConnectionAborted, "kad aborted")));
let kad_connec = Box::new(rx) as Box<Future<Item = _, Error = _>>;
info_by_peer.insert(new_id, PeerConnectionInfo {
senders: Vec::new(), // TODO: Vec::with_capacity(num_registered_protocols),
opened_kad: false,
incoming_kad_channel: tx,
kad_connec: kad_connec.shared(),
id: node_id.clone(),
originated: endpoint == Endpoint::Dialer,
ping: Mutex::new(None),
client_version: None,
local_address: None,
remote_address: None,
});
new_id
});
Ok(peer_id)
}
/// Parses an address of the form `/ip4/x.x.x.x/tcp/x/p2p/xxxxxx`, and adds it
/// to the given peerstore. Returns the corresponding peer ID.
fn parse_and_add_to_peerstore(addr_str: &str, peerstore: &PeersStorage)
-> Result<PeerstorePeerId, Error> {
let mut addr: Multiaddr = addr_str.parse()
.map_err(|_| ErrorKind::AddressParse)?;
let p2p_component = addr.pop().ok_or(ErrorKind::AddressParse)?;
let peer_id = match p2p_component {
AddrComponent::P2P(key) | AddrComponent::IPFS(key) =>
PeerstorePeerId::from_bytes(key).map_err(|_| ErrorKind::AddressParse)?,
_ => return Err(ErrorKind::BadProtocol.into()),
};
// Registering the bootstrap node with a TTL of 100000 years TODO: wrong
match peerstore {
PeersStorage::Memory(ref peerstore) =>
peerstore
.peer_or_create(&peer_id)
.add_addr(addr, Duration::from_secs(100000 * 365 * 24 * 3600)),
PeersStorage::Json(ref peerstore) =>
peerstore
.peer_or_create(&peer_id)
.add_addr(addr, Duration::from_secs(100000 * 365 * 24 * 3600)),
}
Ok(peer_id)
}
/// Obtains or generates the local private key using the configuration.
fn obtain_private_key(config: &NetworkConfiguration)
-> Result<secio::SecioKeyPair, IoError> {
if let Some(ref secret) = config.use_secret {
// Key was specified in the configuration.
secio::SecioKeyPair::secp256k1_raw_key(&secret[..])
.map_err(|err| IoError::new(IoErrorKind::InvalidData, err))
} else {
if let Some(ref path) = config.net_config_path {
fs::create_dir_all(Path::new(path))?;
// Try fetch the key from a the file containing th esecret.
let secret_path = Path::new(path).join(SECRET_FILE);
match load_private_key_from_file(&secret_path) {
Ok(s) => Ok(s),
Err(err) => {
// Failed to fetch existing file ; generate a new key
trace!(target: "sub-libp2p", "Failed to load existing \
secret key file {:?}, generating new key ; err = {:?}",
secret_path, err);
Ok(gen_key_and_try_write_to_file(&secret_path))
}
}
} else {
// No path in the configuration, nothing we can do except generate
// a new key.
let mut key: [u8; 32] = [0; 32];
rand::rngs::EntropyRng::new().fill(&mut key);
Ok(secio::SecioKeyPair::secp256k1_raw_key(&key)
.expect("randomly-generated key with correct len should \
always be valid"))
}
}
}
/// Tries to load a private key from a file located at the given path.
fn load_private_key_from_file<P>(path: P)
-> Result<secio::SecioKeyPair, IoError>
where P: AsRef<Path>
{
fs::File::open(path)
.and_then(|mut file| {
// We are in 2018 and there is still no method on `std::io::Read`
// that directly returns a `Vec`.
let mut buf = Vec::new();
file.read_to_end(&mut buf).map(|_| buf)
})
.and_then(|content|
secio::SecioKeyPair::secp256k1_raw_key(&content)
.map_err(|err| IoError::new(IoErrorKind::InvalidData, err))
)
}
/// Generates a new secret key and tries to write it to the given file.
/// Doesn't error if we couldn't open or write to the file.
fn gen_key_and_try_write_to_file<P>(path: P) -> secio::SecioKeyPair
where P: AsRef<Path> {
let raw_key: [u8; 32] = rand::rngs::EntropyRng::new().gen();
let secio_key = secio::SecioKeyPair::secp256k1_raw_key(&raw_key)
.expect("randomly-generated key with correct len should always be valid");
// And store the newly-generated key in the file if possible.
// Errors that happen while doing so are ignored.
match open_priv_key_file(&path) {
Ok(mut file) =>
match file.write_all(&raw_key) {
Ok(()) => (),
Err(err) => warn!(target: "sub-libp2p", "Failed to write \
secret key in file {:?} ; err = {:?}", path.as_ref(), err),
},
Err(err) =>
warn!(target: "sub-libp2p", "Failed to store secret key in file \
{:?} ; err = {:?}", path.as_ref(), err),
}
secio_key
}
/// Opens a file containing a private key in write mode.
#[cfg(unix)]
fn open_priv_key_file<P>(path: P) -> Result<fs::File, IoError>
where P: AsRef<Path>
{
use std::os::unix::fs::OpenOptionsExt;
fs::OpenOptions::new()
.write(true)
.create_new(true)
.mode(256 | 128) // 0o600 in decimal
.open(path)
}
/// Opens a file containing a private key in write mode.
#[cfg(not(unix))]
fn open_priv_key_file<P>(path: P) -> Result<fs::File, IoError>
where P: AsRef<Path>
{
fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(path)
}
#[cfg(test)]
mod tests {
use futures::sync::mpsc;
use libp2p::core::{Endpoint, PublicKey};
use network_state::NetworkState;
#[test]
fn refuse_disabled_peer() {
let state = NetworkState::new(&Default::default()).unwrap();
let example_peer = PublicKey::Rsa(vec![1, 2, 3, 4]).into_peer_id();
let peer_id = state.accept_custom_proto(
example_peer.clone(),
[1, 2, 3],
1,
Endpoint::Dialer,
mpsc::unbounded().0
).unwrap();
state.disable_peer(peer_id);
assert!(state.accept_custom_proto(
example_peer.clone(),
[1, 2, 3],
1,
Endpoint::Dialer,
mpsc::unbounded().0
).is_err());
}
}
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,87 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
use futures::{future, Future, stream, Stream, sync::mpsc};
use std::io::Error as IoError;
use std::time::Instant;
use tokio_core::reactor::{Handle, Timeout};
/// Builds the timeouts system.
///
/// The `timeouts_rx` should be a stream receiving newly-created timeout
/// requests. Returns a stream that produces items as their timeout elapses.
/// `T` can be anything you want, as it is transparently passed from the input
/// to the output.
pub fn build_timeouts_stream<T>(
core: Handle,
timeouts_rx: mpsc::UnboundedReceiver<(Instant, T)>
) -> impl Stream<Item = T, Error = IoError> {
let next_timeout = next_in_timeouts_stream(timeouts_rx);
// The `unfold` function is essentially a loop turned into a stream. The
// first parameter is the initial state, and the closure returns the new
// state and an item.
stream::unfold(vec![future::Either::A(next_timeout)], move |timeouts| {
// `timeouts` is a `Vec` of futures that produce an `Out`.
let core = core.clone();
// `select_ok` panics if `timeouts` is empty anyway.
if timeouts.is_empty() {
return None
}
Some(future::select_ok(timeouts.into_iter())
.and_then(move |(item, mut timeouts)|
match item {
Out::NewTimeout((Some((at, item)), next_timeouts)) => {
// Received a new timeout request on the channel.
let next_timeout = next_in_timeouts_stream(next_timeouts);
let timeout = Timeout::new_at(at, &core)?
.map(move |()| Out::Timeout(item));
timeouts.push(future::Either::B(timeout));
timeouts.push(future::Either::A(next_timeout));
Ok((None, timeouts))
},
Out::NewTimeout((None, _)) =>
// The channel has been closed.
Ok((None, timeouts)),
Out::Timeout(item) =>
// A timeout has happened.
Ok((Some(item), timeouts)),
}
)
)
}).filter_map(|item| item)
}
/// Local enum representing the output of the selection.
enum Out<A, B> {
NewTimeout(A),
Timeout(B),
}
/// Convenience function that calls `.into_future()` on the timeouts stream,
/// and applies some modifiers.
/// This function is necessary. Otherwise if we copy-paste its content we run
/// into errors because the type of the copy-pasted closures differs.
fn next_in_timeouts_stream<T, B>(stream: mpsc::UnboundedReceiver<T>)
-> impl Future<Item = Out<(Option<T>, mpsc::UnboundedReceiver<T>), B>, Error = IoError> {
stream
.into_future()
.map(Out::NewTimeout)
.map_err(|_| unreachable!("an UnboundedReceiver can never error"))
}
@@ -0,0 +1,62 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
use libp2p::{self, Transport, secio};
use libp2p::core::{MuxedTransport, either, upgrade};
use tokio_core::reactor::Handle;
use tokio_io::{AsyncRead, AsyncWrite};
/// Builds the transport that serves as a common ground for all connections.
pub fn build_transport(
core: Handle,
unencrypted_allowed: UnencryptedAllowed,
local_private_key: secio::SecioKeyPair
) -> impl MuxedTransport<Output = impl AsyncRead + AsyncWrite> + Clone {
libp2p::CommonTransport::new(core)
.with_upgrade({
let secio = secio::SecioConfig {
key: local_private_key,
};
let mut plaintext = upgrade::toggleable(upgrade::PlainTextConfig);
match unencrypted_allowed {
UnencryptedAllowed::Allowed => plaintext.disable(),
UnencryptedAllowed::Denied => (),
};
// TODO: this `EitherOutput` thing shows that libp2p's API could be improved
upgrade::or(
upgrade::map(plaintext, |out|
(either::EitherOutput::First(out), None)
),
upgrade::map(secio, |out: secio::SecioOutput<_>|
(either::EitherOutput::Second(out.stream),
Some(out.remote_key))
),
)
})
// TODO: check that the public key matches what is reported by identify
.map(|(socket, _key), _| socket)
.with_upgrade(libp2p::mplex::MultiplexConfig::new())
.into_connection_reuse()
}
/// Specifies whether unencrypted communications are allowed or denied.
#[derive(Debug, Copy, Clone)]
pub enum UnencryptedAllowed {
Allowed,
Denied,
}
@@ -0,0 +1,149 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
extern crate parking_lot;
extern crate ethcore_bytes;
extern crate ethcore_io as io;
extern crate ethcore_logger;
extern crate ethcore_network;
extern crate substrate_network_libp2p;
extern crate ethkey;
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use std::sync::Arc;
use std::thread;
use std::time::*;
use parking_lot::Mutex;
use ethcore_bytes::Bytes;
use ethcore_network::*;
use substrate_network_libp2p::NetworkService;
use ethkey::{Random, Generator};
use io::TimerToken;
pub struct TestProtocol {
drop_session: bool,
pub packet: Mutex<Bytes>,
pub got_timeout: AtomicBool,
pub got_disconnect: AtomicBool,
}
impl TestProtocol {
pub fn new(drop_session: bool) -> Self {
TestProtocol {
packet: Mutex::new(Vec::new()),
got_timeout: AtomicBool::new(false),
got_disconnect: AtomicBool::new(false),
drop_session: drop_session,
}
}
/// Creates and register protocol with the network service
pub fn register(service: &mut NetworkService, drop_session: bool) -> Arc<TestProtocol> {
let handler = Arc::new(TestProtocol::new(drop_session));
service.register_protocol(handler.clone(), *b"tst", &[(42u8, 1), (43u8, 1)]);
handler
}
pub fn got_packet(&self) -> bool {
self.packet.lock()[..] == b"hello"[..]
}
pub fn got_timeout(&self) -> bool {
self.got_timeout.load(AtomicOrdering::Relaxed)
}
pub fn got_disconnect(&self) -> bool {
self.got_disconnect.load(AtomicOrdering::Relaxed)
}
}
impl NetworkProtocolHandler for TestProtocol {
fn initialize(&self, io: &NetworkContext) {
io.register_timer(0, Duration::from_millis(10)).unwrap();
}
fn read(&self, _io: &NetworkContext, _peer: &PeerId, packet_id: u8, data: &[u8]) {
assert_eq!(packet_id, 33);
self.packet.lock().extend(data);
}
fn connected(&self, io: &NetworkContext, peer: &PeerId) {
if self.drop_session {
io.disconnect_peer(*peer)
} else {
io.respond(33, "hello".to_owned().into_bytes()).unwrap();
}
}
fn disconnected(&self, _io: &NetworkContext, _peer: &PeerId) {
self.got_disconnect.store(true, AtomicOrdering::Relaxed);
}
/// Timer function called after a timeout created with `NetworkContext::timeout`.
fn timeout(&self, _io: &NetworkContext, timer: TimerToken) {
assert_eq!(timer, 0);
self.got_timeout.store(true, AtomicOrdering::Relaxed);
}
}
#[test]
fn net_service() {
let service = NetworkService::new(NetworkConfiguration::new_local(), None).expect("Error creating network service");
service.start().unwrap();
service.register_protocol(Arc::new(TestProtocol::new(false)), *b"myp", &[(1u8, 1)]);
}
#[test]
fn net_start_stop() {
let config = NetworkConfiguration::new_local();
let service = NetworkService::new(config, None).unwrap();
service.start().unwrap();
service.stop();
service.start().unwrap();
}
#[test]
#[ignore] // TODO: how is this test even supposed to work?
fn net_disconnect() {
let key1 = Random.generate().unwrap();
let mut config1 = NetworkConfiguration::new_local();
config1.use_secret = Some(key1.secret().clone());
config1.boot_nodes = vec![ ];
let mut service1 = NetworkService::new(config1, None).unwrap();
service1.start().unwrap();
let handler1 = TestProtocol::register(&mut service1, false);
let mut config2 = NetworkConfiguration::new_local();
config2.boot_nodes = vec![ service1.external_url().unwrap() ];
let mut service2 = NetworkService::new(config2, None).unwrap();
service2.start().unwrap();
let handler2 = TestProtocol::register(&mut service2, true);
while !(handler1.got_disconnect() && handler2.got_disconnect()) {
thread::sleep(Duration::from_millis(50));
}
assert!(handler1.got_disconnect());
assert!(handler2.got_disconnect());
}
#[test]
fn net_timeout() {
let config = NetworkConfiguration::new_local();
let mut service = NetworkService::new(config, None).unwrap();
service.start().unwrap();
let handler = TestProtocol::register(&mut service, false);
while !handler.got_timeout() {
thread::sleep(Duration::from_millis(50));
}
}
+1 -1
View File
@@ -19,7 +19,6 @@ serde_json = "1.0"
futures = "0.1.17"
linked-hash-map = "0.5"
ethcore-network = { git = "https://github.com/paritytech/parity.git" }
ethcore-network-devp2p = { git = "https://github.com/paritytech/parity.git" }
ethcore-io = { git = "https://github.com/paritytech/parity.git" }
ed25519 = { path = "../../substrate/ed25519" }
substrate-primitives = { path = "../../substrate/primitives" }
@@ -29,6 +28,7 @@ substrate-runtime-support = { path = "../../substrate/runtime-support" }
substrate-runtime-primitives = { path = "../../substrate/runtime/primitives" }
substrate-bft = { path = "../../substrate/bft" }
substrate-codec = { path = "../../substrate/codec" }
substrate-network-libp2p = { path = "../../substrate/network-libp2p" }
[dev-dependencies]
env_logger = "0.4"
+1 -1
View File
@@ -19,7 +19,6 @@
//! Substrate-specific P2P networking: synchronizing blocks, propagating BFT messages.
//! Allows attachment of an optional subprotocol for chain-specific requests.
extern crate ethcore_network_devp2p as network_devp2p;
extern crate ethcore_network as network;
extern crate ethcore_io as core_io;
extern crate linked_hash_map;
@@ -30,6 +29,7 @@ extern crate substrate_serializer as ser;
extern crate substrate_client as client;
extern crate substrate_runtime_support as runtime_support;
extern crate substrate_runtime_primitives as runtime_primitives;
extern crate substrate_network_libp2p as network_libp2p;
extern crate substrate_bft;
extern crate substrate_codec as codec;
extern crate serde;
+2 -3
View File
@@ -21,7 +21,7 @@ use std::time::Duration;
use futures::sync::{oneshot, mpsc};
use network::{NetworkProtocolHandler, NetworkContext, PeerId, ProtocolId,
NetworkConfiguration , NonReservedPeerMode, ErrorKind};
use network_devp2p::{NetworkService};
use network_libp2p::{NetworkService};
use core_io::{TimerToken};
use io::NetSyncIo;
use protocol::{Protocol, ProtocolContext, Context, ProtocolStatus, PeerInfo as ProtocolPeerInfo};
@@ -199,8 +199,7 @@ impl<B: BlockT + 'static, S: Specialization<B>> Service<B, S> {
Err(err) => warn!("Error starting network: {}", err),
_ => {},
};
self.network.register_protocol(self.handler.clone(), self.protocol_id, &[(::protocol::CURRENT_VERSION as u8, ::protocol::CURRENT_PACKET_COUNT)])
.unwrap_or_else(|e| warn!("Error registering polkadot protocol: {:?}", e));
self.network.register_protocol(self.handler.clone(), self.protocol_id, &[(::protocol::CURRENT_VERSION as u8, ::protocol::CURRENT_PACKET_COUNT)]);
}
fn stop(&self) {