Make consensus SlotWorker don't assume a slot is time / duration (#7441)

* Make consensus `SlotWorker` don't assume a slot is time / duration

This removes the last bit of assumption that a slot is always `time /
duration`. This will be required by parachains where a slot will be the
relay chain block number. Besides this there are also some other drive
by changes. One more notable is that `on_slot` now returns a
`SlotResult` that holds the block and a potential storage proof.

To simplify the implementation and usage of the `SimpleSlotWorker` the
`SlotWorker` trait is now implemented for each type that implements
`SimpleSlotWorker`.

* Update client/consensus/slots/src/lib.rs

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

* Update client/consensus/slots/src/lib.rs

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>
This commit is contained in:
Bastian Köcher
2020-10-28 00:01:06 +01:00
committed by GitHub
parent e4cfb2556d
commit 05af334bbc
7 changed files with 99 additions and 143 deletions
+1
View File
@@ -6832,6 +6832,7 @@ dependencies = [
"sp-inherents",
"sp-runtime",
"sp-state-machine",
"sp-trie",
"substrate-test-runtime-client",
]
+5 -31
View File
@@ -72,7 +72,7 @@ use sp_timestamp::{
use sc_telemetry::{telemetry, CONSENSUS_TRACE, CONSENSUS_DEBUG, CONSENSUS_INFO};
use sc_consensus_slots::{
CheckedHeader, SlotWorker, SlotInfo, SlotCompatible, StorageChanges, check_equivocation,
CheckedHeader, SlotInfo, SlotCompatible, StorageChanges, check_equivocation,
};
use sp_api::ApiExt;
@@ -127,7 +127,7 @@ struct AuraSlotCompatible;
impl SlotCompatible for AuraSlotCompatible {
fn extract_timestamp_and_slot(
&self,
data: &InherentData
data: &InherentData,
) -> Result<(TimestampInherent, AuraInherent, std::time::Duration), sp_consensus::Error> {
data.timestamp_inherent_data()
.and_then(|t| data.aura_inherent_data().map(|a| (t, a)))
@@ -198,7 +198,8 @@ struct AuraWorker<C, E, I, P, SO> {
_key_type: PhantomData<P>,
}
impl<B, C, E, I, P, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for AuraWorker<C, E, I, P, SO> where
impl<B, C, E, I, P, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for AuraWorker<C, E, I, P, SO>
where
B: BlockT,
C: ProvideRuntimeApi<B> + BlockOf + ProvideCache<B> + Sync,
C::Api: AuraApi<B, AuthorityId<P>>,
@@ -353,26 +354,6 @@ impl<B, C, E, I, P, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for AuraW
}
}
impl<B: BlockT, C, E, I, P, Error, SO> SlotWorker<B> for AuraWorker<C, E, I, P, SO> where
B: BlockT,
C: ProvideRuntimeApi<B> + BlockOf + ProvideCache<B> + Sync + Send,
C::Api: AuraApi<B, AuthorityId<P>>,
E: Environment<B, Error = Error> + Send + Sync,
E::Proposer: Proposer<B, Error = Error, Transaction = sp_api::TransactionFor<C, B>>,
I: BlockImport<B, Transaction = sp_api::TransactionFor<C, B>> + Send + Sync + 'static,
P: Pair + Send + Sync,
P::Public: AppPublic + Member + Encode + Decode + Hash,
P::Signature: TryFrom<Vec<u8>> + Member + Encode + Decode + Hash + Debug,
SO: SyncOracle + Send + Sync + Clone,
Error: std::error::Error + Send + From<sp_consensus::Error> + 'static,
{
type OnSlot = Pin<Box<dyn Future<Output = Result<(), sp_consensus::Error>> + Send>>;
fn on_slot(&mut self, chain_head: B::Header, slot_info: SlotInfo) -> Self::OnSlot {
<Self as sc_consensus_slots::SimpleSlotWorker<B>>::on_slot(self, chain_head, slot_info)
}
}
fn aura_err<B: BlockT>(error: Error<B>) -> Error<B> {
debug!(target: "aura", "{}", error);
error
@@ -886,19 +867,12 @@ mod tests {
use std::task::Poll;
use sc_block_builder::BlockBuilderProvider;
use sp_runtime::traits::Header as _;
use substrate_test_runtime_client::runtime::{Header, H256};
use substrate_test_runtime_client::{TestClient, runtime::{Header, H256}};
use sc_keystore::LocalKeystore;
use sp_application_crypto::key_types::AURA;
type Error = sp_blockchain::Error;
type TestClient = substrate_test_runtime_client::client::Client<
substrate_test_runtime_client::Backend,
substrate_test_runtime_client::Executor,
TestBlock,
substrate_test_runtime_client::runtime::RuntimeApi
>;
struct DummyFactory(Arc<TestClient>);
struct DummyProposer(u64, Arc<TestClient>);
+1 -21
View File
@@ -113,7 +113,7 @@ use futures::prelude::*;
use log::{debug, info, log, trace, warn};
use prometheus_endpoint::Registry;
use sc_consensus_slots::{
SlotWorker, SlotInfo, SlotCompatible, StorageChanges, CheckedHeader, check_equivocation,
SlotInfo, SlotCompatible, StorageChanges, CheckedHeader, check_equivocation,
};
use sc_consensus_epochs::{
descendent_query, SharedEpochChanges, EpochChangesFor, Epoch as EpochT, ViableEpochDescriptor,
@@ -667,26 +667,6 @@ impl<B, C, E, I, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for BabeSlot
}
}
impl<B, C, E, I, Error, SO> SlotWorker<B> for BabeSlotWorker<B, C, E, I, SO> where
B: BlockT,
C: ProvideRuntimeApi<B> +
ProvideCache<B> +
HeaderBackend<B> +
HeaderMetadata<B, Error = ClientError> + Send + Sync,
C::Api: BabeApi<B>,
E: Environment<B, Error = Error> + Send + Sync,
E::Proposer: Proposer<B, Error = Error, Transaction = sp_api::TransactionFor<C, B>>,
I: BlockImport<B, Transaction = sp_api::TransactionFor<C, B>> + Send + Sync + 'static,
SO: SyncOracle + Send + Sync + Clone,
Error: std::error::Error + Send + From<sp_consensus::Error> + From<I::Error> + 'static,
{
type OnSlot = Pin<Box<dyn Future<Output = Result<(), sp_consensus::Error>> + Send>>;
fn on_slot(&mut self, chain_head: B::Header, slot_info: SlotInfo) -> Self::OnSlot {
<Self as sc_consensus_slots::SimpleSlotWorker<B>>::on_slot(self, chain_head, slot_info)
}
}
/// Extract the BABE pre digest from the given header. Pre-runtime digests are
/// mandatory, the function will return `Err` if none is found.
fn find_pre_digest<B: BlockT>(header: &B::Header) -> Result<PreDigest, Error<B>>
@@ -17,6 +17,7 @@ targets = ["x86_64-unknown-linux-gnu"]
codec = { package = "parity-scale-codec", version = "1.3.4" }
sc-client-api = { version = "2.0.0", path = "../../api" }
sp-core = { version = "2.0.0", path = "../../../primitives/core" }
sp-trie = { version = "2.0.0", path = "../../../primitives/trie" }
sp-application-crypto = { version = "2.0.0", path = "../../../primitives/application-crypto" }
sp-blockchain = { version = "2.0.0", path = "../../../primitives/blockchain" }
sp-consensus-slots = { version = "0.8.0", path = "../../../primitives/consensus/slots" }
+83 -61
View File
@@ -25,7 +25,7 @@
mod slots;
mod aux_schema;
pub use slots::{SignedDuration, SlotInfo};
pub use slots::SlotInfo;
use slots::Slots;
pub use aux_schema::{check_equivocation, MAX_SLOT_CAPACITY, PRUNING_BOUND};
@@ -48,13 +48,29 @@ use parking_lot::Mutex;
pub type StorageChanges<Transaction, Block> =
sp_state_machine::StorageChanges<Transaction, HashFor<Block>, NumberFor<Block>>;
/// The result of [`SlotWorker::on_slot`].
#[derive(Debug, Clone)]
pub struct SlotResult<Block: BlockT> {
/// The block that was built.
pub block: Block,
/// The optional storage proof that was calculated while building the block.
///
/// This needs to be enabled for the proposer to get this storage proof.
pub storage_proof: Option<sp_trie::StorageProof>,
}
/// A worker that should be invoked at every new slot.
///
/// The implementation should not make any assumptions of the slot being bound to the time or
/// similar. The only valid assumption is that the slot number is always increasing.
pub trait SlotWorker<B: BlockT> {
/// The type of the future that will be returned when a new slot is
/// triggered.
type OnSlot: Future<Output = Result<(), sp_consensus::Error>>;
/// The type of the future that will be returned when a new slot is triggered.
type OnSlot: Future<Output = Option<SlotResult<B>>>;
/// Called when a new slot is triggered.
///
/// Returns a future that resolves to a [`SlotResult`] iff a block was successfully built in
/// the slot. Otherwise `None` is returned.
fn on_slot(&mut self, chain_head: B::Header, slot_info: SlotInfo) -> Self::OnSlot;
}
@@ -90,7 +106,11 @@ pub trait SimpleSlotWorker<B: BlockT> {
/// Returns the epoch data necessary for authoring. For time-dependent epochs,
/// use the provided slot number as a canonical source of time.
fn epoch_data(&self, header: &B::Header, slot_number: u64) -> Result<Self::EpochData, sp_consensus::Error>;
fn epoch_data(
&self,
header: &B::Header,
slot_number: u64,
) -> Result<Self::EpochData, sp_consensus::Error>;
/// Returns the number of authorities given the epoch data.
/// None indicate that the authorities information is incomplete.
@@ -111,7 +131,7 @@ pub trait SimpleSlotWorker<B: BlockT> {
_header: &B::Header,
_slot_number: u64,
_epoch_data: &Self::EpochData,
) { }
) {}
/// Return the pre digest data to include in a block authored with the given claim.
fn pre_digest_data(
@@ -158,32 +178,38 @@ pub trait SimpleSlotWorker<B: BlockT> {
fn proposing_remaining_duration(
&self,
_head: &B::Header,
slot_info: &SlotInfo
slot_info: &SlotInfo,
) -> Option<Duration> {
Some(self.slot_remaining_duration(slot_info))
}
/// Implements the `on_slot` functionality from `SlotWorker`.
fn on_slot(&mut self, chain_head: B::Header, slot_info: SlotInfo)
-> Pin<Box<dyn Future<Output = Result<(), sp_consensus::Error>> + Send>> where
Self: Send + Sync,
/// Implements [`SlotWorker::on_slot`].
fn on_slot(
&mut self,
chain_head: B::Header,
slot_info: SlotInfo,
) -> Pin<Box<dyn Future<Output = Option<SlotResult<B>>> + Send>>
where
<Self::Proposer as Proposer<B>>::Proposal: Unpin + Send + 'static,
{
let (timestamp, slot_number, slot_duration) =
(slot_info.timestamp, slot_info.number, slot_info.duration);
let (timestamp, slot_number) = (slot_info.timestamp, slot_info.number);
{
let slot_now = SignedDuration::default().slot_now(slot_duration);
if slot_now > slot_number {
// if this is behind, return.
debug!(target: self.logging_target(),
"Skipping proposal slot {} since our current view is {}",
slot_number, slot_now,
let slot_remaining_duration = self.slot_remaining_duration(&slot_info);
let proposing_remaining_duration = self.proposing_remaining_duration(&chain_head, &slot_info);
let proposing_remaining = match proposing_remaining_duration {
Some(r) if r.as_secs() == 0 && r.as_nanos() == 0 => {
debug!(
target: self.logging_target(),
"Skipping proposal slot {} since there's no time left to propose",
slot_number,
);
return Box::pin(future::ready(Ok(())));
}
}
return Box::pin(future::ready(None));
},
Some(r) => Box::new(Delay::new(r)) as Box<dyn Future<Output = ()> + Unpin + Send>,
None => Box::new(future::pending()) as Box<_>,
};
let epoch_data = match self.epoch_data(&chain_head, slot_number) {
Ok(epoch_data) => epoch_data,
@@ -196,7 +222,7 @@ pub trait SimpleSlotWorker<B: BlockT> {
"err" => ?err,
);
return Box::pin(future::ready(Ok(())));
return Box::pin(future::ready(None));
}
};
@@ -215,16 +241,17 @@ pub trait SimpleSlotWorker<B: BlockT> {
"authorities_len" => authorities_len,
);
return Box::pin(future::ready(Ok(())));
return Box::pin(future::ready(None));
}
let claim = match self.claim_slot(&chain_head, slot_number, &epoch_data) {
None => return Box::pin(future::ready(Ok(()))),
None => return Box::pin(future::ready(None)),
Some(claim) => claim,
};
debug!(
target: self.logging_target(), "Starting authorship at slot {}; timestamp = {}",
target: self.logging_target(),
"Starting authorship at slot {}; timestamp = {}",
slot_number,
timestamp,
);
@@ -244,8 +271,6 @@ pub trait SimpleSlotWorker<B: BlockT> {
err
});
let slot_remaining_duration = self.slot_remaining_duration(&slot_info);
let proposing_remaining_duration = self.proposing_remaining_duration(&chain_head, &slot_info);
let logs = self.pre_digest_data(slot_number, &claim);
// deadline our production to approx. the end of the slot
@@ -258,15 +283,10 @@ pub trait SimpleSlotWorker<B: BlockT> {
RecordProof::No,
).map_err(|e| sp_consensus::Error::ClientImport(format!("{:?}", e))));
let delay: Box<dyn Future<Output=()> + Unpin + Send> = match proposing_remaining_duration {
Some(r) => Box::new(Delay::new(r)),
None => Box::new(future::pending()),
};
let proposal_work =
Box::new(futures::future::select(proposing, delay).map(move |v| match v {
futures::future::Either::Left((b, _)) => b.map(|b| (b, claim)),
futures::future::Either::Right(_) => {
futures::future::select(proposing, proposing_remaining).map(move |v| match v {
Either::Left((b, _)) => b.map(|b| (b, claim)),
Either::Right(_) => {
info!("⌛️ Discarding proposal for slot {}; block production took too long", slot_number);
// If the node was compiled with debug, tell the user to use release optimizations.
#[cfg(build_type="debug")]
@@ -274,16 +294,18 @@ pub trait SimpleSlotWorker<B: BlockT> {
telemetry!(CONSENSUS_INFO; "slots.discarding_proposal_took_too_long";
"slot" => slot_number,
);
Err(sp_consensus::Error::ClientImport("Timeout in the Slots proposer".into()))
},
}));
});
let block_import_params_maker = self.block_import_params();
let block_import = self.block_import();
let logging_target = self.logging_target();
Box::pin(proposal_work.and_then(move |(proposal, claim)| {
let (header, body) = proposal.block.deconstruct();
proposal_work.and_then(move |(proposal, claim)| async move {
let (block, storage_proof) = (proposal.block, proposal.proof);
let (header, body) = block.clone().deconstruct();
let header_num = *header.number();
let header_hash = header.hash();
let parent_hash = *header.parent_hash();
@@ -295,12 +317,7 @@ pub trait SimpleSlotWorker<B: BlockT> {
proposal.storage_changes,
claim,
epoch_data,
);
let block_import_params = match block_import_params {
Ok(params) => params,
Err(e) => return future::err(e),
};
)?;
info!(
"🔖 Pre-sealed block for proposal at {}. Hash now {:?}, previously {:?}.",
@@ -316,18 +333,32 @@ pub trait SimpleSlotWorker<B: BlockT> {
);
if let Err(err) = block_import.lock().import_block(block_import_params, Default::default()) {
warn!(target: logging_target,
warn!(
target: logging_target,
"Error with block built on {:?}: {:?}",
parent_hash,
err,
);
telemetry!(CONSENSUS_WARN; "slots.err_with_block_built_on";
"hash" => ?parent_hash, "err" => ?err,
telemetry!(
CONSENSUS_WARN; "slots.err_with_block_built_on";
"hash" => ?parent_hash,
"err" => ?err,
);
}
future::ready(Ok(()))
}))
Ok(SlotResult { block, storage_proof })
}).then(|r| async move {
r.map_err(|e| warn!(target: "slots", "Encountered consensus error: {:?}", e)).ok()
}).boxed()
}
}
impl<B: BlockT, T: SimpleSlotWorker<B>> SlotWorker<B> for T {
type OnSlot = Pin<Box<dyn Future<Output = Option<SlotResult<B>>> + Send>>;
fn on_slot(&mut self, chain_head: B::Header, slot_info: SlotInfo) -> Self::OnSlot {
SimpleSlotWorker::on_slot(self, chain_head, slot_info)
}
}
@@ -338,10 +369,6 @@ pub trait SlotCompatible {
&self,
inherent: &InherentData,
) -> Result<(u64, u64, std::time::Duration), sp_consensus::Error>;
/// Get the difference between chain time and local time. Defaults to
/// always returning zero.
fn time_offset() -> SignedDuration { Default::default() }
}
/// Start a new slot worker.
@@ -403,11 +430,7 @@ where
Either::Right(future::ready(Ok(())))
} else {
Either::Left(
worker.on_slot(chain_head, slot_info)
.map_err(|e| {
warn!(target: "slots", "Encountered consensus error: {:?}", e);
})
.or_else(|_| future::ready(Ok(())))
worker.on_slot(chain_head, slot_info).then(|_| future::ready(Ok(())))
)
}
}).then(|res| {
@@ -569,7 +592,6 @@ mod test {
fn slot(n: u64) -> super::slots::SlotInfo {
super::slots::SlotInfo {
number: n,
last_number: n - 1,
duration: SLOT_DURATION.as_millis() as u64,
timestamp: Default::default(),
inherent_data: Default::default(),
+1 -30
View File
@@ -37,30 +37,6 @@ pub fn duration_now() -> Duration {
))
}
/// A `Duration` with a sign (before or after). Immutable.
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub struct SignedDuration {
offset: Duration,
is_positive: bool,
}
impl SignedDuration {
/// Construct a `SignedDuration`
pub fn new(offset: Duration, is_positive: bool) -> Self {
Self { offset, is_positive }
}
/// Get the slot for now. Panics if `slot_duration` is 0.
pub fn slot_now(&self, slot_duration: u64) -> u64 {
(if self.is_positive {
duration_now() + self.offset
} else {
duration_now() - self.offset
}.as_millis() as u64) / slot_duration
}
}
/// Returns the duration until the next slot, based on current duration since
pub fn time_until_next(now: Duration, slot_duration: u64) -> Duration {
let remaining_full_millis = slot_duration - (now.as_millis() as u64 % slot_duration) - 1;
@@ -71,8 +47,6 @@ pub fn time_until_next(now: Duration, slot_duration: u64) -> Duration {
pub struct SlotInfo {
/// The slot number.
pub number: u64,
/// The last slot number produced.
pub last_number: u64,
/// Current timestamp.
pub timestamp: u64,
/// The instant at which the slot ends.
@@ -150,13 +124,11 @@ impl<SC: SlotCompatible> Stream for Slots<SC> {
// never yield the same slot twice.
if slot_num > self.last_slot {
let last_slot = self.last_slot;
self.last_slot = slot_num;
break Poll::Ready(Some(Ok(SlotInfo {
number: slot_num,
duration: self.slot_duration,
last_number: last_slot,
timestamp,
ends_at,
inherent_data,
@@ -166,5 +138,4 @@ impl<SC: SlotCompatible> Stream for Slots<SC> {
}
}
impl<SC> Unpin for Slots<SC> {
}
impl<SC> Unpin for Slots<SC> {}
@@ -123,6 +123,13 @@ impl RecordProof {
}
}
/// Will return [`RecordProof::No`] as default value.
impl Default for RecordProof {
fn default() -> Self {
Self::No
}
}
impl From<bool> for RecordProof {
fn from(val: bool) -> Self {
if val {