From 05af334bbc18cafd22847322c0d8ea9f6a28a26b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Wed, 28 Oct 2020 00:01:06 +0100 Subject: [PATCH] Make consensus `SlotWorker` don't assume a slot is time / duration (#7441) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Make consensus `SlotWorker` don't assume a slot is time / duration This removes the last bit of assumption that a slot is always `time / duration`. This will be required by parachains where a slot will be the relay chain block number. Besides this there are also some other drive by changes. One more notable is that `on_slot` now returns a `SlotResult` that holds the block and a potential storage proof. To simplify the implementation and usage of the `SimpleSlotWorker` the `SlotWorker` trait is now implemented for each type that implements `SimpleSlotWorker`. * Update client/consensus/slots/src/lib.rs Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com> * Update client/consensus/slots/src/lib.rs Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com> Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com> --- substrate/Cargo.lock | 1 + substrate/client/consensus/aura/src/lib.rs | 36 +---- substrate/client/consensus/babe/src/lib.rs | 22 +-- substrate/client/consensus/slots/Cargo.toml | 1 + substrate/client/consensus/slots/src/lib.rs | 144 ++++++++++-------- substrate/client/consensus/slots/src/slots.rs | 31 +--- .../primitives/consensus/common/src/lib.rs | 7 + 7 files changed, 99 insertions(+), 143 deletions(-) diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index ae15dab886..c026060ec1 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -6832,6 +6832,7 @@ dependencies = [ "sp-inherents", "sp-runtime", "sp-state-machine", + "sp-trie", "substrate-test-runtime-client", ] diff --git a/substrate/client/consensus/aura/src/lib.rs b/substrate/client/consensus/aura/src/lib.rs index 426a0e873f..5013c1813b 100644 --- a/substrate/client/consensus/aura/src/lib.rs +++ b/substrate/client/consensus/aura/src/lib.rs @@ -72,7 +72,7 @@ use sp_timestamp::{ use sc_telemetry::{telemetry, CONSENSUS_TRACE, CONSENSUS_DEBUG, CONSENSUS_INFO}; use sc_consensus_slots::{ - CheckedHeader, SlotWorker, SlotInfo, SlotCompatible, StorageChanges, check_equivocation, + CheckedHeader, SlotInfo, SlotCompatible, StorageChanges, check_equivocation, }; use sp_api::ApiExt; @@ -127,7 +127,7 @@ struct AuraSlotCompatible; impl SlotCompatible for AuraSlotCompatible { fn extract_timestamp_and_slot( &self, - data: &InherentData + data: &InherentData, ) -> Result<(TimestampInherent, AuraInherent, std::time::Duration), sp_consensus::Error> { data.timestamp_inherent_data() .and_then(|t| data.aura_inherent_data().map(|a| (t, a))) @@ -198,7 +198,8 @@ struct AuraWorker { _key_type: PhantomData

, } -impl sc_consensus_slots::SimpleSlotWorker for AuraWorker where +impl sc_consensus_slots::SimpleSlotWorker for AuraWorker +where B: BlockT, C: ProvideRuntimeApi + BlockOf + ProvideCache + Sync, C::Api: AuraApi>, @@ -353,26 +354,6 @@ impl sc_consensus_slots::SimpleSlotWorker for AuraW } } -impl SlotWorker for AuraWorker where - B: BlockT, - C: ProvideRuntimeApi + BlockOf + ProvideCache + Sync + Send, - C::Api: AuraApi>, - E: Environment + Send + Sync, - E::Proposer: Proposer>, - I: BlockImport> + Send + Sync + 'static, - P: Pair + Send + Sync, - P::Public: AppPublic + Member + Encode + Decode + Hash, - P::Signature: TryFrom> + Member + Encode + Decode + Hash + Debug, - SO: SyncOracle + Send + Sync + Clone, - Error: std::error::Error + Send + From + 'static, -{ - type OnSlot = Pin> + Send>>; - - fn on_slot(&mut self, chain_head: B::Header, slot_info: SlotInfo) -> Self::OnSlot { - >::on_slot(self, chain_head, slot_info) - } -} - fn aura_err(error: Error) -> Error { debug!(target: "aura", "{}", error); error @@ -886,19 +867,12 @@ mod tests { use std::task::Poll; use sc_block_builder::BlockBuilderProvider; use sp_runtime::traits::Header as _; - use substrate_test_runtime_client::runtime::{Header, H256}; + use substrate_test_runtime_client::{TestClient, runtime::{Header, H256}}; use sc_keystore::LocalKeystore; use sp_application_crypto::key_types::AURA; type Error = sp_blockchain::Error; - type TestClient = substrate_test_runtime_client::client::Client< - substrate_test_runtime_client::Backend, - substrate_test_runtime_client::Executor, - TestBlock, - substrate_test_runtime_client::runtime::RuntimeApi - >; - struct DummyFactory(Arc); struct DummyProposer(u64, Arc); diff --git a/substrate/client/consensus/babe/src/lib.rs b/substrate/client/consensus/babe/src/lib.rs index 6105e9876b..4705381c2b 100644 --- a/substrate/client/consensus/babe/src/lib.rs +++ b/substrate/client/consensus/babe/src/lib.rs @@ -113,7 +113,7 @@ use futures::prelude::*; use log::{debug, info, log, trace, warn}; use prometheus_endpoint::Registry; use sc_consensus_slots::{ - SlotWorker, SlotInfo, SlotCompatible, StorageChanges, CheckedHeader, check_equivocation, + SlotInfo, SlotCompatible, StorageChanges, CheckedHeader, check_equivocation, }; use sc_consensus_epochs::{ descendent_query, SharedEpochChanges, EpochChangesFor, Epoch as EpochT, ViableEpochDescriptor, @@ -667,26 +667,6 @@ impl sc_consensus_slots::SimpleSlotWorker for BabeSlot } } -impl SlotWorker for BabeSlotWorker where - B: BlockT, - C: ProvideRuntimeApi + - ProvideCache + - HeaderBackend + - HeaderMetadata + Send + Sync, - C::Api: BabeApi, - E: Environment + Send + Sync, - E::Proposer: Proposer>, - I: BlockImport> + Send + Sync + 'static, - SO: SyncOracle + Send + Sync + Clone, - Error: std::error::Error + Send + From + From + 'static, -{ - type OnSlot = Pin> + Send>>; - - fn on_slot(&mut self, chain_head: B::Header, slot_info: SlotInfo) -> Self::OnSlot { - >::on_slot(self, chain_head, slot_info) - } -} - /// Extract the BABE pre digest from the given header. Pre-runtime digests are /// mandatory, the function will return `Err` if none is found. fn find_pre_digest(header: &B::Header) -> Result> diff --git a/substrate/client/consensus/slots/Cargo.toml b/substrate/client/consensus/slots/Cargo.toml index 3a636360e7..a13a712fe7 100644 --- a/substrate/client/consensus/slots/Cargo.toml +++ b/substrate/client/consensus/slots/Cargo.toml @@ -17,6 +17,7 @@ targets = ["x86_64-unknown-linux-gnu"] codec = { package = "parity-scale-codec", version = "1.3.4" } sc-client-api = { version = "2.0.0", path = "../../api" } sp-core = { version = "2.0.0", path = "../../../primitives/core" } +sp-trie = { version = "2.0.0", path = "../../../primitives/trie" } sp-application-crypto = { version = "2.0.0", path = "../../../primitives/application-crypto" } sp-blockchain = { version = "2.0.0", path = "../../../primitives/blockchain" } sp-consensus-slots = { version = "0.8.0", path = "../../../primitives/consensus/slots" } diff --git a/substrate/client/consensus/slots/src/lib.rs b/substrate/client/consensus/slots/src/lib.rs index 7d346ffe39..681d4a6273 100644 --- a/substrate/client/consensus/slots/src/lib.rs +++ b/substrate/client/consensus/slots/src/lib.rs @@ -25,7 +25,7 @@ mod slots; mod aux_schema; -pub use slots::{SignedDuration, SlotInfo}; +pub use slots::SlotInfo; use slots::Slots; pub use aux_schema::{check_equivocation, MAX_SLOT_CAPACITY, PRUNING_BOUND}; @@ -48,13 +48,29 @@ use parking_lot::Mutex; pub type StorageChanges = sp_state_machine::StorageChanges, NumberFor>; +/// The result of [`SlotWorker::on_slot`]. +#[derive(Debug, Clone)] +pub struct SlotResult { + /// The block that was built. + pub block: Block, + /// The optional storage proof that was calculated while building the block. + /// + /// This needs to be enabled for the proposer to get this storage proof. + pub storage_proof: Option, +} + /// A worker that should be invoked at every new slot. +/// +/// The implementation should not make any assumptions of the slot being bound to the time or +/// similar. The only valid assumption is that the slot number is always increasing. pub trait SlotWorker { - /// The type of the future that will be returned when a new slot is - /// triggered. - type OnSlot: Future>; + /// The type of the future that will be returned when a new slot is triggered. + type OnSlot: Future>>; /// Called when a new slot is triggered. + /// + /// Returns a future that resolves to a [`SlotResult`] iff a block was successfully built in + /// the slot. Otherwise `None` is returned. fn on_slot(&mut self, chain_head: B::Header, slot_info: SlotInfo) -> Self::OnSlot; } @@ -90,7 +106,11 @@ pub trait SimpleSlotWorker { /// Returns the epoch data necessary for authoring. For time-dependent epochs, /// use the provided slot number as a canonical source of time. - fn epoch_data(&self, header: &B::Header, slot_number: u64) -> Result; + fn epoch_data( + &self, + header: &B::Header, + slot_number: u64, + ) -> Result; /// Returns the number of authorities given the epoch data. /// None indicate that the authorities information is incomplete. @@ -111,7 +131,7 @@ pub trait SimpleSlotWorker { _header: &B::Header, _slot_number: u64, _epoch_data: &Self::EpochData, - ) { } + ) {} /// Return the pre digest data to include in a block authored with the given claim. fn pre_digest_data( @@ -158,32 +178,38 @@ pub trait SimpleSlotWorker { fn proposing_remaining_duration( &self, _head: &B::Header, - slot_info: &SlotInfo + slot_info: &SlotInfo, ) -> Option { Some(self.slot_remaining_duration(slot_info)) } - /// Implements the `on_slot` functionality from `SlotWorker`. - fn on_slot(&mut self, chain_head: B::Header, slot_info: SlotInfo) - -> Pin> + Send>> where - Self: Send + Sync, + /// Implements [`SlotWorker::on_slot`]. + fn on_slot( + &mut self, + chain_head: B::Header, + slot_info: SlotInfo, + ) -> Pin>> + Send>> + where >::Proposal: Unpin + Send + 'static, { - let (timestamp, slot_number, slot_duration) = - (slot_info.timestamp, slot_info.number, slot_info.duration); + let (timestamp, slot_number) = (slot_info.timestamp, slot_info.number); - { - let slot_now = SignedDuration::default().slot_now(slot_duration); - if slot_now > slot_number { - // if this is behind, return. - debug!(target: self.logging_target(), - "Skipping proposal slot {} since our current view is {}", - slot_number, slot_now, + let slot_remaining_duration = self.slot_remaining_duration(&slot_info); + let proposing_remaining_duration = self.proposing_remaining_duration(&chain_head, &slot_info); + + let proposing_remaining = match proposing_remaining_duration { + Some(r) if r.as_secs() == 0 && r.as_nanos() == 0 => { + debug!( + target: self.logging_target(), + "Skipping proposal slot {} since there's no time left to propose", + slot_number, ); - return Box::pin(future::ready(Ok(()))); - } - } + return Box::pin(future::ready(None)); + }, + Some(r) => Box::new(Delay::new(r)) as Box + Unpin + Send>, + None => Box::new(future::pending()) as Box<_>, + }; let epoch_data = match self.epoch_data(&chain_head, slot_number) { Ok(epoch_data) => epoch_data, @@ -196,7 +222,7 @@ pub trait SimpleSlotWorker { "err" => ?err, ); - return Box::pin(future::ready(Ok(()))); + return Box::pin(future::ready(None)); } }; @@ -215,16 +241,17 @@ pub trait SimpleSlotWorker { "authorities_len" => authorities_len, ); - return Box::pin(future::ready(Ok(()))); + return Box::pin(future::ready(None)); } let claim = match self.claim_slot(&chain_head, slot_number, &epoch_data) { - None => return Box::pin(future::ready(Ok(()))), + None => return Box::pin(future::ready(None)), Some(claim) => claim, }; debug!( - target: self.logging_target(), "Starting authorship at slot {}; timestamp = {}", + target: self.logging_target(), + "Starting authorship at slot {}; timestamp = {}", slot_number, timestamp, ); @@ -244,8 +271,6 @@ pub trait SimpleSlotWorker { err }); - let slot_remaining_duration = self.slot_remaining_duration(&slot_info); - let proposing_remaining_duration = self.proposing_remaining_duration(&chain_head, &slot_info); let logs = self.pre_digest_data(slot_number, &claim); // deadline our production to approx. the end of the slot @@ -258,15 +283,10 @@ pub trait SimpleSlotWorker { RecordProof::No, ).map_err(|e| sp_consensus::Error::ClientImport(format!("{:?}", e)))); - let delay: Box + Unpin + Send> = match proposing_remaining_duration { - Some(r) => Box::new(Delay::new(r)), - None => Box::new(future::pending()), - }; - let proposal_work = - Box::new(futures::future::select(proposing, delay).map(move |v| match v { - futures::future::Either::Left((b, _)) => b.map(|b| (b, claim)), - futures::future::Either::Right(_) => { + futures::future::select(proposing, proposing_remaining).map(move |v| match v { + Either::Left((b, _)) => b.map(|b| (b, claim)), + Either::Right(_) => { info!("⌛️ Discarding proposal for slot {}; block production took too long", slot_number); // If the node was compiled with debug, tell the user to use release optimizations. #[cfg(build_type="debug")] @@ -274,16 +294,18 @@ pub trait SimpleSlotWorker { telemetry!(CONSENSUS_INFO; "slots.discarding_proposal_took_too_long"; "slot" => slot_number, ); + Err(sp_consensus::Error::ClientImport("Timeout in the Slots proposer".into())) }, - })); + }); let block_import_params_maker = self.block_import_params(); let block_import = self.block_import(); let logging_target = self.logging_target(); - Box::pin(proposal_work.and_then(move |(proposal, claim)| { - let (header, body) = proposal.block.deconstruct(); + proposal_work.and_then(move |(proposal, claim)| async move { + let (block, storage_proof) = (proposal.block, proposal.proof); + let (header, body) = block.clone().deconstruct(); let header_num = *header.number(); let header_hash = header.hash(); let parent_hash = *header.parent_hash(); @@ -295,12 +317,7 @@ pub trait SimpleSlotWorker { proposal.storage_changes, claim, epoch_data, - ); - - let block_import_params = match block_import_params { - Ok(params) => params, - Err(e) => return future::err(e), - }; + )?; info!( "🔖 Pre-sealed block for proposal at {}. Hash now {:?}, previously {:?}.", @@ -316,18 +333,32 @@ pub trait SimpleSlotWorker { ); if let Err(err) = block_import.lock().import_block(block_import_params, Default::default()) { - warn!(target: logging_target, + warn!( + target: logging_target, "Error with block built on {:?}: {:?}", parent_hash, err, ); - telemetry!(CONSENSUS_WARN; "slots.err_with_block_built_on"; - "hash" => ?parent_hash, "err" => ?err, + telemetry!( + CONSENSUS_WARN; "slots.err_with_block_built_on"; + "hash" => ?parent_hash, + "err" => ?err, ); } - future::ready(Ok(())) - })) + + Ok(SlotResult { block, storage_proof }) + }).then(|r| async move { + r.map_err(|e| warn!(target: "slots", "Encountered consensus error: {:?}", e)).ok() + }).boxed() + } +} + +impl> SlotWorker for T { + type OnSlot = Pin>> + Send>>; + + fn on_slot(&mut self, chain_head: B::Header, slot_info: SlotInfo) -> Self::OnSlot { + SimpleSlotWorker::on_slot(self, chain_head, slot_info) } } @@ -338,10 +369,6 @@ pub trait SlotCompatible { &self, inherent: &InherentData, ) -> Result<(u64, u64, std::time::Duration), sp_consensus::Error>; - - /// Get the difference between chain time and local time. Defaults to - /// always returning zero. - fn time_offset() -> SignedDuration { Default::default() } } /// Start a new slot worker. @@ -403,11 +430,7 @@ where Either::Right(future::ready(Ok(()))) } else { Either::Left( - worker.on_slot(chain_head, slot_info) - .map_err(|e| { - warn!(target: "slots", "Encountered consensus error: {:?}", e); - }) - .or_else(|_| future::ready(Ok(()))) + worker.on_slot(chain_head, slot_info).then(|_| future::ready(Ok(()))) ) } }).then(|res| { @@ -569,7 +592,6 @@ mod test { fn slot(n: u64) -> super::slots::SlotInfo { super::slots::SlotInfo { number: n, - last_number: n - 1, duration: SLOT_DURATION.as_millis() as u64, timestamp: Default::default(), inherent_data: Default::default(), diff --git a/substrate/client/consensus/slots/src/slots.rs b/substrate/client/consensus/slots/src/slots.rs index 32316c56c9..e7c84a2c1f 100644 --- a/substrate/client/consensus/slots/src/slots.rs +++ b/substrate/client/consensus/slots/src/slots.rs @@ -37,30 +37,6 @@ pub fn duration_now() -> Duration { )) } - -/// A `Duration` with a sign (before or after). Immutable. -#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Default)] -pub struct SignedDuration { - offset: Duration, - is_positive: bool, -} - -impl SignedDuration { - /// Construct a `SignedDuration` - pub fn new(offset: Duration, is_positive: bool) -> Self { - Self { offset, is_positive } - } - - /// Get the slot for now. Panics if `slot_duration` is 0. - pub fn slot_now(&self, slot_duration: u64) -> u64 { - (if self.is_positive { - duration_now() + self.offset - } else { - duration_now() - self.offset - }.as_millis() as u64) / slot_duration - } -} - /// Returns the duration until the next slot, based on current duration since pub fn time_until_next(now: Duration, slot_duration: u64) -> Duration { let remaining_full_millis = slot_duration - (now.as_millis() as u64 % slot_duration) - 1; @@ -71,8 +47,6 @@ pub fn time_until_next(now: Duration, slot_duration: u64) -> Duration { pub struct SlotInfo { /// The slot number. pub number: u64, - /// The last slot number produced. - pub last_number: u64, /// Current timestamp. pub timestamp: u64, /// The instant at which the slot ends. @@ -150,13 +124,11 @@ impl Stream for Slots { // never yield the same slot twice. if slot_num > self.last_slot { - let last_slot = self.last_slot; self.last_slot = slot_num; break Poll::Ready(Some(Ok(SlotInfo { number: slot_num, duration: self.slot_duration, - last_number: last_slot, timestamp, ends_at, inherent_data, @@ -166,5 +138,4 @@ impl Stream for Slots { } } -impl Unpin for Slots { -} +impl Unpin for Slots {} diff --git a/substrate/primitives/consensus/common/src/lib.rs b/substrate/primitives/consensus/common/src/lib.rs index fa4f233c68..47de067411 100644 --- a/substrate/primitives/consensus/common/src/lib.rs +++ b/substrate/primitives/consensus/common/src/lib.rs @@ -123,6 +123,13 @@ impl RecordProof { } } +/// Will return [`RecordProof::No`] as default value. +impl Default for RecordProof { + fn default() -> Self { + Self::No + } +} + impl From for RecordProof { fn from(val: bool) -> Self { if val {