diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index ac6f721d81..32bcf235ab 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -57,7 +57,7 @@ dependencies = [ "parking_lot 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-executor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-timer 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1180,7 +1180,7 @@ dependencies = [ "tokio-reactor 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-tcp 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-threadpool 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-timer 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", "want 0.0.6 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -4012,7 +4012,7 @@ dependencies = [ "substrate-inherents 2.0.0", "substrate-primitives 2.0.0", "substrate-test-client 2.0.0", - "tokio-timer 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -4180,6 +4180,7 @@ dependencies = [ "substrate-primitives 2.0.0", "substrate-test-client 2.0.0", "tokio 0.1.19 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", "void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -4206,7 +4207,7 @@ dependencies = [ "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.19 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-timer 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", "unsigned-varint 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "zeroize 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4722,7 +4723,7 @@ dependencies = [ "tokio-sync 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-tcp 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-threadpool 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-timer 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-trace-core 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-udp 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-uds 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4845,7 +4846,7 @@ dependencies = [ [[package]] name = "tokio-timer" -version = "0.2.10" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "crossbeam-utils 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", @@ -5272,7 +5273,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)", "send_wrapper 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-timer 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", "wasm-bindgen 0.2.42 (registry+https://github.com/rust-lang/crates.io-index)", "web-sys 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -5838,7 +5839,7 @@ dependencies = [ "checksum tokio-sync 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "5b2f843ffdf8d6e1f90bddd48da43f99ab071660cd92b7ec560ef3cdfd7a409a" "checksum tokio-tcp 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "1d14b10654be682ac43efee27401d792507e30fd8d26389e1da3b185de2e4119" "checksum tokio-threadpool 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)" = "72558af20be886ea124595ea0f806dd5703b8958e4705429dd58b3d8231f72f2" -"checksum tokio-timer 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "2910970404ba6fa78c5539126a9ae2045d62e3713041e447f695f41405a120c6" +"checksum tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "f2106812d500ed25a4f38235b9cae8f78a09edf43203e16e59c3b769a342a60e" "checksum tokio-tls 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "354b8cd83825b3c20217a9dc174d6a0c67441a2fae5c41bcb1ea6679f6ae0f7c" "checksum tokio-trace-core 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "350c9edade9830dc185ae48ba45667a445ab59f6167ef6d0254ec9d2430d9dd3" "checksum tokio-udp 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "66268575b80f4a4a710ef83d087fdfeeabdce9b74c797535fbac18a2cb906e92" diff --git a/substrate/core/finality-grandpa/src/communication/mod.rs b/substrate/core/finality-grandpa/src/communication/mod.rs index e4343352d9..05033c2c41 100644 --- a/substrate/core/finality-grandpa/src/communication/mod.rs +++ b/substrate/core/finality-grandpa/src/communication/mod.rs @@ -39,7 +39,7 @@ use substrate_primitives::{ed25519, Pair}; use substrate_telemetry::{telemetry, CONSENSUS_DEBUG, CONSENSUS_INFO}; use runtime_primitives::ConsensusEngineId; use runtime_primitives::traits::{Block as BlockT, Hash as HashT, Header as HeaderT}; -use network::{consensus_gossip as network_gossip, Service as NetworkService}; +use network::{consensus_gossip as network_gossip, NetworkService}; use network_gossip::ConsensusMessage; use crate::{Error, Message, SignedMessage, Commit, CompactCommit}; diff --git a/substrate/core/network/Cargo.toml b/substrate/core/network/Cargo.toml index d3b7a71f03..6576ccabbf 100644 --- a/substrate/core/network/Cargo.toml +++ b/substrate/core/network/Cargo.toml @@ -27,7 +27,8 @@ runtime_primitives = { package = "sr-primitives", path = "../../core/sr-primitiv parity-codec = { version = "3.3", features = ["derive"] } network_libp2p = { package = "substrate-network-libp2p", path = "../../core/network-libp2p" } peerset = { package = "substrate-peerset", path = "../../core/peerset" } -tokio = "0.1.11" +tokio-timer = "0.2.11" +tokio = { version = "0.1.11", optional = true } keyring = { package = "substrate-keyring", path = "../../core/keyring", optional = true } test_client = { package = "substrate-test-client", path = "../../core/test-client", optional = true } void = "1.0" @@ -37,7 +38,8 @@ env_logger = { version = "0.6" } keyring = { package = "substrate-keyring", path = "../../core/keyring" } test_client = { package = "substrate-test-client", path = "../../core/test-client" } consensus = { package = "substrate-consensus-common", path = "../../core/consensus/common", features = ["test-helpers"] } +tokio = "0.1.11" [features] default = [] -test-helpers = ["keyring", "test_client", "consensus/test-helpers"] +test-helpers = ["keyring", "test_client", "consensus/test-helpers", "tokio"] diff --git a/substrate/core/network/src/lib.rs b/substrate/core/network/src/lib.rs index e3ed56d5ad..e2def15425 100644 --- a/substrate/core/network/src/lib.rs +++ b/substrate/core/network/src/lib.rs @@ -43,8 +43,8 @@ pub mod test; pub use chain::{Client as ClientHandle, FinalityProofProvider}; pub use service::{ - Service, FetchFuture, TransactionPool, ManageNetwork, NetworkMsg, - SyncProvider, ExHashT, ReportHandle, + NetworkService, NetworkWorker, FetchFuture, TransactionPool, ManageNetwork, + NetworkMsg, SyncProvider, ExHashT, ReportHandle, }; pub use protocol::{ProtocolStatus, PeerInfo, Context}; pub use sync::{Status as SyncStatus, SyncState}; diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index d0b14bada7..2d7799b5bb 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -80,9 +80,9 @@ const RPC_FAILED_REPUTATION_CHANGE: i32 = -(1 << 12); // Lock must always be taken in order declared here. pub struct Protocol, H: ExHashT> { /// Interval at which we call `tick`. - tick_timeout: tokio::timer::Interval, + tick_timeout: tokio_timer::Interval, /// Interval at which we call `propagate_extrinsics`. - propagate_timeout: tokio::timer::Interval, + propagate_timeout: tokio_timer::Interval, config: ProtocolConfig, /// Handler for on-demand requests. on_demand_core: OnDemandCore, @@ -395,8 +395,8 @@ impl, H: ExHashT> Protocol { let info = chain.info()?; let sync = ChainSync::new(config.roles, &info); Ok(Protocol { - tick_timeout: tokio::timer::Interval::new_interval(TICK_TIMEOUT), - propagate_timeout: tokio::timer::Interval::new_interval(PROPAGATE_TIMEOUT), + tick_timeout: tokio_timer::Interval::new_interval(TICK_TIMEOUT), + propagate_timeout: tokio_timer::Interval::new_interval(PROPAGATE_TIMEOUT), config: config, context_data: ContextData { peers: HashMap::new(), diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index 805fb55ddb..c7a32d472f 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -15,15 +15,16 @@ // along with Substrate. If not, see . use std::collections::HashMap; +use std::io; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; -use std::{io, thread, time::Duration}; +use std::time::Duration; use log::{warn, debug, error, info}; -use futures::{Async, Future, Stream, sync::oneshot, sync::mpsc}; +use futures::{prelude::*, sync::oneshot, sync::mpsc}; use parking_lot::{Mutex, RwLock}; -use network_libp2p::{start_service, parse_str_addr, Service as NetworkService, ServiceEvent as NetworkServiceEvent}; -use network_libp2p::{NetworkConfiguration, RegisteredProtocol, NetworkState}; +use network_libp2p::{start_service, parse_str_addr, Service as Libp2pNetService, ServiceEvent as Libp2pNetServiceEvent}; +use network_libp2p::{RegisteredProtocol, NetworkState}; use peerset::PeersetHandle; use consensus::import_queue::{ImportQueue, Link, SharedFinalityProofRequestBuilder}; use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId}; @@ -39,8 +40,6 @@ use crate::config::Params; use crate::error::Error; use crate::specialization::NetworkSpecialization; -use tokio::runtime::Builder as RuntimeBuilder; - /// Interval at which we send status updates on the SyncProvider status stream. const STATUS_INTERVAL: Duration = Duration::from_millis(5000); /// Interval at which we update the `peers` field on the main thread. @@ -176,7 +175,7 @@ impl ReportHandle { } /// Substrate network service. Handles network IO and manages connectivity. -pub struct Service> { +pub struct NetworkService> { /// Sinks to propagate status updates. status_sinks: Arc>>>>, /// Are we connected to any peer? @@ -188,23 +187,23 @@ pub struct Service> { /// Channel for networking messages processed by the background thread. network_chan: mpsc::UnboundedSender>, /// Network service - network: Arc>>>, + network: Arc>>>, /// Peerset manager (PSM); manages the reputation of nodes and indicates the network which /// nodes it should be connected to or not. peerset: PeersetHandle, /// Protocol sender protocol_sender: mpsc::UnboundedSender>, - /// Sender for messages to the background service task, and handle for the background thread. - /// Dropping the sender should close the task and the thread. - /// This is an `Option` because we need to extract it in the destructor. - bg_thread: Option<(oneshot::Sender<()>, thread::JoinHandle<()>)>, } -impl> Service { - /// Creates and register protocol with the network service - pub fn new( +impl, H: ExHashT> NetworkWorker { + /// Creates the network service. + /// + /// Returns a `NetworkWorker` that implements `Future` and must be regularly polled in order + /// for the network processing to advance. From it, you can extract a `NetworkService` using + /// `worker.service()`. The `NetworkService` can be shared through the codebase. + pub fn new( params: Params, - ) -> Result>, Error> { + ) -> Result, Error> { let (network_chan, network_port) = mpsc::unbounded(); let (protocol_sender, protocol_rx) = mpsc::unbounded(); let status_sinks = Arc::new(Mutex::new(Vec::new())); @@ -229,35 +228,55 @@ impl> Service { )?; let versions: Vec<_> = ((protocol::MIN_VERSION as u8)..=(protocol::CURRENT_VERSION as u8)).collect(); let registered = RegisteredProtocol::new(params.protocol_id, &versions); - let (thread, network, peerset) = start_thread( - is_offline.clone(), - is_major_syncing.clone(), - protocol, - peers.clone(), - params.import_queue, - params.transaction_pool, - params.finality_proof_provider, - network_port, - protocol_rx, - status_sinks.clone(), - params.network_config, - registered, - params.on_demand.and_then(|od| od.extract_receiver()), - )?; - Ok(Arc::new(Service { - status_sinks, + // Start the main service. + let (network, peerset) = match start_service(params.network_config, registered) { + Ok((network, peerset)) => (Arc::new(Mutex::new(network)), peerset), + Err(err) => { + warn!("Error starting network: {}", err); + return Err(err.into()) + }, + }; + + let service = Arc::new(NetworkService { + status_sinks: status_sinks.clone(), + is_offline: is_offline.clone(), + is_major_syncing: is_major_syncing.clone(), + network_chan, + peers: peers.clone(), + peerset: peerset.clone(), + network: network.clone(), + protocol_sender: protocol_sender.clone(), + }); + + Ok(NetworkWorker { is_offline, is_major_syncing, - network_chan, - peers, + network_service: network, peerset, - network, - protocol_sender, - bg_thread: Some(thread), - })) + service, + protocol, + peers, + import_queue: params.import_queue, + transaction_pool: params.transaction_pool, + finality_proof_provider: params.finality_proof_provider, + network_port, + protocol_rx, + status_sinks, + on_demand_in: params.on_demand.and_then(|od| od.extract_receiver()), + status_interval: tokio_timer::Interval::new_interval(STATUS_INTERVAL), + connected_peers_interval: tokio_timer::Interval::new_interval(CONNECTED_PEERS_INTERVAL), + }) } + /// Return a `NetworkService` that can be shared through the code base and can be used to + /// manipulate the worker. + pub fn service(&self) -> &Arc> { + &self.service + } +} + +impl> NetworkService { /// Returns the downloaded bytes per second averaged over the past few seconds. #[inline] pub fn average_download_per_sec(&self) -> u64 { @@ -362,7 +381,7 @@ impl> Service { } } -impl> ::consensus::SyncOracle for Service { +impl> ::consensus::SyncOracle for NetworkService { fn is_major_syncing(&self) -> bool { self.is_major_syncing() } @@ -372,18 +391,7 @@ impl> ::consensus::SyncOracle f } } -impl> Drop for Service { - fn drop(&mut self) { - if let Some((sender, join)) = self.bg_thread.take() { - let _ = sender.send(()); - if let Err(e) = join.join() { - error!("Error while waiting on background thread: {:?}", e); - } - } - } -} - -impl> SyncProvider for Service { +impl> SyncProvider for NetworkService { fn is_major_syncing(&self) -> bool { self.is_major_syncing() } @@ -417,7 +425,7 @@ pub trait ManageNetwork { fn add_reserved_peer(&self, peer: String) -> Result<(), String>; } -impl> ManageNetwork for Service { +impl> ManageNetwork for NetworkService { fn accept_unreserved_peers(&self) { self.peerset.set_reserved_only(false); } @@ -438,7 +446,7 @@ impl> ManageNetwork for Service } } -/// Messages to be handled by NetworkService. +/// Messages to be handled by Libp2pNetService. #[derive(Debug)] pub enum NetworkMsg { /// Send an outgoing custom message. @@ -516,11 +524,16 @@ impl, &mut Context)> GossipTask< } } -/// Starts the background thread that handles the networking. -fn start_thread, H: ExHashT>( +/// Future tied to the `Network` service and that must be polled in order for the network to +/// advance. +#[must_use = "The NetworkWorker must be polled in order for the network to work"] +pub struct NetworkWorker, H: ExHashT> { is_offline: Arc, is_major_syncing: Arc, protocol: Protocol, + /// The network service that can be extracted and shared through the codebase. + service: Arc>, + network_service: Arc>>>, peers: Arc>>>, import_queue: Box>, transaction_pool: Arc>, @@ -528,124 +541,68 @@ fn start_thread, H: ExHashT>( network_port: mpsc::UnboundedReceiver>, protocol_rx: mpsc::UnboundedReceiver>, status_sinks: Arc>>>>, - config: NetworkConfiguration, - registered: RegisteredProtocol>, + peerset: PeersetHandle, on_demand_in: Option>>, -) -> Result<((oneshot::Sender<()>, thread::JoinHandle<()>), Arc>>>, PeersetHandle), Error> { - // Start the main service. - let (service, peerset) = match start_service(config, registered) { - Ok((service, peerset)) => (Arc::new(Mutex::new(service)), peerset), - Err(err) => { - warn!("Error starting network: {}", err); - return Err(err.into()) - }, - }; - let (close_tx, close_rx) = oneshot::channel(); - let service_clone = service.clone(); - let mut runtime = RuntimeBuilder::new().name_prefix("libp2p-").build()?; - let peerset_clone = peerset.clone(); - let thread = thread::Builder::new().name("network".to_string()).spawn(move || { - let fut = run_thread( - is_offline, - is_major_syncing, - protocol, - service_clone, - peers, - import_queue, - transaction_pool, - finality_proof_provider, - network_port, - protocol_rx, - status_sinks, - peerset_clone, - on_demand_in - ) - .select(close_rx.then(|_| Ok(()))) - .map(|(val, _)| val) - .map_err(|(err,_ )| err); - - // Note that we use `block_on` and not `block_on_all` because we want to kill the thread - // instantly if `close_rx` receives something. - match runtime.block_on(fut) { - Ok(()) => debug!(target: "sub-libp2p", "Networking thread finished"), - Err(err) => error!(target: "sub-libp2p", "Error while running libp2p: {:?}", err), - }; - })?; - - Ok(((close_tx, thread), service, peerset)) + /// Interval at which we send status updates on the `status_sinks`. + status_interval: tokio_timer::Interval, + /// Interval at which we update the `connected_peers` Arc. + connected_peers_interval: tokio_timer::Interval, } -/// Runs the background thread that handles the networking. -fn run_thread, H: ExHashT>( - is_offline: Arc, - is_major_syncing: Arc, - mut protocol: Protocol, - network_service: Arc>>>, - peers: Arc>>>, - import_queue: Box>, - transaction_pool: Arc>, - finality_proof_provider: Option>>, - mut network_port: mpsc::UnboundedReceiver>, - mut protocol_rx: mpsc::UnboundedReceiver>, - status_sinks: Arc>>>>, - peerset: PeersetHandle, - mut on_demand_in: Option>>, -) -> impl Future { - // Implementation of `protocol::NetworkOut` using the available local variables. - struct Ctxt<'a, B: BlockT>(&'a mut NetworkService>, &'a PeersetHandle); - impl<'a, B: BlockT> NetworkOut for Ctxt<'a, B> { - fn report_peer(&mut self, who: PeerId, reputation: i32) { - self.1.report_peer(who, reputation) - } - fn disconnect_peer(&mut self, who: PeerId) { - self.0.drop_node(&who) - } - fn send_message(&mut self, who: PeerId, message: Message) { - self.0.send_custom_message(&who, message) - } - } +impl, H: ExHashT> Future for NetworkWorker { + type Item = (); + type Error = io::Error; - // Interval at which we send status updates on the `status_sinks`. - let mut status_interval = tokio::timer::Interval::new_interval(STATUS_INTERVAL); - // Interval at which we update the `connected_peers` Arc. - let mut connected_peers_interval = tokio::timer::Interval::new_interval(CONNECTED_PEERS_INTERVAL); - - futures::future::poll_fn(move || { - while let Ok(Async::Ready(_)) = status_interval.poll() { - let status = protocol.status(); - status_sinks.lock().retain(|sink| sink.unbounded_send(status.clone()).is_ok()); + fn poll(&mut self) -> Poll { + // Implementation of `protocol::NetworkOut` using the available local variables. + struct Context<'a, B: BlockT>(&'a mut Libp2pNetService>, &'a PeersetHandle); + impl<'a, B: BlockT> NetworkOut for Context<'a, B> { + fn report_peer(&mut self, who: PeerId, reputation: i32) { + self.1.report_peer(who, reputation) + } + fn disconnect_peer(&mut self, who: PeerId) { + self.0.drop_node(&who) + } + fn send_message(&mut self, who: PeerId, message: Message) { + self.0.send_custom_message(&who, message) + } } - while let Ok(Async::Ready(_)) = connected_peers_interval.poll() { - let infos = protocol.peers_info().map(|(id, info)| { + while let Ok(Async::Ready(_)) = self.status_interval.poll() { + let status = self.protocol.status(); + self.status_sinks.lock().retain(|sink| sink.unbounded_send(status.clone()).is_ok()); + } + + while let Ok(Async::Ready(_)) = self.connected_peers_interval.poll() { + let infos = self.protocol.peers_info().map(|(id, info)| { (id.clone(), ConnectedPeer { peer_info: info.clone() }) }).collect(); - *peers.write() = infos; + *self.peers.write() = infos; } - match protocol.poll(&mut Ctxt(&mut network_service.lock(), &peerset), &*transaction_pool) { + match self.protocol.poll(&mut Context(&mut self.network_service.lock(), &self.peerset), &*self.transaction_pool) { Ok(Async::Ready(v)) => void::unreachable(v), Ok(Async::NotReady) => {} Err(err) => void::unreachable(err), } // Check for new incoming on-demand requests. - if let Some(on_demand_in) = on_demand_in.as_mut() { + if let Some(on_demand_in) = self.on_demand_in.as_mut() { while let Ok(Async::Ready(Some(rq))) = on_demand_in.poll() { - protocol.add_on_demand_request(&mut Ctxt(&mut network_service.lock(), &peerset), rq); + self.protocol.add_on_demand_request(&mut Context(&mut self.network_service.lock(), &self.peerset), rq); } } loop { - match network_port.poll() { + match self.network_port.poll() { Ok(Async::NotReady) => break, Ok(Async::Ready(Some(NetworkMsg::Outgoing(who, outgoing_message)))) => - network_service.lock().send_custom_message(&who, outgoing_message), + self.network_service.lock().send_custom_message(&who, outgoing_message), Ok(Async::Ready(Some(NetworkMsg::ReportPeer(who, reputation)))) => - peerset.report_peer(who, reputation), + self.peerset.report_peer(who, reputation), Ok(Async::Ready(Some(NetworkMsg::DisconnectPeer(who)))) => - network_service.lock().drop_node(&who), + self.network_service.lock().drop_node(&who), #[cfg(any(test, feature = "test-helpers"))] Ok(Async::Ready(Some(NetworkMsg::Synchronized))) => {} @@ -655,91 +612,91 @@ fn run_thread, H: ExHashT>( } loop { - let msg = match protocol_rx.poll() { + let msg = match self.protocol_rx.poll() { Ok(Async::Ready(Some(msg))) => msg, Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), Ok(Async::NotReady) => break, }; - let mut network_service = network_service.lock(); - let mut network_out = Ctxt(&mut network_service, &peerset); + let mut network_service = self.network_service.lock(); + let mut network_out = Context(&mut network_service, &self.peerset); match msg { ProtocolMsg::BlockImported(hash, header) => - protocol.on_block_imported(&mut network_out, hash, &header), + self.protocol.on_block_imported(&mut network_out, hash, &header), ProtocolMsg::BlockFinalized(hash, header) => - protocol.on_block_finalized(&mut network_out, hash, &header), + self.protocol.on_block_finalized(&mut network_out, hash, &header), ProtocolMsg::ExecuteWithSpec(task) => { - let (mut context, spec) = protocol.specialization_lock(&mut network_out); + let (mut context, spec) = self.protocol.specialization_lock(&mut network_out); task.call_box(spec, &mut context); }, ProtocolMsg::ExecuteWithGossip(task) => { - let (mut context, gossip) = protocol.consensus_gossip_lock(&mut network_out); + let (mut context, gossip) = self.protocol.consensus_gossip_lock(&mut network_out); task.call_box(gossip, &mut context); } ProtocolMsg::GossipConsensusMessage(topic, engine_id, message, recipient) => - protocol.gossip_consensus_message(&mut network_out, topic, engine_id, message, recipient), + self.protocol.gossip_consensus_message(&mut network_out, topic, engine_id, message, recipient), ProtocolMsg::BlocksProcessed(hashes, has_error) => - protocol.blocks_processed(&mut network_out, hashes, has_error), + self.protocol.blocks_processed(&mut network_out, hashes, has_error), ProtocolMsg::RestartSync => - protocol.restart(&mut network_out), + self.protocol.restart(&mut network_out), ProtocolMsg::AnnounceBlock(hash) => - protocol.announce_block(&mut network_out, hash), + self.protocol.announce_block(&mut network_out, hash), ProtocolMsg::BlockImportedSync(hash, number) => - protocol.block_imported(&hash, number), + self.protocol.block_imported(&hash, number), ProtocolMsg::ClearJustificationRequests => - protocol.clear_justification_requests(), + self.protocol.clear_justification_requests(), ProtocolMsg::RequestJustification(hash, number) => - protocol.request_justification(&mut network_out, &hash, number), + self.protocol.request_justification(&mut network_out, &hash, number), ProtocolMsg::JustificationImportResult(hash, number, success) => - protocol.justification_import_result(hash, number, success), + self.protocol.justification_import_result(hash, number, success), ProtocolMsg::SetFinalityProofRequestBuilder(builder) => - protocol.set_finality_proof_request_builder(builder), + self.protocol.set_finality_proof_request_builder(builder), ProtocolMsg::RequestFinalityProof(hash, number) => - protocol.request_finality_proof(&mut network_out, &hash, number), + self.protocol.request_finality_proof(&mut network_out, &hash, number), ProtocolMsg::FinalityProofImportResult(requested_block, finalziation_result) => - protocol.finality_proof_import_result(requested_block, finalziation_result), + self.protocol.finality_proof_import_result(requested_block, finalziation_result), ProtocolMsg::PropagateExtrinsics => - protocol.propagate_extrinsics(&mut network_out, &*transaction_pool), + self.protocol.propagate_extrinsics(&mut network_out, &*self.transaction_pool), #[cfg(any(test, feature = "test-helpers"))] - ProtocolMsg::Tick => protocol.tick(&mut network_out), + ProtocolMsg::Tick => self.protocol.tick(&mut network_out), #[cfg(any(test, feature = "test-helpers"))] ProtocolMsg::Synchronize => {}, } } loop { - let mut network_service = network_service.lock(); + let mut network_service = self.network_service.lock(); let poll_value = network_service.poll(); - let mut network_out = Ctxt(&mut network_service, &peerset); + let mut network_out = Context(&mut network_service, &self.peerset); let outcome = match poll_value { Ok(Async::NotReady) => break, - Ok(Async::Ready(Some(NetworkServiceEvent::OpenedCustomProtocol { peer_id, version, debug_info, .. }))) => { + Ok(Async::Ready(Some(Libp2pNetServiceEvent::OpenedCustomProtocol { peer_id, version, debug_info, .. }))) => { debug_assert!( version <= protocol::CURRENT_VERSION as u8 && version >= protocol::MIN_VERSION as u8 ); - protocol.on_peer_connected(&mut network_out, peer_id, debug_info); + self.protocol.on_peer_connected(&mut network_out, peer_id, debug_info); CustomMessageOutcome::None } - Ok(Async::Ready(Some(NetworkServiceEvent::ClosedCustomProtocol { peer_id, debug_info, .. }))) => { - protocol.on_peer_disconnected(&mut network_out, peer_id, debug_info); + Ok(Async::Ready(Some(Libp2pNetServiceEvent::ClosedCustomProtocol { peer_id, debug_info, .. }))) => { + self.protocol.on_peer_disconnected(&mut network_out, peer_id, debug_info); CustomMessageOutcome::None }, - Ok(Async::Ready(Some(NetworkServiceEvent::CustomMessage { peer_id, message, .. }))) => - protocol.on_custom_message( + Ok(Async::Ready(Some(Libp2pNetServiceEvent::CustomMessage { peer_id, message, .. }))) => + self.protocol.on_custom_message( &mut network_out, - &*transaction_pool, + &*self.transaction_pool, peer_id, message, - finality_proof_provider.as_ref().map(|p| &**p) + self.finality_proof_provider.as_ref().map(|p| &**p) ), - Ok(Async::Ready(Some(NetworkServiceEvent::Clogged { peer_id, messages, .. }))) => { + Ok(Async::Ready(Some(Libp2pNetServiceEvent::Clogged { peer_id, messages, .. }))) => { debug!(target: "sync", "{} clogging messages:", messages.len()); for msg in messages.into_iter().take(5) { debug!(target: "sync", "{:?}", msg); - protocol.on_clogged_peer(&mut network_out, peer_id.clone(), Some(msg)); + self.protocol.on_clogged_peer(&mut network_out, peer_id.clone(), Some(msg)); } CustomMessageOutcome::None } @@ -752,18 +709,18 @@ fn run_thread, H: ExHashT>( match outcome { CustomMessageOutcome::BlockImport(origin, blocks) => - import_queue.import_blocks(origin, blocks), + self.import_queue.import_blocks(origin, blocks), CustomMessageOutcome::JustificationImport(origin, hash, nb, justification) => - import_queue.import_justification(origin, hash, nb, justification), + self.import_queue.import_justification(origin, hash, nb, justification), CustomMessageOutcome::FinalityProofImport(origin, hash, nb, proof) => - import_queue.import_finality_proof(origin, hash, nb, proof), + self.import_queue.import_finality_proof(origin, hash, nb, proof), CustomMessageOutcome::None => {} } } - is_offline.store(protocol.is_offline(), Ordering::Relaxed); - is_major_syncing.store(protocol.is_major_syncing(), Ordering::Relaxed); + self.is_offline.store(self.protocol.is_offline(), Ordering::Relaxed); + self.is_major_syncing.store(self.protocol.is_major_syncing(), Ordering::Relaxed); Ok(Async::NotReady) - }) + } } diff --git a/substrate/core/service/src/components.rs b/substrate/core/service/src/components.rs index 202bd15b2a..238d84b522 100644 --- a/substrate/core/service/src/components.rs +++ b/substrate/core/service/src/components.rs @@ -38,7 +38,8 @@ use parking_lot::Mutex; // Type aliases. // These exist mainly to avoid typing `::Foo` all over the code. /// Network service type for a factory. -pub type NetworkService = network::Service<::Block, ::NetworkProtocol>; +pub type NetworkService = + network::NetworkService<::Block, ::NetworkProtocol>; /// Code executor type for a factory. pub type CodeExecutor = NativeExecutor<::RuntimeDispatch>; diff --git a/substrate/core/service/src/lib.rs b/substrate/core/service/src/lib.rs index d2b4a66c5b..1eb59872ce 100644 --- a/substrate/core/service/src/lib.rs +++ b/substrate/core/service/src/lib.rs @@ -203,7 +203,13 @@ impl Service { }; let has_bootnodes = !network_params.network_config.boot_nodes.is_empty(); - let network = network::Service::new(network_params)?; + let network_mut = network::NetworkWorker::new(network_params)?; + let network = network_mut.service().clone(); + + task_executor.spawn(network_mut + .map_err(|_| ()) + .select(exit.clone()) + .then(|_| Ok(()))); let inherents_pool = Arc::new(InherentsPool::default()); let offchain_workers = if config.offchain_worker {