diff --git a/substrate/core/consensus/aura/src/lib.rs b/substrate/core/consensus/aura/src/lib.rs
index 36561645f0..e3fffad33f 100644
--- a/substrate/core/consensus/aura/src/lib.rs
+++ b/substrate/core/consensus/aura/src/lib.rs
@@ -65,7 +65,7 @@ use srml_aura::{
};
use substrate_telemetry::{telemetry, CONSENSUS_TRACE, CONSENSUS_DEBUG, CONSENSUS_WARN, CONSENSUS_INFO};
-use slots::{CheckedHeader, SlotWorker, SlotInfo, SlotCompatible, slot_now, check_equivocation};
+use slots::{CheckedHeader, SlotData, SlotWorker, SlotInfo, SlotCompatible, slot_now, check_equivocation};
pub use aura_primitives::*;
pub use consensus_common::{SyncOracle, ExtraVerification};
@@ -125,7 +125,7 @@ impl SlotCompatible for AuraSlotCompatible {
}
/// Start the aura worker. The returned future should be run in a tokio runtime.
-pub fn start_aura(
+pub fn start_aura(
slot_duration: SlotDuration,
local_key: Arc
,
client: Arc,
@@ -133,7 +133,6 @@ pub fn start_aura(
block_import: Arc,
env: Arc,
sync_oracle: SO,
- on_exit: OnExit,
inherent_data_providers: InherentDataProviders,
force_authoring: bool,
) -> Result, consensus_common::Error> where
@@ -156,25 +155,26 @@ pub fn start_aura(
I: BlockImport + Send + Sync + 'static,
Error: ::std::error::Error + Send + From<::consensus_common::Error> + From + 'static,
SO: SyncOracle + Send + Sync + Clone,
- OnExit: Future- ,
{
let worker = AuraWorker {
client: client.clone(),
block_import,
env,
local_key,
- inherent_data_providers: inherent_data_providers.clone(),
sync_oracle: sync_oracle.clone(),
force_authoring,
};
- slots::start_slot_worker::<_, _, _, _, _, AuraSlotCompatible, _>(
+ register_aura_inherent_data_provider(
+ &inherent_data_providers,
+ slot_duration.0.slot_duration()
+ )?;
+ Ok(slots::start_slot_worker::<_, _, _, _, _, AuraSlotCompatible>(
slot_duration.0,
select_chain,
- Arc::new(worker),
+ worker,
sync_oracle,
- on_exit,
inherent_data_providers
- )
+ ))
}
struct AuraWorker {
@@ -183,7 +183,6 @@ struct AuraWorker {
env: Arc,
local_key: Arc
,
sync_oracle: SO,
- inherent_data_providers: InherentDataProviders,
force_authoring: bool,
}
@@ -208,13 +207,6 @@ impl SlotWorker for AuraWorker w
{
type OnSlot = Box + Send>;
- fn on_start(
- &self,
- slot_duration: u64
- ) -> Result<(), consensus_common::Error> {
- register_aura_inherent_data_provider(&self.inherent_data_providers, slot_duration)
- }
-
fn on_slot(
&self,
chain_head: B::Header,
@@ -902,7 +894,7 @@ mod tests {
&inherent_data_providers, slot_duration.get()
).expect("Registers aura inherent data provider");
- let aura = start_aura::<_, _, _, _, _, sr25519::Pair, _, _, _, _>(
+ let aura = start_aura::<_, _, _, _, _, sr25519::Pair, _, _, _>(
slot_duration,
Arc::new(key.clone().into()),
client.clone(),
@@ -910,7 +902,6 @@ mod tests {
client,
environ.clone(),
DummyOracle,
- futures::empty(),
inherent_data_providers,
false,
).expect("Starts aura");
diff --git a/substrate/core/consensus/babe/src/lib.rs b/substrate/core/consensus/babe/src/lib.rs
index 0cb95c02df..c66eb68137 100644
--- a/substrate/core/consensus/babe/src/lib.rs
+++ b/substrate/core/consensus/babe/src/lib.rs
@@ -82,7 +82,7 @@ use futures::{Future, IntoFuture, future};
use tokio::timer::Timeout;
use log::{error, warn, debug, info, trace};
-use slots::{SlotWorker, SlotInfo, SlotCompatible, slot_now};
+use slots::{SlotWorker, SlotData, SlotInfo, SlotCompatible, slot_now};
/// A slot duration. Create with `get_or_compute`.
@@ -134,7 +134,7 @@ impl SlotCompatible for BabeSlotCompatible {
}
/// Parameters for BABE.
-pub struct BabeParams {
+pub struct BabeParams {
/// The configuration for BABE. Includes the slot duration, threshold, and
/// other parameters.
@@ -158,9 +158,6 @@ pub struct BabeParams {
/// A sync oracle
pub sync_oracle: SO,
- /// Exit callback.
- pub on_exit: OnExit,
-
/// Providers for inherent data.
pub inherent_data_providers: InherentDataProviders,
@@ -169,7 +166,7 @@ pub struct BabeParams {
}
/// Start the babe worker. The returned future should be run in a tokio runtime.
-pub fn start_babe(BabeParams {
+pub fn start_babe(BabeParams {
config,
local_key,
client,
@@ -177,10 +174,9 @@ pub fn start_babe(BabeParams {
block_import,
env,
sync_oracle,
- on_exit,
inherent_data_providers,
force_authoring,
-}: BabeParams) -> Result<
+}: BabeParams) -> Result<
impl Future- ,
consensus_common::Error,
> where
@@ -200,26 +196,24 @@ pub fn start_babe(BabeParams {
I: BlockImport + Send + Sync + 'static,
Error: std::error::Error + Send + From<::consensus_common::Error> + From + 'static,
SO: SyncOracle + Send + Sync + Clone,
- OnExit: Future
- ,
{
let worker = BabeWorker {
client: client.clone(),
block_import,
env,
local_key,
- inherent_data_providers: inherent_data_providers.clone(),
sync_oracle: sync_oracle.clone(),
force_authoring,
threshold: config.threshold(),
};
- slots::start_slot_worker::<_, _, _, _, _, BabeSlotCompatible, _>(
+ register_babe_inherent_data_provider(&inherent_data_providers, config.0.slot_duration())?;
+ Ok(slots::start_slot_worker::<_, _, _, _, _, BabeSlotCompatible>(
config.0,
select_chain,
- Arc::new(worker),
+ worker,
sync_oracle,
- on_exit,
inherent_data_providers
- )
+ ))
}
struct BabeWorker {
@@ -228,7 +222,6 @@ struct BabeWorker {
env: Arc,
local_key: Arc,
sync_oracle: SO,
- inherent_data_providers: InherentDataProviders,
force_authoring: bool,
threshold: u64,
}
@@ -253,13 +246,6 @@ impl SlotWorker for BabeWorker w
{
type OnSlot = Box + Send>;
- fn on_start(
- &self,
- slot_duration: u64
- ) -> Result<(), consensus_common::Error> {
- register_babe_inherent_data_provider(&self.inherent_data_providers, slot_duration)
- }
-
fn on_slot(
&self,
chain_head: B::Header,
@@ -985,7 +971,6 @@ mod tests {
client,
env: environ.clone(),
sync_oracle: DummyOracle,
- on_exit: futures::empty(),
inherent_data_providers,
force_authoring: false,
}).expect("Starts babe");
diff --git a/substrate/core/consensus/slots/src/lib.rs b/substrate/core/consensus/slots/src/lib.rs
index 783cb018b9..aee398a9ce 100644
--- a/substrate/core/consensus/slots/src/lib.rs
+++ b/substrate/core/consensus/slots/src/lib.rs
@@ -41,7 +41,7 @@ use runtime_primitives::generic::BlockId;
use runtime_primitives::traits::{ApiRef, Block, ProvideRuntimeApi};
use std::fmt::Debug;
use std::ops::Deref;
-use std::sync::{mpsc, Arc};
+use std::sync::mpsc;
use std::thread;
/// A worker that should be invoked at every new slot.
@@ -51,7 +51,8 @@ pub trait SlotWorker {
type OnSlot: IntoFuture
- ;
/// Called when the proposer starts.
- fn on_start(&self, slot_duration: u64) -> Result<(), consensus_common::Error>;
+ #[deprecated(note = "Not called. Please perform any initialization before calling start_slot_worker.")]
+ fn on_start(&self, _slot_duration: u64) -> Result<(), consensus_common::Error> { Ok(()) }
/// Called when a new slot is triggered.
fn on_slot(&self, chain_head: B::Header, slot_info: SlotInfo) -> Self::OnSlot;
@@ -70,7 +71,7 @@ pub trait SlotCompatible {
pub fn start_slot_worker_thread(
slot_duration: SlotDuration,
select_chain: C,
- worker: Arc,
+ worker: W,
sync_oracle: SO,
on_exit: OnExit,
inherent_data_providers: InherentDataProviders,
@@ -97,29 +98,19 @@ where
}
};
- let slot_worker_future = match start_slot_worker::<_, _, _, T, _, SC, _>(
+ let slot_worker_future = start_slot_worker::<_, _, _, T, _, SC>(
slot_duration.clone(),
select_chain,
worker,
sync_oracle,
- on_exit,
inherent_data_providers,
- ) {
- Ok(slot_worker_future) => {
- result_sender
- .send(Ok(()))
- .expect("Receive is not dropped before receiving a result; qed");
- slot_worker_future
- }
- Err(e) => {
- result_sender
- .send(Err(e))
- .expect("Receive is not dropped before receiving a result; qed");
- return;
- }
- };
+ );
- let _ = runtime.block_on(slot_worker_future);
+ result_sender
+ .send(Ok(()))
+ .expect("Receive is not dropped before receiving a result; qed");
+
+ let _ = runtime.block_on(slot_worker_future.select(on_exit).map(|_| ()));
});
result_recv
@@ -128,67 +119,58 @@ where
}
/// Start a new slot worker.
-pub fn start_slot_worker(
+///
+/// Every time a new slot is triggered, `worker.on_slot` is called and the future it returns is
+/// polled until completion, unless we are major syncing.
+pub fn start_slot_worker(
slot_duration: SlotDuration,
client: C,
- worker: Arc,
+ worker: W,
sync_oracle: SO,
- on_exit: OnExit,
inherent_data_providers: InherentDataProviders,
-) -> Result, consensus_common::Error>
+) -> impl Future
-
where
B: Block,
C: SelectChain + Clone,
W: SlotWorker,
SO: SyncOracle + Send + Clone,
SC: SlotCompatible,
- OnExit: Future
- ,
T: SlotData + Clone,
{
- worker.on_start(slot_duration.slot_duration())?;
+ let SlotDuration(slot_duration) = slot_duration;
- let make_authorship = move || {
- let client = client.clone();
- let worker = worker.clone();
- let sync_oracle = sync_oracle.clone();
- let SlotDuration(slot_duration) = slot_duration.clone();
- let inherent_data_providers = inherent_data_providers.clone();
+ // rather than use a timer interval, we schedule our waits ourselves
+ let mut authorship = Slots::::new(slot_duration.slot_duration(), inherent_data_providers)
+ .map_err(|e| debug!(target: "slots", "Faulty timer: {:?}", e))
+ .for_each(move |slot_info| {
+ // only propose when we are not syncing.
+ if sync_oracle.is_major_syncing() {
+ debug!(target: "slots", "Skipping proposal slot due to sync.");
+ return Either::B(future::ok(()));
+ }
- // rather than use a timer interval, we schedule our waits ourselves
- Slots::::new(slot_duration.slot_duration(), inherent_data_providers)
- .map_err(|e| debug!(target: "slots", "Faulty timer: {:?}", e))
- .for_each(move |slot_info| {
- let client = client.clone();
- let worker = worker.clone();
- let sync_oracle = sync_oracle.clone();
-
- // only propose when we are not syncing.
- if sync_oracle.is_major_syncing() {
- debug!(target: "slots", "Skipping proposal slot due to sync.");
+ let slot_num = slot_info.number;
+ let chain_head = match client.best_chain() {
+ Ok(x) => x,
+ Err(e) => {
+ warn!(target: "slots", "Unable to author block in slot {}. \
+ no best block header: {:?}", slot_num, e);
return Either::B(future::ok(()));
}
+ };
- let slot_num = slot_info.number;
- let chain_head = match client.best_chain() {
- Ok(x) => x,
- Err(e) => {
- warn!(target: "slots", "Unable to author block in slot {}. \
- no best block header: {:?}", slot_num, e);
- return Either::B(future::ok(()));
- }
- };
+ Either::A(worker.on_slot(chain_head, slot_info).into_future().map_err(
+ |e| warn!(target: "slots", "Encountered consensus error: {:?}", e),
+ ))
+ });
- Either::A(worker.on_slot(chain_head, slot_info).into_future().map_err(
- |e| warn!(target: "slots", "Encountered consensus error: {:?}", e),
- ))
- })
- };
-
- let work = future::loop_fn((), move |()| {
- let authorship_task = ::std::panic::AssertUnwindSafe(make_authorship());
- authorship_task.catch_unwind().then(|res| {
- match res {
- Ok(Ok(())) => (),
+ future::poll_fn(move ||
+ loop {
+ let mut authorship = std::panic::AssertUnwindSafe(&mut authorship);
+ match std::panic::catch_unwind(move || authorship.poll()) {
+ Ok(Ok(Async::Ready(()))) =>
+ warn!(target: "slots", "Slots stream has terminated unexpectedly."),
+ Ok(Ok(Async::NotReady)) => break Ok(Async::NotReady),
Ok(Err(())) => warn!(target: "slots", "Authorship task terminated unexpectedly. Restarting"),
Err(e) => {
if let Some(s) = e.downcast_ref::<&'static str>() {
@@ -198,12 +180,8 @@ where
warn!(target: "slots", "Restarting authorship task");
}
}
-
- Ok(future::Loop::Continue(()))
- })
- });
-
- Ok(work.select(on_exit).then(|_| Ok(())))
+ }
+ )
}
/// A header which has been checked
diff --git a/substrate/node-template/src/service.rs b/substrate/node-template/src/service.rs
index c8f5dfc0a1..25e7db8dec 100644
--- a/substrate/node-template/src/service.rs
+++ b/substrate/node-template/src/service.rs
@@ -14,6 +14,7 @@ use substrate_service::{
};
use basic_authorship::ProposerFactory;
use consensus::{import_queue, start_aura, AuraImportQueue, SlotDuration, NothingExtra};
+use futures::prelude::*;
use substrate_client::{self as client, LongestChain};
use primitives::{ed25519::Pair, Pair as PairT};
use inherents::InherentDataProviders;
@@ -75,7 +76,7 @@ construct_service_factory! {
let client = service.client();
let select_chain = service.select_chain()
.ok_or_else(|| ServiceError::SelectChainRequired)?;
- executor.spawn(start_aura(
+ let aura = start_aura(
SlotDuration::get_or_compute(&*client)?,
key.clone(),
client.clone(),
@@ -83,10 +84,10 @@ construct_service_factory! {
client,
proposer,
service.network(),
- service.on_exit(),
service.config.custom.inherent_data_providers.clone(),
service.config.force_authoring,
- )?);
+ )?;
+ executor.spawn(aura.select(service.on_exit()).then(|_| Ok(())));
}
Ok(service)
diff --git a/substrate/node/cli/src/service.rs b/substrate/node/cli/src/service.rs
index 415ddbe71b..1458392b0f 100644
--- a/substrate/node/cli/src/service.rs
+++ b/substrate/node/cli/src/service.rs
@@ -26,6 +26,7 @@ use consensus::{import_queue, start_aura, AuraImportQueue, SlotDuration, Nothing
use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider};
use node_executor;
use primitives::{Pair as PairT, ed25519};
+use futures::prelude::*;
use node_primitives::Block;
use node_runtime::{GenesisConfig, RuntimeApi};
use substrate_service::{
@@ -92,7 +93,7 @@ construct_service_factory! {
let client = service.client();
let select_chain = service.select_chain()
.ok_or(ServiceError::SelectChainRequired)?;
- executor.spawn(start_aura(
+ let aura = start_aura(
SlotDuration::get_or_compute(&*client)?,
key.clone(),
client,
@@ -100,10 +101,10 @@ construct_service_factory! {
block_import.clone(),
proposer,
service.network(),
- service.on_exit(),
service.config.custom.inherent_data_providers.clone(),
service.config.force_authoring,
- )?);
+ )?;
+ executor.spawn(aura.select(service.on_exit()).then(|_| Ok(())));
info!("Running Grandpa session as Authority {}", key.public());
}