Rewrite sync and network layers to use futures (#2683)

* Remove background thread from network

* Primarily use tokio-timer instead of tokio

* Minor style fix

* Reintroduce tokio as an optional dependency

* Renames

* Line width

* Ctxt -> Context

* Merge NetworkWorker and NetworkService
This commit is contained in:
Pierre Krieger
2019-05-29 11:58:02 +02:00
committed by Arkadiy Paronyan
parent 643e309411
commit e9a4c80c40
8 changed files with 172 additions and 205 deletions
+2 -2
View File
@@ -43,8 +43,8 @@ pub mod test;
pub use chain::{Client as ClientHandle, FinalityProofProvider};
pub use service::{
Service, FetchFuture, TransactionPool, ManageNetwork, NetworkMsg,
SyncProvider, ExHashT, ReportHandle,
NetworkService, NetworkWorker, FetchFuture, TransactionPool, ManageNetwork,
NetworkMsg, SyncProvider, ExHashT, ReportHandle,
};
pub use protocol::{ProtocolStatus, PeerInfo, Context};
pub use sync::{Status as SyncStatus, SyncState};
+4 -4
View File
@@ -80,9 +80,9 @@ const RPC_FAILED_REPUTATION_CHANGE: i32 = -(1 << 12);
// Lock must always be taken in order declared here.
pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
/// Interval at which we call `tick`.
tick_timeout: tokio::timer::Interval,
tick_timeout: tokio_timer::Interval,
/// Interval at which we call `propagate_extrinsics`.
propagate_timeout: tokio::timer::Interval,
propagate_timeout: tokio_timer::Interval,
config: ProtocolConfig,
/// Handler for on-demand requests.
on_demand_core: OnDemandCore<B>,
@@ -395,8 +395,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
let info = chain.info()?;
let sync = ChainSync::new(config.roles, &info);
Ok(Protocol {
tick_timeout: tokio::timer::Interval::new_interval(TICK_TIMEOUT),
propagate_timeout: tokio::timer::Interval::new_interval(PROPAGATE_TIMEOUT),
tick_timeout: tokio_timer::Interval::new_interval(TICK_TIMEOUT),
propagate_timeout: tokio_timer::Interval::new_interval(PROPAGATE_TIMEOUT),
config: config,
context_data: ContextData {
peers: HashMap::new(),
+143 -186
View File
@@ -15,15 +15,16 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::collections::HashMap;
use std::io;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::{io, thread, time::Duration};
use std::time::Duration;
use log::{warn, debug, error, info};
use futures::{Async, Future, Stream, sync::oneshot, sync::mpsc};
use futures::{prelude::*, sync::oneshot, sync::mpsc};
use parking_lot::{Mutex, RwLock};
use network_libp2p::{start_service, parse_str_addr, Service as NetworkService, ServiceEvent as NetworkServiceEvent};
use network_libp2p::{NetworkConfiguration, RegisteredProtocol, NetworkState};
use network_libp2p::{start_service, parse_str_addr, Service as Libp2pNetService, ServiceEvent as Libp2pNetServiceEvent};
use network_libp2p::{RegisteredProtocol, NetworkState};
use peerset::PeersetHandle;
use consensus::import_queue::{ImportQueue, Link, SharedFinalityProofRequestBuilder};
use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId};
@@ -39,8 +40,6 @@ use crate::config::Params;
use crate::error::Error;
use crate::specialization::NetworkSpecialization;
use tokio::runtime::Builder as RuntimeBuilder;
/// Interval at which we send status updates on the SyncProvider status stream.
const STATUS_INTERVAL: Duration = Duration::from_millis(5000);
/// Interval at which we update the `peers` field on the main thread.
@@ -176,7 +175,7 @@ impl ReportHandle {
}
/// Substrate network service. Handles network IO and manages connectivity.
pub struct Service<B: BlockT + 'static, S: NetworkSpecialization<B>> {
pub struct NetworkService<B: BlockT + 'static, S: NetworkSpecialization<B>> {
/// Sinks to propagate status updates.
status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<ProtocolStatus<B>>>>>,
/// Are we connected to any peer?
@@ -188,23 +187,23 @@ pub struct Service<B: BlockT + 'static, S: NetworkSpecialization<B>> {
/// Channel for networking messages processed by the background thread.
network_chan: mpsc::UnboundedSender<NetworkMsg<B>>,
/// Network service
network: Arc<Mutex<NetworkService<Message<B>>>>,
network: Arc<Mutex<Libp2pNetService<Message<B>>>>,
/// Peerset manager (PSM); manages the reputation of nodes and indicates the network which
/// nodes it should be connected to or not.
peerset: PeersetHandle,
/// Protocol sender
protocol_sender: mpsc::UnboundedSender<ProtocolMsg<B, S>>,
/// Sender for messages to the background service task, and handle for the background thread.
/// Dropping the sender should close the task and the thread.
/// This is an `Option` because we need to extract it in the destructor.
bg_thread: Option<(oneshot::Sender<()>, thread::JoinHandle<()>)>,
}
impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
/// Creates and register protocol with the network service
pub fn new<H: ExHashT>(
impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker<B, S, H> {
/// Creates the network service.
///
/// Returns a `NetworkWorker` that implements `Future` and must be regularly polled in order
/// for the network processing to advance. From it, you can extract a `NetworkService` using
/// `worker.service()`. The `NetworkService` can be shared through the codebase.
pub fn new(
params: Params<B, S, H>,
) -> Result<Arc<Service<B, S>>, Error> {
) -> Result<NetworkWorker<B, S, H>, Error> {
let (network_chan, network_port) = mpsc::unbounded();
let (protocol_sender, protocol_rx) = mpsc::unbounded();
let status_sinks = Arc::new(Mutex::new(Vec::new()));
@@ -229,35 +228,55 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
)?;
let versions: Vec<_> = ((protocol::MIN_VERSION as u8)..=(protocol::CURRENT_VERSION as u8)).collect();
let registered = RegisteredProtocol::new(params.protocol_id, &versions);
let (thread, network, peerset) = start_thread(
is_offline.clone(),
is_major_syncing.clone(),
protocol,
peers.clone(),
params.import_queue,
params.transaction_pool,
params.finality_proof_provider,
network_port,
protocol_rx,
status_sinks.clone(),
params.network_config,
registered,
params.on_demand.and_then(|od| od.extract_receiver()),
)?;
Ok(Arc::new(Service {
status_sinks,
// Start the main service.
let (network, peerset) = match start_service(params.network_config, registered) {
Ok((network, peerset)) => (Arc::new(Mutex::new(network)), peerset),
Err(err) => {
warn!("Error starting network: {}", err);
return Err(err.into())
},
};
let service = Arc::new(NetworkService {
status_sinks: status_sinks.clone(),
is_offline: is_offline.clone(),
is_major_syncing: is_major_syncing.clone(),
network_chan,
peers: peers.clone(),
peerset: peerset.clone(),
network: network.clone(),
protocol_sender: protocol_sender.clone(),
});
Ok(NetworkWorker {
is_offline,
is_major_syncing,
network_chan,
peers,
network_service: network,
peerset,
network,
protocol_sender,
bg_thread: Some(thread),
}))
service,
protocol,
peers,
import_queue: params.import_queue,
transaction_pool: params.transaction_pool,
finality_proof_provider: params.finality_proof_provider,
network_port,
protocol_rx,
status_sinks,
on_demand_in: params.on_demand.and_then(|od| od.extract_receiver()),
status_interval: tokio_timer::Interval::new_interval(STATUS_INTERVAL),
connected_peers_interval: tokio_timer::Interval::new_interval(CONNECTED_PEERS_INTERVAL),
})
}
/// Return a `NetworkService` that can be shared through the code base and can be used to
/// manipulate the worker.
pub fn service(&self) -> &Arc<NetworkService<B, S>> {
&self.service
}
}
impl<B: BlockT + 'static, S: NetworkSpecialization<B>> NetworkService<B, S> {
/// Returns the downloaded bytes per second averaged over the past few seconds.
#[inline]
pub fn average_download_per_sec(&self) -> u64 {
@@ -362,7 +381,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
}
}
impl<B: BlockT + 'static, S: NetworkSpecialization<B>> ::consensus::SyncOracle for Service<B, S> {
impl<B: BlockT + 'static, S: NetworkSpecialization<B>> ::consensus::SyncOracle for NetworkService<B, S> {
fn is_major_syncing(&self) -> bool {
self.is_major_syncing()
}
@@ -372,18 +391,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> ::consensus::SyncOracle f
}
}
impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Drop for Service<B, S> {
fn drop(&mut self) {
if let Some((sender, join)) = self.bg_thread.take() {
let _ = sender.send(());
if let Err(e) = join.join() {
error!("Error while waiting on background thread: {:?}", e);
}
}
}
}
impl<B: BlockT + 'static, S: NetworkSpecialization<B>> SyncProvider<B> for Service<B, S> {
impl<B: BlockT + 'static, S: NetworkSpecialization<B>> SyncProvider<B> for NetworkService<B, S> {
fn is_major_syncing(&self) -> bool {
self.is_major_syncing()
}
@@ -417,7 +425,7 @@ pub trait ManageNetwork {
fn add_reserved_peer(&self, peer: String) -> Result<(), String>;
}
impl<B: BlockT + 'static, S: NetworkSpecialization<B>> ManageNetwork for Service<B, S> {
impl<B: BlockT + 'static, S: NetworkSpecialization<B>> ManageNetwork for NetworkService<B, S> {
fn accept_unreserved_peers(&self) {
self.peerset.set_reserved_only(false);
}
@@ -438,7 +446,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> ManageNetwork for Service
}
}
/// Messages to be handled by NetworkService.
/// Messages to be handled by Libp2pNetService.
#[derive(Debug)]
pub enum NetworkMsg<B: BlockT + 'static> {
/// Send an outgoing custom message.
@@ -516,11 +524,16 @@ impl<B: BlockT, F: FnOnce(&mut ConsensusGossip<B>, &mut Context<B>)> GossipTask<
}
}
/// Starts the background thread that handles the networking.
fn start_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
/// Future tied to the `Network` service and that must be polled in order for the network to
/// advance.
#[must_use = "The NetworkWorker must be polled in order for the network to work"]
pub struct NetworkWorker<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> {
is_offline: Arc<AtomicBool>,
is_major_syncing: Arc<AtomicBool>,
protocol: Protocol<B, S, H>,
/// The network service that can be extracted and shared through the codebase.
service: Arc<NetworkService<B, S>>,
network_service: Arc<Mutex<Libp2pNetService<Message<B>>>>,
peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<B>>>>,
import_queue: Box<ImportQueue<B>>,
transaction_pool: Arc<dyn TransactionPool<H, B>>,
@@ -528,124 +541,68 @@ fn start_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
network_port: mpsc::UnboundedReceiver<NetworkMsg<B>>,
protocol_rx: mpsc::UnboundedReceiver<ProtocolMsg<B, S>>,
status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<ProtocolStatus<B>>>>>,
config: NetworkConfiguration,
registered: RegisteredProtocol<Message<B>>,
peerset: PeersetHandle,
on_demand_in: Option<mpsc::UnboundedReceiver<RequestData<B>>>,
) -> Result<((oneshot::Sender<()>, thread::JoinHandle<()>), Arc<Mutex<NetworkService<Message<B>>>>, PeersetHandle), Error> {
// Start the main service.
let (service, peerset) = match start_service(config, registered) {
Ok((service, peerset)) => (Arc::new(Mutex::new(service)), peerset),
Err(err) => {
warn!("Error starting network: {}", err);
return Err(err.into())
},
};
let (close_tx, close_rx) = oneshot::channel();
let service_clone = service.clone();
let mut runtime = RuntimeBuilder::new().name_prefix("libp2p-").build()?;
let peerset_clone = peerset.clone();
let thread = thread::Builder::new().name("network".to_string()).spawn(move || {
let fut = run_thread(
is_offline,
is_major_syncing,
protocol,
service_clone,
peers,
import_queue,
transaction_pool,
finality_proof_provider,
network_port,
protocol_rx,
status_sinks,
peerset_clone,
on_demand_in
)
.select(close_rx.then(|_| Ok(())))
.map(|(val, _)| val)
.map_err(|(err,_ )| err);
// Note that we use `block_on` and not `block_on_all` because we want to kill the thread
// instantly if `close_rx` receives something.
match runtime.block_on(fut) {
Ok(()) => debug!(target: "sub-libp2p", "Networking thread finished"),
Err(err) => error!(target: "sub-libp2p", "Error while running libp2p: {:?}", err),
};
})?;
Ok(((close_tx, thread), service, peerset))
/// Interval at which we send status updates on the `status_sinks`.
status_interval: tokio_timer::Interval,
/// Interval at which we update the `connected_peers` Arc.
connected_peers_interval: tokio_timer::Interval,
}
/// Runs the background thread that handles the networking.
fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
is_offline: Arc<AtomicBool>,
is_major_syncing: Arc<AtomicBool>,
mut protocol: Protocol<B, S, H>,
network_service: Arc<Mutex<NetworkService<Message<B>>>>,
peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<B>>>>,
import_queue: Box<ImportQueue<B>>,
transaction_pool: Arc<dyn TransactionPool<H, B>>,
finality_proof_provider: Option<Arc<FinalityProofProvider<B>>>,
mut network_port: mpsc::UnboundedReceiver<NetworkMsg<B>>,
mut protocol_rx: mpsc::UnboundedReceiver<ProtocolMsg<B, S>>,
status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<ProtocolStatus<B>>>>>,
peerset: PeersetHandle,
mut on_demand_in: Option<mpsc::UnboundedReceiver<RequestData<B>>>,
) -> impl Future<Item = (), Error = io::Error> {
// Implementation of `protocol::NetworkOut` using the available local variables.
struct Ctxt<'a, B: BlockT>(&'a mut NetworkService<Message<B>>, &'a PeersetHandle);
impl<'a, B: BlockT> NetworkOut<B> for Ctxt<'a, B> {
fn report_peer(&mut self, who: PeerId, reputation: i32) {
self.1.report_peer(who, reputation)
}
fn disconnect_peer(&mut self, who: PeerId) {
self.0.drop_node(&who)
}
fn send_message(&mut self, who: PeerId, message: Message<B>) {
self.0.send_custom_message(&who, message)
}
}
impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for NetworkWorker<B, S, H> {
type Item = ();
type Error = io::Error;
// Interval at which we send status updates on the `status_sinks`.
let mut status_interval = tokio::timer::Interval::new_interval(STATUS_INTERVAL);
// Interval at which we update the `connected_peers` Arc.
let mut connected_peers_interval = tokio::timer::Interval::new_interval(CONNECTED_PEERS_INTERVAL);
futures::future::poll_fn(move || {
while let Ok(Async::Ready(_)) = status_interval.poll() {
let status = protocol.status();
status_sinks.lock().retain(|sink| sink.unbounded_send(status.clone()).is_ok());
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// Implementation of `protocol::NetworkOut` using the available local variables.
struct Context<'a, B: BlockT>(&'a mut Libp2pNetService<Message<B>>, &'a PeersetHandle);
impl<'a, B: BlockT> NetworkOut<B> for Context<'a, B> {
fn report_peer(&mut self, who: PeerId, reputation: i32) {
self.1.report_peer(who, reputation)
}
fn disconnect_peer(&mut self, who: PeerId) {
self.0.drop_node(&who)
}
fn send_message(&mut self, who: PeerId, message: Message<B>) {
self.0.send_custom_message(&who, message)
}
}
while let Ok(Async::Ready(_)) = connected_peers_interval.poll() {
let infos = protocol.peers_info().map(|(id, info)| {
while let Ok(Async::Ready(_)) = self.status_interval.poll() {
let status = self.protocol.status();
self.status_sinks.lock().retain(|sink| sink.unbounded_send(status.clone()).is_ok());
}
while let Ok(Async::Ready(_)) = self.connected_peers_interval.poll() {
let infos = self.protocol.peers_info().map(|(id, info)| {
(id.clone(), ConnectedPeer { peer_info: info.clone() })
}).collect();
*peers.write() = infos;
*self.peers.write() = infos;
}
match protocol.poll(&mut Ctxt(&mut network_service.lock(), &peerset), &*transaction_pool) {
match self.protocol.poll(&mut Context(&mut self.network_service.lock(), &self.peerset), &*self.transaction_pool) {
Ok(Async::Ready(v)) => void::unreachable(v),
Ok(Async::NotReady) => {}
Err(err) => void::unreachable(err),
}
// Check for new incoming on-demand requests.
if let Some(on_demand_in) = on_demand_in.as_mut() {
if let Some(on_demand_in) = self.on_demand_in.as_mut() {
while let Ok(Async::Ready(Some(rq))) = on_demand_in.poll() {
protocol.add_on_demand_request(&mut Ctxt(&mut network_service.lock(), &peerset), rq);
self.protocol.add_on_demand_request(&mut Context(&mut self.network_service.lock(), &self.peerset), rq);
}
}
loop {
match network_port.poll() {
match self.network_port.poll() {
Ok(Async::NotReady) => break,
Ok(Async::Ready(Some(NetworkMsg::Outgoing(who, outgoing_message)))) =>
network_service.lock().send_custom_message(&who, outgoing_message),
self.network_service.lock().send_custom_message(&who, outgoing_message),
Ok(Async::Ready(Some(NetworkMsg::ReportPeer(who, reputation)))) =>
peerset.report_peer(who, reputation),
self.peerset.report_peer(who, reputation),
Ok(Async::Ready(Some(NetworkMsg::DisconnectPeer(who)))) =>
network_service.lock().drop_node(&who),
self.network_service.lock().drop_node(&who),
#[cfg(any(test, feature = "test-helpers"))]
Ok(Async::Ready(Some(NetworkMsg::Synchronized))) => {}
@@ -655,91 +612,91 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
}
loop {
let msg = match protocol_rx.poll() {
let msg = match self.protocol_rx.poll() {
Ok(Async::Ready(Some(msg))) => msg,
Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())),
Ok(Async::NotReady) => break,
};
let mut network_service = network_service.lock();
let mut network_out = Ctxt(&mut network_service, &peerset);
let mut network_service = self.network_service.lock();
let mut network_out = Context(&mut network_service, &self.peerset);
match msg {
ProtocolMsg::BlockImported(hash, header) =>
protocol.on_block_imported(&mut network_out, hash, &header),
self.protocol.on_block_imported(&mut network_out, hash, &header),
ProtocolMsg::BlockFinalized(hash, header) =>
protocol.on_block_finalized(&mut network_out, hash, &header),
self.protocol.on_block_finalized(&mut network_out, hash, &header),
ProtocolMsg::ExecuteWithSpec(task) => {
let (mut context, spec) = protocol.specialization_lock(&mut network_out);
let (mut context, spec) = self.protocol.specialization_lock(&mut network_out);
task.call_box(spec, &mut context);
},
ProtocolMsg::ExecuteWithGossip(task) => {
let (mut context, gossip) = protocol.consensus_gossip_lock(&mut network_out);
let (mut context, gossip) = self.protocol.consensus_gossip_lock(&mut network_out);
task.call_box(gossip, &mut context);
}
ProtocolMsg::GossipConsensusMessage(topic, engine_id, message, recipient) =>
protocol.gossip_consensus_message(&mut network_out, topic, engine_id, message, recipient),
self.protocol.gossip_consensus_message(&mut network_out, topic, engine_id, message, recipient),
ProtocolMsg::BlocksProcessed(hashes, has_error) =>
protocol.blocks_processed(&mut network_out, hashes, has_error),
self.protocol.blocks_processed(&mut network_out, hashes, has_error),
ProtocolMsg::RestartSync =>
protocol.restart(&mut network_out),
self.protocol.restart(&mut network_out),
ProtocolMsg::AnnounceBlock(hash) =>
protocol.announce_block(&mut network_out, hash),
self.protocol.announce_block(&mut network_out, hash),
ProtocolMsg::BlockImportedSync(hash, number) =>
protocol.block_imported(&hash, number),
self.protocol.block_imported(&hash, number),
ProtocolMsg::ClearJustificationRequests =>
protocol.clear_justification_requests(),
self.protocol.clear_justification_requests(),
ProtocolMsg::RequestJustification(hash, number) =>
protocol.request_justification(&mut network_out, &hash, number),
self.protocol.request_justification(&mut network_out, &hash, number),
ProtocolMsg::JustificationImportResult(hash, number, success) =>
protocol.justification_import_result(hash, number, success),
self.protocol.justification_import_result(hash, number, success),
ProtocolMsg::SetFinalityProofRequestBuilder(builder) =>
protocol.set_finality_proof_request_builder(builder),
self.protocol.set_finality_proof_request_builder(builder),
ProtocolMsg::RequestFinalityProof(hash, number) =>
protocol.request_finality_proof(&mut network_out, &hash, number),
self.protocol.request_finality_proof(&mut network_out, &hash, number),
ProtocolMsg::FinalityProofImportResult(requested_block, finalziation_result) =>
protocol.finality_proof_import_result(requested_block, finalziation_result),
self.protocol.finality_proof_import_result(requested_block, finalziation_result),
ProtocolMsg::PropagateExtrinsics =>
protocol.propagate_extrinsics(&mut network_out, &*transaction_pool),
self.protocol.propagate_extrinsics(&mut network_out, &*self.transaction_pool),
#[cfg(any(test, feature = "test-helpers"))]
ProtocolMsg::Tick => protocol.tick(&mut network_out),
ProtocolMsg::Tick => self.protocol.tick(&mut network_out),
#[cfg(any(test, feature = "test-helpers"))]
ProtocolMsg::Synchronize => {},
}
}
loop {
let mut network_service = network_service.lock();
let mut network_service = self.network_service.lock();
let poll_value = network_service.poll();
let mut network_out = Ctxt(&mut network_service, &peerset);
let mut network_out = Context(&mut network_service, &self.peerset);
let outcome = match poll_value {
Ok(Async::NotReady) => break,
Ok(Async::Ready(Some(NetworkServiceEvent::OpenedCustomProtocol { peer_id, version, debug_info, .. }))) => {
Ok(Async::Ready(Some(Libp2pNetServiceEvent::OpenedCustomProtocol { peer_id, version, debug_info, .. }))) => {
debug_assert!(
version <= protocol::CURRENT_VERSION as u8
&& version >= protocol::MIN_VERSION as u8
);
protocol.on_peer_connected(&mut network_out, peer_id, debug_info);
self.protocol.on_peer_connected(&mut network_out, peer_id, debug_info);
CustomMessageOutcome::None
}
Ok(Async::Ready(Some(NetworkServiceEvent::ClosedCustomProtocol { peer_id, debug_info, .. }))) => {
protocol.on_peer_disconnected(&mut network_out, peer_id, debug_info);
Ok(Async::Ready(Some(Libp2pNetServiceEvent::ClosedCustomProtocol { peer_id, debug_info, .. }))) => {
self.protocol.on_peer_disconnected(&mut network_out, peer_id, debug_info);
CustomMessageOutcome::None
},
Ok(Async::Ready(Some(NetworkServiceEvent::CustomMessage { peer_id, message, .. }))) =>
protocol.on_custom_message(
Ok(Async::Ready(Some(Libp2pNetServiceEvent::CustomMessage { peer_id, message, .. }))) =>
self.protocol.on_custom_message(
&mut network_out,
&*transaction_pool,
&*self.transaction_pool,
peer_id,
message,
finality_proof_provider.as_ref().map(|p| &**p)
self.finality_proof_provider.as_ref().map(|p| &**p)
),
Ok(Async::Ready(Some(NetworkServiceEvent::Clogged { peer_id, messages, .. }))) => {
Ok(Async::Ready(Some(Libp2pNetServiceEvent::Clogged { peer_id, messages, .. }))) => {
debug!(target: "sync", "{} clogging messages:", messages.len());
for msg in messages.into_iter().take(5) {
debug!(target: "sync", "{:?}", msg);
protocol.on_clogged_peer(&mut network_out, peer_id.clone(), Some(msg));
self.protocol.on_clogged_peer(&mut network_out, peer_id.clone(), Some(msg));
}
CustomMessageOutcome::None
}
@@ -752,18 +709,18 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
match outcome {
CustomMessageOutcome::BlockImport(origin, blocks) =>
import_queue.import_blocks(origin, blocks),
self.import_queue.import_blocks(origin, blocks),
CustomMessageOutcome::JustificationImport(origin, hash, nb, justification) =>
import_queue.import_justification(origin, hash, nb, justification),
self.import_queue.import_justification(origin, hash, nb, justification),
CustomMessageOutcome::FinalityProofImport(origin, hash, nb, proof) =>
import_queue.import_finality_proof(origin, hash, nb, proof),
self.import_queue.import_finality_proof(origin, hash, nb, proof),
CustomMessageOutcome::None => {}
}
}
is_offline.store(protocol.is_offline(), Ordering::Relaxed);
is_major_syncing.store(protocol.is_major_syncing(), Ordering::Relaxed);
self.is_offline.store(self.protocol.is_offline(), Ordering::Relaxed);
self.is_major_syncing.store(self.protocol.is_major_syncing(), Ordering::Relaxed);
Ok(Async::NotReady)
})
}
}