mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-28 09:47:56 +00:00
Small slots refactor (#2780)
* Deprecate SlotWorker::on_start * start_slot_worker no longer needs an Arc * start_slot_worker now always succeeds * Removed on_exit parameter from start_*_worker * Minor doc * Fix node-template
This commit is contained in:
committed by
DemiMarie-parity
parent
eaa0ab014a
commit
67bdfc7d8e
@@ -41,7 +41,7 @@ use runtime_primitives::generic::BlockId;
|
||||
use runtime_primitives::traits::{ApiRef, Block, ProvideRuntimeApi};
|
||||
use std::fmt::Debug;
|
||||
use std::ops::Deref;
|
||||
use std::sync::{mpsc, Arc};
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
|
||||
/// A worker that should be invoked at every new slot.
|
||||
@@ -51,7 +51,8 @@ pub trait SlotWorker<B: Block> {
|
||||
type OnSlot: IntoFuture<Item = (), Error = consensus_common::Error>;
|
||||
|
||||
/// Called when the proposer starts.
|
||||
fn on_start(&self, slot_duration: u64) -> Result<(), consensus_common::Error>;
|
||||
#[deprecated(note = "Not called. Please perform any initialization before calling start_slot_worker.")]
|
||||
fn on_start(&self, _slot_duration: u64) -> Result<(), consensus_common::Error> { Ok(()) }
|
||||
|
||||
/// Called when a new slot is triggered.
|
||||
fn on_slot(&self, chain_head: B::Header, slot_info: SlotInfo) -> Self::OnSlot;
|
||||
@@ -70,7 +71,7 @@ pub trait SlotCompatible {
|
||||
pub fn start_slot_worker_thread<B, C, W, SO, SC, T, OnExit>(
|
||||
slot_duration: SlotDuration<T>,
|
||||
select_chain: C,
|
||||
worker: Arc<W>,
|
||||
worker: W,
|
||||
sync_oracle: SO,
|
||||
on_exit: OnExit,
|
||||
inherent_data_providers: InherentDataProviders,
|
||||
@@ -97,29 +98,19 @@ where
|
||||
}
|
||||
};
|
||||
|
||||
let slot_worker_future = match start_slot_worker::<_, _, _, T, _, SC, _>(
|
||||
let slot_worker_future = start_slot_worker::<_, _, _, T, _, SC>(
|
||||
slot_duration.clone(),
|
||||
select_chain,
|
||||
worker,
|
||||
sync_oracle,
|
||||
on_exit,
|
||||
inherent_data_providers,
|
||||
) {
|
||||
Ok(slot_worker_future) => {
|
||||
result_sender
|
||||
.send(Ok(()))
|
||||
.expect("Receive is not dropped before receiving a result; qed");
|
||||
slot_worker_future
|
||||
}
|
||||
Err(e) => {
|
||||
result_sender
|
||||
.send(Err(e))
|
||||
.expect("Receive is not dropped before receiving a result; qed");
|
||||
return;
|
||||
}
|
||||
};
|
||||
);
|
||||
|
||||
let _ = runtime.block_on(slot_worker_future);
|
||||
result_sender
|
||||
.send(Ok(()))
|
||||
.expect("Receive is not dropped before receiving a result; qed");
|
||||
|
||||
let _ = runtime.block_on(slot_worker_future.select(on_exit).map(|_| ()));
|
||||
});
|
||||
|
||||
result_recv
|
||||
@@ -128,67 +119,58 @@ where
|
||||
}
|
||||
|
||||
/// Start a new slot worker.
|
||||
pub fn start_slot_worker<B, C, W, T, SO, SC, OnExit>(
|
||||
///
|
||||
/// Every time a new slot is triggered, `worker.on_slot` is called and the future it returns is
|
||||
/// polled until completion, unless we are major syncing.
|
||||
pub fn start_slot_worker<B, C, W, T, SO, SC>(
|
||||
slot_duration: SlotDuration<T>,
|
||||
client: C,
|
||||
worker: Arc<W>,
|
||||
worker: W,
|
||||
sync_oracle: SO,
|
||||
on_exit: OnExit,
|
||||
inherent_data_providers: InherentDataProviders,
|
||||
) -> Result<impl Future<Item = (), Error = ()>, consensus_common::Error>
|
||||
) -> impl Future<Item = (), Error = ()>
|
||||
where
|
||||
B: Block,
|
||||
C: SelectChain<B> + Clone,
|
||||
W: SlotWorker<B>,
|
||||
SO: SyncOracle + Send + Clone,
|
||||
SC: SlotCompatible,
|
||||
OnExit: Future<Item = (), Error = ()>,
|
||||
T: SlotData + Clone,
|
||||
{
|
||||
worker.on_start(slot_duration.slot_duration())?;
|
||||
let SlotDuration(slot_duration) = slot_duration;
|
||||
|
||||
let make_authorship = move || {
|
||||
let client = client.clone();
|
||||
let worker = worker.clone();
|
||||
let sync_oracle = sync_oracle.clone();
|
||||
let SlotDuration(slot_duration) = slot_duration.clone();
|
||||
let inherent_data_providers = inherent_data_providers.clone();
|
||||
// rather than use a timer interval, we schedule our waits ourselves
|
||||
let mut authorship = Slots::<SC>::new(slot_duration.slot_duration(), inherent_data_providers)
|
||||
.map_err(|e| debug!(target: "slots", "Faulty timer: {:?}", e))
|
||||
.for_each(move |slot_info| {
|
||||
// only propose when we are not syncing.
|
||||
if sync_oracle.is_major_syncing() {
|
||||
debug!(target: "slots", "Skipping proposal slot due to sync.");
|
||||
return Either::B(future::ok(()));
|
||||
}
|
||||
|
||||
// rather than use a timer interval, we schedule our waits ourselves
|
||||
Slots::<SC>::new(slot_duration.slot_duration(), inherent_data_providers)
|
||||
.map_err(|e| debug!(target: "slots", "Faulty timer: {:?}", e))
|
||||
.for_each(move |slot_info| {
|
||||
let client = client.clone();
|
||||
let worker = worker.clone();
|
||||
let sync_oracle = sync_oracle.clone();
|
||||
|
||||
// only propose when we are not syncing.
|
||||
if sync_oracle.is_major_syncing() {
|
||||
debug!(target: "slots", "Skipping proposal slot due to sync.");
|
||||
let slot_num = slot_info.number;
|
||||
let chain_head = match client.best_chain() {
|
||||
Ok(x) => x,
|
||||
Err(e) => {
|
||||
warn!(target: "slots", "Unable to author block in slot {}. \
|
||||
no best block header: {:?}", slot_num, e);
|
||||
return Either::B(future::ok(()));
|
||||
}
|
||||
};
|
||||
|
||||
let slot_num = slot_info.number;
|
||||
let chain_head = match client.best_chain() {
|
||||
Ok(x) => x,
|
||||
Err(e) => {
|
||||
warn!(target: "slots", "Unable to author block in slot {}. \
|
||||
no best block header: {:?}", slot_num, e);
|
||||
return Either::B(future::ok(()));
|
||||
}
|
||||
};
|
||||
Either::A(worker.on_slot(chain_head, slot_info).into_future().map_err(
|
||||
|e| warn!(target: "slots", "Encountered consensus error: {:?}", e),
|
||||
))
|
||||
});
|
||||
|
||||
Either::A(worker.on_slot(chain_head, slot_info).into_future().map_err(
|
||||
|e| warn!(target: "slots", "Encountered consensus error: {:?}", e),
|
||||
))
|
||||
})
|
||||
};
|
||||
|
||||
let work = future::loop_fn((), move |()| {
|
||||
let authorship_task = ::std::panic::AssertUnwindSafe(make_authorship());
|
||||
authorship_task.catch_unwind().then(|res| {
|
||||
match res {
|
||||
Ok(Ok(())) => (),
|
||||
future::poll_fn(move ||
|
||||
loop {
|
||||
let mut authorship = std::panic::AssertUnwindSafe(&mut authorship);
|
||||
match std::panic::catch_unwind(move || authorship.poll()) {
|
||||
Ok(Ok(Async::Ready(()))) =>
|
||||
warn!(target: "slots", "Slots stream has terminated unexpectedly."),
|
||||
Ok(Ok(Async::NotReady)) => break Ok(Async::NotReady),
|
||||
Ok(Err(())) => warn!(target: "slots", "Authorship task terminated unexpectedly. Restarting"),
|
||||
Err(e) => {
|
||||
if let Some(s) = e.downcast_ref::<&'static str>() {
|
||||
@@ -198,12 +180,8 @@ where
|
||||
warn!(target: "slots", "Restarting authorship task");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(future::Loop::Continue(()))
|
||||
})
|
||||
});
|
||||
|
||||
Ok(work.select(on_exit).then(|_| Ok(())))
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
/// A header which has been checked
|
||||
|
||||
Reference in New Issue
Block a user