Use async_trait in sc-consensus-slots (#8461)

* Use `async_trait` in sc-consensus-slots

This makes the code a little bit easier to read and also expresses that
there can always only be one call at a time to `on_slot`.

* slots: remove mutex around BlockImport in SlotWorker

Co-authored-by: André Silva <andrerfosilva@gmail.com>
This commit is contained in:
Bastian Köcher
2021-03-27 22:40:28 +01:00
committed by GitHub
parent ff5765eac3
commit 49e79967c8
6 changed files with 179 additions and 155 deletions
+1 -1
View File
@@ -7313,11 +7313,11 @@ dependencies = [
name = "sc-consensus-slots"
version = "0.9.0"
dependencies = [
"async-trait",
"futures 0.3.13",
"futures-timer 3.0.2",
"log",
"parity-scale-codec 2.0.1",
"parking_lot 0.11.1",
"sc-client-api",
"sc-telemetry",
"sp-api",
+6 -7
View File
@@ -35,7 +35,6 @@ use std::{
};
use futures::prelude::*;
use parking_lot::Mutex;
use log::{debug, trace};
use codec::{Encode, Decode, Codec};
@@ -272,7 +271,7 @@ pub fn build_aura_worker<P, B, C, PF, I, SO, BS, Error>(
{
AuraWorker {
client,
block_import: Arc::new(Mutex::new(block_import)),
block_import,
env: proposer_factory,
keystore,
sync_oracle,
@@ -286,7 +285,7 @@ pub fn build_aura_worker<P, B, C, PF, I, SO, BS, Error>(
struct AuraWorker<C, E, I, P, SO, BS> {
client: Arc<C>,
block_import: Arc<Mutex<I>>,
block_import: I,
env: E,
keystore: SyncCryptoStorePtr,
sync_oracle: SO,
@@ -326,8 +325,8 @@ where
"aura"
}
fn block_import(&self) -> Arc<Mutex<Self::BlockImport>> {
self.block_import.clone()
fn block_import(&mut self) -> &mut Self::BlockImport {
&mut self.block_import
}
fn epoch_data(
@@ -805,7 +804,7 @@ mod tests {
let worker = AuraWorker {
client: client.clone(),
block_import: Arc::new(Mutex::new(client)),
block_import: client,
env: environ,
keystore: keystore.into(),
sync_oracle: DummyOracle.clone(),
@@ -854,7 +853,7 @@ mod tests {
let mut worker = AuraWorker {
client: client.clone(),
block_import: Arc::new(Mutex::new(client.clone())),
block_import: client.clone(),
env: environ,
keystore: keystore.into(),
sync_oracle: DummyOracle.clone(),
+5 -5
View File
@@ -438,7 +438,7 @@ pub fn start_babe<B, C, SC, E, I, SO, CAW, BS, Error>(BabeParams {
+ Sync + 'static,
Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
SO: SyncOracle + Send + Sync + Clone + 'static,
CAW: CanAuthorWith<B> + Send + 'static,
CAW: CanAuthorWith<B> + Send + Sync + 'static,
BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + 'static,
{
const HANDLE_BUFFER_SIZE: usize = 1024;
@@ -448,7 +448,7 @@ pub fn start_babe<B, C, SC, E, I, SO, CAW, BS, Error>(BabeParams {
let worker = BabeSlotWorker {
client: client.clone(),
block_import: Arc::new(Mutex::new(block_import)),
block_import,
env,
sync_oracle: sync_oracle.clone(),
force_authoring,
@@ -605,7 +605,7 @@ type SlotNotificationSinks<B> = Arc<
struct BabeSlotWorker<B: BlockT, C, E, I, SO, BS> {
client: Arc<C>,
block_import: Arc<Mutex<I>>,
block_import: I,
env: E,
sync_oracle: SO,
force_authoring: bool,
@@ -647,8 +647,8 @@ where
"babe"
}
fn block_import(&self) -> Arc<Mutex<Self::BlockImport>> {
self.block_import.clone()
fn block_import(&mut self) -> &mut Self::BlockImport {
&mut self.block_import
}
fn epoch_data(
+1 -1
View File
@@ -31,9 +31,9 @@ sp-inherents = { version = "3.0.0", path = "../../../primitives/inherents" }
sp-timestamp = { version = "3.0.0", path = "../../../primitives/timestamp" }
futures = "0.3.9"
futures-timer = "3.0.1"
parking_lot = "0.11.1"
log = "0.4.11"
thiserror = "1.0.21"
async-trait = "0.1.42"
[dev-dependencies]
substrate-test-runtime-client = { version = "2.0.0", path = "../../../test-utils/runtime/client" }
+154 -120
View File
@@ -32,12 +32,11 @@ pub use slots::SlotInfo;
use slots::Slots;
pub use aux_schema::{check_equivocation, MAX_SLOT_CAPACITY, PRUNING_BOUND};
use std::{fmt::Debug, ops::Deref, pin::Pin, sync::Arc, time::Duration};
use std::{fmt::Debug, ops::Deref, time::Duration};
use codec::{Decode, Encode};
use futures::{prelude::*, future::{self, Either}};
use futures::{future::Either, Future, TryFutureExt};
use futures_timer::Delay;
use log::{debug, error, info, warn};
use parking_lot::Mutex;
use sp_api::{ProvideRuntimeApi, ApiRef};
use sp_arithmetic::traits::BaseArithmetic;
use sp_consensus::{BlockImport, Proposer, SyncOracle, SelectChain, CanAuthorWith, SlotData};
@@ -68,21 +67,23 @@ pub struct SlotResult<Block: BlockT, Proof> {
///
/// The implementation should not make any assumptions of the slot being bound to the time or
/// similar. The only valid assumption is that the slot number is always increasing.
#[async_trait::async_trait]
pub trait SlotWorker<B: BlockT, Proof> {
/// Called when a new slot is triggered.
///
/// Returns a future that resolves to a [`SlotResult`] iff a block was successfully built in
/// the slot. Otherwise `None` is returned.
fn on_slot(
async fn on_slot(
&mut self,
chain_head: B::Header,
slot_info: SlotInfo,
) -> Pin<Box<dyn Future<Output = Option<SlotResult<B, Proof>>> + Send>>;
) -> Option<SlotResult<B, Proof>>;
}
/// A skeleton implementation for `SlotWorker` which tries to claim a slot at
/// its beginning and tries to produce a block if successfully claimed, timing
/// out if block production takes too long.
#[async_trait::async_trait]
pub trait SimpleSlotWorker<B: BlockT> {
/// A handle to a `BlockImport`.
type BlockImport: BlockImport<B, Transaction = <Self::Proposer as Proposer<B>>::Transaction>
@@ -96,7 +97,7 @@ pub trait SimpleSlotWorker<B: BlockT> {
+ Send + Unpin + 'static;
/// The type of proposer to use to build blocks.
type Proposer: Proposer<B>;
type Proposer: Proposer<B> + Send;
/// Data associated with a slot claim.
type Claim: Send + 'static;
@@ -108,7 +109,7 @@ pub trait SimpleSlotWorker<B: BlockT> {
fn logging_target(&self) -> &'static str;
/// A handle to a `BlockImport`.
fn block_import(&self) -> Arc<Mutex<Self::BlockImport>>;
fn block_import(&mut self) -> &mut Self::BlockImport;
/// Returns the epoch data necessary for authoring. For time-dependent epochs,
/// use the provided slot number as a canonical source of time.
@@ -191,36 +192,38 @@ pub trait SimpleSlotWorker<B: BlockT> {
) -> Duration;
/// Implements [`SlotWorker::on_slot`].
fn on_slot(
async fn on_slot(
&mut self,
chain_head: B::Header,
slot_info: SlotInfo,
) -> Pin<Box<dyn Future<Output = Option<SlotResult<B, <Self::Proposer as Proposer<B>>::Proof>>> + Send>>
where
<Self::Proposer as Proposer<B>>::Proposal: Unpin + Send + 'static,
{
) -> Option<SlotResult<B, <Self::Proposer as Proposer<B>>::Proof>> {
let (timestamp, slot) = (slot_info.timestamp, slot_info.slot);
let telemetry = self.telemetry();
let logging_target = self.logging_target();
let proposing_remaining_duration = self.proposing_remaining_duration(&chain_head, &slot_info);
let proposing_remaining = if proposing_remaining_duration == Duration::default() {
debug!(
target: self.logging_target(),
target: logging_target,
"Skipping proposal slot {} since there's no time left to propose",
slot,
);
return Box::pin(future::ready(None));
return None
} else {
Box::new(Delay::new(proposing_remaining_duration))
as Box<dyn Future<Output = ()> + Unpin + Send>
Delay::new(proposing_remaining_duration)
};
let epoch_data = match self.epoch_data(&chain_head, slot) {
Ok(epoch_data) => epoch_data,
Err(err) => {
warn!("Unable to fetch epoch data at block {:?}: {:?}", chain_head.hash(), err);
warn!(
target: logging_target,
"Unable to fetch epoch data at block {:?}: {:?}",
chain_head.hash(),
err,
);
telemetry!(
telemetry;
@@ -230,7 +233,7 @@ pub trait SimpleSlotWorker<B: BlockT> {
"err" => ?err,
);
return Box::pin(future::ready(None));
return None;
}
};
@@ -242,7 +245,7 @@ pub trait SimpleSlotWorker<B: BlockT> {
self.sync_oracle().is_offline() &&
authorities_len.map(|a| a > 1).unwrap_or(false)
{
debug!(target: self.logging_target(), "Skipping proposal slot. Waiting for the network.");
debug!(target: logging_target, "Skipping proposal slot. Waiting for the network.");
telemetry!(
telemetry;
CONSENSUS_DEBUG;
@@ -250,16 +253,16 @@ pub trait SimpleSlotWorker<B: BlockT> {
"authorities_len" => authorities_len,
);
return Box::pin(future::ready(None));
return None;
}
let claim = match self.claim_slot(&chain_head, slot, &epoch_data) {
None => return Box::pin(future::ready(None)),
None => return None,
Some(claim) => claim,
};
if self.should_backoff(slot, &chain_head) {
return Box::pin(future::ready(None));
return None;
}
debug!(
@@ -277,10 +280,15 @@ pub trait SimpleSlotWorker<B: BlockT> {
"timestamp" => *timestamp,
);
let awaiting_proposer = {
let telemetry = telemetry.clone();
self.proposer(&chain_head).map_err(move |err| {
warn!("Unable to author block in slot {:?}: {:?}", slot, err);
let proposer = match self.proposer(&chain_head).await {
Ok(p) => p,
Err(err) => {
warn!(
target: logging_target,
"Unable to author block in slot {:?}: {:?}",
slot,
err,
);
telemetry!(
telemetry;
@@ -290,8 +298,8 @@ pub trait SimpleSlotWorker<B: BlockT> {
"err" => ?err
);
err
})
return None
}
};
let logs = self.pre_digest_data(slot, &claim);
@@ -299,106 +307,126 @@ pub trait SimpleSlotWorker<B: BlockT> {
// deadline our production to 98% of the total time left for proposing. As we deadline
// the proposing below to the same total time left, the 2% margin should be enough for
// the result to be returned.
let proposing = awaiting_proposer.and_then(move |proposer| proposer.propose(
let proposing = proposer.propose(
slot_info.inherent_data,
sp_runtime::generic::Digest {
logs,
},
proposing_remaining_duration.mul_f32(0.98),
).map_err(|e| sp_consensus::Error::ClientImport(format!("{:?}", e))));
).map_err(|e| sp_consensus::Error::ClientImport(format!("{:?}", e)));
let proposal_work = {
let telemetry = telemetry.clone();
futures::future::select(proposing, proposing_remaining).map(move |v| match v {
Either::Left((b, _)) => b.map(|b| (b, claim)),
Either::Right(_) => {
info!(
"⌛️ Discarding proposal for slot {}; block production took too long",
slot,
);
// If the node was compiled with debug, tell the user to use release optimizations.
#[cfg(build_type="debug")]
info!("👉 Recompile your node in `--release` mode to mitigate this problem.");
telemetry!(
telemetry;
CONSENSUS_INFO;
"slots.discarding_proposal_took_too_long";
"slot" => *slot,
);
let proposal = match futures::future::select(proposing, proposing_remaining).await {
Either::Left((Ok(p), _)) => p,
Either::Left((Err(err), _)) => {
warn!(
target: logging_target,
"Proposing failed: {:?}",
err,
);
Err(sp_consensus::Error::ClientImport("Timeout in the Slots proposer".into()))
},
})
return None
},
Either::Right(_) => {
info!(
target: logging_target,
"⌛️ Discarding proposal for slot {}; block production took too long",
slot,
);
// If the node was compiled with debug, tell the user to use release optimizations.
#[cfg(build_type="debug")]
info!(
target: logging_target,
"👉 Recompile your node in `--release` mode to mitigate this problem.",
);
telemetry!(
telemetry;
CONSENSUS_INFO;
"slots.discarding_proposal_took_too_long";
"slot" => *slot,
);
return None
},
};
let block_import_params_maker = self.block_import_params();
let block_import = self.block_import();
let logging_target = self.logging_target();
proposal_work.and_then(move |(proposal, claim)| async move {
let (block, storage_proof) = (proposal.block, proposal.proof);
let (header, body) = block.deconstruct();
let header_num = *header.number();
let header_hash = header.hash();
let parent_hash = *header.parent_hash();
let (block, storage_proof) = (proposal.block, proposal.proof);
let (header, body) = block.deconstruct();
let header_num = *header.number();
let header_hash = header.hash();
let parent_hash = *header.parent_hash();
let block_import_params = block_import_params_maker(
header,
&header_hash,
body.clone(),
proposal.storage_changes,
claim,
epoch_data,
)?;
let block_import_params = match block_import_params_maker(
header,
&header_hash,
body.clone(),
proposal.storage_changes,
claim,
epoch_data,
) {
Ok(bi) => bi,
Err(err) => {
warn!(
target: logging_target,
"Failed to create block import params: {:?}",
err,
);
info!(
"🔖 Pre-sealed block for proposal at {}. Hash now {:?}, previously {:?}.",
header_num,
block_import_params.post_hash(),
header_hash,
return None
}
};
info!(
target: logging_target,
"🔖 Pre-sealed block for proposal at {}. Hash now {:?}, previously {:?}.",
header_num,
block_import_params.post_hash(),
header_hash,
);
telemetry!(
telemetry;
CONSENSUS_INFO;
"slots.pre_sealed_block";
"header_num" => ?header_num,
"hash_now" => ?block_import_params.post_hash(),
"hash_previously" => ?header_hash,
);
let header = block_import_params.post_header();
if let Err(err) = block_import
.import_block(block_import_params, Default::default())
{
warn!(
target: logging_target,
"Error with block built on {:?}: {:?}",
parent_hash,
err,
);
telemetry!(
telemetry;
CONSENSUS_INFO;
"slots.pre_sealed_block";
"header_num" => ?header_num,
"hash_now" => ?block_import_params.post_hash(),
"hash_previously" => ?header_hash,
CONSENSUS_WARN;
"slots.err_with_block_built_on";
"hash" => ?parent_hash,
"err" => ?err,
);
}
let header = block_import_params.post_header();
if let Err(err) = block_import.lock().import_block(block_import_params, Default::default()) {
warn!(
target: logging_target,
"Error with block built on {:?}: {:?}",
parent_hash,
err,
);
telemetry!(
telemetry;
CONSENSUS_WARN;
"slots.err_with_block_built_on";
"hash" => ?parent_hash,
"err" => ?err,
);
}
Ok(SlotResult { block: B::new(header, body), storage_proof })
}).then(|r| async move {
r.map_err(|e| warn!(target: "slots", "Encountered consensus error: {:?}", e)).ok()
}).boxed()
Some(SlotResult { block: B::new(header, body), storage_proof })
}
}
impl<B: BlockT, T: SimpleSlotWorker<B>> SlotWorker<B, <T::Proposer as Proposer<B>>::Proof> for T {
fn on_slot(
#[async_trait::async_trait]
impl<B: BlockT, T: SimpleSlotWorker<B> + Send> SlotWorker<B, <T::Proposer as Proposer<B>>::Proof> for T {
async fn on_slot(
&mut self,
chain_head: B::Header,
slot_info: SlotInfo,
) -> Pin<Box<dyn Future<Output = Option<SlotResult<B, <T::Proposer as Proposer<B>>::Proof>>> + Send>> {
SimpleSlotWorker::on_slot(self, chain_head, slot_info)
) -> Option<SlotResult<B, <T::Proposer as Proposer<B>>::Proof>> {
SimpleSlotWorker::on_slot(self, chain_head, slot_info).await
}
}
@@ -436,25 +464,39 @@ where
let SlotDuration(slot_duration) = slot_duration;
// rather than use a timer interval, we schedule our waits ourselves
Slots::<SC>::new(
let mut slots = Slots::<SC>::new(
slot_duration.slot_duration(),
inherent_data_providers,
timestamp_extractor,
).inspect_err(|e| debug!(target: "slots", "Faulty timer: {:?}", e))
.try_for_each(move |slot_info| {
);
async move {
loop {
let slot_info = match slots.next_slot().await {
Ok(slot) => slot,
Err(err) => {
debug!(target: "slots", "Faulty timer: {:?}", err);
return
},
};
// only propose when we are not syncing.
if sync_oracle.is_major_syncing() {
debug!(target: "slots", "Skipping proposal slot due to sync.");
return Either::Right(future::ready(Ok(())));
continue;
}
let slot = slot_info.slot;
let chain_head = match client.best_chain() {
Ok(x) => x,
Err(e) => {
warn!(target: "slots", "Unable to author block in slot {}. \
no best block header: {:?}", slot, e);
return Either::Right(future::ready(Ok(())));
warn!(
target: "slots",
"Unable to author block in slot {}. No best block header: {:?}",
slot,
e,
);
continue;
}
};
@@ -466,19 +508,11 @@ where
slot,
err,
);
Either::Right(future::ready(Ok(())))
} else {
Either::Left(
worker.on_slot(chain_head, slot_info)
.then(|_| future::ready(Ok(())))
)
worker.on_slot(chain_head, slot_info).await;
}
}).then(|res| {
if let Err(err) = res {
warn!(target: "slots", "Slots stream terminated with an error: {:?}", err);
}
future::ready(())
})
}
}
}
/// A header which has been checked
+12 -21
View File
@@ -22,10 +22,9 @@
use super::{SlotCompatible, Slot};
use sp_consensus::Error;
use futures::{prelude::*, task::Context, task::Poll};
use sp_inherents::{InherentData, InherentDataProviders};
use std::{pin::Pin, time::{Duration, Instant}};
use std::time::{Duration, Instant};
use futures_timer::Delay;
/// Returns current duration since unix epoch.
@@ -107,57 +106,49 @@ impl<SC> Slots<SC> {
}
}
impl<SC: SlotCompatible> Stream for Slots<SC> {
type Item = Result<SlotInfo, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
impl<SC: SlotCompatible> Slots<SC> {
/// Returns a future that fires when the next slot starts.
pub async fn next_slot(&mut self) -> Result<SlotInfo, Error> {
loop {
let slot_duration = self.slot_duration;
self.inner_delay = match self.inner_delay.take() {
None => {
// schedule wait.
let wait_dur = time_until_next(duration_now(), slot_duration);
let wait_dur = time_until_next(duration_now(), self.slot_duration);
Some(Delay::new(wait_dur))
}
Some(d) => Some(d),
};
if let Some(ref mut inner_delay) = self.inner_delay {
match Future::poll(Pin::new(inner_delay), cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(()) => {}
}
if let Some(inner_delay) = self.inner_delay.take() {
inner_delay.await;
}
// timeout has fired.
let inherent_data = match self.inherent_data_providers.create_inherent_data() {
Ok(id) => id,
Err(err) => return Poll::Ready(Some(Err(sp_consensus::Error::InherentData(err)))),
Err(err) => return Err(sp_consensus::Error::InherentData(err)),
};
let result = self.timestamp_extractor.extract_timestamp_and_slot(&inherent_data);
let (timestamp, slot, offset) = match result {
Ok(v) => v,
Err(err) => return Poll::Ready(Some(Err(err))),
Err(err) => return Err(err),
};
// reschedule delay for next slot.
let ends_in = offset +
time_until_next(timestamp.as_duration(), slot_duration);
time_until_next(timestamp.as_duration(), self.slot_duration);
self.inner_delay = Some(Delay::new(ends_in));
// never yield the same slot twice.
if slot > self.last_slot {
self.last_slot = slot;
break Poll::Ready(Some(Ok(SlotInfo::new(
break Ok(SlotInfo::new(
slot,
timestamp,
inherent_data,
self.slot_duration,
))))
))
}
}
}
}
impl<SC> Unpin for Slots<SC> {}