// Copyright 2017-2018 Parity Technologies (UK) Ltd. // This file is part of Substrate. // Substrate is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Substrate is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . use std::collections::HashMap; use std::sync::Arc; use std::{io, thread}; use std::time::Duration; use futures::{self, Future, Stream, stream, sync::oneshot}; use parking_lot::Mutex; use network_libp2p::{ProtocolId, PeerId, NetworkConfiguration, ErrorKind}; use network_libp2p::{start_service, Service as NetworkService, ServiceEvent as NetworkServiceEvent}; use network_libp2p::{RegisteredProtocol, parse_str_addr, Protocol as Libp2pProtocol}; use io::NetSyncIo; use protocol::{self, Protocol, ProtocolContext, Context, ProtocolStatus}; use config::{ProtocolConfig}; use error::Error; use chain::Client; use specialization::Specialization; use on_demand::OnDemandService; use import_queue::ImportQueue; use runtime_primitives::traits::{Block as BlockT}; use tokio::{runtime::Runtime, timer::Interval}; /// Type that represents fetch completion future. pub type FetchFuture = oneshot::Receiver>; const TICK_TIMEOUT: Duration = Duration::from_millis(1000); const PROPAGATE_TIMEOUT: Duration = Duration::from_millis(5000); bitflags! { /// Node roles bitmask. pub struct Roles: u8 { /// No network. const NONE = 0b00000000; /// Full node, does not participate in consensus. const FULL = 0b00000001; /// Light client node. const LIGHT = 0b00000010; /// Act as an authority const AUTHORITY = 0b00000100; } } impl ::codec::Encode for Roles { fn encode_to(&self, dest: &mut T) { dest.push_byte(self.bits()) } } impl ::codec::Decode for Roles { fn decode(input: &mut I) -> Option { Self::from_bits(input.read_byte()?) } } /// Sync status pub trait SyncProvider: Send + Sync { /// Get sync status fn status(&self) -> ProtocolStatus; /// Get this node id if available. fn node_id(&self) -> Option; } pub trait ExHashT: ::std::hash::Hash + Eq + ::std::fmt::Debug + Clone + Send + Sync + 'static {} impl ExHashT for T where T: ::std::hash::Hash + Eq + ::std::fmt::Debug + Clone + Send + Sync + 'static {} /// Transaction pool interface pub trait TransactionPool: Send + Sync { /// Get transactions from the pool that are ready to be propagated. fn transactions(&self) -> Vec<(H, B::Extrinsic)>; /// Import a transaction into the pool. fn import(&self, transaction: &B::Extrinsic) -> Option; /// Notify the pool about transactions broadcast. fn on_broadcasted(&self, propagations: HashMap>); } /// Service able to execute closure in the network context. pub trait ExecuteInContext: Send + Sync { /// Execute closure in network context. fn execute_in_context)>(&self, closure: F); } /// Service initialization parameters. pub struct Params { /// Configuration. pub config: ProtocolConfig, /// Network layer configuration. pub network_config: NetworkConfiguration, /// Substrate relay chain access point. pub chain: Arc>, /// On-demand service reference. pub on_demand: Option>>, /// Transaction pool. pub transaction_pool: Arc>, /// Protocol specialization. pub specialization: S, } /// Substrate network service. Handles network IO and manages connectivity. pub struct Service, H: ExHashT> { /// Network service network: Arc>, /// Protocol handler handler: Arc>, /// Protocol ID. protocol_id: ProtocolId, /// Sender for messages to the background service task, and handle for the background thread. /// Dropping the sender should close the task and the thread. /// This is an `Option` because we need to extract it in the destructor. bg_thread: Option<(oneshot::Sender<()>, thread::JoinHandle<()>)>, } impl, H: ExHashT> Service { /// Creates and register protocol with the network service pub fn new>( params: Params, protocol_id: ProtocolId, import_queue: I, ) -> Result>, Error> { let chain = params.chain.clone(); let import_queue = Arc::new(import_queue); let handler = Arc::new(Protocol::new( params.config, params.chain, import_queue.clone(), params.on_demand, params.transaction_pool, params.specialization, )?); let versions = [(protocol::CURRENT_VERSION as u8)]; let registered = RegisteredProtocol::new(protocol_id, &versions[..]); let (thread, network) = start_thread(params.network_config, handler.clone(), registered)?; let sync = Arc::new(Service { network, protocol_id, handler, bg_thread: Some(thread), }); import_queue.start( Arc::downgrade(sync.handler.sync()), Arc::downgrade(&sync), Arc::downgrade(&chain) )?; Ok(sync) } /// Called when a new block is imported by the client. pub fn on_block_imported(&self, hash: B::Hash, header: &B::Header) { self.handler.on_block_imported(&mut NetSyncIo::new(&self.network, self.protocol_id), hash, header) } /// Called when new transactons are imported by the client. pub fn trigger_repropagate(&self) { self.handler.propagate_extrinsics(&mut NetSyncIo::new(&self.network, self.protocol_id)); } /// Execute a closure with the chain-specific network specialization. pub fn with_spec(&self, f: F) -> U where F: FnOnce(&mut S, &mut Context) -> U { self.handler.with_spec(&mut NetSyncIo::new(&self.network, self.protocol_id), f) } } impl, H: ExHashT> ::consensus::SyncOracle for Service { fn is_major_syncing(&self) -> bool { self.handler.sync().read().status().is_major_syncing() } } impl, H:ExHashT> Drop for Service { fn drop(&mut self) { self.handler.stop(); if let Some((sender, join)) = self.bg_thread.take() { let _ = sender.send(()); if let Err(e) = join.join() { error!("Error while waiting on background thread: {:?}", e); } } } } impl, H: ExHashT> ExecuteInContext for Service { fn execute_in_context)>(&self, closure: F) { closure(&mut ProtocolContext::new(self.handler.context_data(), &mut NetSyncIo::new(&self.network, self.protocol_id))) } } impl, H: ExHashT> SyncProvider for Service { /// Get sync status fn status(&self) -> ProtocolStatus { self.handler.status() } fn node_id(&self) -> Option { let network = self.network.lock(); let ret = network .listeners() .next() .map(|addr| { let mut addr = addr.clone(); addr.append(Libp2pProtocol::P2p(network.peer_id().clone().into())); addr.to_string() }); ret } } /// Trait for managing network pub trait ManageNetwork: Send + Sync { /// Set to allow unreserved peers to connect fn accept_unreserved_peers(&self); /// Set to deny unreserved peers to connect fn deny_unreserved_peers(&self); /// Remove reservation for the peer fn remove_reserved_peer(&self, peer: PeerId); /// Add reserved peer fn add_reserved_peer(&self, peer: String) -> Result<(), String>; } impl, H: ExHashT> ManageNetwork for Service { fn accept_unreserved_peers(&self) { self.network.lock().accept_unreserved_peers(); } fn deny_unreserved_peers(&self) { // This method can disconnect nodes, in which case we have to properly close them in the // protocol. let disconnected = self.network.lock().deny_unreserved_peers(); let mut net_sync = NetSyncIo::new(&self.network, self.protocol_id); for node_index in disconnected { self.handler.on_peer_disconnected(&mut net_sync, node_index) } } fn remove_reserved_peer(&self, peer: PeerId) { // This method can disconnect a node, in which case we have to properly close it in the // protocol. let disconnected = self.network.lock().remove_reserved_peer(peer); if let Some(node_index) = disconnected { let mut net_sync = NetSyncIo::new(&self.network, self.protocol_id); self.handler.on_peer_disconnected(&mut net_sync, node_index) } } fn add_reserved_peer(&self, peer: String) -> Result<(), String> { let (addr, peer_id) = parse_str_addr(&peer).map_err(|e| format!("{:?}", e))?; self.network.lock().add_reserved_peer(addr, peer_id); Ok(()) } } /// Starts the background thread that handles the networking. fn start_thread, H: ExHashT>( config: NetworkConfiguration, protocol: Arc>, registered: RegisteredProtocol, ) -> Result<((oneshot::Sender<()>, thread::JoinHandle<()>), Arc>), Error> { let protocol_id = registered.id(); // Start the main service. let service = match start_service(config, Some(registered)) { Ok(service) => Arc::new(Mutex::new(service)), Err(err) => { match err.kind() { ErrorKind::Io(ref e) if e.kind() == io::ErrorKind::AddrInUse => warn!("Network port is already in use, make sure that another instance of Substrate client is not running or change the port using the --port option."), _ => warn!("Error starting network: {}", err), }; return Err(err.into()) }, }; let (close_tx, close_rx) = oneshot::channel(); let service_clone = service.clone(); let mut runtime = Runtime::new()?; let thread = thread::Builder::new().name("network".to_string()).spawn(move || { let fut = run_thread(service_clone, protocol, protocol_id) .select(close_rx.then(|_| Ok(()))) .map(|(val, _)| val) .map_err(|(err,_ )| err); // Note that we use `block_on` and not `block_on_all` because we want to kill the thread // instantly if `close_rx` receives something. match runtime.block_on(fut) { Ok(()) => debug!(target: "sub-libp2p", "Networking thread finished"), Err(err) => error!(target: "sub-libp2p", "Error while running libp2p: {:?}", err), }; })?; Ok(((close_tx, thread), service)) } /// Runs the background thread that handles the networking. fn run_thread, H: ExHashT>( network_service: Arc>, protocol: Arc>, protocol_id: ProtocolId, ) -> impl Future { // Interval for performing maintenance on the protocol handler. let tick = Interval::new_interval(TICK_TIMEOUT) .for_each({ let protocol = protocol.clone(); let network_service = network_service.clone(); move |_| { protocol.tick(&mut NetSyncIo::new(&network_service, protocol_id)); Ok(()) } }) .then(|res| { match res { Ok(()) => (), Err(err) => error!("Error in the propagation timer: {:?}", err), }; Ok(()) }); // Interval at which we gossip extrinsics over the network. let propagate = Interval::new_interval(PROPAGATE_TIMEOUT) .for_each({ let protocol = protocol.clone(); let network_service = network_service.clone(); move |_| { protocol.propagate_extrinsics(&mut NetSyncIo::new(&network_service, protocol_id)); Ok(()) } }) .then(|res| { match res { Ok(()) => (), Err(err) => error!("Error in the propagation timer: {:?}", err), }; Ok(()) }); // The network service produces events about what happens on the network. Let's process them. let network_service2 = network_service.clone(); let network = stream::poll_fn(move || network_service2.lock().poll()).for_each(move |event| { let mut net_sync = NetSyncIo::new(&network_service, protocol_id); match event { NetworkServiceEvent::NodeClosed { node_index, closed_custom_protocols } => { if !closed_custom_protocols.is_empty() { debug_assert_eq!(closed_custom_protocols, &[protocol_id]); protocol.on_peer_disconnected(&mut net_sync, node_index); } } NetworkServiceEvent::ClosedCustomProtocols { node_index, protocols } => { if !protocols.is_empty() { debug_assert_eq!(protocols, &[protocol_id]); protocol.on_peer_disconnected(&mut net_sync, node_index); } } NetworkServiceEvent::OpenedCustomProtocol { node_index, version, .. } => { debug_assert_eq!(version, protocol::CURRENT_VERSION as u8); protocol.on_peer_connected(&mut net_sync, node_index); } NetworkServiceEvent::ClosedCustomProtocol { node_index, .. } => { protocol.on_peer_disconnected(&mut net_sync, node_index); } NetworkServiceEvent::CustomMessage { node_index, data, .. } => { protocol.handle_packet(&mut net_sync, node_index, &data); } }; Ok(()) }); // Merge all futures into one. let futures: Vec + Send>> = vec![ Box::new(tick) as Box<_>, Box::new(propagate) as Box<_>, Box::new(network) as Box<_> ]; futures::select_all(futures) .and_then(move |_| { debug!("Networking ended"); Ok(()) }) .map_err(|(r, _, _)| r) }