Some networking cleanups (#504)

* Some networking cleanups

* Fix tests

* Fix wrong port in new_local
This commit is contained in:
Pierre Krieger
2018-08-08 20:05:40 +02:00
committed by Gav Wood
parent 96b3a8f92f
commit 6f4a401afa
10 changed files with 51 additions and 180 deletions
+1 -7
View File
@@ -978,11 +978,6 @@ dependencies = [
"winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "ipnetwork"
version = "0.12.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "itertools"
version = "0.5.10"
@@ -2679,6 +2674,7 @@ dependencies = [
"substrate-client 0.1.0",
"substrate-extrinsic-pool 0.1.0",
"substrate-network 0.1.0",
"substrate-network-libp2p 0.1.0",
"substrate-runtime-primitives 0.1.0",
"substrate-service 0.3.0",
"substrate-telemetry 0.3.0",
@@ -2867,7 +2863,6 @@ dependencies = [
"ethkey 0.3.0 (git+https://github.com/paritytech/parity.git)",
"fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
"ipnetwork 0.12.8 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.41 (registry+https://github.com/rust-lang/crates.io-index)",
"libp2p 0.1.0 (git+https://github.com/tomaka/libp2p-rs?branch=polkadot-2)",
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -4054,7 +4049,6 @@ dependencies = [
"checksum integer-sqrt 0.1.0 (git+https://github.com/paritytech/integer-sqrt-rs.git)" = "<none>"
"checksum interleaved-ordered 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "141340095b15ed7491bd3d4ced9d20cebfb826174b6bb03386381f62b01e3d77"
"checksum iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dbe6e417e7d0975db6512b90796e8ce223145ac4e33c377e4a42882a0e88bb08"
"checksum ipnetwork 0.12.8 (registry+https://github.com/rust-lang/crates.io-index)" = "70783119ac90828aaba91eae39db32c6c1b8838deea3637e5238efa0130801ab"
"checksum itertools 0.5.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4833d6978da405305126af4ac88569b5d71ff758581ce5a987dbfa3755f694fc"
"checksum itoa 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c069bbec61e1ca5a596166e55dfe4773ff745c3d16b700013bcaff9a6df2c682"
"checksum jsonrpc-core 8.0.2 (git+https://github.com/paritytech/jsonrpc.git)" = "<none>"
+1
View File
@@ -25,6 +25,7 @@ exit-future = "0.1"
substrate-client = { path = "../../substrate/client" }
substrate-extrinsic-pool = { path = "../../substrate/extrinsic-pool" }
substrate-network = { path = "../../substrate/network" }
substrate-network-libp2p = { path = "../../substrate/network-libp2p" }
substrate-runtime-primitives = { path = "../../substrate/runtime/primitives" }
substrate-service = { path = "../../substrate/service" }
substrate-telemetry = { path = "../../substrate/telemetry" }
+7 -2
View File
@@ -33,6 +33,7 @@ extern crate backtrace;
extern crate substrate_client as client;
extern crate substrate_network as network;
extern crate substrate_network_libp2p as network_libp2p;
extern crate substrate_runtime_primitives as runtime_primitives;
extern crate substrate_extrinsic_pool;
extern crate substrate_service as service;
@@ -54,6 +55,7 @@ pub mod error;
pub mod informant;
mod panic_hook;
use network_libp2p::AddrComponent;
use runtime_primitives::traits::As;
use service::{
ServiceFactory, FactoryFullConfiguration, RuntimeGenesis,
@@ -61,8 +63,9 @@ use service::{
};
use std::io::{Write, Read, stdin, stdout};
use std::iter;
use std::fs::File;
use std::net::SocketAddr;
use std::net::{Ipv4Addr, SocketAddr};
use std::path::{Path, PathBuf};
use names::{Generator, Name};
use regex::Regex;
@@ -281,7 +284,9 @@ where
None => 30333,
};
config.network.listen_address = Some(SocketAddr::new("0.0.0.0".parse().unwrap(), port));
config.network.listen_address = iter::once(AddrComponent::IP4(Ipv4Addr::new(0, 0, 0, 0)))
.chain(iter::once(AddrComponent::TCP(port)))
.collect();
config.network.public_address = None;
config.network.client_version = config.client_id();
config.network.use_secret = match matches.value_of("node-key").map(|s| s.parse()) {
@@ -15,7 +15,6 @@ libp2p = { git = "https://github.com/tomaka/libp2p-rs", branch = "polkadot-2", d
ethcore-io = { git = "https://github.com/paritytech/parity.git" }
ethkey = { git = "https://github.com/paritytech/parity.git" }
ethereum-types = "0.3"
ipnetwork = "0.12.6"
parking_lot = "0.5"
libc = "0.2"
log = "0.3"
@@ -32,7 +32,6 @@ extern crate varint;
extern crate ethcore_io as io;
extern crate ethereum_types;
extern crate ipnetwork;
#[macro_use]
extern crate error_chain;
@@ -44,6 +43,7 @@ extern crate assert_matches;
pub use connection_filter::{ConnectionFilter, ConnectionDirection};
pub use io::TimerToken;
pub use error::{Error, ErrorKind, DisconnectReason};
pub use libp2p::{Multiaddr, multiaddr::AddrComponent};
pub use traits::*;
mod connection_filter;
@@ -229,7 +229,7 @@ impl NetworkState {
peer_by_nodeid: FnvHashMap::with_capacity_and_hasher(expected_max_peers, Default::default()),
info_by_peer: FnvHashMap::with_capacity_and_hasher(expected_max_peers, Default::default()),
}),
reserved_only: atomic::AtomicBool::new(false),
reserved_only: atomic::AtomicBool::new(config.non_reserved_mode == NonReservedPeerMode::Deny),
reserved_peers,
next_node_index: atomic::AtomicUsize::new(0),
disabled_nodes: Mutex::new(Default::default()),
@@ -30,11 +30,10 @@ use libp2p::core::{Endpoint, PeerId as PeerstorePeerId, PublicKey};
use libp2p::core::{SwarmController, UniqueConnecState};
use libp2p::ping;
use libp2p::transport_timeout::TransportTimeout;
use {PacketId, SessionInfo, ConnectionFilter, TimerToken};
use {PacketId, SessionInfo, TimerToken};
use rand;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::iter;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::mpsc as sync_mpsc;
use std::thread;
@@ -104,17 +103,13 @@ impl NetworkService {
/// generic here is too much and crashes the Rust compiler.
pub fn new(
config: NetworkConfiguration,
protocols: Vec<(Arc<NetworkProtocolHandler + Send + Sync>, ProtocolId, &[(u8, u8)])>,
filter: Option<Arc<ConnectionFilter>>
protocols: Vec<(Arc<NetworkProtocolHandler + Send + Sync>, ProtocolId, &[(u8, u8)])>
) -> Result<NetworkService, Error> {
// TODO: for now `filter` is always `None` ; remove it from the code or implement it
assert!(filter.is_none());
let network_state = NetworkState::new(&config)?;
let local_peer_id = network_state.local_public_key().clone()
.into_peer_id();
let mut listen_addr = config_to_listen_addr(&config);
let mut listen_addr = config.listen_address.clone();
listen_addr.append(AddrComponent::P2P(local_peer_id.clone().into_bytes()));
info!(target: "sub-libp2p", "Local node address is: {}", listen_addr);
@@ -134,6 +129,11 @@ impl NetworkService {
let (close_tx, close_rx) = oneshot::channel();
let (timeouts_register_tx, timeouts_register_rx) = mpsc::unbounded();
let mut listened_addrs = Vec::new();
if let Some(ref addr) = config.public_address {
listened_addrs.push(addr.clone());
}
let shared = Arc::new(Shared {
network_state,
protocols: RegisteredProtocols(protocols.into_iter()
@@ -146,7 +146,7 @@ impl NetworkService {
config,
timeouts_register_tx,
original_listened_addr: RwLock::new(None),
listened_addrs: RwLock::new(Vec::new()),
listened_addrs: RwLock::new(listened_addrs),
});
// Initialize all the protocols now.
@@ -446,20 +446,18 @@ fn init_thread(
};
// Listen on multiaddress.
// TODO: change the network config to directly contain a `Multiaddr`
{
let listen_addr = config_to_listen_addr(&shared.config);
debug!(target: "sub-libp2p", "Libp2p listening on {}", listen_addr);
match swarm_controller.listen_on(listen_addr.clone()) {
Ok(new_addr) => {
*shared.original_listened_addr.write() = Some(new_addr.clone());
},
Err(_) => {
warn!(target: "sub-libp2p", "Can't listen on {}, protocol not supported", listen_addr);
return Err(ErrorKind::BadProtocol.into())
},
}
match swarm_controller.listen_on(shared.config.listen_address.clone()) {
Ok(new_addr) => {
debug!(target: "sub-libp2p", "Libp2p listening on {}", new_addr);
*shared.original_listened_addr.write() = Some(new_addr.clone());
},
Err(_) => {
warn!(target: "sub-libp2p", "Can't listen on {}, protocol not supported",
shared.config.listen_address);
return Err(ErrorKind::BadProtocol.into())
},
}
// Explicitely connect to _all_ the boostrap nodes as a temporary measure.
for bootnode in shared.config.boot_nodes.iter() {
match shared.network_state.add_peer(bootnode) {
@@ -841,23 +839,6 @@ fn handle_custom_connection(
future::Either::B(final_fut)
}
/// Builds the multiaddress corresponding to the address we need to listen to
/// according to the config.
// TODO: put the `Multiaddr` directly in the `NetworkConfiguration`
fn config_to_listen_addr(config: &NetworkConfiguration) -> Multiaddr {
if let Some(addr) = config.listen_address {
let ip = match addr.ip() {
IpAddr::V4(addr) => AddrComponent::IP4(addr),
IpAddr::V6(addr) => AddrComponent::IP6(addr),
};
iter::once(ip).chain(iter::once(AddrComponent::TCP(addr.port()))).collect()
} else {
let host = AddrComponent::IP4(Ipv4Addr::new(0, 0, 0, 0));
let port = AddrComponent::TCP(0);
iter::once(host).chain(iter::once(port)).collect()
}
}
/// Randomly discovers peers to connect to.
/// This works by running a round at a regular interval, and skipping if we
/// reached `min_peers`. When we are over `min_peers`, we stop trying to dial
@@ -1402,6 +1383,6 @@ mod tests {
#[test]
fn builds_and_finishes_in_finite_time() {
// Checks that merely starting the network doesn't end up in an infinite loop.
let _service = NetworkService::new(Default::default(), vec![], None).unwrap();
let _service = NetworkService::new(Default::default(), vec![]).unwrap();
}
}
+13 -121
View File
@@ -16,13 +16,12 @@
use std::fmt;
use std::cmp::Ordering;
use std::collections::HashMap;
use std::net::{SocketAddr, SocketAddrV4, Ipv4Addr};
use std::str::{self, FromStr};
use std::sync::Arc;
use std::iter;
use std::net::Ipv4Addr;
use std::str;
use std::time::Duration;
use io::TimerToken;
use ipnetwork::{IpNetwork, IpNetworkError};
use libp2p::{multiaddr::AddrComponent, Multiaddr};
use error::Error;
use ethkey::Secret;
use ethereum_types::H512;
@@ -36,40 +35,8 @@ pub type ProtocolId = [u8; 3];
pub type NodeId = H512;
/// Local (temporary) peer session ID.
/// RENAME TO NodeIndex
pub type NodeIndex = usize;
/// Messages used to communitate with the event loop from other threads.
#[derive(Clone)]
pub enum NetworkIoMessage {
/// Register a new protocol handler.
AddHandler {
/// Handler shared instance.
handler: Arc<NetworkProtocolHandler + Sync>,
/// Protocol Id.
protocol: ProtocolId,
/// Supported protocol versions and number of packet IDs reserved by the protocol (packet count).
versions: Vec<(u8, u8)>,
},
/// Register a new protocol timer
AddTimer {
/// Protocol Id.
protocol: ProtocolId,
/// Timer token.
token: TimerToken,
/// Timer delay.
delay: Duration,
},
/// Initliaze public interface.
InitPublicInterface,
/// Disconnect a peer.
Disconnect(NodeIndex),
/// Disconnect and temporary disable peer.
DisablePeer(NodeIndex),
/// Network has been started with the host as the given enode.
NetworkStarted(String),
}
/// Shared session information
#[derive(Debug, Clone)]
pub struct SessionInfo {
@@ -138,15 +105,9 @@ pub struct NetworkConfiguration {
/// Directory path to store network-specific configuration. None means nothing will be saved
pub net_config_path: Option<String>,
/// IP address to listen for incoming connections. Listen to all connections by default
pub listen_address: Option<SocketAddr>,
pub listen_address: Multiaddr,
/// IP address to advertise. Detected automatically if none.
pub public_address: Option<SocketAddr>,
/// Port for UDP connections, same as TCP by default
pub udp_port: Option<u16>,
/// Enable NAT configuration
pub nat_enabled: bool,
/// Enable discovery
pub discovery_enabled: bool,
pub public_address: Option<Multiaddr>,
/// List of initial node addresses
pub boot_nodes: Vec<String>,
/// Use provided node key instead of default
@@ -155,16 +116,10 @@ pub struct NetworkConfiguration {
pub min_peers: u32,
/// Maximum allowed number of peers
pub max_peers: u32,
/// Maximum handshakes
pub max_handshakes: u32,
/// Reserved protocols. Peers with <key> protocol get additional <value> connection slots.
pub reserved_protocols: HashMap<ProtocolId, u32>,
/// List of reserved node addresses.
pub reserved_nodes: Vec<String>,
/// The non-reserved peer mode.
pub non_reserved_mode: NonReservedPeerMode,
/// IP filter
pub ip_filter: IpFilter,
/// Client identifier
pub client_version: String,
}
@@ -181,36 +136,26 @@ impl NetworkConfiguration {
NetworkConfiguration {
config_path: None,
net_config_path: None,
listen_address: None,
listen_address: iter::once(AddrComponent::IP4(Ipv4Addr::new(0, 0, 0, 0)))
.chain(iter::once(AddrComponent::TCP(30333)))
.collect(),
public_address: None,
udp_port: None,
nat_enabled: true,
discovery_enabled: true,
boot_nodes: Vec::new(),
use_secret: None,
min_peers: 25,
max_peers: 50,
max_handshakes: 64,
reserved_protocols: HashMap::new(),
ip_filter: IpFilter::default(),
reserved_nodes: Vec::new(),
non_reserved_mode: NonReservedPeerMode::Accept,
client_version: "Parity-network".into(),
}
}
/// Create new default configuration with specified listen port.
pub fn new_with_port(port: u16) -> NetworkConfiguration {
let mut config = NetworkConfiguration::new();
config.listen_address = Some(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port)));
config
}
/// Create new default configuration for localhost-only connection with random port (usefull for testing)
/// Create new default configuration for localhost-only connection with random port (useful for testing)
pub fn new_local() -> NetworkConfiguration {
let mut config = NetworkConfiguration::new();
config.listen_address = Some(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 0)));
config.nat_enabled = false;
config.listen_address = iter::once(AddrComponent::IP4(Ipv4Addr::new(127, 0, 0, 1)))
.chain(iter::once(AddrComponent::TCP(0)))
.collect();
config
}
}
@@ -348,56 +293,3 @@ impl NonReservedPeerMode {
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct IpFilter {
pub predefined: AllowIP,
pub custom_allow: Vec<IpNetwork>,
pub custom_block: Vec<IpNetwork>,
}
impl Default for IpFilter {
fn default() -> Self {
IpFilter {
predefined: AllowIP::All,
custom_allow: vec![],
custom_block: vec![],
}
}
}
impl IpFilter {
/// Attempt to parse the peer mode from a string.
pub fn parse(s: &str) -> Result<IpFilter, IpNetworkError> {
let mut filter = IpFilter::default();
for f in s.split_whitespace() {
match f {
"all" => filter.predefined = AllowIP::All,
"private" => filter.predefined = AllowIP::Private,
"public" => filter.predefined = AllowIP::Public,
"none" => filter.predefined = AllowIP::None,
custom => {
if custom.starts_with("-") {
filter.custom_block.push(IpNetwork::from_str(&custom.to_owned().split_off(1))?)
} else {
filter.custom_allow.push(IpNetwork::from_str(custom)?)
}
}
}
}
Ok(filter)
}
}
/// IP fiter
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum AllowIP {
/// Connect to any address
All,
/// Connect to private network only
Private,
/// Connect to public network only
Public,
/// Block all addresses
None,
}
@@ -95,8 +95,7 @@ impl NetworkProtocolHandler for TestProtocol {
fn net_service() {
let _service = NetworkService::new(
NetworkConfiguration::new_local(),
vec![(Arc::new(TestProtocol::new(false)), *b"myp", &[(1u8, 1)])],
None
vec![(Arc::new(TestProtocol::new(false)), *b"myp", &[(1u8, 1)])]
).expect("Error creating network service");
}
@@ -108,11 +107,11 @@ fn net_disconnect() {
config1.use_secret = Some(key1.secret().clone());
config1.boot_nodes = vec![ ];
let handler1 = Arc::new(TestProtocol::new(false));
let service1 = NetworkService::new(config1, vec![(handler1.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])], None).unwrap();
let service1 = NetworkService::new(config1, vec![(handler1.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])]).unwrap();
let mut config2 = NetworkConfiguration::new_local();
config2.boot_nodes = vec![ service1.external_url().unwrap() ];
let handler2 = Arc::new(TestProtocol::new(true));
let _service2 = NetworkService::new(config2, vec![(handler2.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])], None).unwrap();
let _service2 = NetworkService::new(config2, vec![(handler2.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])]).unwrap();
while !(handler1.got_disconnect() && handler2.got_disconnect()) {
thread::sleep(Duration::from_millis(50));
}
@@ -124,7 +123,7 @@ fn net_disconnect() {
fn net_timeout() {
let config = NetworkConfiguration::new_local();
let handler = Arc::new(TestProtocol::new(false));
let _service = NetworkService::new(config, vec![(handler.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])], None).unwrap();
let _service = NetworkService::new(config, vec![(handler.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])]).unwrap();
while !handler.got_timeout() {
thread::sleep(Duration::from_millis(50));
}
+1 -1
View File
@@ -174,7 +174,7 @@ impl<B: BlockT + 'static, S: Specialization<B>> Service<B, S> {
});
let versions = [(::protocol::CURRENT_VERSION as u8, ::protocol::CURRENT_PACKET_COUNT)];
let protocols = vec![(handler.clone() as Arc<_>, protocol_id, &versions[..])];
let service = match NetworkService::new(params.network_config.clone(), protocols, None) {
let service = match NetworkService::new(params.network_config.clone(), protocols) {
Ok(service) => service,
Err(err) => {
match err.kind() {