diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index 30c2077de6..c6ae27cb16 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -17,9 +17,9 @@ use futures::{prelude::*, sync::mpsc}; use network_libp2p::PeerId; use primitives::storage::StorageKey; -use runtime_primitives::{generic::BlockId, ConsensusEngineId}; +use consensus::{import_queue::IncomingBlock, import_queue::Origin, BlockOrigin}; +use runtime_primitives::{generic::BlockId, ConsensusEngineId, Justification}; use runtime_primitives::traits::{As, Block as BlockT, Header as HeaderT, NumberFor, Zero}; -use consensus::import_queue::ImportQueue; use crate::message::{self, BlockRequest as BlockRequestMessage, Message}; use crate::message::generic::{Message as GenericMessage, ConsensusMessage}; use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient}; @@ -32,7 +32,6 @@ use parking_lot::RwLock; use rustc_hex::ToHex; use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; -use std::sync::atomic::AtomicBool; use std::{cmp, num::NonZeroUsize, time}; use log::{trace, debug, warn, error}; use crate::chain::Client; @@ -276,10 +275,6 @@ pub enum ProtocolMsg> { ExecuteWithGossip(Box + Send + 'static>), /// Incoming gossip consensus message. GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec, GossipMessageRecipient), - /// Tell protocol to abort sync (does not stop protocol). - /// Only used in tests. - #[cfg(any(test, feature = "test-helpers"))] - Abort, /// Tell protocol to perform regular maintenance. #[cfg(any(test, feature = "test-helpers"))] Tick, @@ -291,20 +286,17 @@ pub enum ProtocolMsg> { impl, H: ExHashT> Protocol { /// Create a new instance. pub fn new( - is_offline: Arc, - is_major_syncing: Arc, connected_peers: Arc>>>, network_chan: NetworkChan, config: ProtocolConfig, chain: Arc>, - import_queue: Box>, on_demand: Option>>, transaction_pool: Arc>, specialization: S, ) -> error::Result<(Protocol, mpsc::UnboundedSender>)> { let (protocol_sender, port) = mpsc::unbounded(); let info = chain.info()?; - let sync = ChainSync::new(is_offline, is_major_syncing, config.roles, &info, import_queue); + let sync = ChainSync::new(config.roles, &info); let protocol = Protocol { network_chan, port, @@ -341,6 +333,14 @@ impl, H: ExHashT> Protocol { .count(), } } + + pub fn is_major_syncing(&self) -> bool { + self.sync.status().is_major_syncing() + } + + pub fn is_offline(&self) -> bool { + self.sync.status().is_offline() + } } impl, H: ExHashT> Future for Protocol { @@ -358,10 +358,7 @@ impl, H: ExHashT> Future for Protocol { - self.stop(); - return Ok(Async::Ready(())) - } + Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), Ok(Async::Ready(Some(msg))) => if !self.handle_client_msg(msg) { return Ok(Async::Ready(())) } @@ -415,8 +412,6 @@ impl, H: ExHashT> Protocol { #[cfg(any(test, feature = "test-helpers"))] ProtocolMsg::Tick => self.tick(), #[cfg(any(test, feature = "test-helpers"))] - ProtocolMsg::Abort => self.abort(), - #[cfg(any(test, feature = "test-helpers"))] ProtocolMsg::Synchronize => { trace!(target: "sync", "handle_client_msg: received Synchronize msg"); self.network_chan.send(NetworkMsg::Synchronized) @@ -457,14 +452,15 @@ impl, H: ExHashT> Protocol { } } - pub fn on_custom_message(&mut self, who: PeerId, message: Message) { + pub fn on_custom_message(&mut self, who: PeerId, message: Message) -> CustomMessageOutcome { match message { GenericMessage::Status(s) => self.on_status_message(who, s), GenericMessage::BlockRequest(r) => self.on_block_request(who, r), GenericMessage::BlockResponse(r) => { if let Some(request) = self.handle_response(who.clone(), &r) { - self.on_block_response(who.clone(), request, r); + let outcome = self.on_block_response(who.clone(), request, r); self.update_peer_info(&who); + return outcome } }, GenericMessage::BlockAnnounce(announce) => { @@ -495,6 +491,8 @@ impl, H: ExHashT> Protocol { &mut Some(other), ), } + + CustomMessageOutcome::None } fn send_message(&mut self, who: PeerId, message: Message) { @@ -643,7 +641,7 @@ impl, H: ExHashT> Protocol { peer: PeerId, request: message::BlockRequest, response: message::BlockResponse, - ) { + ) -> CustomMessageOutcome { let blocks_range = match ( response.blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())), response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())), @@ -658,14 +656,26 @@ impl, H: ExHashT> Protocol { // TODO [andre]: move this logic to the import queue so that // justifications are imported asynchronously (#1482) if request.fields == message::BlockAttributes::JUSTIFICATION { - self.sync.on_block_justification_data( + let outcome = self.sync.on_block_justification_data( &mut ProtocolContext::new(&mut self.context_data, &self.network_chan), peer, request, response, ); + + if let Some((origin, hash, nb, just)) = outcome { + CustomMessageOutcome::JustificationImport(origin, hash, nb, just) + } else { + CustomMessageOutcome::None + } + } else { - self.sync.on_block_data(&mut ProtocolContext::new(&mut self.context_data, &self.network_chan), peer, request, response); + let outcome = self.sync.on_block_data(&mut ProtocolContext::new(&mut self.context_data, &self.network_chan), peer, request, response); + if let Some((origin, blocks)) = outcome { + CustomMessageOutcome::BlockImport(origin, blocks) + } else { + CustomMessageOutcome::None + } } } @@ -891,22 +901,6 @@ impl, H: ExHashT> Protocol { } } - fn abort(&mut self) { - self.sync.clear(); - self.specialization.on_abort(); - self.context_data.peers.clear(); - self.handshaking_peers.clear(); - self.consensus_gossip.abort(); - } - - fn stop(&mut self) { - // stop processing import requests first (without holding a sync lock) - self.sync.stop(); - - // and then clear all the sync data - self.abort(); - } - fn on_block_announce(&mut self, who: PeerId, announce: message::BlockAnnounce) { let header = announce.header; let hash = header.hash(); @@ -1107,6 +1101,14 @@ impl, H: ExHashT> Protocol { } } +/// Outcome of an incoming custom message. +#[derive(Debug)] +pub enum CustomMessageOutcome { + BlockImport(BlockOrigin, Vec>), + JustificationImport(Origin, B::Hash, NumberFor, Justification), + None, +} + fn send_message( peers: &mut HashMap>, network_chan: &NetworkChan, diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index 4c0afde42d..f681cc8fe9 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -31,7 +31,7 @@ use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient}; use crate::message::Message; -use crate::protocol::{self, Context, Protocol, ConnectedPeer, ProtocolMsg, ProtocolStatus, PeerInfo}; +use crate::protocol::{self, Context, CustomMessageOutcome, Protocol, ConnectedPeer, ProtocolMsg, ProtocolStatus, PeerInfo}; use crate::config::Params; use crate::error::Error; use crate::specialization::NetworkSpecialization; @@ -175,13 +175,10 @@ impl> Service { let is_major_syncing = Arc::new(AtomicBool::new(false)); let peers: Arc>>> = Arc::new(Default::default()); let (protocol, protocol_sender) = Protocol::new( - is_offline.clone(), - is_major_syncing.clone(), peers.clone(), network_chan.clone(), params.config, params.chain, - import_queue.clone(), params.on_demand, params.transaction_pool, params.specialization, @@ -189,7 +186,10 @@ impl> Service { let versions: Vec<_> = ((protocol::MIN_VERSION as u8)..=(protocol::CURRENT_VERSION as u8)).collect(); let registered = RegisteredProtocol::new(protocol_id, &versions); let (thread, network, peerset) = start_thread( + is_offline.clone(), + is_major_syncing.clone(), protocol, + import_queue.clone(), network_port, status_sinks.clone(), params.network_config, @@ -475,7 +475,10 @@ pub enum NetworkMsg { /// Starts the background thread that handles the networking. fn start_thread, H: ExHashT>( + is_offline: Arc, + is_major_syncing: Arc, protocol: Protocol, + import_queue: Box>, network_port: NetworkPort, status_sinks: Arc>>>>, config: NetworkConfiguration, @@ -495,7 +498,7 @@ fn start_thread, H: ExHashT>( let mut runtime = RuntimeBuilder::new().name_prefix("libp2p-").build()?; let peerset_clone = peerset.clone(); let thread = thread::Builder::new().name("network".to_string()).spawn(move || { - let fut = run_thread(protocol, service_clone, network_port, status_sinks, peerset_clone) + let fut = run_thread(is_offline, is_major_syncing, protocol, service_clone, import_queue, network_port, status_sinks, peerset_clone) .select(close_rx.then(|_| Ok(()))) .map(|(val, _)| val) .map_err(|(err,_ )| err); @@ -513,8 +516,11 @@ fn start_thread, H: ExHashT>( /// Runs the background thread that handles the networking. fn run_thread, H: ExHashT>( + is_offline: Arc, + is_major_syncing: Arc, mut protocol: Protocol, network_service: Arc>>>, + import_queue: Box>, network_port: NetworkPort, status_sinks: Arc>>>>, peerset: PeersetHandle, @@ -551,7 +557,7 @@ fn run_thread, H: ExHashT>( } loop { - match network_service.lock().poll() { + let outcome = match network_service.lock().poll() { Ok(Async::NotReady) => break, Ok(Async::Ready(Some(NetworkServiceEvent::OpenedCustomProtocol { peer_id, version, debug_info, .. }))) => { debug_assert!( @@ -559,9 +565,12 @@ fn run_thread, H: ExHashT>( && version >= protocol::MIN_VERSION as u8 ); protocol.on_peer_connected(peer_id, debug_info); + CustomMessageOutcome::None } - Ok(Async::Ready(Some(NetworkServiceEvent::ClosedCustomProtocol { peer_id, debug_info, .. }))) => - protocol.on_peer_disconnected(peer_id, debug_info), + Ok(Async::Ready(Some(NetworkServiceEvent::ClosedCustomProtocol { peer_id, debug_info, .. }))) => { + protocol.on_peer_disconnected(peer_id, debug_info); + CustomMessageOutcome::None + }, Ok(Async::Ready(Some(NetworkServiceEvent::CustomMessage { peer_id, message, .. }))) => protocol.on_custom_message(peer_id, message), Ok(Async::Ready(Some(NetworkServiceEvent::Clogged { peer_id, messages, .. }))) => { @@ -570,15 +579,27 @@ fn run_thread, H: ExHashT>( debug!(target: "sync", "{:?}", msg); protocol.on_clogged_peer(peer_id.clone(), Some(msg)); } + CustomMessageOutcome::None } Ok(Async::Ready(None)) => return Ok(Async::Ready(())), Err(err) => { error!(target: "sync", "Error in the network: {:?}", err); return Err(err) } + }; + + match outcome { + CustomMessageOutcome::BlockImport(origin, blocks) => + import_queue.import_blocks(origin, blocks), + CustomMessageOutcome::JustificationImport(origin, hash, nb, justification) => + import_queue.import_justification(origin, hash, nb, justification), + CustomMessageOutcome::None => {} } } + is_offline.store(protocol.is_offline(), Ordering::Relaxed); + is_major_syncing.store(protocol.is_major_syncing(), Ordering::Relaxed); + Ok(Async::NotReady) }) } diff --git a/substrate/core/network/src/sync.rs b/substrate/core/network/src/sync.rs index 16e8609858..151d13e829 100644 --- a/substrate/core/network/src/sync.rs +++ b/substrate/core/network/src/sync.rs @@ -14,6 +14,22 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . +//! Contains the state of the chain synchronization process +//! +//! At any given point in time, a running node tries as much as possible to be at the head of the +//! chain. This module handles the logic of which blocks to request from remotes, and processing +//! responses. It yields blocks to check and potentially move to the database. +//! +//! # Usage +//! +//! The `ChainSync` struct maintains the state of the block requests. Whenever something happens on +//! the network, or whenever a block has been successfully verified, call the appropriate method in +//! order to update it. You must also regularly call `tick()`. +//! +//! To each of these methods, you must pass a `Context` object that the `ChainSync` will use to +//! send its new outgoing requests. +//! + use std::cmp::max; use std::collections::{HashMap, VecDeque}; use std::time::{Duration, Instant}; @@ -22,8 +38,7 @@ use crate::protocol::Context; use fork_tree::ForkTree; use network_libp2p::PeerId; use client::{BlockStatus, ClientInfo}; -use consensus::BlockOrigin; -use consensus::import_queue::{ImportQueue, IncomingBlock}; +use consensus::{BlockOrigin, import_queue::IncomingBlock}; use client::error::Error as ClientError; use crate::blocks::BlockCollection; use runtime_primitives::Justification; @@ -32,8 +47,6 @@ use runtime_primitives::generic::BlockId; use crate::message; use crate::config::Roles; use std::collections::HashSet; -use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; // Maximum blocks to request in a single packet. const MAX_BLOCKS_TO_REQUEST: usize = 128; @@ -285,20 +298,21 @@ impl PendingJustifications { /// Processes the response for the request previously sent to the given /// peer. Queues a retry in case the given justification /// was `None`. + /// + /// Returns `Some` if this produces a justification that must be imported in the import queue. + #[must_use] fn on_response( &mut self, who: PeerId, justification: Option, - import_queue: &ImportQueue, - ) { + ) -> Option<(PeerId, B::Hash, NumberFor, Justification)> { // we assume that the request maps to the given response, this is // currently enforced by the outer network protocol before passing on // messages to chain sync. if let Some(request) = self.peer_requests.remove(&who) { if let Some(justification) = justification { - import_queue.import_justification(who.clone(), request.0, request.1, justification); self.importing_requests.insert(request); - return + return Some((who, request.0, request.1, justification)) } self.previous_requests @@ -308,6 +322,8 @@ impl PendingJustifications { self.pending_requests.push_front(request); } + + None } /// Removes any pending justification requests for blocks lower than the @@ -355,12 +371,8 @@ pub struct ChainSync { best_queued_hash: B::Hash, required_block_attributes: message::BlockAttributes, justifications: PendingJustifications, - import_queue: Box>, queue_blocks: HashSet, best_importing_number: NumberFor, - is_stopping: AtomicBool, - is_offline: Arc, - is_major_syncing: Arc, } /// Reported sync state. @@ -400,13 +412,10 @@ impl Status { } impl ChainSync { - /// Create a new instance. + /// Create a new instance. Pass the initial known state of the chain. pub(crate) fn new( - is_offline: Arc, - is_major_syncing: Arc, role: Roles, info: &ClientInfo, - import_queue: Box> ) -> Self { let mut required_block_attributes = message::BlockAttributes::HEADER | message::BlockAttributes::JUSTIFICATION; if role.intersects(Roles::FULL | Roles::AUTHORITY) { @@ -421,12 +430,8 @@ impl ChainSync { best_queued_number: info.best_queued_number.unwrap_or(info.chain.best_number), justifications: PendingJustifications::new(), required_block_attributes, - import_queue, queue_blocks: Default::default(), best_importing_number: Zero::zero(), - is_stopping: Default::default(), - is_offline, - is_major_syncing, } } @@ -441,7 +446,7 @@ impl ChainSync { } } - /// Returns peer sync status (if any). + /// Returns the state of the sync of the given peer. Returns `None` if the peer is unknown. pub(crate) fn peer_info(&self, who: &PeerId) -> Option> { self.peers.get(who).map(|peer| { PeerInfo { @@ -462,15 +467,8 @@ impl ChainSync { } } - /// Handle new connected peer. + /// Handle new connected peer. Call this method whenever we connect to a new peer. pub(crate) fn new_peer(&mut self, protocol: &mut Context, who: PeerId) { - // Initialize some variables to determine if - // is_offline or is_major_syncing should be updated - // after processing this new peer. - let previous_len = self.peers.len(); - let previous_best_seen = self.best_seen_block(); - let previous_state = self.state(&previous_best_seen); - if let Some(info) = protocol.peer_info(&who) { let status = block_status(&*protocol.client(), &self.queue_blocks, info.best_hash); match (status, info.best_number) { @@ -538,22 +536,6 @@ impl ChainSync { } } } - - let current_best_seen = self.best_seen_block(); - let current_state = self.state(¤t_best_seen); - let current_len = self.peers.len(); - if previous_len == 0 && current_len > 0 { - // We were offline, and now we're connected to at least one peer. - self.is_offline.store(false, Ordering::Relaxed); - } - if previous_len < current_len { - // We added a peer, let's see if major_syncing should be updated. - match (previous_state, current_state) { - (SyncState::Idle, SyncState::Downloading) => self.is_major_syncing.store(true, Ordering::Relaxed), - (SyncState::Downloading, SyncState::Idle) => self.is_major_syncing.store(false, Ordering::Relaxed), - _ => {}, - } - } } fn handle_ancestor_search_state( @@ -594,14 +576,20 @@ impl ChainSync { } } - /// Handle new block data. + /// Handle a response from the remote to a block request that we made. + /// + /// `request` must be the original request that triggered `response`. + /// + /// If this corresponds to a valid block, this outputs the block that must be imported in the + /// import queue. + #[must_use] pub(crate) fn on_block_data( &mut self, protocol: &mut Context, who: PeerId, request: message::BlockRequest, response: message::BlockResponse - ) { + ) -> Option<(BlockOrigin, Vec>)> { let new_blocks: Vec> = if let Some(ref mut peer) = self.peers.get_mut(&who) { let mut blocks = response.blocks; if request.direction == message::Direction::Descending { @@ -649,13 +637,13 @@ impl ChainSync { debug!(target: "sync", "Invalid response when searching for ancestor from {}", who); protocol.report_peer(who.clone(), i32::min_value()); protocol.disconnect_peer(who); - return; + return None }, (_, Err(e)) => { info!("Error answering legitimate blockchain query: {:?}", e); protocol.report_peer(who.clone(), ANCESTRY_BLOCK_ERROR_REPUTATION_CHANGE); protocol.disconnect_peer(who); - return; + return None }, }; if block_hash_match && peer.common_number < num { @@ -665,12 +653,12 @@ impl ChainSync { trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", who); protocol.report_peer(who.clone(), GENESIS_MISMATCH_REPUTATION_CHANGE); protocol.disconnect_peer(who); - return; + return None } if let Some((next_state, next_block_num)) = Self::handle_ancestor_search_state(state, num, block_hash_match) { peer.state = PeerSyncState::AncestorSearch(next_block_num, next_state); Self::request_ancestry(protocol, who, next_block_num); - return; + return None } else { peer.state = PeerSyncState::Available; vec![] @@ -702,17 +690,23 @@ impl ChainSync { self.queue_blocks .extend(new_blocks.iter().map(|b| b.hash.clone())); self.best_importing_number = max(new_best_importing_number, self.best_importing_number); - self.import_queue.import_blocks(origin, new_blocks); + Some((origin, new_blocks)) } - /// Handle new justification data. + /// Handle a response from the remote to a justification request that we made. + /// + /// `request` must be the original request that triggered `response`. + /// + /// Returns `Some` if this produces a justification that must be imported into the import + /// queue. + #[must_use] pub(crate) fn on_block_justification_data( &mut self, protocol: &mut Context, who: PeerId, _request: message::BlockRequest, response: message::BlockResponse, - ) { + ) -> Option<(PeerId, B::Hash, NumberFor, Justification)> { if let Some(ref mut peer) = self.peers.get_mut(&who) { if let PeerSyncState::DownloadingJustification(hash) = peer.state { peer.state = PeerSyncState::Available; @@ -725,13 +719,12 @@ impl ChainSync { who, hash, response.hash); protocol.report_peer(who.clone(), i32::min_value()); protocol.disconnect_peer(who); - return; + return None; } - self.justifications.on_response( + return self.justifications.on_response( who, response.justification, - &*self.import_queue, ); }, None => { @@ -741,16 +734,18 @@ impl ChainSync { who, hash, ); - return; + return None; }, } } } self.maintain_sync(protocol); + None } - /// A batch of blocks have been processed, with or without errors. + /// Call this when a batch of blocks have been processed by the import queue, with or without + /// errors. pub fn blocks_processed(&mut self, processed_blocks: Vec, has_error: bool) { for hash in processed_blocks { self.queue_blocks.remove(&hash); @@ -762,9 +757,6 @@ impl ChainSync { /// Maintain the sync process (download new blocks, fetch justifications). pub fn maintain_sync(&mut self, protocol: &mut Context) { - if self.is_stopping.load(Ordering::SeqCst) { - return - } let peers: Vec = self.peers.keys().map(|p| p.clone()).collect(); for peer in peers { self.download_new(protocol, peer); @@ -772,14 +764,16 @@ impl ChainSync { self.justifications.dispatch(&mut self.peers, protocol); } - /// Called periodically to perform any time-based actions. + /// Called periodically to perform any time-based actions. Must be called at a regular + /// interval. pub fn tick(&mut self, protocol: &mut Context) { self.justifications.dispatch(&mut self.peers, protocol); } /// Request a justification for the given block. /// - /// Queues a new justification request and tries to dispatch all pending requests. + /// Uses `protocol` to queue a new justification request and tries to dispatch all pending + /// requests. pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor, protocol: &mut Context) { self.justifications.queue_request( &(*hash, number), @@ -794,15 +788,12 @@ impl ChainSync { self.justifications.clear(); } + /// Call this when a justification has been processed by the import queue, with or without + /// errors. pub fn justification_import_result(&mut self, hash: B::Hash, number: NumberFor, success: bool) { self.justifications.justification_import_result(hash, number, success); } - pub fn stop(&self) { - self.is_stopping.store(true, Ordering::SeqCst); - self.import_queue.stop(); - } - /// Notify about successful import of the given block. pub fn block_imported(&mut self, hash: &B::Hash, number: NumberFor) { trace!(target: "sync", "Block imported successfully {} ({})", number, hash); @@ -820,19 +811,10 @@ impl ChainSync { } fn block_queued(&mut self, hash: &B::Hash, number: NumberFor) { - let best_seen = self.best_seen_block(); - let previous_state = self.state(&best_seen); if number > self.best_queued_number { self.best_queued_number = number; self.best_queued_hash = *hash; } - let current_state = self.state(&best_seen); - // If the latest queued block changed our state, update is_major_syncing. - match (previous_state, current_state) { - (SyncState::Idle, SyncState::Downloading) => self.is_major_syncing.store(true, Ordering::Relaxed), - (SyncState::Downloading, SyncState::Idle) => self.is_major_syncing.store(false, Ordering::Relaxed), - _ => {}, - } // Update common blocks for (n, peer) in self.peers.iter_mut() { if let PeerSyncState::AncestorSearch(_, _) = peer.state { @@ -848,12 +830,13 @@ impl ChainSync { } } + /// Sets the new head of chain. pub(crate) fn update_chain_info(&mut self, best_header: &B::Header) { let hash = best_header.hash(); self.block_queued(&hash, best_header.number().clone()) } - /// Handle new block announcement. + /// Call when a node announces a new block. pub(crate) fn on_block_announce(&mut self, protocol: &mut Context, who: PeerId, hash: B::Hash, header: &B::Header) { let number = *header.number(); debug!(target: "sync", "Received block announcement with number {:?}", number); @@ -929,23 +912,10 @@ impl ChainSync { block_status(&*protocol.client(), &self.queue_blocks, *hash).ok().map_or(false, |s| s != BlockStatus::Unknown) } - /// Handle disconnected peer. + /// Call when a peer has disconnected. pub(crate) fn peer_disconnected(&mut self, protocol: &mut Context, who: PeerId) { - let previous_best_seen = self.best_seen_block(); - let previous_state = self.state(&previous_best_seen); self.blocks.clear_peer_download(&who); self.peers.remove(&who); - if self.peers.len() == 0 { - // We're not connected to any peer anymore. - self.is_offline.store(true, Ordering::Relaxed); - } - let current_best_seen = self.best_seen_block(); - let current_state = self.state(¤t_best_seen); - // We removed a peer, let's see if this put us in idle state and is_major_syncing should be updated. - match (previous_state, current_state) { - (SyncState::Downloading, SyncState::Idle) => self.is_major_syncing.store(false, Ordering::Relaxed), - _ => {}, - } self.justifications.peer_disconnected(who); self.maintain_sync(protocol); } @@ -973,12 +943,6 @@ impl ChainSync { } } - /// Clear all sync data. - pub(crate) fn clear(&mut self) { - self.blocks.clear(); - self.peers.clear(); - } - // Download old block with known parent. fn download_stale(&mut self, protocol: &mut Context, who: PeerId, hash: &B::Hash) { if let Some(ref mut peer) = self.peers.get_mut(&who) { diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index 5965868472..f8d0ce9e3b 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -23,7 +23,7 @@ mod sync; use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::Arc; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicBool, Ordering}; use log::trace; use client; @@ -40,7 +40,7 @@ use crate::message::Message; use network_libp2p::PeerId; use parking_lot::{Mutex, RwLock}; use primitives::{H256, sr25519::Public as AuthorityId}; -use crate::protocol::{ConnectedPeer, Context, Protocol, ProtocolMsg}; +use crate::protocol::{ConnectedPeer, Context, Protocol, ProtocolMsg, CustomMessageOutcome}; use runtime_primitives::generic::BlockId; use runtime_primitives::traits::{AuthorityIdFor, Block as BlockT, Digest, DigestItem, Header, NumberFor}; use runtime_primitives::{Justification, ConsensusEngineId}; @@ -427,11 +427,6 @@ impl> Peer { *finalized_hash = Some(info.chain.finalized_hash); } - /// Restart sync for a peer. - fn restart_sync(&self) { - self.net_proto_channel.send_from_client(ProtocolMsg::Abort); - } - /// Push a message into the gossip network and relay to peers. /// `TestNet::sync_step` needs to be called to ensure it's propagated. pub fn gossip_message( @@ -654,30 +649,46 @@ pub trait TestNetFactory: Sized { let (network_to_protocol_sender, mut network_to_protocol_rx) = mpsc::unbounded(); let (mut protocol, protocol_sender) = Protocol::new( - is_offline.clone(), - is_major_syncing.clone(), peers.clone(), network_sender.clone(), config.clone(), client.clone(), - import_queue.clone(), None, tx_pool, specialization, ).unwrap(); + let is_offline2 = is_offline.clone(); + let is_major_syncing2 = is_major_syncing.clone(); + let import_queue2 = import_queue.clone(); + std::thread::spawn(move || { tokio::runtime::current_thread::run(futures::future::poll_fn(move || { while let Async::Ready(msg) = network_to_protocol_rx.poll().unwrap() { - match msg { - Some(FromNetworkMsg::PeerConnected(peer_id, debug_msg)) => - protocol.on_peer_connected(peer_id, debug_msg), - Some(FromNetworkMsg::PeerDisconnected(peer_id, debug_msg)) => - protocol.on_peer_disconnected(peer_id, debug_msg), + let outcome = match msg { + Some(FromNetworkMsg::PeerConnected(peer_id, debug_msg)) => { + protocol.on_peer_connected(peer_id, debug_msg); + CustomMessageOutcome::None + }, + Some(FromNetworkMsg::PeerDisconnected(peer_id, debug_msg)) => { + protocol.on_peer_disconnected(peer_id, debug_msg); + CustomMessageOutcome::None + }, Some(FromNetworkMsg::CustomMessage(peer_id, message)) => protocol.on_custom_message(peer_id, message), - Some(FromNetworkMsg::Synchronize) => protocol.synchronize(), + Some(FromNetworkMsg::Synchronize) => { + protocol.synchronize(); + CustomMessageOutcome::None + }, None => return Ok(Async::Ready(())) + }; + + match outcome { + CustomMessageOutcome::BlockImport(origin, blocks) => + import_queue2.import_blocks(origin, blocks), + CustomMessageOutcome::JustificationImport(origin, hash, nb, justification) => + import_queue2.import_justification(origin, hash, nb, justification), + CustomMessageOutcome::None => {} } } @@ -685,6 +696,9 @@ pub trait TestNetFactory: Sized { return Ok(Async::Ready(())) } + is_offline2.store(protocol.is_offline(), Ordering::Relaxed); + is_major_syncing2.store(protocol.is_major_syncing(), Ordering::Relaxed); + Ok(Async::NotReady) })); }); @@ -803,11 +817,6 @@ pub trait TestNetFactory: Sized { self.peers().iter().for_each(|peer| peer.send_finality_notifications()) } - /// Restart sync for a peer. - fn restart_peer(&mut self, i: usize) { - self.peers()[i].restart_sync(); - } - /// Perform synchronization until complete, if provided the /// given nodes set are excluded from sync. fn sync_with(&mut self, disconnect: bool, disconnected: Option>) { diff --git a/substrate/core/network/src/test/sync.rs b/substrate/core/network/src/test/sync.rs index 9274722693..6d582d858f 100644 --- a/substrate/core/network/src/test/sync.rs +++ b/substrate/core/network/src/test/sync.rs @@ -33,7 +33,6 @@ fn test_ancestor_search_when_common_is(n: usize) { net.peer(1).push_blocks(100, false); net.peer(2).push_blocks(100, false); - net.restart_peer(0); net.sync(); assert!(net.peer(0).client.backend().as_in_memory().blockchain() .canon_equals_to(net.peer(1).client.backend().as_in_memory().blockchain())); @@ -96,6 +95,7 @@ fn sync_cycle_from_offline_to_syncing_to_offline() { net.peer(peer).on_disconnect(net.peer(other)); } } + net.sync(); assert!(net.peer(peer).is_offline()); assert!(!net.peer(peer).is_major_syncing()); } @@ -119,6 +119,7 @@ fn syncing_node_not_major_syncing_when_disconnected() { net.peer(1).on_disconnect(net.peer(2)); // Peer 1 is not major-syncing. + net.sync(); assert!(!net.peer(1).is_major_syncing()); } @@ -141,7 +142,6 @@ fn sync_from_two_peers_with_ancestry_search_works() { net.peer(0).push_blocks(10, true); net.peer(1).push_blocks(100, false); net.peer(2).push_blocks(100, false); - net.restart_peer(0); net.sync(); assert!(net.peer(0).client.backend().as_in_memory().blockchain() .canon_equals_to(net.peer(1).client.backend().as_in_memory().blockchain())); @@ -156,7 +156,6 @@ fn ancestry_search_works_when_backoff_is_one() { net.peer(1).push_blocks(2, false); net.peer(2).push_blocks(2, false); - net.restart_peer(0); net.sync(); assert!(net.peer(0).client.backend().as_in_memory().blockchain() .canon_equals_to(net.peer(1).client.backend().as_in_memory().blockchain())); @@ -171,7 +170,6 @@ fn ancestry_search_works_when_ancestor_is_genesis() { net.peer(1).push_blocks(100, false); net.peer(2).push_blocks(100, false); - net.restart_peer(0); net.sync(); assert!(net.peer(0).client.backend().as_in_memory().blockchain() .canon_equals_to(net.peer(1).client.backend().as_in_memory().blockchain()));