From 077ed00951899c718345e0651c7eb87ed1eda2f6 Mon Sep 17 00:00:00 2001 From: Gregory Terzian <2792687+gterzian@users.noreply.github.com> Date: Mon, 25 Feb 2019 21:00:13 +0800 Subject: [PATCH] Remove blocking operations in SyncOracle implementation (#1852) * remove blocking operations in SyncOracle implementation * docs * docs --- substrate/core/network/src/protocol.rs | 17 ++---- substrate/core/network/src/service.rs | 28 +++++----- substrate/core/network/src/sync.rs | 71 ++++++++++++++++++++++-- substrate/core/network/src/test/mod.rs | 31 ++++++++--- substrate/core/network/src/test/sync.rs | 73 ++++++++++++++++++++++++- 5 files changed, 178 insertions(+), 42 deletions(-) diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index bc30d32008..a85b2e0cec 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -31,6 +31,7 @@ use crate::config::{ProtocolConfig, Roles}; use rustc_hex::ToHex; use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; +use std::sync::atomic::AtomicBool; use std::{cmp, num::NonZeroUsize, thread, time}; use log::{trace, debug, warn}; use crate::chain::Client; @@ -200,10 +201,6 @@ pub enum ProtocolMsg,> { ExecuteWithGossip(Box + Send + 'static>), /// Incoming gossip consensus message. GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec), - /// Is protocol currently major-syncing? - IsMajorSyncing(Sender), - /// Is protocol currently offline? - IsOffline(Sender), /// Return a list of peers currently known to protocol. Peers(Sender)>>), /// Let protocol know a peer is currenlty clogged. @@ -237,6 +234,8 @@ pub enum ProtocolMsg,> { impl, H: ExHashT> Protocol { /// Create a new instance. pub fn new( + is_offline: Arc, + is_major_syncing: Arc, network_chan: NetworkChan, config: ProtocolConfig, chain: Arc>, @@ -247,7 +246,7 @@ impl, H: ExHashT> Protocol { ) -> error::Result>> { let (sender, port) = channel::unbounded(); let info = chain.info()?; - let sync = ChainSync::new(config.roles, &info, import_queue); + let sync = ChainSync::new(is_offline, is_major_syncing, config.roles, &info, import_queue); let _ = thread::Builder::new() .name("Protocol".into()) .spawn(move || { @@ -330,14 +329,6 @@ impl, H: ExHashT> Protocol { ProtocolMsg::GossipConsensusMessage(topic, engine_id, message) => { self.gossip_consensus_message(topic, engine_id, message) } - ProtocolMsg::IsMajorSyncing(sender) => { - let is_syncing = self.sync.status().is_major_syncing(); - let _ = sender.send(is_syncing); - } - ProtocolMsg::IsOffline(sender) => { - let is_offline = self.sync.status().is_offline(); - let _ = sender.send(is_offline); - } ProtocolMsg::MaintainSync => { let mut context = ProtocolContext::new(&mut self.context_data, &self.network_chan); diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index df5c035ec6..f4011b11ee 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; use std::{io, thread}; use log::{warn, debug, error, trace, info}; use futures::{Async, Future, Stream, stream, sync::oneshot}; @@ -117,6 +118,10 @@ impl> Link for NetworkLink { /// Substrate network service. Handles network IO and manages connectivity. pub struct Service> { + // Are we connected to any peer? + is_offline: Arc, + // Are we actively catching up with the chain? + is_major_syncing: Arc, /// Network service network: Arc>>>, /// Protocol sender @@ -135,7 +140,12 @@ impl> Service { import_queue: Box>, ) -> Result<(Arc>, NetworkChan), Error> { let (network_chan, network_port) = network_channel(protocol_id); + // Start in off-line mode, since we're not connected to any nodes yet. + let is_offline = Arc::new(AtomicBool::new(true)); + let is_major_syncing = Arc::new(AtomicBool::new(false)); let protocol_sender = Protocol::new( + is_offline.clone(), + is_major_syncing.clone(), network_chan.clone(), params.config, params.chain, @@ -154,6 +164,8 @@ impl> Service { )?; let service = Arc::new(Service { + is_offline, + is_major_syncing, network, protocol_sender: protocol_sender.clone(), bg_thread: Some(thread), @@ -244,22 +256,10 @@ impl> Service { impl> ::consensus::SyncOracle for Service { fn is_major_syncing(&self) -> bool { - let (sender, port) = channel::unbounded(); - let _ = self - .protocol_sender - .send(ProtocolMsg::IsMajorSyncing(sender)); - port.recv().expect("1. Protocol keeps handling messages until all senders are dropped, - or the ProtocolMsg::Stop message is received, - 2 Service keeps a sender to protocol, and the ProtocolMsg::Stop is never sent.") + self.is_major_syncing.load(Ordering::Relaxed) } fn is_offline(&self) -> bool { - let (sender, port) = channel::unbounded(); - let _ = self - .protocol_sender - .send(ProtocolMsg::IsOffline(sender)); - port.recv().expect("1. Protocol keeps handling messages until all senders are dropped, - or the ProtocolMsg::Stop message is received, - 2 Service keeps a sender to protocol, and the ProtocolMsg::Stop is never sent.") + self.is_offline.load(Ordering::Relaxed) } } diff --git a/substrate/core/network/src/sync.rs b/substrate/core/network/src/sync.rs index 536cf304e7..1cf639ee65 100644 --- a/substrate/core/network/src/sync.rs +++ b/substrate/core/network/src/sync.rs @@ -30,6 +30,7 @@ use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, NumberF use runtime_primitives::generic::BlockId; use crate::message::{self, generic::Message as GenericMessage}; use crate::config::Roles; +use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; // Maximum blocks to request in a single packet. @@ -46,6 +47,7 @@ const ANNOUNCE_HISTORY_SIZE: usize = 64; // TODO: this should take finality into account. See https://github.com/paritytech/substrate/issues/1606 const MAX_UNKNOWN_FORK_DOWNLOAD_LEN: u32 = 32; +#[derive(Debug)] struct PeerSync { pub common_number: NumberFor, pub best_hash: B::Hash, @@ -302,6 +304,8 @@ pub struct ChainSync { justifications: PendingJustifications, import_queue: Box>, is_stopping: AtomicBool, + is_offline: Arc, + is_major_syncing: Arc, } /// Reported sync state. @@ -342,7 +346,13 @@ impl Status { impl ChainSync { /// Create a new instance. - pub(crate) fn new(role: Roles, info: &ClientInfo, import_queue: Box>) -> Self { + pub(crate) fn new( + is_offline: Arc, + is_major_syncing: Arc, + role: Roles, + info: &ClientInfo, + import_queue: Box> + ) -> Self { let mut required_block_attributes = message::BlockAttributes::HEADER | message::BlockAttributes::JUSTIFICATION; if role.intersects(Roles::FULL | Roles::AUTHORITY) { required_block_attributes |= message::BlockAttributes::BODY; @@ -358,6 +368,8 @@ impl ChainSync { required_block_attributes, import_queue, is_stopping: Default::default(), + is_offline, + is_major_syncing, } } @@ -370,13 +382,17 @@ impl ChainSync { self.import_queue.clone() } + fn state(&self, best_seen: &Option>) -> SyncState { + match best_seen { + &Some(n) if n > self.best_queued_number && n - self.best_queued_number > As::sa(5) => SyncState::Downloading, + _ => SyncState::Idle, + } + } + /// Returns sync status. pub(crate) fn status(&self) -> Status { let best_seen = self.best_seen_block(); - let state = match &best_seen { - &Some(n) if n > self.best_queued_number && n - self.best_queued_number > As::sa(5) => SyncState::Downloading, - _ => SyncState::Idle, - }; + let state = self.state(&best_seen); Status { state: state, best_seen_block: best_seen, @@ -386,6 +402,13 @@ impl ChainSync { /// Handle new connected peer. pub(crate) fn new_peer(&mut self, protocol: &mut Context, who: NodeIndex) { + // Initialize some variables to determine if + // is_offline or is_major_syncing should be updated + // after processing this new peer. + let previous_len = self.peers.len(); + let previous_best_seen = self.best_seen_block(); + let previous_state = self.state(&previous_best_seen); + if let Some(info) = protocol.peer_info(who) { match (block_status(&*protocol.client(), &*self.import_queue, info.best_hash), info.best_number) { (Err(e), _) => { @@ -450,6 +473,22 @@ impl ChainSync { } } } + + let current_best_seen = self.best_seen_block(); + let current_state = self.state(¤t_best_seen); + let current_len = self.peers.len(); + if previous_len == 0 && current_len > 0 { + // We were offline, and now we're connected to at least one peer. + self.is_offline.store(false, Ordering::Relaxed); + } + if previous_len < current_len { + // We added a peer, let's see if major_syncing should be updated. + match (previous_state, current_state) { + (SyncState::Idle, SyncState::Downloading) => self.is_major_syncing.store(true, Ordering::Relaxed), + (SyncState::Downloading, SyncState::Idle) => self.is_major_syncing.store(false, Ordering::Relaxed), + _ => {}, + } + } } /// Handle new block data. @@ -660,10 +699,19 @@ impl ChainSync { } fn block_queued(&mut self, hash: &B::Hash, number: NumberFor) { + let best_seen = self.best_seen_block(); + let previous_state = self.state(&best_seen); if number > self.best_queued_number { self.best_queued_number = number; self.best_queued_hash = *hash; } + let current_state = self.state(&best_seen); + // If the latest queued block changed our state, update is_major_syncing. + match (previous_state, current_state) { + (SyncState::Idle, SyncState::Downloading) => self.is_major_syncing.store(true, Ordering::Relaxed), + (SyncState::Downloading, SyncState::Idle) => self.is_major_syncing.store(false, Ordering::Relaxed), + _ => {}, + } // Update common blocks for (n, peer) in self.peers.iter_mut() { if let PeerSyncState::AncestorSearch(_) = peer.state { @@ -744,8 +792,21 @@ impl ChainSync { /// Handle disconnected peer. pub(crate) fn peer_disconnected(&mut self, protocol: &mut Context, who: NodeIndex) { + let previous_best_seen = self.best_seen_block(); + let previous_state = self.state(&previous_best_seen); self.blocks.clear_peer_download(who); self.peers.remove(&who); + if self.peers.len() == 0 { + // We're not connected to any peer anymore. + self.is_offline.store(true, Ordering::Relaxed); + } + let current_best_seen = self.best_seen_block(); + let current_state = self.state(¤t_best_seen); + // We removed a peer, let's see if this put us in idle state and is_major_syncing should be updated. + match (previous_state, current_state) { + (SyncState::Downloading, SyncState::Idle) => self.is_major_syncing.store(false, Ordering::Relaxed), + _ => {}, + } self.justifications.peer_disconnected(who); self.maintain_sync(protocol); } diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index c16b2b8fd5..7245d1f1ab 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -23,6 +23,7 @@ mod sync; use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; use std::time::Duration; @@ -44,7 +45,7 @@ use network_libp2p::{NodeIndex, ProtocolId}; use parity_codec::Encode; use parking_lot::Mutex; use primitives::{H256, Ed25519AuthorityId}; -use crate::protocol::{Context, Protocol, ProtocolMsg, ProtocolStatus}; +use crate::protocol::{Context, Protocol, ProtocolMsg}; use runtime_primitives::generic::BlockId; use runtime_primitives::traits::{AuthorityIdFor, Block as BlockT, Digest, DigestItem, Header, NumberFor}; use runtime_primitives::Justification; @@ -117,6 +118,8 @@ impl NetworkSpecialization for DummySpecialization { pub type PeersClient = client::Client; pub struct Peer { + pub is_offline: Arc, + pub is_major_syncing: Arc, client: Arc, pub protocol_sender: Sender>, @@ -130,6 +133,8 @@ pub struct Peer { impl Peer { fn new( + is_offline: Arc, + is_major_syncing: Arc, client: Arc, import_queue: Box>, protocol_sender: Sender>, @@ -139,6 +144,8 @@ impl Peer { ) -> Self { let network_port = Mutex::new(network_port); Peer { + is_offline, + is_major_syncing, client, protocol_sender, import_queue, @@ -179,6 +186,16 @@ impl Peer { .send(ProtocolMsg::BlockImported(hash, header.clone())); } + // SyncOracle: are we connected to any peer? + fn is_offline(&self) -> bool { + self.is_offline.load(Ordering::Relaxed) + } + + // SyncOracle: are we in the process of catching-up with the chain? + fn is_major_syncing(&self) -> bool { + self.is_major_syncing.load(Ordering::Relaxed) + } + /// Called on connection to other indicated peer. fn on_connect(&self, other: NodeIndex) { let _ = self.protocol_sender.send(ProtocolMsg::PeerConnected(other, String::new())); @@ -266,12 +283,6 @@ impl Peer { let _ = self.protocol_sender.send(ProtocolMsg::Abort); } - pub fn status(&self) -> ProtocolStatus { - let (sender, port) = channel::unbounded(); - let _ = self.protocol_sender.send(ProtocolMsg::Status(sender)); - port.recv().unwrap() - } - /// Push a message into the gossip network and relay to peers. /// `TestNet::sync_step` needs to be called to ensure it's propagated. pub fn gossip_message(&self, topic: ::Hash, engine_id: ConsensusEngineId, data: Vec) { @@ -467,7 +478,11 @@ pub trait TestNetFactory: Sized { let import_queue = Box::new(BasicQueue::new(verifier, block_import, justification_import)); let specialization = DummySpecialization {}; + let is_offline = Arc::new(AtomicBool::new(true)); + let is_major_syncing = Arc::new(AtomicBool::new(false)); let protocol_sender = Protocol::new( + is_offline.clone(), + is_major_syncing.clone(), network_sender.clone(), config.clone(), client.clone(), @@ -478,6 +493,8 @@ pub trait TestNetFactory: Sized { ).unwrap(); let peer = Arc::new(Peer::new( + is_offline, + is_major_syncing, client, import_queue, protocol_sender, diff --git a/substrate/core/network/src/test/sync.rs b/substrate/core/network/src/test/sync.rs index ecea0494fc..795ac202f5 100644 --- a/substrate/core/network/src/test/sync.rs +++ b/substrate/core/network/src/test/sync.rs @@ -19,10 +19,78 @@ use client::blockchain::HeaderBackend as BlockchainHeaderBackend; use crate::config::Roles; use consensus::BlockOrigin; use network_libp2p::NodeIndex; -use crate::sync::SyncState; use std::collections::HashSet; +use std::thread; +use std::time::Duration; use super::*; +#[test] +fn sync_cycle_from_offline_to_syncing_to_offline() { + let _ = ::env_logger::try_init(); + let mut net = TestNet::new(3); + for peer in 0..3 { + // Offline, and not major syncing. + assert!(net.peer(peer).is_offline()); + assert!(!net.peer(peer).is_major_syncing()); + } + + // Generate blocks. + net.peer(2).push_blocks(100, false); + net.start(); + net.route_fast(); + thread::sleep(Duration::from_millis(100)); + net.route_fast(); + for peer in 0..3 { + // Online + assert!(!net.peer(peer).is_offline()); + if peer < 2 { + // Major syncing. + assert!(net.peer(peer).is_major_syncing()); + } + } + net.sync(); + for peer in 0..3 { + // All done syncing. + assert!(!net.peer(peer).is_major_syncing()); + } + + // Now disconnect them all. + for peer in 0..3 { + for other in 0..3 { + if other != peer { + net.peer(peer).on_disconnect(other); + } + } + thread::sleep(Duration::from_millis(100)); + assert!(net.peer(peer).is_offline()); + assert!(!net.peer(peer).is_major_syncing()); + } +} + +#[test] +fn syncing_node_not_major_syncing_when_disconnected() { + let _ = ::env_logger::try_init(); + let mut net = TestNet::new(3); + + // Generate blocks. + net.peer(2).push_blocks(100, false); + net.start(); + net.route_fast(); + thread::sleep(Duration::from_millis(100)); + net.route_fast(); + + // Peer 1 is major-syncing. + assert!(net.peer(1).is_major_syncing()); + + // Disconnect peer 1 form everyone else. + net.peer(1).on_disconnect(0); + net.peer(1).on_disconnect(2); + thread::sleep(Duration::from_millis(100)); + + // Peer 1 is not major-syncing. + assert!(!net.peer(1).is_major_syncing()); +} + #[test] fn sync_from_two_peers_works() { let _ = ::env_logger::try_init(); @@ -32,8 +100,7 @@ fn sync_from_two_peers_works() { net.sync(); assert!(net.peer(0).client.backend().as_in_memory().blockchain() .equals_to(net.peer(1).client.backend().as_in_memory().blockchain())); - let status = net.peer(0).status(); - assert_eq!(status.sync.state, SyncState::Idle); + assert!(!net.peer(0).is_major_syncing()); } #[test]