Isolate the code of ChainSync and turn it into a state machine (#2497)

* Move the is_offline and is_major_syncing logic to service.rs

* Move the ImportQueue to service.rs

* Remove stop() and abort()

* Add some more documentation to sync.rs
This commit is contained in:
Pierre Krieger
2019-05-12 19:23:45 +02:00
committed by Arkadiy Paronyan
parent cbe13c459b
commit 6c41d0b3ec
5 changed files with 164 additions and 170 deletions
+40 -38
View File
@@ -17,9 +17,9 @@
use futures::{prelude::*, sync::mpsc};
use network_libp2p::PeerId;
use primitives::storage::StorageKey;
use runtime_primitives::{generic::BlockId, ConsensusEngineId};
use consensus::{import_queue::IncomingBlock, import_queue::Origin, BlockOrigin};
use runtime_primitives::{generic::BlockId, ConsensusEngineId, Justification};
use runtime_primitives::traits::{As, Block as BlockT, Header as HeaderT, NumberFor, Zero};
use consensus::import_queue::ImportQueue;
use crate::message::{self, BlockRequest as BlockRequestMessage, Message};
use crate::message::generic::{Message as GenericMessage, ConsensusMessage};
use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient};
@@ -32,7 +32,6 @@ use parking_lot::RwLock;
use rustc_hex::ToHex;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::{cmp, num::NonZeroUsize, time};
use log::{trace, debug, warn, error};
use crate::chain::Client;
@@ -276,10 +275,6 @@ pub enum ProtocolMsg<B: BlockT, S: NetworkSpecialization<B>> {
ExecuteWithGossip(Box<GossipTask<B> + Send + 'static>),
/// Incoming gossip consensus message.
GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec<u8>, GossipMessageRecipient),
/// Tell protocol to abort sync (does not stop protocol).
/// Only used in tests.
#[cfg(any(test, feature = "test-helpers"))]
Abort,
/// Tell protocol to perform regular maintenance.
#[cfg(any(test, feature = "test-helpers"))]
Tick,
@@ -291,20 +286,17 @@ pub enum ProtocolMsg<B: BlockT, S: NetworkSpecialization<B>> {
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
/// Create a new instance.
pub fn new(
is_offline: Arc<AtomicBool>,
is_major_syncing: Arc<AtomicBool>,
connected_peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<B>>>>,
network_chan: NetworkChan<B>,
config: ProtocolConfig,
chain: Arc<Client<B>>,
import_queue: Box<ImportQueue<B>>,
on_demand: Option<Arc<OnDemandService<B>>>,
transaction_pool: Arc<TransactionPool<H, B>>,
specialization: S,
) -> error::Result<(Protocol<B, S, H>, mpsc::UnboundedSender<ProtocolMsg<B, S>>)> {
let (protocol_sender, port) = mpsc::unbounded();
let info = chain.info()?;
let sync = ChainSync::new(is_offline, is_major_syncing, config.roles, &info, import_queue);
let sync = ChainSync::new(config.roles, &info);
let protocol = Protocol {
network_chan,
port,
@@ -341,6 +333,14 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
.count(),
}
}
pub fn is_major_syncing(&self) -> bool {
self.sync.status().is_major_syncing()
}
pub fn is_offline(&self) -> bool {
self.sync.status().is_offline()
}
}
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Future for Protocol<B, S, H> {
@@ -358,10 +358,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Future for Protocol<B,
loop {
match self.port.poll() {
Ok(Async::Ready(None)) | Err(_) => {
self.stop();
return Ok(Async::Ready(()))
}
Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())),
Ok(Async::Ready(Some(msg))) => if !self.handle_client_msg(msg) {
return Ok(Async::Ready(()))
}
@@ -415,8 +412,6 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
#[cfg(any(test, feature = "test-helpers"))]
ProtocolMsg::Tick => self.tick(),
#[cfg(any(test, feature = "test-helpers"))]
ProtocolMsg::Abort => self.abort(),
#[cfg(any(test, feature = "test-helpers"))]
ProtocolMsg::Synchronize => {
trace!(target: "sync", "handle_client_msg: received Synchronize msg");
self.network_chan.send(NetworkMsg::Synchronized)
@@ -457,14 +452,15 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
}
}
pub fn on_custom_message(&mut self, who: PeerId, message: Message<B>) {
pub fn on_custom_message(&mut self, who: PeerId, message: Message<B>) -> CustomMessageOutcome<B> {
match message {
GenericMessage::Status(s) => self.on_status_message(who, s),
GenericMessage::BlockRequest(r) => self.on_block_request(who, r),
GenericMessage::BlockResponse(r) => {
if let Some(request) = self.handle_response(who.clone(), &r) {
self.on_block_response(who.clone(), request, r);
let outcome = self.on_block_response(who.clone(), request, r);
self.update_peer_info(&who);
return outcome
}
},
GenericMessage::BlockAnnounce(announce) => {
@@ -495,6 +491,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
&mut Some(other),
),
}
CustomMessageOutcome::None
}
fn send_message(&mut self, who: PeerId, message: Message<B>) {
@@ -643,7 +641,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
peer: PeerId,
request: message::BlockRequest<B>,
response: message::BlockResponse<B>,
) {
) -> CustomMessageOutcome<B> {
let blocks_range = match (
response.blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())),
response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
@@ -658,14 +656,26 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
// TODO [andre]: move this logic to the import queue so that
// justifications are imported asynchronously (#1482)
if request.fields == message::BlockAttributes::JUSTIFICATION {
self.sync.on_block_justification_data(
let outcome = self.sync.on_block_justification_data(
&mut ProtocolContext::new(&mut self.context_data, &self.network_chan),
peer,
request,
response,
);
if let Some((origin, hash, nb, just)) = outcome {
CustomMessageOutcome::JustificationImport(origin, hash, nb, just)
} else {
CustomMessageOutcome::None
}
} else {
self.sync.on_block_data(&mut ProtocolContext::new(&mut self.context_data, &self.network_chan), peer, request, response);
let outcome = self.sync.on_block_data(&mut ProtocolContext::new(&mut self.context_data, &self.network_chan), peer, request, response);
if let Some((origin, blocks)) = outcome {
CustomMessageOutcome::BlockImport(origin, blocks)
} else {
CustomMessageOutcome::None
}
}
}
@@ -891,22 +901,6 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
}
}
fn abort(&mut self) {
self.sync.clear();
self.specialization.on_abort();
self.context_data.peers.clear();
self.handshaking_peers.clear();
self.consensus_gossip.abort();
}
fn stop(&mut self) {
// stop processing import requests first (without holding a sync lock)
self.sync.stop();
// and then clear all the sync data
self.abort();
}
fn on_block_announce(&mut self, who: PeerId, announce: message::BlockAnnounce<B::Header>) {
let header = announce.header;
let hash = header.hash();
@@ -1107,6 +1101,14 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
}
}
/// Outcome of an incoming custom message.
#[derive(Debug)]
pub enum CustomMessageOutcome<B: BlockT> {
BlockImport(BlockOrigin, Vec<IncomingBlock<B>>),
JustificationImport(Origin, B::Hash, NumberFor<B>, Justification),
None,
}
fn send_message<B: BlockT, H: ExHashT>(
peers: &mut HashMap<PeerId, Peer<B, H>>,
network_chan: &NetworkChan<B>,
+29 -8
View File
@@ -31,7 +31,7 @@ use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId
use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient};
use crate::message::Message;
use crate::protocol::{self, Context, Protocol, ConnectedPeer, ProtocolMsg, ProtocolStatus, PeerInfo};
use crate::protocol::{self, Context, CustomMessageOutcome, Protocol, ConnectedPeer, ProtocolMsg, ProtocolStatus, PeerInfo};
use crate::config::Params;
use crate::error::Error;
use crate::specialization::NetworkSpecialization;
@@ -175,13 +175,10 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
let is_major_syncing = Arc::new(AtomicBool::new(false));
let peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<B>>>> = Arc::new(Default::default());
let (protocol, protocol_sender) = Protocol::new(
is_offline.clone(),
is_major_syncing.clone(),
peers.clone(),
network_chan.clone(),
params.config,
params.chain,
import_queue.clone(),
params.on_demand,
params.transaction_pool,
params.specialization,
@@ -189,7 +186,10 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
let versions: Vec<_> = ((protocol::MIN_VERSION as u8)..=(protocol::CURRENT_VERSION as u8)).collect();
let registered = RegisteredProtocol::new(protocol_id, &versions);
let (thread, network, peerset) = start_thread(
is_offline.clone(),
is_major_syncing.clone(),
protocol,
import_queue.clone(),
network_port,
status_sinks.clone(),
params.network_config,
@@ -475,7 +475,10 @@ pub enum NetworkMsg<B: BlockT + 'static> {
/// Starts the background thread that handles the networking.
fn start_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
is_offline: Arc<AtomicBool>,
is_major_syncing: Arc<AtomicBool>,
protocol: Protocol<B, S, H>,
import_queue: Box<ImportQueue<B>>,
network_port: NetworkPort<B>,
status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<ProtocolStatus<B>>>>>,
config: NetworkConfiguration,
@@ -495,7 +498,7 @@ fn start_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
let mut runtime = RuntimeBuilder::new().name_prefix("libp2p-").build()?;
let peerset_clone = peerset.clone();
let thread = thread::Builder::new().name("network".to_string()).spawn(move || {
let fut = run_thread(protocol, service_clone, network_port, status_sinks, peerset_clone)
let fut = run_thread(is_offline, is_major_syncing, protocol, service_clone, import_queue, network_port, status_sinks, peerset_clone)
.select(close_rx.then(|_| Ok(())))
.map(|(val, _)| val)
.map_err(|(err,_ )| err);
@@ -513,8 +516,11 @@ fn start_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
/// Runs the background thread that handles the networking.
fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
is_offline: Arc<AtomicBool>,
is_major_syncing: Arc<AtomicBool>,
mut protocol: Protocol<B, S, H>,
network_service: Arc<Mutex<NetworkService<Message<B>>>>,
import_queue: Box<ImportQueue<B>>,
network_port: NetworkPort<B>,
status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<ProtocolStatus<B>>>>>,
peerset: PeersetHandle,
@@ -551,7 +557,7 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
}
loop {
match network_service.lock().poll() {
let outcome = match network_service.lock().poll() {
Ok(Async::NotReady) => break,
Ok(Async::Ready(Some(NetworkServiceEvent::OpenedCustomProtocol { peer_id, version, debug_info, .. }))) => {
debug_assert!(
@@ -559,9 +565,12 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
&& version >= protocol::MIN_VERSION as u8
);
protocol.on_peer_connected(peer_id, debug_info);
CustomMessageOutcome::None
}
Ok(Async::Ready(Some(NetworkServiceEvent::ClosedCustomProtocol { peer_id, debug_info, .. }))) =>
protocol.on_peer_disconnected(peer_id, debug_info),
Ok(Async::Ready(Some(NetworkServiceEvent::ClosedCustomProtocol { peer_id, debug_info, .. }))) => {
protocol.on_peer_disconnected(peer_id, debug_info);
CustomMessageOutcome::None
},
Ok(Async::Ready(Some(NetworkServiceEvent::CustomMessage { peer_id, message, .. }))) =>
protocol.on_custom_message(peer_id, message),
Ok(Async::Ready(Some(NetworkServiceEvent::Clogged { peer_id, messages, .. }))) => {
@@ -570,15 +579,27 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
debug!(target: "sync", "{:?}", msg);
protocol.on_clogged_peer(peer_id.clone(), Some(msg));
}
CustomMessageOutcome::None
}
Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
Err(err) => {
error!(target: "sync", "Error in the network: {:?}", err);
return Err(err)
}
};
match outcome {
CustomMessageOutcome::BlockImport(origin, blocks) =>
import_queue.import_blocks(origin, blocks),
CustomMessageOutcome::JustificationImport(origin, hash, nb, justification) =>
import_queue.import_justification(origin, hash, nb, justification),
CustomMessageOutcome::None => {}
}
}
is_offline.store(protocol.is_offline(), Ordering::Relaxed);
is_major_syncing.store(protocol.is_major_syncing(), Ordering::Relaxed);
Ok(Async::NotReady)
})
}
+63 -99
View File
@@ -14,6 +14,22 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Contains the state of the chain synchronization process
//!
//! At any given point in time, a running node tries as much as possible to be at the head of the
//! chain. This module handles the logic of which blocks to request from remotes, and processing
//! responses. It yields blocks to check and potentially move to the database.
//!
//! # Usage
//!
//! The `ChainSync` struct maintains the state of the block requests. Whenever something happens on
//! the network, or whenever a block has been successfully verified, call the appropriate method in
//! order to update it. You must also regularly call `tick()`.
//!
//! To each of these methods, you must pass a `Context` object that the `ChainSync` will use to
//! send its new outgoing requests.
//!
use std::cmp::max;
use std::collections::{HashMap, VecDeque};
use std::time::{Duration, Instant};
@@ -22,8 +38,7 @@ use crate::protocol::Context;
use fork_tree::ForkTree;
use network_libp2p::PeerId;
use client::{BlockStatus, ClientInfo};
use consensus::BlockOrigin;
use consensus::import_queue::{ImportQueue, IncomingBlock};
use consensus::{BlockOrigin, import_queue::IncomingBlock};
use client::error::Error as ClientError;
use crate::blocks::BlockCollection;
use runtime_primitives::Justification;
@@ -32,8 +47,6 @@ use runtime_primitives::generic::BlockId;
use crate::message;
use crate::config::Roles;
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
// Maximum blocks to request in a single packet.
const MAX_BLOCKS_TO_REQUEST: usize = 128;
@@ -285,20 +298,21 @@ impl<B: BlockT> PendingJustifications<B> {
/// Processes the response for the request previously sent to the given
/// peer. Queues a retry in case the given justification
/// was `None`.
///
/// Returns `Some` if this produces a justification that must be imported in the import queue.
#[must_use]
fn on_response(
&mut self,
who: PeerId,
justification: Option<Justification>,
import_queue: &ImportQueue<B>,
) {
) -> Option<(PeerId, B::Hash, NumberFor<B>, Justification)> {
// we assume that the request maps to the given response, this is
// currently enforced by the outer network protocol before passing on
// messages to chain sync.
if let Some(request) = self.peer_requests.remove(&who) {
if let Some(justification) = justification {
import_queue.import_justification(who.clone(), request.0, request.1, justification);
self.importing_requests.insert(request);
return
return Some((who, request.0, request.1, justification))
}
self.previous_requests
@@ -308,6 +322,8 @@ impl<B: BlockT> PendingJustifications<B> {
self.pending_requests.push_front(request);
}
None
}
/// Removes any pending justification requests for blocks lower than the
@@ -355,12 +371,8 @@ pub struct ChainSync<B: BlockT> {
best_queued_hash: B::Hash,
required_block_attributes: message::BlockAttributes,
justifications: PendingJustifications<B>,
import_queue: Box<ImportQueue<B>>,
queue_blocks: HashSet<B::Hash>,
best_importing_number: NumberFor<B>,
is_stopping: AtomicBool,
is_offline: Arc<AtomicBool>,
is_major_syncing: Arc<AtomicBool>,
}
/// Reported sync state.
@@ -400,13 +412,10 @@ impl<B: BlockT> Status<B> {
}
impl<B: BlockT> ChainSync<B> {
/// Create a new instance.
/// Create a new instance. Pass the initial known state of the chain.
pub(crate) fn new(
is_offline: Arc<AtomicBool>,
is_major_syncing: Arc<AtomicBool>,
role: Roles,
info: &ClientInfo<B>,
import_queue: Box<ImportQueue<B>>
) -> Self {
let mut required_block_attributes = message::BlockAttributes::HEADER | message::BlockAttributes::JUSTIFICATION;
if role.intersects(Roles::FULL | Roles::AUTHORITY) {
@@ -421,12 +430,8 @@ impl<B: BlockT> ChainSync<B> {
best_queued_number: info.best_queued_number.unwrap_or(info.chain.best_number),
justifications: PendingJustifications::new(),
required_block_attributes,
import_queue,
queue_blocks: Default::default(),
best_importing_number: Zero::zero(),
is_stopping: Default::default(),
is_offline,
is_major_syncing,
}
}
@@ -441,7 +446,7 @@ impl<B: BlockT> ChainSync<B> {
}
}
/// Returns peer sync status (if any).
/// Returns the state of the sync of the given peer. Returns `None` if the peer is unknown.
pub(crate) fn peer_info(&self, who: &PeerId) -> Option<PeerInfo<B>> {
self.peers.get(who).map(|peer| {
PeerInfo {
@@ -462,15 +467,8 @@ impl<B: BlockT> ChainSync<B> {
}
}
/// Handle new connected peer.
/// Handle new connected peer. Call this method whenever we connect to a new peer.
pub(crate) fn new_peer(&mut self, protocol: &mut Context<B>, who: PeerId) {
// Initialize some variables to determine if
// is_offline or is_major_syncing should be updated
// after processing this new peer.
let previous_len = self.peers.len();
let previous_best_seen = self.best_seen_block();
let previous_state = self.state(&previous_best_seen);
if let Some(info) = protocol.peer_info(&who) {
let status = block_status(&*protocol.client(), &self.queue_blocks, info.best_hash);
match (status, info.best_number) {
@@ -538,22 +536,6 @@ impl<B: BlockT> ChainSync<B> {
}
}
}
let current_best_seen = self.best_seen_block();
let current_state = self.state(&current_best_seen);
let current_len = self.peers.len();
if previous_len == 0 && current_len > 0 {
// We were offline, and now we're connected to at least one peer.
self.is_offline.store(false, Ordering::Relaxed);
}
if previous_len < current_len {
// We added a peer, let's see if major_syncing should be updated.
match (previous_state, current_state) {
(SyncState::Idle, SyncState::Downloading) => self.is_major_syncing.store(true, Ordering::Relaxed),
(SyncState::Downloading, SyncState::Idle) => self.is_major_syncing.store(false, Ordering::Relaxed),
_ => {},
}
}
}
fn handle_ancestor_search_state(
@@ -594,14 +576,20 @@ impl<B: BlockT> ChainSync<B> {
}
}
/// Handle new block data.
/// Handle a response from the remote to a block request that we made.
///
/// `request` must be the original request that triggered `response`.
///
/// If this corresponds to a valid block, this outputs the block that must be imported in the
/// import queue.
#[must_use]
pub(crate) fn on_block_data(
&mut self,
protocol: &mut Context<B>,
who: PeerId,
request: message::BlockRequest<B>,
response: message::BlockResponse<B>
) {
) -> Option<(BlockOrigin, Vec<IncomingBlock<B>>)> {
let new_blocks: Vec<IncomingBlock<B>> = if let Some(ref mut peer) = self.peers.get_mut(&who) {
let mut blocks = response.blocks;
if request.direction == message::Direction::Descending {
@@ -649,13 +637,13 @@ impl<B: BlockT> ChainSync<B> {
debug!(target: "sync", "Invalid response when searching for ancestor from {}", who);
protocol.report_peer(who.clone(), i32::min_value());
protocol.disconnect_peer(who);
return;
return None
},
(_, Err(e)) => {
info!("Error answering legitimate blockchain query: {:?}", e);
protocol.report_peer(who.clone(), ANCESTRY_BLOCK_ERROR_REPUTATION_CHANGE);
protocol.disconnect_peer(who);
return;
return None
},
};
if block_hash_match && peer.common_number < num {
@@ -665,12 +653,12 @@ impl<B: BlockT> ChainSync<B> {
trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", who);
protocol.report_peer(who.clone(), GENESIS_MISMATCH_REPUTATION_CHANGE);
protocol.disconnect_peer(who);
return;
return None
}
if let Some((next_state, next_block_num)) = Self::handle_ancestor_search_state(state, num, block_hash_match) {
peer.state = PeerSyncState::AncestorSearch(next_block_num, next_state);
Self::request_ancestry(protocol, who, next_block_num);
return;
return None
} else {
peer.state = PeerSyncState::Available;
vec![]
@@ -702,17 +690,23 @@ impl<B: BlockT> ChainSync<B> {
self.queue_blocks
.extend(new_blocks.iter().map(|b| b.hash.clone()));
self.best_importing_number = max(new_best_importing_number, self.best_importing_number);
self.import_queue.import_blocks(origin, new_blocks);
Some((origin, new_blocks))
}
/// Handle new justification data.
/// Handle a response from the remote to a justification request that we made.
///
/// `request` must be the original request that triggered `response`.
///
/// Returns `Some` if this produces a justification that must be imported into the import
/// queue.
#[must_use]
pub(crate) fn on_block_justification_data(
&mut self,
protocol: &mut Context<B>,
who: PeerId,
_request: message::BlockRequest<B>,
response: message::BlockResponse<B>,
) {
) -> Option<(PeerId, B::Hash, NumberFor<B>, Justification)> {
if let Some(ref mut peer) = self.peers.get_mut(&who) {
if let PeerSyncState::DownloadingJustification(hash) = peer.state {
peer.state = PeerSyncState::Available;
@@ -725,13 +719,12 @@ impl<B: BlockT> ChainSync<B> {
who, hash, response.hash);
protocol.report_peer(who.clone(), i32::min_value());
protocol.disconnect_peer(who);
return;
return None;
}
self.justifications.on_response(
return self.justifications.on_response(
who,
response.justification,
&*self.import_queue,
);
},
None => {
@@ -741,16 +734,18 @@ impl<B: BlockT> ChainSync<B> {
who,
hash,
);
return;
return None;
},
}
}
}
self.maintain_sync(protocol);
None
}
/// A batch of blocks have been processed, with or without errors.
/// Call this when a batch of blocks have been processed by the import queue, with or without
/// errors.
pub fn blocks_processed(&mut self, processed_blocks: Vec<B::Hash>, has_error: bool) {
for hash in processed_blocks {
self.queue_blocks.remove(&hash);
@@ -762,9 +757,6 @@ impl<B: BlockT> ChainSync<B> {
/// Maintain the sync process (download new blocks, fetch justifications).
pub fn maintain_sync(&mut self, protocol: &mut Context<B>) {
if self.is_stopping.load(Ordering::SeqCst) {
return
}
let peers: Vec<PeerId> = self.peers.keys().map(|p| p.clone()).collect();
for peer in peers {
self.download_new(protocol, peer);
@@ -772,14 +764,16 @@ impl<B: BlockT> ChainSync<B> {
self.justifications.dispatch(&mut self.peers, protocol);
}
/// Called periodically to perform any time-based actions.
/// Called periodically to perform any time-based actions. Must be called at a regular
/// interval.
pub fn tick(&mut self, protocol: &mut Context<B>) {
self.justifications.dispatch(&mut self.peers, protocol);
}
/// Request a justification for the given block.
///
/// Queues a new justification request and tries to dispatch all pending requests.
/// Uses `protocol` to queue a new justification request and tries to dispatch all pending
/// requests.
pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>, protocol: &mut Context<B>) {
self.justifications.queue_request(
&(*hash, number),
@@ -794,15 +788,12 @@ impl<B: BlockT> ChainSync<B> {
self.justifications.clear();
}
/// Call this when a justification has been processed by the import queue, with or without
/// errors.
pub fn justification_import_result(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
self.justifications.justification_import_result(hash, number, success);
}
pub fn stop(&self) {
self.is_stopping.store(true, Ordering::SeqCst);
self.import_queue.stop();
}
/// Notify about successful import of the given block.
pub fn block_imported(&mut self, hash: &B::Hash, number: NumberFor<B>) {
trace!(target: "sync", "Block imported successfully {} ({})", number, hash);
@@ -820,19 +811,10 @@ impl<B: BlockT> ChainSync<B> {
}
fn block_queued(&mut self, hash: &B::Hash, number: NumberFor<B>) {
let best_seen = self.best_seen_block();
let previous_state = self.state(&best_seen);
if number > self.best_queued_number {
self.best_queued_number = number;
self.best_queued_hash = *hash;
}
let current_state = self.state(&best_seen);
// If the latest queued block changed our state, update is_major_syncing.
match (previous_state, current_state) {
(SyncState::Idle, SyncState::Downloading) => self.is_major_syncing.store(true, Ordering::Relaxed),
(SyncState::Downloading, SyncState::Idle) => self.is_major_syncing.store(false, Ordering::Relaxed),
_ => {},
}
// Update common blocks
for (n, peer) in self.peers.iter_mut() {
if let PeerSyncState::AncestorSearch(_, _) = peer.state {
@@ -848,12 +830,13 @@ impl<B: BlockT> ChainSync<B> {
}
}
/// Sets the new head of chain.
pub(crate) fn update_chain_info(&mut self, best_header: &B::Header) {
let hash = best_header.hash();
self.block_queued(&hash, best_header.number().clone())
}
/// Handle new block announcement.
/// Call when a node announces a new block.
pub(crate) fn on_block_announce(&mut self, protocol: &mut Context<B>, who: PeerId, hash: B::Hash, header: &B::Header) {
let number = *header.number();
debug!(target: "sync", "Received block announcement with number {:?}", number);
@@ -929,23 +912,10 @@ impl<B: BlockT> ChainSync<B> {
block_status(&*protocol.client(), &self.queue_blocks, *hash).ok().map_or(false, |s| s != BlockStatus::Unknown)
}
/// Handle disconnected peer.
/// Call when a peer has disconnected.
pub(crate) fn peer_disconnected(&mut self, protocol: &mut Context<B>, who: PeerId) {
let previous_best_seen = self.best_seen_block();
let previous_state = self.state(&previous_best_seen);
self.blocks.clear_peer_download(&who);
self.peers.remove(&who);
if self.peers.len() == 0 {
// We're not connected to any peer anymore.
self.is_offline.store(true, Ordering::Relaxed);
}
let current_best_seen = self.best_seen_block();
let current_state = self.state(&current_best_seen);
// We removed a peer, let's see if this put us in idle state and is_major_syncing should be updated.
match (previous_state, current_state) {
(SyncState::Downloading, SyncState::Idle) => self.is_major_syncing.store(false, Ordering::Relaxed),
_ => {},
}
self.justifications.peer_disconnected(who);
self.maintain_sync(protocol);
}
@@ -973,12 +943,6 @@ impl<B: BlockT> ChainSync<B> {
}
}
/// Clear all sync data.
pub(crate) fn clear(&mut self) {
self.blocks.clear();
self.peers.clear();
}
// Download old block with known parent.
fn download_stale(&mut self, protocol: &mut Context<B>, who: PeerId, hash: &B::Hash) {
if let Some(ref mut peer) = self.peers.get_mut(&who) {
+30 -21
View File
@@ -23,7 +23,7 @@ mod sync;
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::{AtomicBool, Ordering};
use log::trace;
use client;
@@ -40,7 +40,7 @@ use crate::message::Message;
use network_libp2p::PeerId;
use parking_lot::{Mutex, RwLock};
use primitives::{H256, sr25519::Public as AuthorityId};
use crate::protocol::{ConnectedPeer, Context, Protocol, ProtocolMsg};
use crate::protocol::{ConnectedPeer, Context, Protocol, ProtocolMsg, CustomMessageOutcome};
use runtime_primitives::generic::BlockId;
use runtime_primitives::traits::{AuthorityIdFor, Block as BlockT, Digest, DigestItem, Header, NumberFor};
use runtime_primitives::{Justification, ConsensusEngineId};
@@ -427,11 +427,6 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> {
*finalized_hash = Some(info.chain.finalized_hash);
}
/// Restart sync for a peer.
fn restart_sync(&self) {
self.net_proto_channel.send_from_client(ProtocolMsg::Abort);
}
/// Push a message into the gossip network and relay to peers.
/// `TestNet::sync_step` needs to be called to ensure it's propagated.
pub fn gossip_message(
@@ -654,30 +649,46 @@ pub trait TestNetFactory: Sized {
let (network_to_protocol_sender, mut network_to_protocol_rx) = mpsc::unbounded();
let (mut protocol, protocol_sender) = Protocol::new(
is_offline.clone(),
is_major_syncing.clone(),
peers.clone(),
network_sender.clone(),
config.clone(),
client.clone(),
import_queue.clone(),
None,
tx_pool,
specialization,
).unwrap();
let is_offline2 = is_offline.clone();
let is_major_syncing2 = is_major_syncing.clone();
let import_queue2 = import_queue.clone();
std::thread::spawn(move || {
tokio::runtime::current_thread::run(futures::future::poll_fn(move || {
while let Async::Ready(msg) = network_to_protocol_rx.poll().unwrap() {
match msg {
Some(FromNetworkMsg::PeerConnected(peer_id, debug_msg)) =>
protocol.on_peer_connected(peer_id, debug_msg),
Some(FromNetworkMsg::PeerDisconnected(peer_id, debug_msg)) =>
protocol.on_peer_disconnected(peer_id, debug_msg),
let outcome = match msg {
Some(FromNetworkMsg::PeerConnected(peer_id, debug_msg)) => {
protocol.on_peer_connected(peer_id, debug_msg);
CustomMessageOutcome::None
},
Some(FromNetworkMsg::PeerDisconnected(peer_id, debug_msg)) => {
protocol.on_peer_disconnected(peer_id, debug_msg);
CustomMessageOutcome::None
},
Some(FromNetworkMsg::CustomMessage(peer_id, message)) =>
protocol.on_custom_message(peer_id, message),
Some(FromNetworkMsg::Synchronize) => protocol.synchronize(),
Some(FromNetworkMsg::Synchronize) => {
protocol.synchronize();
CustomMessageOutcome::None
},
None => return Ok(Async::Ready(()))
};
match outcome {
CustomMessageOutcome::BlockImport(origin, blocks) =>
import_queue2.import_blocks(origin, blocks),
CustomMessageOutcome::JustificationImport(origin, hash, nb, justification) =>
import_queue2.import_justification(origin, hash, nb, justification),
CustomMessageOutcome::None => {}
}
}
@@ -685,6 +696,9 @@ pub trait TestNetFactory: Sized {
return Ok(Async::Ready(()))
}
is_offline2.store(protocol.is_offline(), Ordering::Relaxed);
is_major_syncing2.store(protocol.is_major_syncing(), Ordering::Relaxed);
Ok(Async::NotReady)
}));
});
@@ -803,11 +817,6 @@ pub trait TestNetFactory: Sized {
self.peers().iter().for_each(|peer| peer.send_finality_notifications())
}
/// Restart sync for a peer.
fn restart_peer(&mut self, i: usize) {
self.peers()[i].restart_sync();
}
/// Perform synchronization until complete, if provided the
/// given nodes set are excluded from sync.
fn sync_with(&mut self, disconnect: bool, disconnected: Option<HashSet<usize>>) {
+2 -4
View File
@@ -33,7 +33,6 @@ fn test_ancestor_search_when_common_is(n: usize) {
net.peer(1).push_blocks(100, false);
net.peer(2).push_blocks(100, false);
net.restart_peer(0);
net.sync();
assert!(net.peer(0).client.backend().as_in_memory().blockchain()
.canon_equals_to(net.peer(1).client.backend().as_in_memory().blockchain()));
@@ -96,6 +95,7 @@ fn sync_cycle_from_offline_to_syncing_to_offline() {
net.peer(peer).on_disconnect(net.peer(other));
}
}
net.sync();
assert!(net.peer(peer).is_offline());
assert!(!net.peer(peer).is_major_syncing());
}
@@ -119,6 +119,7 @@ fn syncing_node_not_major_syncing_when_disconnected() {
net.peer(1).on_disconnect(net.peer(2));
// Peer 1 is not major-syncing.
net.sync();
assert!(!net.peer(1).is_major_syncing());
}
@@ -141,7 +142,6 @@ fn sync_from_two_peers_with_ancestry_search_works() {
net.peer(0).push_blocks(10, true);
net.peer(1).push_blocks(100, false);
net.peer(2).push_blocks(100, false);
net.restart_peer(0);
net.sync();
assert!(net.peer(0).client.backend().as_in_memory().blockchain()
.canon_equals_to(net.peer(1).client.backend().as_in_memory().blockchain()));
@@ -156,7 +156,6 @@ fn ancestry_search_works_when_backoff_is_one() {
net.peer(1).push_blocks(2, false);
net.peer(2).push_blocks(2, false);
net.restart_peer(0);
net.sync();
assert!(net.peer(0).client.backend().as_in_memory().blockchain()
.canon_equals_to(net.peer(1).client.backend().as_in_memory().blockchain()));
@@ -171,7 +170,6 @@ fn ancestry_search_works_when_ancestor_is_genesis() {
net.peer(1).push_blocks(100, false);
net.peer(2).push_blocks(100, false);
net.restart_peer(0);
net.sync();
assert!(net.peer(0).client.backend().as_in_memory().blockchain()
.canon_equals_to(net.peer(1).client.backend().as_in_memory().blockchain()));