diff --git a/substrate/core/finality-grandpa/src/communication/mod.rs b/substrate/core/finality-grandpa/src/communication/mod.rs index 0633fcec1f..cbcfef0d41 100644 --- a/substrate/core/finality-grandpa/src/communication/mod.rs +++ b/substrate/core/finality-grandpa/src/communication/mod.rs @@ -125,9 +125,10 @@ pub(crate) fn global_topic(set_id: u64) -> B::Hash { <::Hashing as HashT>::hash(format!("{}-GLOBAL", set_id).as_bytes()) } -impl Network for Arc> where +impl Network for Arc> where B: BlockT, S: network::specialization::NetworkSpecialization, + H: network::ExHashT, { type In = NetworkStream; diff --git a/substrate/core/network/src/lib.rs b/substrate/core/network/src/lib.rs index 88c252bb32..44ca7eab4c 100644 --- a/substrate/core/network/src/lib.rs +++ b/substrate/core/network/src/lib.rs @@ -169,6 +169,7 @@ mod discovery; mod on_demand_layer; #[macro_use] mod protocol; +mod protocol_behaviour; mod service; mod transport; diff --git a/substrate/core/network/src/protocol_behaviour.rs b/substrate/core/network/src/protocol_behaviour.rs new file mode 100644 index 0000000000..81c0502f60 --- /dev/null +++ b/substrate/core/network/src/protocol_behaviour.rs @@ -0,0 +1,463 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Implementation of libp2p's `NetworkBehaviour` trait that handles everything Substrate-specific. + +use crate::{ExHashT, DiscoveryNetBehaviour, ProtocolId}; +use crate::custom_proto::{CustomProto, CustomProtoOut}; +use crate::chain::{Client, FinalityProofProvider}; +use crate::protocol::{self, CustomMessageOutcome, Protocol, ProtocolConfig, sync::SyncState}; +use crate::protocol::{PeerInfo, NetworkOut, message::Message, on_demand::RequestData}; +use crate::protocol::consensus_gossip::MessageRecipient as GossipMessageRecipient; +use crate::protocol::specialization::NetworkSpecialization; +use crate::service::TransactionPool; + +use client::light::fetcher::FetchChecker; +use futures::prelude::*; +use consensus::import_queue::SharedFinalityProofRequestBuilder; +use log::debug; +use libp2p::{PeerId, Multiaddr}; +use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; +use libp2p::core::{nodes::Substream, muxing::StreamMuxerBox}; +use libp2p::core::protocols_handler::{ProtocolsHandler, IntoProtocolsHandler}; +use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId}; +use std::sync::Arc; + +/// Implementation of `NetworkBehaviour` that handles everything related to Substrate and Polkadot. +pub struct ProtocolBehaviour, H: ExHashT> { + /// Handles opening the unique substream and sending and receiving raw messages. + behaviour: CustomProto, Substream>, + /// Handles the logic behind the raw messages that we receive. + protocol: Protocol, + /// Used to report reputation changes. + peerset_handle: peerset::PeersetHandle, + transaction_pool: Arc>, + /// When asked for a proof of finality, we use this struct to build one. + finality_proof_provider: Option>>, +} + +impl, H: ExHashT> ProtocolBehaviour { + /// Builds a new `ProtocolBehaviour`. + pub fn new( + config: ProtocolConfig, + chain: Arc>, + checker: Arc>, + specialization: S, + transaction_pool: Arc>, + finality_proof_provider: Option>>, + protocol_id: ProtocolId, + versions: &[u8], + peerset: peerset::Peerset, + peerset_handle: peerset::PeersetHandle, + ) -> crate::error::Result { + let protocol = Protocol::new(config, chain, checker, specialization)?; + let behaviour = CustomProto::new(protocol_id, versions, peerset); + + Ok(ProtocolBehaviour { + protocol, + behaviour, + peerset_handle, + transaction_pool, + finality_proof_provider, + }) + } + + /// Returns the list of all the peers we have an open channel to. + pub fn open_peers(&self) -> impl Iterator { + self.behaviour.open_peers() + } + + /// Returns true if we have a channel open with this node. + pub fn is_open(&self, peer_id: &PeerId) -> bool { + self.behaviour.is_open(peer_id) + } + + /// Disconnects the given peer if we are connected to it. + pub fn disconnect_peer(&mut self, peer_id: &PeerId) { + self.behaviour.disconnect_peer(peer_id) + } + + /// Adjusts the reputation of a node. + pub fn report_peer(&mut self, who: PeerId, reputation: i32) { + self.peerset_handle.report_peer(who, reputation) + } + + /// Returns true if we try to open protocols with the given peer. + pub fn is_enabled(&self, peer_id: &PeerId) -> bool { + self.behaviour.is_enabled(peer_id) + } + + /// Sends a message to a peer. + /// + /// Has no effect if the custom protocol is not open with the given peer. + /// + /// Also note that even we have a valid open substream, it may in fact be already closed + /// without us knowing, in which case the packet will not be received. + pub fn send_packet(&mut self, target: &PeerId, message: Message) { + self.behaviour.send_packet(target, message) + } + + /// Returns the state of the peerset manager, for debugging purposes. + pub fn peerset_debug_info(&mut self) -> serde_json::Value { + self.behaviour.peerset_debug_info() + } + + /// Returns the number of peers we're connected to. + pub fn num_connected_peers(&self) -> usize { + self.protocol.num_connected_peers() + } + + /// Returns the number of peers we're connected to and that are being queried. + pub fn num_active_peers(&self) -> usize { + self.protocol.num_active_peers() + } + + /// Current global sync state. + pub fn sync_state(&self) -> SyncState { + self.protocol.sync_state() + } + + /// Target sync block number. + pub fn best_seen_block(&self) -> Option> { + self.protocol.best_seen_block() + } + + /// Number of peers participating in syncing. + pub fn num_sync_peers(&self) -> u32 { + self.protocol.num_sync_peers() + } + + /// Starts a new data demand request. + /// + /// The parameter contains a `Sender` where the result, once received, must be sent. + pub(crate) fn add_on_demand_request(&mut self, rq: RequestData) { + self.protocol.add_on_demand_request( + &mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle }, + rq + ); + } + + /// Returns information about all the peers we are connected to after the handshake message. + pub fn peers_info(&self) -> impl Iterator)> { + self.protocol.peers_info() + } + + /// Locks `self` and gives access to the protocol and a context that can be used in order to + /// use `consensus_gossip_lock` or `specialization_lock`. + /// + /// **Important**: ONLY USE THIS FUNCTION TO CALL `consensus_gossip_lock` or `specialization_lock`. + /// This function is a very bad API. + pub fn protocol_context_lock<'a>( + &'a mut self, + ) -> (&'a mut Protocol, LocalNetworkOut<'a, B>) { + let net_out = LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle }; + (&mut self.protocol, net_out) + } + + /// Gossip a consensus message to the network. + pub fn gossip_consensus_message( + &mut self, + topic: B::Hash, + engine_id: ConsensusEngineId, + message: Vec, + recipient: GossipMessageRecipient, + ) { + self.protocol.gossip_consensus_message( + &mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle }, + topic, + engine_id, + message, + recipient + ); + } + + /// Call when we must propagate ready extrinsics to peers. + pub fn propagate_extrinsics(&mut self) { + self.protocol.propagate_extrinsics( + &mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle }, + &*self.transaction_pool + ) + } + + /// Make sure an important block is propagated to peers. + /// + /// In chain-based consensus, we often need to make sure non-best forks are + /// at least temporarily synced. + pub fn announce_block(&mut self, hash: B::Hash) { + self.protocol.announce_block( + &mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle }, + hash + ) + } + + /// Call this when a block has been imported in the import queue and we should announce it on + /// the network. + pub fn on_block_imported(&mut self, hash: B::Hash, header: &B::Header) { + self.protocol.on_block_imported( + &mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle }, + hash, + header + ) + } + + /// Call this when a block has been finalized. The sync layer may have some additional + /// requesting to perform. + pub fn on_block_finalized(&mut self, hash: B::Hash, header: &B::Header) { + self.protocol.on_block_finalized( + &mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle }, + hash, + header + ) + } + + /// Request a justification for the given block. + /// + /// Uses `protocol` to queue a new justification request and tries to dispatch all pending + /// requests. + pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { + self.protocol.request_justification( + &mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle }, + hash, + number + ) + } + + /// Clears all pending justification requests. + pub fn clear_justification_requests(&mut self) { + self.protocol.clear_justification_requests() + } + + /// A batch of blocks have been processed, with or without errors. + /// Call this when a batch of blocks have been processed by the import queue, with or without + /// errors. + pub fn blocks_processed( + &mut self, + processed_blocks: Vec, + has_error: bool, + ) { + self.protocol.blocks_processed( + &mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle }, + processed_blocks, + has_error, + ) + } + + /// Restart the sync process. + pub fn restart(&mut self) { + let mut net_out = LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle }; + self.protocol.restart(&mut net_out); + } + + /// Notify about successful import of the given block. + pub fn block_imported(&mut self, hash: &B::Hash, number: NumberFor) { + self.protocol.block_imported(hash, number) + } + + pub fn set_finality_proof_request_builder(&mut self, request_builder: SharedFinalityProofRequestBuilder) { + self.protocol.set_finality_proof_request_builder(request_builder) + } + + /// Call this when a justification has been processed by the import queue, with or without + /// errors. + pub fn justification_import_result(&mut self, hash: B::Hash, number: NumberFor, success: bool) { + self.protocol.justification_import_result(hash, number, success) + } + + /// Request a finality proof for the given block. + /// + /// Queues a new finality proof request and tries to dispatch all pending requests. + pub fn request_finality_proof( + &mut self, + hash: &B::Hash, + number: NumberFor, + ) { + self.protocol.request_finality_proof( + &mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle }, + &hash, + number, + ); + } + + pub fn finality_proof_import_result( + &mut self, + request_block: (B::Hash, NumberFor), + finalization_result: Result<(B::Hash, NumberFor), ()>, + ) { + self.protocol.finality_proof_import_result(request_block, finalization_result) + } + + pub fn tick(&mut self) { + self.protocol.tick(&mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle }); + } +} + +impl, H: ExHashT> NetworkBehaviour for +ProtocolBehaviour { + type ProtocolsHandler = , Substream> as NetworkBehaviour>::ProtocolsHandler; + type OutEvent = CustomMessageOutcome; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + self.behaviour.new_handler() + } + + fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { + self.behaviour.addresses_of_peer(peer_id) + } + + fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { + self.behaviour.inject_connected(peer_id, endpoint) + } + + fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) { + self.behaviour.inject_disconnected(peer_id, endpoint) + } + + fn inject_node_event( + &mut self, + peer_id: PeerId, + event: <::Handler as ProtocolsHandler>::OutEvent, + ) { + self.behaviour.inject_node_event(peer_id, event) + } + + fn poll( + &mut self, + params: &mut PollParameters, + ) -> Async< + NetworkBehaviourAction< + <::Handler as ProtocolsHandler>::InEvent, + Self::OutEvent + > + > { + let mut net_out = LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle }; + match self.protocol.poll(&mut net_out, &*self.transaction_pool) { + Ok(Async::Ready(v)) => void::unreachable(v), + Ok(Async::NotReady) => {} + Err(err) => void::unreachable(err), + } + + let event = match self.behaviour.poll(params) { + Async::NotReady => return Async::NotReady, + Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)) => ev, + Async::Ready(NetworkBehaviourAction::DialAddress { address }) => + return Async::Ready(NetworkBehaviourAction::DialAddress { address }), + Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }) => + return Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }), + Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) => + return Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }), + Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) => + return Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }), + }; + + let mut network_out = LocalNetworkOut { + inner: &mut self.behaviour, + peerset_handle: &self.peerset_handle, + }; + + let outcome = match event { + CustomProtoOut::CustomProtocolOpen { peer_id, version, .. } => { + debug_assert!( + version <= protocol::CURRENT_VERSION as u8 + && version >= protocol::MIN_VERSION as u8 + ); + self.protocol.on_peer_connected(&mut network_out, peer_id); + CustomMessageOutcome::None + } + CustomProtoOut::CustomProtocolClosed { peer_id, .. } => { + self.protocol.on_peer_disconnected(&mut network_out, peer_id); + CustomMessageOutcome::None + }, + CustomProtoOut::CustomMessage { peer_id, message } => + self.protocol.on_custom_message( + &mut network_out, + &*self.transaction_pool, + peer_id, + message, + self.finality_proof_provider.as_ref().map(|p| &**p) + ), + CustomProtoOut::Clogged { peer_id, messages } => { + debug!(target: "sync", "{} clogging messages:", messages.len()); + for msg in messages.into_iter().take(5) { + debug!(target: "sync", "{:?}", msg); + self.protocol.on_clogged_peer(&mut network_out, peer_id.clone(), Some(msg)); + } + CustomMessageOutcome::None + } + }; + + if let CustomMessageOutcome::None = outcome { + Async::NotReady + } else { + Async::Ready(NetworkBehaviourAction::GenerateEvent(outcome)) + } + } + + fn inject_replaced(&mut self, peer_id: PeerId, closed_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) { + self.behaviour.inject_replaced(peer_id, closed_endpoint, new_endpoint) + } + + fn inject_addr_reach_failure( + &mut self, + peer_id: Option<&PeerId>, + addr: &Multiaddr, + error: &dyn std::error::Error + ) { + self.behaviour.inject_addr_reach_failure(peer_id, addr, error) + } + + fn inject_dial_failure(&mut self, peer_id: &PeerId) { + self.behaviour.inject_dial_failure(peer_id) + } + + fn inject_new_listen_addr(&mut self, addr: &Multiaddr) { + self.behaviour.inject_new_listen_addr(addr) + } + + fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) { + self.behaviour.inject_expired_listen_addr(addr) + } + + fn inject_new_external_addr(&mut self, addr: &Multiaddr) { + self.behaviour.inject_new_external_addr(addr) + } +} + +impl, H: ExHashT> DiscoveryNetBehaviour + for ProtocolBehaviour { + fn add_discovered_nodes(&mut self, peer_ids: impl Iterator) { + self.behaviour.add_discovered_nodes(peer_ids) + } +} + +/// Has to be public for stupid API reasons. This should be made private again ASAP. +pub struct LocalNetworkOut<'a, B: BlockT> { + inner: &'a mut CustomProto, Substream>, + peerset_handle: &'a peerset::PeersetHandle, +} + +impl<'a, B: BlockT> NetworkOut for LocalNetworkOut<'a, B> { + fn report_peer(&mut self, who: PeerId, reputation: i32) { + self.peerset_handle.report_peer(who, reputation) + } + + fn disconnect_peer(&mut self, who: PeerId) { + self.inner.disconnect_peer(&who) + } + + fn send_message(&mut self, who: PeerId, message: Message) { + self.inner.send_packet(&who, message) + } +} diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index 34cc9797f1..eb28573d95 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -20,26 +20,24 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; -use log::{warn, debug, error, info}; +use log::{warn, error, info}; use libp2p::core::swarm::NetworkBehaviour; use libp2p::core::{nodes::Substream, transport::boxed::Boxed, muxing::StreamMuxerBox}; use futures::{prelude::*, sync::oneshot, sync::mpsc}; use parking_lot::{Mutex, RwLock}; -use crate::custom_proto::{CustomProto, CustomProtoOut}; -use crate::{behaviour::Behaviour, parse_str_addr, ProtocolId}; +use crate::protocol_behaviour::ProtocolBehaviour; +use crate::{behaviour::Behaviour, parse_str_addr}; use crate::{NetworkState, NetworkStateNotConnectedPeer, NetworkStatePeer}; -use crate::{transport, config::NodeKeyConfig, config::NonReservedPeerMode, config::NetworkConfiguration}; +use crate::{transport, config::NodeKeyConfig, config::NonReservedPeerMode}; use peerset::PeersetHandle; use consensus::import_queue::{ImportQueue, Link, SharedFinalityProofRequestBuilder}; use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId}; use crate::AlwaysBadChecker; -use crate::chain::FinalityProofProvider; use crate::protocol::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient}; use crate::protocol::message::Message; use crate::protocol::on_demand::RequestData; -use crate::protocol::{self, Context, CustomMessageOutcome, Protocol, ConnectedPeer}; -use crate::protocol::{PeerInfo, NetworkOut}; +use crate::protocol::{self, Context, CustomMessageOutcome, ConnectedPeer, PeerInfo}; use crate::protocol::sync::SyncState; use crate::config::Params; use crate::error::Error; @@ -88,7 +86,7 @@ impl ReportHandle { } /// Substrate network service. Handles network IO and manages connectivity. -pub struct NetworkService> { +pub struct NetworkService, H: ExHashT> { /// Are we connected to any peer? is_offline: Arc, /// Are we actively catching up with the chain? @@ -98,7 +96,7 @@ pub struct NetworkService> { /// Channel for networking messages processed by the background thread. network_chan: mpsc::UnboundedSender>, /// Network service - network: Arc>>, + network: Arc>>, /// Bandwidth logging system. Can be queried to know the average bandwidth consumed. bandwidth: Arc, /// Peerset manager (PSM); manages the reputation of nodes and indicates the network which @@ -120,28 +118,106 @@ impl, H: ExHashT> NetworkWorker let (network_chan, network_port) = mpsc::unbounded(); let (protocol_sender, protocol_rx) = mpsc::unbounded(); + if let Some(ref path) = params.network_config.net_config_path { + fs::create_dir_all(Path::new(path))?; + } + + // List of multiaddresses that we know in the network. + let mut known_addresses = Vec::new(); + let mut bootnodes = Vec::new(); + let mut reserved_nodes = Vec::new(); + + // Process the bootnodes. + for bootnode in params.network_config.boot_nodes.iter() { + match parse_str_addr(bootnode) { + Ok((peer_id, addr)) => { + bootnodes.push(peer_id.clone()); + known_addresses.push((peer_id, addr)); + }, + Err(_) => warn!(target: "sub-libp2p", "Not a valid bootnode address: {}", bootnode), + } + } + + // Initialize the reserved peers. + for reserved in params.network_config.reserved_nodes.iter() { + if let Ok((peer_id, addr)) = parse_str_addr(reserved) { + reserved_nodes.push(peer_id.clone()); + known_addresses.push((peer_id, addr)); + } else { + warn!(target: "sub-libp2p", "Not a valid reserved node address: {}", reserved); + } + } + + // Build the peerset. + let (peerset, peerset_handle) = peerset::Peerset::from_config(peerset::PeersetConfig { + in_peers: params.network_config.in_peers, + out_peers: params.network_config.out_peers, + bootnodes, + reserved_only: params.network_config.non_reserved_mode == NonReservedPeerMode::Deny, + reserved_nodes, + }); + + // Private and public keys configuration. + if let NodeKeyConfig::Secp256k1(_) = params.network_config.node_key { + warn!(target: "sub-libp2p", "Secp256k1 keys are deprecated in favour of ed25519"); + } + let local_identity = params.network_config.node_key.clone().into_keypair()?; + let local_public = local_identity.public(); + let local_peer_id = local_public.clone().into_peer_id(); + info!(target: "sub-libp2p", "Local node identity is: {}", local_peer_id.to_base58()); + // Start in off-line mode, since we're not connected to any nodes yet. let is_offline = Arc::new(AtomicBool::new(true)); let is_major_syncing = Arc::new(AtomicBool::new(false)); let peers: Arc>>> = Arc::new(Default::default()); - let protocol = Protocol::new( + let protocol = ProtocolBehaviour::new( protocol::ProtocolConfig { roles: params.roles }, params.chain, params.on_demand.as_ref().map(|od| od.checker().clone()) .unwrap_or(Arc::new(AlwaysBadChecker)), params.specialization, + params.transaction_pool, + params.finality_proof_provider, + params.protocol_id, + &((protocol::MIN_VERSION as u8)..=(protocol::CURRENT_VERSION as u8)).collect::>(), + peerset, + peerset_handle.clone(), )?; - let versions: Vec<_> = ((protocol::MIN_VERSION as u8)..=(protocol::CURRENT_VERSION as u8)).collect(); - // Start the main service. - let (network, bandwidth, peerset) = - match start_service::(params.network_config, params.protocol_id, &versions) { - Ok((network, bandwidth, peerset)) => (Arc::new(Mutex::new(network)), bandwidth, peerset), - Err(err) => { - warn!("Error starting network: {}", err); - return Err(err.into()) - }, - }; + // Build the swarm. + let (mut swarm, bandwidth) = { + let user_agent = format!( + "{} ({})", + params.network_config.client_version, + params.network_config.node_name + ); + let behaviour = Behaviour::new( + protocol, + user_agent, + local_public, + known_addresses, + params.network_config.enable_mdns + ); + let (transport, bandwidth) = transport::build_transport( + local_identity, + params.network_config.wasm_external_transport + ); + (Swarm::::new(transport, behaviour, local_peer_id.clone()), bandwidth) + }; + + // Listen on multiaddresses. + for addr in ¶ms.network_config.listen_addresses { + if let Err(err) = Swarm::::listen_on(&mut swarm, addr.clone()) { + warn!(target: "sub-libp2p", "Can't listen on {} because: {:?}", addr, err) + } + } + + // Add external addresses. + for addr in ¶ms.network_config.public_addresses { + Swarm::::add_external_address(&mut swarm, addr.clone()); + } + + let network = Arc::new(Mutex::new(swarm)); let service = Arc::new(NetworkService { bandwidth, @@ -149,7 +225,7 @@ impl, H: ExHashT> NetworkWorker is_major_syncing: is_major_syncing.clone(), network_chan, peers: peers.clone(), - peerset: peerset.clone(), + peerset: peerset_handle.clone(), network: network.clone(), protocol_sender: protocol_sender.clone(), }); @@ -158,13 +234,10 @@ impl, H: ExHashT> NetworkWorker is_offline, is_major_syncing, network_service: network, - peerset, + peerset: peerset_handle, service, - protocol, peers, import_queue: params.import_queue, - transaction_pool: params.transaction_pool, - finality_proof_provider: params.finality_proof_provider, network_port, protocol_rx, on_demand_in: params.on_demand.and_then(|od| od.extract_receiver()), @@ -184,40 +257,40 @@ impl, H: ExHashT> NetworkWorker /// Returns the number of peers we're connected to. pub fn num_connected_peers(&self) -> usize { - self.protocol.num_connected_peers() + self.network_service.lock().user_protocol_mut().num_connected_peers() } /// Returns the number of peers we're connected to and that are being queried. pub fn num_active_peers(&self) -> usize { - self.protocol.num_active_peers() + self.network_service.lock().user_protocol_mut().num_active_peers() } /// Current global sync state. pub fn sync_state(&self) -> SyncState { - self.protocol.sync_state() + self.network_service.lock().user_protocol_mut().sync_state() } /// Target sync block number. pub fn best_seen_block(&self) -> Option> { - self.protocol.best_seen_block() + self.network_service.lock().user_protocol_mut().best_seen_block() } /// Number of peers participating in syncing. pub fn num_sync_peers(&self) -> u32 { - self.protocol.num_sync_peers() + self.network_service.lock().user_protocol_mut().num_sync_peers() } /// Return a `NetworkService` that can be shared through the code base and can be used to /// manipulate the worker. - pub fn service(&self) -> &Arc> { + pub fn service(&self) -> &Arc> { &self.service } } -impl> NetworkService { +impl, H: ExHashT> NetworkService { /// Returns the network identity of the node. pub fn local_peer_id(&self) -> PeerId { - Swarm::::local_peer_id(&*self.network.lock()).clone() + Swarm::::local_peer_id(&*self.network.lock()).clone() } /// Called when a new block is imported by the client. @@ -275,13 +348,6 @@ impl> NetworkService { let _ = self.network_chan.unbounded_send(NetworkMsg::DisconnectPeer(who)); } - /// Send a message to the given peer. Has no effect if we're not connected to this peer. - /// - /// This method is extremely poor in terms of API and should be eventually removed. - pub fn send_request(&self, who: PeerId, message: Message) { - let _ = self.network_chan.unbounded_send(NetworkMsg::Outgoing(who, message)); - } - /// Execute a closure with the chain-specific network specialization. pub fn with_spec(&self, f: F) where F: FnOnce(&mut S, &mut dyn Context) + Send + 'static @@ -304,7 +370,9 @@ impl> NetworkService { pub fn is_major_syncing(&self) -> bool { self.is_major_syncing.load(Ordering::Relaxed) } +} +impl, H: ExHashT> NetworkService { /// Get network state. pub fn network_state(&self) -> NetworkState { let mut swarm = self.network.lock(); @@ -326,7 +394,8 @@ impl> NetworkService { Some((peer_id.to_base58(), NetworkStatePeer { endpoint, - version_string: swarm.node(peer_id).and_then(|i| i.client_version().map(|s| s.to_owned())).clone(), + version_string: swarm.node(peer_id) + .and_then(|i| i.client_version().map(|s| s.to_owned())).clone(), latest_ping_time: swarm.node(peer_id).and_then(|i| i.latest_ping()), enabled: swarm.user_protocol().is_enabled(&peer_id), open: swarm.user_protocol().is_open(&peer_id), @@ -341,7 +410,8 @@ impl> NetworkService { .cloned().collect::>(); list.into_iter().map(move |peer_id| { (peer_id.to_base58(), NetworkStateNotConnectedPeer { - version_string: swarm.node(&peer_id).and_then(|i| i.client_version().map(|s| s.to_owned())).clone(), + version_string: swarm.node(&peer_id) + .and_then(|i| i.client_version().map(|s| s.to_owned())).clone(), latest_ping_time: swarm.node(&peer_id).and_then(|i| i.latest_ping()), known_addresses: NetworkBehaviour::addresses_of_peer(&mut **swarm, &peer_id) .into_iter().collect(), @@ -350,9 +420,9 @@ impl> NetworkService { }; NetworkState { - peer_id: Swarm::::local_peer_id(&swarm).to_base58(), - listened_addresses: Swarm::::listeners(&swarm).cloned().collect(), - external_addresses: Swarm::::external_addresses(&swarm).cloned().collect(), + peer_id: Swarm::::local_peer_id(&swarm).to_base58(), + listened_addresses: Swarm::::listeners(&swarm).cloned().collect(), + external_addresses: Swarm::::external_addresses(&swarm).cloned().collect(), average_download_per_sec: self.bandwidth.average_download_per_sec(), average_upload_per_sec: self.bandwidth.average_upload_per_sec(), connected_peers, @@ -371,7 +441,8 @@ impl> NetworkService { } } -impl> ::consensus::SyncOracle for NetworkService { +impl, H: ExHashT> + ::consensus::SyncOracle for NetworkService { fn is_major_syncing(&self) -> bool { self.is_major_syncing() } @@ -393,7 +464,7 @@ pub trait ManageNetwork { fn add_reserved_peer(&self, peer: String) -> Result<(), String>; } -impl> ManageNetwork for NetworkService { +impl, H: ExHashT> ManageNetwork for NetworkService { fn accept_unreserved_peers(&self) { self.peerset.set_reserved_only(false); } @@ -498,14 +569,11 @@ impl, &mut dyn Context)> GossipT pub struct NetworkWorker, H: ExHashT> { is_offline: Arc, is_major_syncing: Arc, - protocol: Protocol, /// The network service that can be extracted and shared through the codebase. - service: Arc>, - network_service: Arc>>, + service: Arc>, + network_service: Arc>>, peers: Arc>>>, import_queue: Box>, - transaction_pool: Arc>, - finality_proof_provider: Option>>, network_port: mpsc::UnboundedReceiver>, protocol_rx: mpsc::UnboundedReceiver>, peerset: PeersetHandle, @@ -520,48 +588,33 @@ impl, H: ExHashT> Future for Ne type Error = io::Error; fn poll(&mut self) -> Poll { - // Implementation of `protocol::NetworkOut` trait using the available local variables. - struct Context<'a, B: BlockT>(&'a mut Swarm, &'a PeersetHandle); - impl<'a, B: BlockT> NetworkOut for Context<'a, B> { - fn report_peer(&mut self, who: PeerId, reputation: i32) { - self.1.report_peer(who, reputation) - } - fn disconnect_peer(&mut self, who: PeerId) { - self.0.user_protocol_mut().disconnect_peer(&who) - } - fn send_message(&mut self, who: PeerId, message: Message) { - self.0.user_protocol_mut().send_packet(&who, message) - } - } - // Implementation of `import_queue::Link` trait using the available local variables. struct NetworkLink<'a, B: BlockT, S: NetworkSpecialization, H: ExHashT> { - protocol: &'a mut Protocol, - context: Context<'a, B>, + protocol: &'a mut Swarm, } impl<'a, B: BlockT, S: NetworkSpecialization, H: ExHashT> Link for NetworkLink<'a, B, S, H> { fn block_imported(&mut self, hash: &B::Hash, number: NumberFor) { - self.protocol.block_imported(&hash, number) + self.protocol.user_protocol_mut().block_imported(&hash, number) } fn blocks_processed(&mut self, hashes: Vec, has_error: bool) { - self.protocol.blocks_processed(&mut self.context, hashes, has_error) + self.protocol.user_protocol_mut().blocks_processed(hashes, has_error) } fn justification_imported(&mut self, who: PeerId, hash: &B::Hash, number: NumberFor, success: bool) { - self.protocol.justification_import_result(hash.clone(), number, success); + self.protocol.user_protocol_mut().justification_import_result(hash.clone(), number, success); if !success { info!("Invalid justification provided by {} for #{}", who, hash); - self.context.0.user_protocol_mut().disconnect_peer(&who); - self.context.1.report_peer(who, i32::min_value()); + self.protocol.user_protocol_mut().disconnect_peer(&who); + self.protocol.user_protocol_mut().report_peer(who, i32::min_value()); } } fn clear_justification_requests(&mut self) { - self.protocol.clear_justification_requests() + self.protocol.user_protocol_mut().clear_justification_requests() } fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { - self.protocol.request_justification(&mut self.context, hash, number) + self.protocol.user_protocol_mut().request_justification(hash, number) } fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor) { - self.protocol.request_finality_proof(&mut self.context, hash, number) + self.protocol.user_protocol_mut().request_finality_proof(hash, number) } fn finality_proof_imported( &mut self, @@ -570,54 +623,45 @@ impl, H: ExHashT> Future for Ne finalization_result: Result<(B::Hash, NumberFor), ()>, ) { let success = finalization_result.is_ok(); - self.protocol.finality_proof_import_result(request_block, finalization_result); + self.protocol.user_protocol_mut().finality_proof_import_result(request_block, finalization_result); if !success { info!("Invalid finality proof provided by {} for #{}", who, request_block.0); - self.context.0.user_protocol_mut().disconnect_peer(&who); - self.context.1.report_peer(who, i32::min_value()); + self.protocol.user_protocol_mut().disconnect_peer(&who); + self.protocol.user_protocol_mut().report_peer(who, i32::min_value()); } } fn report_peer(&mut self, who: PeerId, reputation_change: i32) { - self.context.1.report_peer(who, reputation_change) + self.protocol.user_protocol_mut().report_peer(who, reputation_change) } fn restart(&mut self) { - self.protocol.restart(&mut self.context) + self.protocol.user_protocol_mut().restart() } fn set_finality_proof_request_builder(&mut self, builder: SharedFinalityProofRequestBuilder) { - self.protocol.set_finality_proof_request_builder(builder) + self.protocol.user_protocol_mut().set_finality_proof_request_builder(builder) } } { let mut network_service = self.network_service.lock(); let mut link = NetworkLink { - protocol: &mut self.protocol, - context: Context(&mut network_service, &self.peerset), + protocol: &mut network_service, }; self.import_queue.poll_actions(&mut link); } while let Ok(Async::Ready(_)) = self.connected_peers_interval.poll() { - let infos = self.protocol.peers_info().map(|(id, info)| { + let mut network_service = self.network_service.lock(); + let infos = network_service.user_protocol_mut().peers_info().map(|(id, info)| { (id.clone(), ConnectedPeer { peer_info: info.clone() }) }).collect(); *self.peers.write() = infos; } - { - let mut network_service = self.network_service.lock(); - let mut ctxt = Context(&mut *network_service, &self.peerset); - match self.protocol.poll(&mut ctxt, &*self.transaction_pool) { - Ok(Async::Ready(v)) => void::unreachable(v), - Ok(Async::NotReady) => {} - Err(err) => void::unreachable(err), - } - } - // Check for new incoming on-demand requests. if let Some(on_demand_in) = self.on_demand_in.as_mut() { while let Ok(Async::Ready(Some(rq))) = on_demand_in.poll() { - self.protocol.add_on_demand_request(&mut Context(&mut self.network_service.lock(), &self.peerset), rq); + let mut network_service = self.network_service.lock(); + network_service.user_protocol_mut().add_on_demand_request(rq); } } @@ -646,47 +690,49 @@ impl, H: ExHashT> Future for Ne }; let mut network_service = self.network_service.lock(); - let mut network_out = Context(&mut network_service, &self.peerset); match msg { ProtocolMsg::BlockImported(hash, header) => - self.protocol.on_block_imported(&mut network_out, hash, &header), + network_service.user_protocol_mut().on_block_imported(hash, &header), ProtocolMsg::BlockFinalized(hash, header) => - self.protocol.on_block_finalized(&mut network_out, hash, &header), + network_service.user_protocol_mut().on_block_finalized(hash, &header), ProtocolMsg::ExecuteWithSpec(task) => { - let (mut context, spec) = self.protocol.specialization_lock(&mut network_out); + let (protocol, mut net_out) = network_service.user_protocol_mut().protocol_context_lock(); + let (mut context, spec) = protocol.specialization_lock(&mut net_out); task.call_box(spec, &mut context); }, ProtocolMsg::ExecuteWithGossip(task) => { - let (mut context, gossip) = self.protocol.consensus_gossip_lock(&mut network_out); + let (protocol, mut net_out) = network_service.user_protocol_mut().protocol_context_lock(); + let (mut context, gossip) = protocol.consensus_gossip_lock(&mut net_out); task.call_box(gossip, &mut context); } ProtocolMsg::GossipConsensusMessage(topic, engine_id, message, recipient) => - self.protocol.gossip_consensus_message(&mut network_out, topic, engine_id, message, recipient), + network_service.user_protocol_mut().gossip_consensus_message(topic, engine_id, message, recipient), ProtocolMsg::BlocksProcessed(hashes, has_error) => - self.protocol.blocks_processed(&mut network_out, hashes, has_error), + network_service.user_protocol_mut().blocks_processed(hashes, has_error), ProtocolMsg::RestartSync => - self.protocol.restart(&mut network_out), + network_service.user_protocol_mut().restart(), ProtocolMsg::AnnounceBlock(hash) => - self.protocol.announce_block(&mut network_out, hash), + network_service.user_protocol_mut().announce_block(hash), ProtocolMsg::BlockImportedSync(hash, number) => - self.protocol.block_imported(&hash, number), + network_service.user_protocol_mut().block_imported(&hash, number), ProtocolMsg::ClearJustificationRequests => - self.protocol.clear_justification_requests(), + network_service.user_protocol_mut().clear_justification_requests(), ProtocolMsg::RequestJustification(hash, number) => - self.protocol.request_justification(&mut network_out, &hash, number), + network_service.user_protocol_mut().request_justification(&hash, number), ProtocolMsg::JustificationImportResult(hash, number, success) => - self.protocol.justification_import_result(hash, number, success), + network_service.user_protocol_mut().justification_import_result(hash, number, success), ProtocolMsg::SetFinalityProofRequestBuilder(builder) => - self.protocol.set_finality_proof_request_builder(builder), + network_service.user_protocol_mut().set_finality_proof_request_builder(builder), ProtocolMsg::RequestFinalityProof(hash, number) => - self.protocol.request_finality_proof(&mut network_out, &hash, number), + network_service.user_protocol_mut().request_finality_proof(&hash, number), ProtocolMsg::FinalityProofImportResult(requested_block, finalziation_result) => - self.protocol.finality_proof_import_result(requested_block, finalziation_result), + network_service.user_protocol_mut() + .finality_proof_import_result(requested_block, finalziation_result), ProtocolMsg::PropagateExtrinsics => - self.protocol.propagate_extrinsics(&mut network_out, &*self.transaction_pool), + network_service.user_protocol_mut().propagate_extrinsics(), #[cfg(any(test, feature = "test-helpers"))] - ProtocolMsg::Tick => self.protocol.tick(&mut network_out), + ProtocolMsg::Tick => network_service.user_protocol_mut().tick(), #[cfg(any(test, feature = "test-helpers"))] ProtocolMsg::Synchronize => {}, } @@ -695,39 +741,11 @@ impl, H: ExHashT> Future for Ne loop { let mut network_service = self.network_service.lock(); let poll_value = network_service.poll(); - let mut network_out = Context(&mut network_service, &self.peerset); let outcome = match poll_value { Ok(Async::NotReady) => break, - Ok(Async::Ready(Some(CustomProtoOut::CustomProtocolOpen { peer_id, version, .. }))) => { - debug_assert!( - version <= protocol::CURRENT_VERSION as u8 - && version >= protocol::MIN_VERSION as u8 - ); - self.protocol.on_peer_connected(&mut network_out, peer_id); - CustomMessageOutcome::None - } - Ok(Async::Ready(Some(CustomProtoOut::CustomProtocolClosed { peer_id, .. }))) => { - self.protocol.on_peer_disconnected(&mut network_out, peer_id); - CustomMessageOutcome::None - }, - Ok(Async::Ready(Some(CustomProtoOut::CustomMessage { peer_id, message }))) => - self.protocol.on_custom_message( - &mut network_out, - &*self.transaction_pool, - peer_id, - message, - self.finality_proof_provider.as_ref().map(|p| &**p) - ), - Ok(Async::Ready(Some(CustomProtoOut::Clogged { peer_id, messages, .. }))) => { - debug!(target: "sync", "{} clogging messages:", messages.len()); - for msg in messages.into_iter().take(5) { - debug!(target: "sync", "{:?}", msg); - self.protocol.on_clogged_peer(&mut network_out, peer_id.clone(), Some(msg)); - } - CustomMessageOutcome::None - } - Ok(Async::Ready(None)) => return Ok(Async::Ready(())), + Ok(Async::Ready(Some(outcome))) => outcome, + Ok(Async::Ready(None)) => CustomMessageOutcome::None, Err(err) => { error!(target: "sync", "Error in the network: {:?}", err); return Err(err) @@ -745,8 +763,9 @@ impl, H: ExHashT> Future for Ne } } - self.is_offline.store(self.protocol.num_connected_peers() == 0, Ordering::Relaxed); - self.is_major_syncing.store(match self.protocol.sync_state() { + let mut network_service = self.network_service.lock(); + self.is_offline.store(network_service.user_protocol_mut().num_connected_peers() == 0, Ordering::Relaxed); + self.is_major_syncing.store(match network_service.user_protocol_mut().sync_state() { SyncState::Idle => false, SyncState::Downloading => true, }, Ordering::Relaxed); @@ -756,91 +775,7 @@ impl, H: ExHashT> Future for Ne } /// The libp2p swarm, customized for our needs. -type Swarm = libp2p::core::Swarm< +type Swarm = libp2p::core::Swarm< Boxed<(PeerId, StreamMuxerBox), io::Error>, - Behaviour, Substream>, CustomProtoOut>, Substream> + Behaviour, CustomMessageOutcome, Substream> >; - -/// Starts the substrate libp2p service. -/// -/// Returns a stream that must be polled regularly in order for the networking to function. -fn start_service>( - config: NetworkConfiguration, - protocol_id: Pid, - versions: &[u8], -) -> Result<(Swarm, Arc, peerset::PeersetHandle), io::Error> { - - if let Some(ref path) = config.net_config_path { - fs::create_dir_all(Path::new(path))?; - } - - // List of multiaddresses that we know in the network. - let mut known_addresses = Vec::new(); - let mut bootnodes = Vec::new(); - let mut reserved_nodes = Vec::new(); - - // Process the bootnodes. - for bootnode in config.boot_nodes.iter() { - match parse_str_addr(bootnode) { - Ok((peer_id, addr)) => { - bootnodes.push(peer_id.clone()); - known_addresses.push((peer_id, addr)); - }, - Err(_) => warn!(target: "sub-libp2p", "Not a valid bootnode address: {}", bootnode), - } - } - - // Initialize the reserved peers. - for reserved in config.reserved_nodes.iter() { - if let Ok((peer_id, addr)) = parse_str_addr(reserved) { - reserved_nodes.push(peer_id.clone()); - known_addresses.push((peer_id, addr)); - } else { - warn!(target: "sub-libp2p", "Not a valid reserved node address: {}", reserved); - } - } - - // Build the peerset. - let (peerset, peerset_handle) = peerset::Peerset::from_config(peerset::PeersetConfig { - in_peers: config.in_peers, - out_peers: config.out_peers, - bootnodes, - reserved_only: config.non_reserved_mode == NonReservedPeerMode::Deny, - reserved_nodes, - }); - - // Private and public keys configuration. - if let NodeKeyConfig::Secp256k1(_) = config.node_key { - warn!(target: "sub-libp2p", "Secp256k1 keys are deprecated in favour of ed25519"); - } - let local_identity = config.node_key.clone().into_keypair()?; - let local_public = local_identity.public(); - let local_peer_id = local_public.clone().into_peer_id(); - info!(target: "sub-libp2p", "Local node identity is: {}", local_peer_id.to_base58()); - - // Build the swarm. - let (mut swarm, bandwidth) = { - let user_agent = format!("{} ({})", config.client_version, config.node_name); - let proto = CustomProto::new(protocol_id, versions, peerset); - let behaviour = Behaviour::new(proto, user_agent, local_public, known_addresses, config.enable_mdns); - let (transport, bandwidth) = transport::build_transport( - local_identity, - config.wasm_external_transport - ); - (Swarm::::new(transport, behaviour, local_peer_id.clone()), bandwidth) - }; - - // Listen on multiaddresses. - for addr in &config.listen_addresses { - if let Err(err) = Swarm::::listen_on(&mut swarm, addr.clone()) { - warn!(target: "sub-libp2p", "Can't listen on {} because: {:?}", addr, err) - } - } - - // Add external addresses. - for addr in &config.public_addresses { - Swarm::::add_external_address(&mut swarm, addr.clone()); - } - - Ok((swarm, bandwidth, peerset_handle)) -} diff --git a/substrate/core/service/src/components.rs b/substrate/core/service/src/components.rs index 85ab9feb99..a41f9e94ff 100644 --- a/substrate/core/service/src/components.rs +++ b/substrate/core/service/src/components.rs @@ -38,9 +38,13 @@ use futures::sync::mpsc; // Type aliases. // These exist mainly to avoid typing `::Foo` all over the code. -/// Network service type for a factory. -pub type NetworkService = - network::NetworkService<::Block, ::NetworkProtocol>; + +/// Network service type for `Components`. +pub type NetworkService = network::NetworkService< + ComponentBlock, + <::Factory as ServiceFactory>::NetworkProtocol, + ComponentExHash +>; /// Code executor type for a factory. pub type CodeExecutor = NativeExecutor<::RuntimeDispatch>; diff --git a/substrate/core/service/src/lib.rs b/substrate/core/service/src/lib.rs index 85a39cf6c2..f66b083fd0 100644 --- a/substrate/core/service/src/lib.rs +++ b/substrate/core/service/src/lib.rs @@ -75,7 +75,7 @@ const DEFAULT_PROTOCOL_ID: &str = "sup"; pub struct Service { client: Arc>, select_chain: Option<::SelectChain>, - network: Arc>, + network: Arc>, /// Sinks to propagate network status updates. network_status_sinks: Arc>>>>>, transaction_pool: Arc>, @@ -498,7 +498,7 @@ impl Service where Components: components::Components { } /// Get shared network instance. - pub fn network(&self) -> Arc> { + pub fn network(&self) -> Arc> { self.network.clone() } @@ -684,7 +684,7 @@ impl network::TransactionPool, ComponentBlock< /// Builds a never-ending `Future` that answers the RPC requests coming on the receiver. fn build_system_rpc_handler( - network: Arc>, + network: Arc>, rx: mpsc::UnboundedReceiver>>, should_have_peers: bool, ) -> impl Future {