// Copyright 2017-2019 Parity Technologies (UK) Ltd. // This file is part of Substrate. // Substrate is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Substrate is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::{io, thread}; use log::{warn, debug, error, trace, info}; use futures::{Async, Future, Stream, stream, sync::oneshot, sync::mpsc}; use parking_lot::{Mutex, RwLock}; use network_libp2p::{ProtocolId, NetworkConfiguration, Severity}; use network_libp2p::{start_service, parse_str_addr, Service as NetworkService, ServiceEvent as NetworkServiceEvent}; use network_libp2p::{RegisteredProtocol, NetworkState}; use peerset::PeersetHandle; use consensus::import_queue::{ImportQueue, Link}; use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId}; use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient}; use crate::message::Message; use crate::protocol::{self, Context, FromNetworkMsg, Protocol, ConnectedPeer, ProtocolMsg, ProtocolStatus, PeerInfo}; use crate::config::Params; use crate::error::Error; use crate::specialization::NetworkSpecialization; use crossbeam_channel::{self as channel, Receiver, Sender, TryRecvError}; use tokio::prelude::task::AtomicTask; use tokio::runtime::Builder as RuntimeBuilder; pub use network_libp2p::PeerId; /// Type that represents fetch completion future. pub type FetchFuture = oneshot::Receiver>; /// Sync status pub trait SyncProvider: Send + Sync { /// Get a stream of sync statuses. fn status(&self) -> mpsc::UnboundedReceiver>; /// Get network state. fn network_state(&self) -> NetworkState; /// Get currently connected peers fn peers(&self) -> Vec<(PeerId, PeerInfo)>; /// Are we in the process of downloading the chain? fn is_major_syncing(&self) -> bool; } /// Minimum Requirements for a Hash within Networking pub trait ExHashT: ::std::hash::Hash + Eq + ::std::fmt::Debug + Clone + Send + Sync + 'static { } impl ExHashT for T where T: ::std::hash::Hash + Eq + ::std::fmt::Debug + Clone + Send + Sync + 'static { } /// Transaction pool interface pub trait TransactionPool: Send + Sync { /// Get transactions from the pool that are ready to be propagated. fn transactions(&self) -> Vec<(H, B::Extrinsic)>; /// Import a transaction into the pool. fn import(&self, transaction: &B::Extrinsic) -> Option; /// Notify the pool about transactions broadcast. fn on_broadcasted(&self, propagations: HashMap>); } /// A link implementation that connects to the network. #[derive(Clone)] pub struct NetworkLink> { /// The protocol sender pub(crate) protocol_sender: Sender>, /// The network sender pub(crate) network_sender: NetworkChan, } impl> Link for NetworkLink { fn block_imported(&self, hash: &B::Hash, number: NumberFor) { let _ = self.protocol_sender.send(ProtocolMsg::BlockImportedSync(hash.clone(), number)); } fn blocks_processed(&self, processed_blocks: Vec, has_error: bool) { let _ = self.protocol_sender.send(ProtocolMsg::BlocksProcessed(processed_blocks, has_error)); } fn justification_imported(&self, who: PeerId, hash: &B::Hash, number: NumberFor, success: bool) { let _ = self.protocol_sender.send(ProtocolMsg::JustificationImportResult(hash.clone(), number, success)); if !success { let reason = Severity::Bad(format!("Invalid justification provided for #{}", hash).to_string()); let _ = self.network_sender.send(NetworkMsg::ReportPeer(who, reason)); } } fn clear_justification_requests(&self) { let _ = self.protocol_sender.send(ProtocolMsg::ClearJustificationRequests); } fn request_justification(&self, hash: &B::Hash, number: NumberFor) { let _ = self.protocol_sender.send(ProtocolMsg::RequestJustification(hash.clone(), number)); } fn useless_peer(&self, who: PeerId, reason: &str) { trace!(target:"sync", "Useless peer {}, {}", who, reason); self.network_sender.send(NetworkMsg::ReportPeer(who, Severity::Useless(reason.to_string()))); } fn note_useless_and_restart_sync(&self, who: PeerId, reason: &str) { trace!(target:"sync", "Bad peer {}, {}", who, reason); // is this actually malign or just useless? self.network_sender.send(NetworkMsg::ReportPeer(who, Severity::Useless(reason.to_string()))); let _ = self.protocol_sender.send(ProtocolMsg::RestartSync); } fn restart(&self) { let _ = self.protocol_sender.send(ProtocolMsg::RestartSync); } } /// A cloneable handle for reporting cost/benefits of peers. #[derive(Clone)] pub struct ReportHandle { inner: PeersetHandle, // wraps it so we don't have to worry about breaking API. } impl ReportHandle { /// Report a given peer as either beneficial (+) or costly (-) according to the /// given scalar. pub fn report_peer(&self, who: PeerId, cost_benefit: i32) { self.inner.report_peer(who, cost_benefit); } } /// Substrate network service. Handles network IO and manages connectivity. pub struct Service> { /// Sinks to propagate status updates. status_sinks: Arc>>>>, /// Are we connected to any peer? is_offline: Arc, /// Are we actively catching up with the chain? is_major_syncing: Arc, /// Peers whom we are connected with. peers: Arc>>>, /// Network service network: Arc>>>, /// Peerset manager (PSM); manages the reputation of nodes and indicates the network which /// nodes it should be connected to or not. peerset: PeersetHandle, /// Protocol sender protocol_sender: Sender>, /// Sender for messages to the background service task, and handle for the background thread. /// Dropping the sender should close the task and the thread. /// This is an `Option` because we need to extract it in the destructor. bg_thread: Option<(oneshot::Sender<()>, thread::JoinHandle<()>)>, } impl> Service { /// Creates and register protocol with the network service pub fn new( params: Params, protocol_id: ProtocolId, import_queue: Box>, ) -> Result<(Arc>, NetworkChan), Error> { let (network_chan, network_port) = network_channel(); let status_sinks = Arc::new(Mutex::new(Vec::new())); // Start in off-line mode, since we're not connected to any nodes yet. let is_offline = Arc::new(AtomicBool::new(true)); let is_major_syncing = Arc::new(AtomicBool::new(false)); let peers: Arc>>> = Arc::new(Default::default()); let (protocol_sender, network_to_protocol_sender) = Protocol::new( status_sinks.clone(), is_offline.clone(), is_major_syncing.clone(), peers.clone(), network_chan.clone(), params.config, params.chain, import_queue.clone(), params.on_demand, params.transaction_pool, params.specialization, )?; let versions: Vec<_> = ((protocol::MIN_VERSION as u8)..=(protocol::CURRENT_VERSION as u8)).collect(); let registered = RegisteredProtocol::new(protocol_id, &versions); let (thread, network, peerset) = start_thread( network_to_protocol_sender, network_port, params.network_config, registered, )?; let service = Arc::new(Service { status_sinks, is_offline, is_major_syncing, peers, peerset, network, protocol_sender: protocol_sender.clone(), bg_thread: Some(thread), }); // connect the import-queue to the network service. let link = NetworkLink { protocol_sender, network_sender: network_chan.clone(), }; import_queue.start(Box::new(link))?; Ok((service, network_chan)) } /// Returns the downloaded bytes per second averaged over the past few seconds. #[inline] pub fn average_download_per_sec(&self) -> u64 { self.network.lock().average_download_per_sec() } /// Returns the uploaded bytes per second averaged over the past few seconds. #[inline] pub fn average_upload_per_sec(&self) -> u64 { self.network.lock().average_upload_per_sec() } /// Returns the network identity of the node. pub fn local_peer_id(&self) -> PeerId { self.network.lock().peer_id().clone() } /// Called when a new block is imported by the client. pub fn on_block_imported(&self, hash: B::Hash, header: B::Header) { let _ = self .protocol_sender .send(ProtocolMsg::BlockImported(hash, header)); } /// Called when a new block is finalized by the client. pub fn on_block_finalized(&self, hash: B::Hash, header: B::Header) { let _ = self .protocol_sender .send(ProtocolMsg::BlockFinalized(hash, header)); } /// Called when new transactons are imported by the client. pub fn trigger_repropagate(&self) { let _ = self.protocol_sender.send(ProtocolMsg::PropagateExtrinsics); } /// Make sure an important block is propagated to peers. /// /// In chain-based consensus, we often need to make sure non-best forks are /// at least temporarily synced. pub fn announce_block(&self, hash: B::Hash) { let _ = self.protocol_sender.send(ProtocolMsg::AnnounceBlock(hash)); } /// Send a consensus message through the gossip pub fn gossip_consensus_message( &self, topic: B::Hash, engine_id: ConsensusEngineId, message: Vec, recipient: GossipMessageRecipient, ) { let _ = self .protocol_sender .send(ProtocolMsg::GossipConsensusMessage( topic, engine_id, message, recipient, )); } /// Return a cloneable handle for reporting peers' benefits or misbehavior. pub fn report_handle(&self) -> ReportHandle { ReportHandle { inner: self.peerset.clone() } } /// Report a given peer as either beneficial (+) or costly (-) according to the /// given scalar. pub fn report_peer(&self, who: PeerId, cost_benefit: i32) { self.peerset.report_peer(who, cost_benefit); } /// Execute a closure with the chain-specific network specialization. pub fn with_spec(&self, f: F) where F: FnOnce(&mut S, &mut Context) + Send + 'static { let _ = self .protocol_sender .send(ProtocolMsg::ExecuteWithSpec(Box::new(f))); } /// Execute a closure with the consensus gossip. pub fn with_gossip(&self, f: F) where F: FnOnce(&mut ConsensusGossip, &mut Context) + Send + 'static { let _ = self .protocol_sender .send(ProtocolMsg::ExecuteWithGossip(Box::new(f))); } /// Are we in the process of downloading the chain? /// Used by both SyncProvider and SyncOracle. fn is_major_syncing(&self) -> bool { self.is_major_syncing.load(Ordering::Relaxed) } } impl> ::consensus::SyncOracle for Service { fn is_major_syncing(&self) -> bool { self.is_major_syncing() } fn is_offline(&self) -> bool { self.is_offline.load(Ordering::Relaxed) } } impl> Drop for Service { fn drop(&mut self) { if let Some((sender, join)) = self.bg_thread.take() { let _ = sender.send(()); if let Err(e) = join.join() { error!("Error while waiting on background thread: {:?}", e); } } } } impl> SyncProvider for Service { fn is_major_syncing(&self) -> bool { self.is_major_syncing() } /// Get sync status fn status(&self) -> mpsc::UnboundedReceiver> { let (sink, stream) = mpsc::unbounded(); self.status_sinks.lock().push(sink); stream } fn network_state(&self) -> NetworkState { self.network.lock().state() } fn peers(&self) -> Vec<(PeerId, PeerInfo)> { let peers = (*self.peers.read()).clone(); peers.into_iter().map(|(idx, connected)| (idx, connected.peer_info)).collect() } } /// Trait for managing network pub trait ManageNetwork { /// Set to allow unreserved peers to connect fn accept_unreserved_peers(&self); /// Set to deny unreserved peers to connect fn deny_unreserved_peers(&self); /// Remove reservation for the peer fn remove_reserved_peer(&self, peer: PeerId); /// Add reserved peer fn add_reserved_peer(&self, peer: String) -> Result<(), String>; } impl> ManageNetwork for Service { fn accept_unreserved_peers(&self) { self.peerset.set_reserved_only(false); } fn deny_unreserved_peers(&self) { self.peerset.set_reserved_only(true); } fn remove_reserved_peer(&self, peer: PeerId) { self.peerset.remove_reserved_peer(peer); } fn add_reserved_peer(&self, peer: String) -> Result<(), String> { let (peer_id, addr) = parse_str_addr(&peer).map_err(|e| format!("{:?}", e))?; self.peerset.add_reserved_peer(peer_id.clone()); self.network.lock().add_known_address(peer_id, addr); Ok(()) } } /// Create a NetworkPort/Chan pair. pub fn network_channel() -> (NetworkChan, NetworkPort) { let (network_sender, network_receiver) = channel::unbounded(); let task_notify = Arc::new(AtomicTask::new()); let network_port = NetworkPort::new(network_receiver, task_notify.clone()); let network_chan = NetworkChan::new(network_sender, task_notify); (network_chan, network_port) } /// A sender of NetworkMsg that notifies a task when a message has been sent. #[derive(Clone)] pub struct NetworkChan { sender: Sender>, task_notify: Arc, } impl NetworkChan { /// Create a new network chan. pub fn new(sender: Sender>, task_notify: Arc) -> Self { NetworkChan { sender, task_notify, } } /// Send a messaging, to be handled on a stream. Notify the task handling the stream. pub fn send(&self, msg: NetworkMsg) { let _ = self.sender.send(msg); self.task_notify.notify(); } } impl Drop for NetworkChan { /// Notifying the task when a sender is dropped(when all are dropped, the stream is finished). fn drop(&mut self) { self.task_notify.notify(); } } /// A receiver of NetworkMsg that makes the protocol-id available with each message. pub struct NetworkPort { receiver: Receiver>, task_notify: Arc, } impl NetworkPort { /// Create a new network port for a given protocol-id. pub fn new(receiver: Receiver>, task_notify: Arc) -> Self { Self { receiver, task_notify, } } /// Receive a message, if any is currently-enqueued. /// Register the current tokio task for notification when a new message is available. pub fn take_one_message(&self) -> Result>, ()> { self.task_notify.register(); match self.receiver.try_recv() { Ok(msg) => Ok(Some(msg)), Err(TryRecvError::Empty) => Ok(None), Err(TryRecvError::Disconnected) => Err(()), } } /// Get a reference to the underlying crossbeam receiver. #[cfg(any(test, feature = "test-helpers"))] pub fn receiver(&self) -> &Receiver> { &self.receiver } } /// Messages to be handled by NetworkService. #[derive(Debug)] pub enum NetworkMsg { /// Send an outgoing custom message. Outgoing(PeerId, Message), /// Report a peer. ReportPeer(PeerId, Severity), /// Synchronization response. #[cfg(any(test, feature = "test-helpers"))] Synchronized, } /// Starts the background thread that handles the networking. fn start_thread( protocol_sender: Sender>, network_port: NetworkPort, config: NetworkConfiguration, registered: RegisteredProtocol>, ) -> Result<((oneshot::Sender<()>, thread::JoinHandle<()>), Arc>>>, PeersetHandle), Error> { // Start the main service. let (service, peerset) = match start_service(config, registered) { Ok((service, peerset)) => (Arc::new(Mutex::new(service)), peerset), Err(err) => { warn!("Error starting network: {}", err); return Err(err.into()) }, }; let (close_tx, close_rx) = oneshot::channel(); let service_clone = service.clone(); let mut runtime = RuntimeBuilder::new().name_prefix("libp2p-").build()?; let peerset_clone = peerset.clone(); let thread = thread::Builder::new().name("network".to_string()).spawn(move || { let fut = run_thread(protocol_sender, service_clone, network_port, peerset_clone) .select(close_rx.then(|_| Ok(()))) .map(|(val, _)| val) .map_err(|(err,_ )| err); // Note that we use `block_on` and not `block_on_all` because we want to kill the thread // instantly if `close_rx` receives something. match runtime.block_on(fut) { Ok(()) => debug!(target: "sub-libp2p", "Networking thread finished"), Err(err) => error!(target: "sub-libp2p", "Error while running libp2p: {:?}", err), }; })?; Ok(((close_tx, thread), service, peerset)) } /// Runs the background thread that handles the networking. fn run_thread( protocol_sender: Sender>, network_service: Arc>>>, network_port: NetworkPort, peerset: PeersetHandle, ) -> impl Future { let network_service_2 = network_service.clone(); // Protocol produces a stream of messages about what happens in sync. let protocol = stream::poll_fn(move || { match network_port.take_one_message() { Ok(Some(message)) => Ok(Async::Ready(Some(message))), Ok(None) => Ok(Async::NotReady), Err(_) => Err(()) } }).for_each(move |msg| { // Handle message from Protocol. match msg { NetworkMsg::Outgoing(who, outgoing_message) => { network_service_2 .lock() .send_custom_message(&who, outgoing_message); }, NetworkMsg::ReportPeer(who, severity) => { match severity { Severity::Bad(message) => { info!(target: "sync", "Banning {:?} because {:?}", who, message); network_service_2.lock().drop_node(&who); // temporary: make sure the peer gets dropped from the peerset peerset.report_peer(who, i32::min_value()); }, Severity::Useless(message) => { debug!(target: "sync", "Dropping {:?} because {:?}", who, message); network_service_2.lock().drop_node(&who) }, Severity::Timeout => { debug!(target: "sync", "Dropping {:?} because it timed out", who); network_service_2.lock().drop_node(&who) }, } }, #[cfg(any(test, feature = "test-helpers"))] NetworkMsg::Synchronized => (), } Ok(()) }) .then(|res| { match res { Ok(()) => (), Err(_) => error!("Protocol disconnected"), }; Ok(()) }); // The network service produces events about what happens on the network. Let's process them. let network = stream::poll_fn(move || network_service.lock().poll()).for_each(move |event| { match event { NetworkServiceEvent::OpenedCustomProtocol { peer_id, version, debug_info, .. } => { debug_assert!( version <= protocol::CURRENT_VERSION as u8 && version >= protocol::MIN_VERSION as u8 ); let _ = protocol_sender.send(FromNetworkMsg::PeerConnected(peer_id, debug_info)); } NetworkServiceEvent::ClosedCustomProtocol { peer_id, debug_info, .. } => { let _ = protocol_sender.send(FromNetworkMsg::PeerDisconnected(peer_id, debug_info)); } NetworkServiceEvent::CustomMessage { peer_id, message, .. } => { let _ = protocol_sender.send(FromNetworkMsg::CustomMessage(peer_id, message)); return Ok(()) } NetworkServiceEvent::Clogged { peer_id, messages, .. } => { debug!(target: "sync", "{} clogging messages:", messages.len()); for msg in messages.into_iter().take(5) { debug!(target: "sync", "{:?}", msg); let _ = protocol_sender.send(FromNetworkMsg::PeerClogged(peer_id.clone(), Some(msg))); } } }; Ok(()) }); // Merge all futures into one. let futures: Vec + Send>> = vec![ Box::new(protocol) as Box<_>, Box::new(network) as Box<_> ]; futures::select_all(futures) .and_then(move |_| { debug!("Networking ended"); Ok(()) }) .map_err(|(r, _, _)| r) }