NetworkService::new starts the network (#462)

This commit is contained in:
Pierre Krieger
2018-08-06 11:57:08 +02:00
committed by Benjamin Kampmann
parent 21346f34f8
commit 11b9496a6a
4 changed files with 105 additions and 153 deletions
@@ -17,7 +17,7 @@
use bytes::Bytes;
use {Error, ErrorKind, NetworkConfiguration, NetworkProtocolHandler};
use {NonReservedPeerMode, NetworkContext, Severity, NodeIndex, ProtocolId};
use parking_lot::{Mutex, RwLock};
use parking_lot::RwLock;
use libp2p;
use libp2p::multiaddr::{AddrComponent, Multiaddr};
use libp2p::kad::{KadSystem, KadConnecConfig, KadSystemConfig};
@@ -56,10 +56,10 @@ pub struct NetworkService {
shared: Arc<Shared>,
/// Holds the networking-running background thread alive. The `Option` is
/// `None` if the service is stopped.
/// only set to `None` in the destructor.
/// Sending a message on the channel will trigger the end of the
/// background thread. We can then wait on the join handle.
bg_thread: Mutex<Option<(oneshot::Sender<()>, thread::JoinHandle<()>)>>,
bg_thread: Option<(oneshot::Sender<()>, thread::JoinHandle<()>)>,
}
/// Common struct shared throughout all the components of the service.
@@ -79,17 +79,18 @@ struct Shared {
/// List of protocols available on the network. It is a logic error to
/// remove protocols from this list, and the code may assume that protocols
/// stay at the same index forever.
protocols: RwLock<RegisteredProtocols<Arc<NetworkProtocolHandler + Send + Sync>>>,
protocols: RegisteredProtocols<Arc<NetworkProtocolHandler + Send + Sync>>,
/// Use this channel to send a timeout request to the background thread's
/// events loop. After the timeout, elapsed, it will call `timeout` on the
/// `NetworkProtocolHandler`. This can be closed if the background thread
/// is not running. The sender will be overwritten every time we start
/// the service.
timeouts_register_tx: RwLock<mpsc::UnboundedSender<(Duration, (Arc<NetworkProtocolHandler + Send + Sync>, ProtocolId, TimerToken))>>,
timeouts_register_tx: mpsc::UnboundedSender<(Duration, (Arc<NetworkProtocolHandler + Send + Sync>, ProtocolId, TimerToken))>,
/// Original address from the configuration, after being adjusted by the `Transport`.
/// Contains `None` if the network hasn't started yet.
// TODO: because we create the `Shared` before starting to listen, this
// has to be set later ; sort this out
original_listened_addr: RwLock<Option<Multiaddr>>,
/// Contains the addresses we known about ourselves.
@@ -97,9 +98,13 @@ struct Shared {
}
impl NetworkService {
/// Starts IO event loop
/// Starts the networking service.
///
/// Note that we could use an iterator for `protocols`, but having a
/// generic here is too much and crashes the Rust compiler.
pub fn new(
config: NetworkConfiguration,
protocols: Vec<(Arc<NetworkProtocolHandler + Send + Sync>, ProtocolId, &[(u8, u8)])>,
filter: Option<Arc<ConnectionFilter>>
) -> Result<NetworkService, Error> {
// TODO: for now `filter` is always `None` ; remove it from the code or implement it
@@ -121,56 +126,6 @@ impl NetworkService {
known_initial_peers: network_state.known_peers(),
});
let shared = Arc::new(Shared {
network_state,
protocols: RwLock::new(Default::default()),
kad_system,
kad_upgrade: KadConnecConfig::new(),
config,
timeouts_register_tx: RwLock::new(mpsc::unbounded().0),
original_listened_addr: RwLock::new(None),
listened_addrs: RwLock::new(Vec::new()),
});
Ok(NetworkService {
shared,
bg_thread: Mutex::new(None),
})
}
/// Returns network configuration.
pub fn config(&self) -> &NetworkConfiguration {
&self.shared.config
}
pub fn external_url(&self) -> Option<String> {
// TODO: in the context of libp2p, it is hard to define what an external
// URL is, as different nodes can have multiple different ways to
// reach us
self.shared.original_listened_addr.read().as_ref()
.map(|addr|
format!("{}/p2p/{}", addr, self.shared.kad_system.local_peer_id().to_base58())
)
}
/// Start network IO.
/// Note that we could use an iterator for `protocols`, but having a
/// generic here is too much and crashes the Rust compiler.
// TODO (design): the notion of having a `NetworkService` alive should mean
// that it is running ; the `start` and `stop` functions are bad design
pub fn start(
&self,
protocols: Vec<(Arc<NetworkProtocolHandler + Send + Sync>, ProtocolId, &[(u8, u8)])>
) -> Result<(), (Error, Option<SocketAddr>)> {
// TODO: check that service is started already?
*self.shared.protocols.write() = RegisteredProtocols(
protocols.into_iter()
.map(|(handler, protocol, versions)|
RegisteredProtocol::new(handler.clone(), protocol, versions))
.collect()
);
// Channel we use to signal success or failure of the bg thread
// initialization process.
let (init_tx, init_rx) = sync_mpsc::channel();
@@ -178,18 +133,34 @@ impl NetworkService {
// should stop
let (close_tx, close_rx) = oneshot::channel();
let (timeouts_register_tx, timeouts_register_rx) = mpsc::unbounded();
*self.shared.timeouts_register_tx.write() = timeouts_register_tx;
let shared = Arc::new(Shared {
network_state,
protocols: RegisteredProtocols(protocols.into_iter()
.map(|(handler, protocol, versions)|
RegisteredProtocol::new(handler.clone(), protocol, versions))
.collect()
),
kad_system,
kad_upgrade: KadConnecConfig::new(),
config,
timeouts_register_tx,
original_listened_addr: RwLock::new(None),
listened_addrs: RwLock::new(Vec::new()),
});
// Initialize all the protocols now.
for protocol in self.shared.protocols.read().0.iter() {
// TODO: what about failure to initialize? we can't uninitialize a protocol
// TODO: remove this `initialize` method eventually, as it's only used for timers
for protocol in shared.protocols.0.iter() {
protocol.custom_data().initialize(&NetworkContextImpl {
inner: self.shared.clone(),
inner: shared.clone(),
protocol: protocol.id().clone(),
current_peer: None,
});
}
let shared = self.shared.clone();
let shared_clone = shared.clone();
let join_handle = thread::spawn(move || {
// Tokio runtime that is going to run everything in this thread.
let mut runtime = match current_thread::Runtime::new() {
@@ -200,8 +171,7 @@ impl NetworkService {
}
};
let fut = match init_thread(shared,
timeouts_register_rx, close_rx) {
let fut = match init_thread(shared_clone, timeouts_register_rx, close_rx) {
Ok(future) => {
debug!(target: "sub-libp2p", "Successfully started networking service");
let _ = init_tx.send(Ok(()));
@@ -219,23 +189,27 @@ impl NetworkService {
}
});
init_rx.recv().expect("libp2p background thread panicked")
.map_err(|err| (err, self.shared.config.listen_address.clone()))?;
init_rx.recv().expect("libp2p background thread panicked")?;
*self.bg_thread.lock() = Some((close_tx, join_handle));
Ok(())
Ok(NetworkService {
shared,
bg_thread: Some((close_tx, join_handle)),
})
}
/// Stop network IO.
pub fn stop(&self) {
if let Some((close_tx, join)) = self.bg_thread.lock().take() {
let _ = close_tx.send(());
if let Err(e) = join.join() {
warn!(target: "sub-libp2p", "error while waiting on libp2p background thread: {:?}", e);
}
}
/// Returns network configuration.
pub fn config(&self) -> &NetworkConfiguration {
&self.shared.config
}
debug_assert!(!self.shared.network_state.has_connected_peer());
pub fn external_url(&self) -> Option<String> {
// TODO: in the context of libp2p, it is hard to define what an external
// URL is, as different nodes can have multiple different ways to
// reach us
self.shared.original_listened_addr.read().as_ref()
.map(|addr|
format!("{}/p2p/{}", addr, self.shared.kad_system.local_peer_id().to_base58())
)
}
/// Get a list of all connected peers by id.
@@ -269,7 +243,7 @@ impl NetworkService {
pub fn with_context_eval<F, T>(&self, protocol: ProtocolId, action: F)
-> Option<T>
where F: FnOnce(&NetworkContext) -> T {
if !self.shared.protocols.read().has_protocol(protocol) {
if !self.shared.protocols.has_protocol(protocol) {
return None
}
@@ -283,7 +257,14 @@ impl NetworkService {
impl Drop for NetworkService {
fn drop(&mut self) {
self.stop()
if let Some((close_tx, join)) = self.bg_thread.take() {
let _ = close_tx.send(());
if let Err(e) = join.join() {
warn!(target: "sub-libp2p", "error while waiting on libp2p background thread: {:?}", e);
}
}
debug_assert!(!self.shared.network_state.has_connected_peer());
}
}
@@ -306,7 +287,7 @@ impl NetworkContext for NetworkContextImpl {
packet_id: PacketId,
data: Vec<u8>
) {
debug_assert!(self.inner.protocols.read().has_protocol(protocol),
debug_assert!(self.inner.protocols.has_protocol(protocol),
"invalid protocol id requested in the API of the libp2p networking");
// TODO: could be "optimized" by building `message` only after checking the validity of
// the peer, but that's probably not worth the effort
@@ -360,12 +341,11 @@ impl NetworkContext for NetworkContextImpl {
fn register_timer(&self, token: usize, duration: Duration)
-> Result<(), Error> {
let handler = self.inner.protocols
.read()
.find_protocol(self.protocol)
.ok_or(ErrorKind::BadProtocol)?
.custom_data()
.clone();
self.inner.timeouts_register_tx.read()
self.inner.timeouts_register_tx
.unbounded_send((duration, (handler, self.protocol, token)))
.map_err(|err| ErrorKind::Io(IoError::new(IoErrorKind::Other, err)))?;
Ok(())
@@ -485,7 +465,7 @@ fn init_thread(
match shared.network_state.add_peer(bootnode) {
Ok(who) => {
trace!(target: "sub-libp2p", "Dialing bootnode {:?}", who);
for proto in shared.protocols.read().0.clone().into_iter() {
for proto in shared.protocols.0.clone().into_iter() {
open_peer_custom_proto(
shared.clone(),
transport.clone(),
@@ -510,7 +490,7 @@ fn init_thread(
if let Ok(addr) = multi {
trace!(target: "sub-libp2p", "Missing NodeIndex for Bootnode {:}. Querying", bootnode);
for proto in shared.protocols.read().0.clone().into_iter() {
for proto in shared.protocols.0.clone().into_iter() {
connect_with_query_peer_id(
shared.clone(),
transport.clone(),
@@ -1025,7 +1005,7 @@ fn connect_to_nodes<T, To, St, C>(
// Try to dial that node for each registered protocol. Since dialing
// upgrades the connection to use multiplexing, dialing multiple times
// should automatically open multiple substreams.
for proto in shared.protocols.read().0.clone().into_iter() {
for proto in shared.protocols.0.clone().into_iter() {
open_peer_custom_proto(
shared.clone(),
base_transport.clone(),
@@ -1398,7 +1378,7 @@ where C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/
type UpgradeIdentifier = <RegisteredProtocols<Arc<NetworkProtocolHandler + Send + Sync>> as ConnectionUpgrade<C, Maf>>::UpgradeIdentifier;
fn protocol_names(&self) -> Self::NamesIter {
ConnectionUpgrade::<C, Maf>::protocol_names(&*self.0.protocols.read())
ConnectionUpgrade::<C, Maf>::protocol_names(&self.0.protocols)
}
type Output = <RegisteredProtocols<Arc<NetworkProtocolHandler + Send + Sync>> as ConnectionUpgrade<C, Maf>>::Output;
@@ -1409,7 +1389,7 @@ where C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/
fn upgrade(self, socket: C, id: Self::UpgradeIdentifier, endpoint: Endpoint,
remote_addr: Maf) -> Self::Future
{
self.0.protocols.read()
self.0.protocols
.clone()
.upgrade(socket, id, endpoint, remote_addr)
}
@@ -1422,7 +1402,6 @@ mod tests {
#[test]
fn builds_and_finishes_in_finite_time() {
// Checks that merely starting the network doesn't end up in an infinite loop.
let service = NetworkService::new(Default::default(), None).unwrap();
service.start(vec![]).map_err(|(err, _)| err).unwrap();
let _service = NetworkService::new(Default::default(), vec![], None).unwrap();
}
}
@@ -93,17 +93,11 @@ impl NetworkProtocolHandler for TestProtocol {
#[test]
fn net_service() {
let service = NetworkService::new(NetworkConfiguration::new_local(), None).expect("Error creating network service");
service.start(vec![(Arc::new(TestProtocol::new(false)), *b"myp", &[(1u8, 1)])]).unwrap();
}
#[test]
fn net_start_stop() {
let config = NetworkConfiguration::new_local();
let service = NetworkService::new(config, None).unwrap();
service.start(vec![]).unwrap();
service.stop();
service.start(vec![]).unwrap();
let _service = NetworkService::new(
NetworkConfiguration::new_local(),
vec![(Arc::new(TestProtocol::new(false)), *b"myp", &[(1u8, 1)])],
None
).expect("Error creating network service");
}
#[test]
@@ -113,14 +107,12 @@ fn net_disconnect() {
let mut config1 = NetworkConfiguration::new_local();
config1.use_secret = Some(key1.secret().clone());
config1.boot_nodes = vec![ ];
let service1 = NetworkService::new(config1, None).unwrap();
let handler1 = Arc::new(TestProtocol::new(false));
service1.start(vec![(handler1.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])]).unwrap();
let service1 = NetworkService::new(config1, vec![(handler1.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])], None).unwrap();
let mut config2 = NetworkConfiguration::new_local();
config2.boot_nodes = vec![ service1.external_url().unwrap() ];
let service2 = NetworkService::new(config2, None).unwrap();
let handler2 = Arc::new(TestProtocol::new(true));
service2.start(vec![(handler2.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])]).unwrap();
let _service2 = NetworkService::new(config2, vec![(handler2.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])], None).unwrap();
while !(handler1.got_disconnect() && handler2.got_disconnect()) {
thread::sleep(Duration::from_millis(50));
}
@@ -131,9 +123,8 @@ fn net_disconnect() {
#[test]
fn net_timeout() {
let config = NetworkConfiguration::new_local();
let service = NetworkService::new(config, None).unwrap();
let handler = Arc::new(TestProtocol::new(false));
service.start(vec![(handler.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])]).unwrap();
let _service = NetworkService::new(config, vec![(handler.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])], None).unwrap();
while !handler.got_timeout() {
thread::sleep(Duration::from_millis(50));
}
+25 -40
View File
@@ -149,21 +149,34 @@ impl<B: BlockT + 'static, S: Specialization<B>> Service<B, S> {
/// Creates and register protocol with the network service
pub fn new(params: Params<B, S>, protocol_id: ProtocolId) -> Result<Arc<Service<B, S>>, Error> {
let chain = params.chain.clone();
let service = NetworkService::new(params.network_config.clone(), None)?;
let import_queue = Arc::new(AsyncImportQueue::new());
let handler = Arc::new(ProtocolHandler {
protocol: Protocol::new(
params.config,
params.chain,
import_queue.clone(),
params.on_demand,
params.transaction_pool,
params.specialization,
)?,
});
let versions = [(::protocol::CURRENT_VERSION as u8, ::protocol::CURRENT_PACKET_COUNT)];
let protocols = vec![(handler.clone() as Arc<_>, protocol_id, &versions[..])];
let service = match NetworkService::new(params.network_config.clone(), protocols, None) {
Ok(service) => service,
Err(err) => {
match err.kind() {
ErrorKind::Io(ref e) if e.kind() == io::ErrorKind::AddrInUse =>
warn!("Network port is already in use, make sure that another instance of Polkadot client is not running or change the port using the --port option."),
_ => warn!("Error starting network: {}", err),
};
return Err(err.into())
},
};
let sync = Arc::new(Service {
network: service,
protocol_id,
handler: Arc::new(ProtocolHandler {
protocol: Protocol::new(
params.config,
params.chain,
import_queue.clone(),
params.on_demand,
params.transaction_pool,
params.specialization,
)?,
}),
handler,
});
import_queue.start(
@@ -201,27 +214,11 @@ impl<B: BlockT + 'static, S: Specialization<B>> Service<B, S> {
res
}
fn start(&self) {
let versions = [(::protocol::CURRENT_VERSION as u8, ::protocol::CURRENT_PACKET_COUNT)];
let protocols = vec![(self.handler.clone() as Arc<_>, self.protocol_id, &versions[..])];
match self.network.start(protocols).map_err(|e| e.0.into()) {
Err(ErrorKind::Io(ref e)) if e.kind() == io::ErrorKind::AddrInUse =>
warn!("Network port is already in use, make sure that another instance of Polkadot client is not running or change the port using the --port option."),
Err(err) => warn!("Error starting network: {}", err),
_ => {},
};
}
fn stop(&self) {
self.handler.protocol.stop();
self.network.stop();
}
}
impl<B: BlockT + 'static, S: Specialization<B>> Drop for Service<B, S> {
fn drop(&mut self) {
self.stop();
self.handler.protocol.stop();
}
}
@@ -307,10 +304,6 @@ pub trait ManageNetwork: Send + Sync {
fn remove_reserved_peer(&self, peer: String) -> Result<(), String>;
/// Add reserved peer
fn add_reserved_peer(&self, peer: String) -> Result<(), String>;
/// Start network
fn start_network(&self);
/// Stop network
fn stop_network(&self);
}
@@ -330,12 +323,4 @@ impl<B: BlockT + 'static, S: Specialization<B>> ManageNetwork for Service<B, S>
fn add_reserved_peer(&self, peer: String) -> Result<(), String> {
self.network.add_reserved_peer(&peer).map_err(|e| format!("{:?}", e))
}
fn start_network(&self) {
self.start();
}
fn stop_network(&self) {
self.stop();
}
}
+4 -7
View File
@@ -60,7 +60,6 @@ use std::sync::Arc;
use futures::prelude::*;
use keystore::Store as Keystore;
use client::BlockchainEvents;
use network::ManageNetwork;
use runtime_primitives::traits::{Header, As};
use exit_future::Signal;
use tokio::runtime::TaskExecutor;
@@ -83,7 +82,7 @@ pub use components::{ServiceFactory, FullBackend, FullExecutor, LightBackend,
/// Substrate service.
pub struct Service<Components: components::Components> {
client: Arc<ComponentClient<Components>>,
network: Arc<components::NetworkService<Components::Factory>>,
network: Option<Arc<components::NetworkService<Components::Factory>>>,
extrinsic_pool: Arc<Components::ExtrinsicPool>,
keystore: Keystore,
exit: ::exit_future::Exit,
@@ -159,8 +158,6 @@ impl<Components> Service<Components>
let network = network::Service::new(network_params, Components::Factory::NETWORK_PROTOCOL_ID)?;
on_demand.map(|on_demand| on_demand.set_service_link(Arc::downgrade(&network)));
network.start_network();
{
// block notifications
let network = network.clone();
@@ -243,7 +240,7 @@ impl<Components> Service<Components>
Ok(Service {
client: client,
network: network,
network: Some(network),
extrinsic_pool: extrinsic_pool,
signal: Some(signal),
keystore: keystore,
@@ -261,7 +258,7 @@ impl<Components> Service<Components>
/// Get shared network instance.
pub fn network(&self) -> Arc<components::NetworkService<Components::Factory>> {
self.network.clone()
self.network.as_ref().expect("self.network always Some").clone()
}
/// Get shared extrinsic pool instance.
@@ -284,7 +281,7 @@ impl<Components> Drop for Service<Components> where Components: components::Comp
fn drop(&mut self) {
debug!(target: "service", "Substrate service shutdown");
self.network.stop_network();
drop(self.network.take());
if let Some(signal) = self.signal.take() {
signal.fire();