Refactor Sync status updates into a stream of updates (#1858)

* refactor sync provider

* relative use of interval

* typo

* set propagate timeout to 2500ms

* address comments

* fix instant calc

* update intervals
This commit is contained in:
Gregory Terzian
2019-03-02 21:35:16 +08:00
committed by Gav Wood
parent 828cd9580a
commit a81f7f48a0
9 changed files with 90 additions and 54 deletions
+23 -9
View File
@@ -15,6 +15,8 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use crossbeam_channel::{self as channel, Receiver, Sender, select};
use futures::sync::mpsc;
use parking_lot::Mutex;
use network_libp2p::{NodeIndex, PeerId, Severity};
use primitives::storage::StorageKey;
use runtime_primitives::generic::BlockId;
@@ -40,8 +42,12 @@ use client::light::fetcher::ChangesProof;
use crate::{error, util::LruHashSet};
const REQUEST_TIMEOUT_SEC: u64 = 40;
const TICK_TIMEOUT: time::Duration = time::Duration::from_millis(1000);
const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_millis(5000);
/// Interval at which we perform time based maintenance
const TICK_TIMEOUT: time::Duration = time::Duration::from_millis(1100);
/// Interval at which we propagate exstrinsics;
const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_millis(2900);
/// Interval at which we send status updates on the SyncProvider status stream.
const STATUS_INTERVAL: time::Duration = time::Duration::from_millis(5000);
/// Current protocol version.
pub(crate) const CURRENT_VERSION: u32 = 2;
@@ -57,6 +63,7 @@ const LIGHT_MAXIMAL_BLOCKS_DIFFERENCE: u64 = 8192;
// Lock must always be taken in order declared here.
pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<ProtocolStatus<B>>>>>,
network_chan: NetworkChan<B>,
port: Receiver<ProtocolMsg<B, S>>,
from_network_port: Receiver<FromNetworkMsg<B>>,
@@ -210,8 +217,8 @@ pub enum ProtocolMsg<B: BlockT, S: NetworkSpecialization<B>> {
BlocksProcessed(Vec<B::Hash>, bool),
/// Tell protocol to restart sync.
RestartSync,
/// Ask the protocol for its status.
Status(Sender<ProtocolStatus<B>>),
/// Propagate status updates.
Status,
/// Tell protocol to propagate extrinsics.
PropagateExtrinsics,
/// Tell protocol that a block was imported (sent by the import-queue).
@@ -262,6 +269,7 @@ enum Incoming<B: BlockT, S: NetworkSpecialization<B>> {
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
/// Create a new instance.
pub fn new(
status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<ProtocolStatus<B>>>>>,
is_offline: Arc<AtomicBool>,
is_major_syncing: Arc<AtomicBool>,
connected_peers: Arc<RwLock<HashMap<NodeIndex, ConnectedPeer<B>>>>,
@@ -281,6 +289,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
.name("Protocol".into())
.spawn(move || {
let mut protocol = Protocol {
status_sinks,
network_chan,
from_network_port,
port,
@@ -300,7 +309,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
};
let tick_timeout = channel::tick(TICK_TIMEOUT);
let propagate_timeout = channel::tick(PROPAGATE_TIMEOUT);
while protocol.run(&tick_timeout, &propagate_timeout) {
let status_interval = channel::tick(STATUS_INTERVAL);
while protocol.run(&tick_timeout, &propagate_timeout, &status_interval) {
// Running until all senders have been dropped...
}
})
@@ -312,6 +322,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
&mut self,
tick_timeout: &Receiver<time::Instant>,
propagate_timeout: &Receiver<time::Instant>,
status_interval: &Receiver<time::Instant>,
) -> bool {
let msg = select! {
recv(self.port) -> event => {
@@ -338,6 +349,9 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
recv(propagate_timeout) -> _ => {
Incoming::FromClient(ProtocolMsg::PropagateExtrinsics)
},
recv(status_interval) -> _ => {
Incoming::FromClient(ProtocolMsg::Status)
},
};
self.handle_msg(msg)
}
@@ -351,7 +365,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
fn handle_client_msg(&mut self, msg: ProtocolMsg<B, S>) -> bool {
match msg {
ProtocolMsg::Status(sender) => self.status(sender),
ProtocolMsg::Status => self.on_status(),
ProtocolMsg::BlockImported(hash, header) => self.on_block_imported(hash, &header),
ProtocolMsg::BlockFinalized(hash, header) => self.on_block_finalized(hash, &header),
ProtocolMsg::ExecuteWithSpec(task) => {
@@ -428,8 +442,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
None
}
/// Returns protocol status
fn status(&mut self, sender: Sender<ProtocolStatus<B>>) {
/// Propagates protocol statuses.
fn on_status(&mut self) {
let status = ProtocolStatus {
sync: self.sync.status(),
num_peers: self.context_data.peers.values().count(),
@@ -440,7 +454,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
.filter(|p| p.block_request.is_some())
.count(),
};
let _ = sender.send(status);
self.status_sinks.lock().retain(|sink| sink.unbounded_send(status.clone()).is_ok());
}
fn on_custom_message(&mut self, who: NodeIndex, message: Message<B>) {
+25 -10
View File
@@ -19,7 +19,7 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::{io, thread};
use log::{warn, debug, error, trace, info};
use futures::{Async, Future, Stream, stream, sync::oneshot};
use futures::{Async, Future, Stream, stream, sync::oneshot, sync::mpsc};
use parking_lot::{Mutex, RwLock};
use network_libp2p::{ProtocolId, NetworkConfiguration, NodeIndex, ErrorKind, Severity};
use network_libp2p::{start_service, parse_str_addr, Service as NetworkService, ServiceEvent as NetworkServiceEvent};
@@ -42,14 +42,17 @@ pub use network_libp2p::PeerId;
/// Type that represents fetch completion future.
pub type FetchFuture = oneshot::Receiver<Vec<u8>>;
/// Sync status
pub trait SyncProvider<B: BlockT>: Send + Sync {
/// Get sync status
fn status(&self) -> ProtocolStatus<B>;
/// Get a stream of sync statuses.
fn status(&self) -> mpsc::UnboundedReceiver<ProtocolStatus<B>>;
/// Get network state.
fn network_state(&self) -> NetworkState;
/// Get currently connected peers
fn peers(&self) -> Vec<(NodeIndex, PeerInfo<B>)>;
/// Are we in the process of downloading the chain?
fn is_major_syncing(&self) -> bool;
}
/// Minimum Requirements for a Hash within Networking
@@ -121,6 +124,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>> Link<B> for NetworkLink<B, S> {
/// Substrate network service. Handles network IO and manages connectivity.
pub struct Service<B: BlockT + 'static, S: NetworkSpecialization<B>> {
/// Sinks to propagate status updates.
status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<ProtocolStatus<B>>>>>,
/// Are we connected to any peer?
is_offline: Arc<AtomicBool>,
/// Are we actively catching up with the chain?
@@ -145,11 +150,13 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
import_queue: Box<ImportQueue<B>>,
) -> Result<(Arc<Service<B, S>>, NetworkChan<B>), Error> {
let (network_chan, network_port) = network_channel(protocol_id);
let status_sinks = Arc::new(Mutex::new(Vec::new()));
// Start in off-line mode, since we're not connected to any nodes yet.
let is_offline = Arc::new(AtomicBool::new(true));
let is_major_syncing = Arc::new(AtomicBool::new(false));
let peers: Arc<RwLock<HashMap<NodeIndex, ConnectedPeer<B>>>> = Arc::new(Default::default());
let (protocol_sender, network_to_protocol_sender) = Protocol::new(
status_sinks.clone(),
is_offline.clone(),
is_major_syncing.clone(),
peers.clone(),
@@ -171,6 +178,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
)?;
let service = Arc::new(Service {
status_sinks,
is_offline,
is_major_syncing,
peers,
@@ -260,11 +268,17 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
.protocol_sender
.send(ProtocolMsg::ExecuteWithGossip(Box::new(f)));
}
/// Are we in the process of downloading the chain?
/// Used by both SyncProvider and SyncOracle.
fn is_major_syncing(&self) -> bool {
self.is_major_syncing.load(Ordering::Relaxed)
}
}
impl<B: BlockT + 'static, S: NetworkSpecialization<B>> ::consensus::SyncOracle for Service<B, S> {
fn is_major_syncing(&self) -> bool {
self.is_major_syncing.load(Ordering::Relaxed)
self.is_major_syncing()
}
fn is_offline(&self) -> bool {
self.is_offline.load(Ordering::Relaxed)
@@ -283,13 +297,14 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Drop for Service<B, S> {
}
impl<B: BlockT + 'static, S: NetworkSpecialization<B>> SyncProvider<B> for Service<B, S> {
fn is_major_syncing(&self) -> bool {
self.is_major_syncing()
}
/// Get sync status
fn status(&self) -> ProtocolStatus<B> {
let (sender, port) = channel::unbounded();
let _ = self.protocol_sender.send(ProtocolMsg::Status(sender));
port.recv().expect("1. Protocol keeps handling messages until all senders are dropped,
or the ProtocolMsg::Stop message is received,
2 Service keeps a sender to protocol, and the ProtocolMsg::Stop is never sent.")
fn status(&self) -> mpsc::UnboundedReceiver<ProtocolStatus<B>> {
let (sink, stream) = mpsc::unbounded();
self.status_sinks.lock().push(sink);
stream
}
fn network_state(&self) -> NetworkState {
+3
View File
@@ -558,11 +558,14 @@ pub trait TestNetFactory: Sized {
let (network_sender, network_port) = network_channel(ProtocolId::default());
let import_queue = Box::new(BasicQueue::new(verifier, block_import, justification_import));
let status_sinks = Arc::new(Mutex::new(Vec::new()));
let is_offline = Arc::new(AtomicBool::new(true));
let is_major_syncing = Arc::new(AtomicBool::new(false));
let specialization = self::SpecializationFactory::create();
let peers: Arc<RwLock<HashMap<NodeIndex, ConnectedPeer<Block>>>> = Arc::new(Default::default());
let (protocol_sender, network_to_protocol_sender) = Protocol::new(
status_sinks,
is_offline.clone(),
is_major_syncing.clone(),
peers.clone(),