skip passed slots and wake up service correctly (#4212)

This commit is contained in:
Robert Habermeier
2019-11-26 18:38:03 +01:00
committed by Gavin Wood
parent 5010faa9bd
commit 52b5dee4f9
3 changed files with 33 additions and 10 deletions
@@ -122,6 +122,19 @@ pub trait SimpleSlotWorker<B: BlockT> {
let (timestamp, slot_number, slot_duration) = let (timestamp, slot_number, slot_duration) =
(slot_info.timestamp, slot_info.number, slot_info.duration); (slot_info.timestamp, slot_info.number, slot_info.duration);
{
let slot_now = SignedDuration::default().slot_now(slot_duration);
if slot_now > slot_number {
// if this is behind, return.
debug!(target: self.logging_target(),
"Skipping proposal slot {} since our current view is {}",
slot_number, slot_now,
);
return Box::pin(future::ready(Ok(())));
}
}
let epoch_data = match self.epoch_data(&chain_head, slot_number) { let epoch_data = match self.epoch_data(&chain_head, slot_number) {
Ok(epoch_data) => epoch_data, Ok(epoch_data) => epoch_data,
Err(err) => { Err(err) => {
+7 -3
View File
@@ -49,7 +49,7 @@ use sr_primitives::traits::{
use substrate_executor::{NativeExecutor, NativeExecutionDispatch}; use substrate_executor::{NativeExecutor, NativeExecutionDispatch};
use std::{ use std::{
io::{Read, Write, Seek}, io::{Read, Write, Seek},
marker::PhantomData, sync::Arc, sync::atomic::AtomicBool, time::SystemTime marker::PhantomData, sync::Arc, time::SystemTime
}; };
use sysinfo::{get_current_pid, ProcessExt, System, SystemExt}; use sysinfo::{get_current_pid, ProcessExt, System, SystemExt};
use tel::{telemetry, SUBSTRATE_INFO}; use tel::{telemetry, SUBSTRATE_INFO};
@@ -823,6 +823,9 @@ ServiceBuilder<
let (to_spawn_tx, to_spawn_rx) = let (to_spawn_tx, to_spawn_rx) =
mpsc::unbounded::<Box<dyn Future<Item = (), Error = ()> + Send>>(); mpsc::unbounded::<Box<dyn Future<Item = (), Error = ()> + Send>>();
// A side-channel for essential tasks to communicate shutdown.
let (essential_failed_tx, essential_failed_rx) = mpsc::unbounded();
let import_queue = Box::new(import_queue); let import_queue = Box::new(import_queue);
let chain_info = client.info().chain; let chain_info = client.info().chain;
@@ -1163,7 +1166,7 @@ ServiceBuilder<
let _ = to_spawn_tx.unbounded_send(Box::new(future)); let _ = to_spawn_tx.unbounded_send(Box::new(future));
} }
// Instrumentation // Instrumentation
if let Some(tracing_targets) = config.tracing_targets.as_ref() { if let Some(tracing_targets) = config.tracing_targets.as_ref() {
let subscriber = substrate_tracing::ProfilingSubscriber::new( let subscriber = substrate_tracing::ProfilingSubscriber::new(
@@ -1183,7 +1186,8 @@ ServiceBuilder<
transaction_pool, transaction_pool,
exit, exit,
signal: Some(signal), signal: Some(signal),
essential_failed: Arc::new(AtomicBool::new(false)), essential_failed_tx,
essential_failed_rx,
to_spawn_tx, to_spawn_tx,
to_spawn_rx, to_spawn_rx,
to_poll: Vec::new(), to_poll: Vec::new(),
+13 -7
View File
@@ -31,7 +31,6 @@ use std::io;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use futures::sync::mpsc; use futures::sync::mpsc;
use parking_lot::Mutex; use parking_lot::Mutex;
@@ -85,9 +84,11 @@ pub struct Service<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> {
exit: exit_future::Exit, exit: exit_future::Exit,
/// A signal that makes the exit future above resolve, fired on service drop. /// A signal that makes the exit future above resolve, fired on service drop.
signal: Option<Signal>, signal: Option<Signal>,
/// Set to `true` when a spawned essential task has failed. The next time /// Send a signal when a spawned essential task has concluded. The next time
/// the service future is polled it should complete with an error. /// the service future is polled it should complete with an error.
essential_failed: Arc<AtomicBool>, essential_failed_tx: mpsc::UnboundedSender<()>,
/// A receiver for spawned essential-tasks concluding.
essential_failed_rx: mpsc::UnboundedReceiver<()>,
/// Sender for futures that must be spawned as background tasks. /// Sender for futures that must be spawned as background tasks.
to_spawn_tx: mpsc::UnboundedSender<Box<dyn Future<Item = (), Error = ()> + Send>>, to_spawn_tx: mpsc::UnboundedSender<Box<dyn Future<Item = (), Error = ()> + Send>>,
/// Receiver for futures that must be spawned as background tasks. /// Receiver for futures that must be spawned as background tasks.
@@ -239,12 +240,12 @@ where
} }
fn spawn_essential_task(&self, task: impl Future<Item = (), Error = ()> + Send + 'static) { fn spawn_essential_task(&self, task: impl Future<Item = (), Error = ()> + Send + 'static) {
let essential_failed = self.essential_failed.clone(); let essential_failed = self.essential_failed_tx.clone();
let essential_task = std::panic::AssertUnwindSafe(task) let essential_task = std::panic::AssertUnwindSafe(task)
.catch_unwind() .catch_unwind()
.then(move |_| { .then(move |_| {
error!("Essential task failed. Shutting down service."); error!("Essential task failed. Shutting down service.");
essential_failed.store(true, Ordering::Relaxed); let _ = essential_failed.send(());
Ok(()) Ok(())
}); });
let task = essential_task.select(self.on_exit()).then(|_| Ok(())); let task = essential_task.select(self.on_exit()).then(|_| Ok(()));
@@ -297,8 +298,13 @@ impl<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> Future for
type Error = Error; type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.essential_failed.load(Ordering::Relaxed) { match self.essential_failed_rx.poll() {
return Err(Error::Other("Essential task failed.".into())); Ok(Async::NotReady) => {},
Ok(Async::Ready(_)) | Err(_) => {
// Ready(None) should not be possible since we hold a live
// sender.
return Err(Error::Other("Essential task failed.".into()));
}
} }
while let Ok(Async::Ready(Some(task_to_spawn))) = self.to_spawn_rx.poll() { while let Ok(Async::Ready(Some(task_to_spawn))) = self.to_spawn_rx.poll() {