mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 07:01:05 +00:00
Aura and Slots refactoring (#8386)
* Make slot duration being exposed as `Duration` to the outside * Some slot info love * Add `build_aura_worker` utility function * Copy copy copy
This commit is contained in:
@@ -30,7 +30,7 @@ use log::{debug, info, trace};
|
||||
use prometheus_endpoint::Registry;
|
||||
use codec::{Encode, Decode, Codec};
|
||||
use sp_consensus::{
|
||||
BlockImport, CanAuthorWith, ForkChoiceStrategy, BlockImportParams,
|
||||
BlockImport, CanAuthorWith, ForkChoiceStrategy, BlockImportParams, SlotData,
|
||||
BlockOrigin, Error as ConsensusError, BlockCheckParams, ImportResult,
|
||||
import_queue::{
|
||||
Verifier, BasicQueue, DefaultImportQueue, BoxJustificationImport,
|
||||
@@ -284,7 +284,7 @@ impl<B: BlockT, C, P, CAW> Verifier<B> for AuraVerifier<C, P, CAW> where
|
||||
block.clone(),
|
||||
BlockId::Hash(parent_hash),
|
||||
inherent_data,
|
||||
timestamp_now,
|
||||
*timestamp_now,
|
||||
).map_err(|e| e.to_string())?;
|
||||
}
|
||||
|
||||
@@ -541,7 +541,7 @@ pub fn import_queue<'a, P, Block, I, C, S, CAW>(
|
||||
S: sp_core::traits::SpawnEssentialNamed,
|
||||
CAW: CanAuthorWith<Block> + Send + Sync + 'static,
|
||||
{
|
||||
register_aura_inherent_data_provider(&inherent_data_providers, slot_duration.get())?;
|
||||
register_aura_inherent_data_provider(&inherent_data_providers, slot_duration.slot_duration())?;
|
||||
initialize_authorities_cache(&*client)?;
|
||||
|
||||
let verifier = AuraVerifier::<_, P, _>::new(
|
||||
|
||||
@@ -75,7 +75,7 @@ pub use sc_consensus_slots::SlotProportion;
|
||||
type AuthorityId<P> = <P as Pair>::Public;
|
||||
|
||||
/// Slot duration type for Aura.
|
||||
pub type SlotDuration = sc_consensus_slots::SlotDuration<u64>;
|
||||
pub type SlotDuration = sc_consensus_slots::SlotDuration<sp_consensus_aura::SlotDuration>;
|
||||
|
||||
/// Get type of `SlotDuration` for Aura.
|
||||
pub fn slot_duration<A, B, C>(client: &C) -> CResult<SlotDuration> where
|
||||
@@ -111,12 +111,12 @@ impl SlotCompatible for AuraSlotCompatible {
|
||||
fn extract_timestamp_and_slot(
|
||||
&self,
|
||||
data: &InherentData,
|
||||
) -> Result<(u64, AuraInherent, std::time::Duration), sp_consensus::Error> {
|
||||
) -> Result<(sp_timestamp::Timestamp, AuraInherent, std::time::Duration), sp_consensus::Error> {
|
||||
data.timestamp_inherent_data()
|
||||
.and_then(|t| data.aura_inherent_data().map(|a| (t, a)))
|
||||
.map_err(Into::into)
|
||||
.map_err(sp_consensus::Error::InherentData)
|
||||
.map(|(x, y)| (*x, y, Default::default()))
|
||||
.map(|(x, y)| (x, y, Default::default()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -161,7 +161,7 @@ pub fn start_aura<P, B, C, SC, PF, I, SO, CAW, BS, Error>(
|
||||
client,
|
||||
select_chain,
|
||||
block_import,
|
||||
proposer_factory: env,
|
||||
proposer_factory,
|
||||
sync_oracle,
|
||||
inherent_data_providers,
|
||||
force_authoring,
|
||||
@@ -187,22 +187,23 @@ pub fn start_aura<P, B, C, SC, PF, I, SO, CAW, BS, Error>(
|
||||
CAW: CanAuthorWith<B> + Send,
|
||||
BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + 'static,
|
||||
{
|
||||
let worker = AuraWorker {
|
||||
let worker = build_aura_worker::<P, _, _, _, _, _, _, _>(BuildAuraWorkerParams {
|
||||
client: client.clone(),
|
||||
block_import: Arc::new(Mutex::new(block_import)),
|
||||
env,
|
||||
block_import,
|
||||
proposer_factory,
|
||||
keystore,
|
||||
sync_oracle: sync_oracle.clone(),
|
||||
force_authoring,
|
||||
backoff_authoring_blocks,
|
||||
telemetry,
|
||||
_key_type: PhantomData::<P>,
|
||||
block_proposal_slot_portion,
|
||||
};
|
||||
});
|
||||
|
||||
register_aura_inherent_data_provider(
|
||||
&inherent_data_providers,
|
||||
slot_duration.slot_duration()
|
||||
)?;
|
||||
|
||||
Ok(sc_consensus_slots::start_slot_worker::<_, _, _, _, _, AuraSlotCompatible, _, _>(
|
||||
slot_duration,
|
||||
select_chain,
|
||||
@@ -214,6 +215,75 @@ pub fn start_aura<P, B, C, SC, PF, I, SO, CAW, BS, Error>(
|
||||
))
|
||||
}
|
||||
|
||||
/// Parameters of [`build_aura_worker`].
|
||||
pub struct BuildAuraWorkerParams<C, I, PF, SO, BS> {
|
||||
/// The client to interact with the chain.
|
||||
pub client: Arc<C>,
|
||||
/// The block import.
|
||||
pub block_import: I,
|
||||
/// The proposer factory to build proposer instances.
|
||||
pub proposer_factory: PF,
|
||||
/// The sync oracle that can give us the current sync status.
|
||||
pub sync_oracle: SO,
|
||||
/// Should we force the authoring of blocks?
|
||||
pub force_authoring: bool,
|
||||
/// The backoff strategy when we miss slots.
|
||||
pub backoff_authoring_blocks: Option<BS>,
|
||||
/// The keystore used by the node.
|
||||
pub keystore: SyncCryptoStorePtr,
|
||||
/// The proportion of the slot dedicated to proposing.
|
||||
///
|
||||
/// The block proposing will be limited to this proportion of the slot from the starting of the
|
||||
/// slot. However, the proposing can still take longer when there is some lenience factor applied,
|
||||
/// because there were no blocks produced for some slots.
|
||||
pub block_proposal_slot_portion: SlotProportion,
|
||||
/// Telemetry instance used to report telemetry metrics.
|
||||
pub telemetry: Option<TelemetryHandle>,
|
||||
}
|
||||
|
||||
/// Build the aura worker.
|
||||
///
|
||||
/// The caller is responsible for running this worker, otherwise it will do nothing.
|
||||
pub fn build_aura_worker<P, B, C, PF, I, SO, BS, Error>(
|
||||
BuildAuraWorkerParams {
|
||||
client,
|
||||
block_import,
|
||||
proposer_factory,
|
||||
sync_oracle,
|
||||
backoff_authoring_blocks,
|
||||
keystore,
|
||||
block_proposal_slot_portion,
|
||||
telemetry,
|
||||
force_authoring,
|
||||
}: BuildAuraWorkerParams<C, I, PF, SO, BS>,
|
||||
) -> impl sc_consensus_slots::SlotWorker<B, <PF::Proposer as Proposer<B>>::Proof> where
|
||||
B: BlockT,
|
||||
C: ProvideRuntimeApi<B> + BlockOf + ProvideCache<B> + AuxStore + HeaderBackend<B> + Send + Sync,
|
||||
C::Api: AuraApi<B, AuthorityId<P>>,
|
||||
PF: Environment<B, Error = Error> + Send + Sync + 'static,
|
||||
PF::Proposer: Proposer<B, Error = Error, Transaction = sp_api::TransactionFor<C, B>>,
|
||||
P: Pair + Send + Sync,
|
||||
P::Public: AppPublic + Hash + Member + Encode + Decode,
|
||||
P::Signature: TryFrom<Vec<u8>> + Hash + Member + Encode + Decode,
|
||||
I: BlockImport<B, Transaction = sp_api::TransactionFor<C, B>> + Send + Sync + 'static,
|
||||
Error: std::error::Error + Send + From<sp_consensus::Error> + 'static,
|
||||
SO: SyncOracle + Send + Sync + Clone,
|
||||
BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + 'static,
|
||||
{
|
||||
AuraWorker {
|
||||
client,
|
||||
block_import: Arc::new(Mutex::new(block_import)),
|
||||
env: proposer_factory,
|
||||
keystore,
|
||||
sync_oracle,
|
||||
force_authoring,
|
||||
backoff_authoring_blocks,
|
||||
telemetry,
|
||||
_key_type: PhantomData::<P>,
|
||||
block_proposal_slot_portion,
|
||||
}
|
||||
}
|
||||
|
||||
struct AuraWorker<C, E, I, P, SO, BS> {
|
||||
client: Arc<C>,
|
||||
block_import: Arc<Mutex<I>>,
|
||||
@@ -477,7 +547,7 @@ fn find_pre_digest<B: BlockT, Signature: Codec>(header: &B::Header) -> Result<Sl
|
||||
/// Register the aura inherent data provider, if not registered already.
|
||||
fn register_aura_inherent_data_provider(
|
||||
inherent_data_providers: &InherentDataProviders,
|
||||
slot_duration: u64,
|
||||
slot_duration: std::time::Duration,
|
||||
) -> Result<(), sp_consensus::Error> {
|
||||
if !inherent_data_providers.has_provider(&INHERENT_IDENTIFIER) {
|
||||
inherent_data_providers
|
||||
@@ -596,10 +666,10 @@ mod tests {
|
||||
let inherent_data_providers = InherentDataProviders::new();
|
||||
register_aura_inherent_data_provider(
|
||||
&inherent_data_providers,
|
||||
slot_duration.get()
|
||||
slot_duration.slot_duration()
|
||||
).expect("Registers aura inherent data provider");
|
||||
|
||||
assert_eq!(slot_duration.get(), SLOT_DURATION);
|
||||
assert_eq!(slot_duration.slot_duration().as_millis() as u64, SLOT_DURATION);
|
||||
import_queue::AuraVerifier::new(
|
||||
client,
|
||||
inherent_data_providers,
|
||||
@@ -665,7 +735,7 @@ mod tests {
|
||||
|
||||
let inherent_data_providers = InherentDataProviders::new();
|
||||
register_aura_inherent_data_provider(
|
||||
&inherent_data_providers, slot_duration.get()
|
||||
&inherent_data_providers, slot_duration.slot_duration()
|
||||
).expect("Registers aura inherent data provider");
|
||||
|
||||
aura_futures.push(start_aura::<AuthorityPair, _, _, _, _, _, _, _, _, _>(StartAuraParams {
|
||||
@@ -801,7 +871,7 @@ mod tests {
|
||||
head,
|
||||
SlotInfo {
|
||||
slot: 0.into(),
|
||||
timestamp: 0,
|
||||
timestamp: 0.into(),
|
||||
ends_at: Instant::now() + Duration::from_secs(100),
|
||||
inherent_data: InherentData::new(),
|
||||
duration: Duration::from_millis(1000),
|
||||
|
||||
@@ -345,8 +345,8 @@ impl Config {
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the inner slot duration, in milliseconds.
|
||||
pub fn slot_duration(&self) -> u64 {
|
||||
/// Get the inner slot duration
|
||||
pub fn slot_duration(&self) -> Duration {
|
||||
self.0.slot_duration()
|
||||
}
|
||||
}
|
||||
@@ -919,13 +919,13 @@ impl SlotCompatible for TimeSource {
|
||||
fn extract_timestamp_and_slot(
|
||||
&self,
|
||||
data: &InherentData,
|
||||
) -> Result<(u64, Slot, std::time::Duration), sp_consensus::Error> {
|
||||
) -> Result<(sp_timestamp::Timestamp, Slot, std::time::Duration), sp_consensus::Error> {
|
||||
trace!(target: "babe", "extract timestamp");
|
||||
data.timestamp_inherent_data()
|
||||
.and_then(|t| data.babe_inherent_data().map(|a| (t, a)))
|
||||
.map_err(Into::into)
|
||||
.map_err(sp_consensus::Error::InherentData)
|
||||
.map(|(x, y)| (*x, y, self.0.lock().0.take().unwrap_or_default()))
|
||||
.map(|(x, y)| (x, y, self.0.lock().0.take().unwrap_or_default()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1220,7 +1220,7 @@ where
|
||||
/// Register the babe inherent data provider, if not registered already.
|
||||
pub fn register_babe_inherent_data_provider(
|
||||
inherent_data_providers: &InherentDataProviders,
|
||||
slot_duration: u64,
|
||||
slot_duration: Duration,
|
||||
) -> Result<(), sp_consensus::Error> {
|
||||
debug!(target: "babe", "Registering");
|
||||
if !inherent_data_providers.has_provider(&sp_consensus_babe::inherents::INHERENT_IDENTIFIER) {
|
||||
@@ -1626,7 +1626,7 @@ pub fn import_queue<Block: BlockT, Client, SelectChain, Inner, CAW>(
|
||||
SelectChain: sp_consensus::SelectChain<Block> + 'static,
|
||||
CAW: CanAuthorWith<Block> + Send + Sync + 'static,
|
||||
{
|
||||
register_babe_inherent_data_provider(&inherent_data_providers, babe_link.config.slot_duration)?;
|
||||
register_babe_inherent_data_provider(&inherent_data_providers, babe_link.config.slot_duration())?;
|
||||
|
||||
let verifier = BabeVerifier {
|
||||
select_chain,
|
||||
|
||||
@@ -90,7 +90,7 @@ impl<B, C> BabeConsensusDataProvider<B, C>
|
||||
let timestamp_provider = SlotTimestampProvider::new(client.clone())?;
|
||||
|
||||
provider.register_provider(timestamp_provider)?;
|
||||
register_babe_inherent_data_provider(provider, config.slot_duration)?;
|
||||
register_babe_inherent_data_provider(provider, config.slot_duration())?;
|
||||
|
||||
Ok(Self {
|
||||
config,
|
||||
|
||||
@@ -28,6 +28,7 @@ sp-api = { version = "3.0.0", path = "../../../primitives/api" }
|
||||
sc-telemetry = { version = "3.0.0", path = "../../telemetry" }
|
||||
sp-consensus = { version = "0.9.0", path = "../../../primitives/consensus/common" }
|
||||
sp-inherents = { version = "3.0.0", path = "../../../primitives/inherents" }
|
||||
sp-timestamp = { version = "3.0.0", path = "../../../primitives/timestamp" }
|
||||
futures = "0.3.9"
|
||||
futures-timer = "3.0.1"
|
||||
parking_lot = "0.11.1"
|
||||
|
||||
@@ -274,7 +274,7 @@ pub trait SimpleSlotWorker<B: BlockT> {
|
||||
CONSENSUS_DEBUG;
|
||||
"slots.starting_authorship";
|
||||
"slot_num" => *slot,
|
||||
"timestamp" => timestamp,
|
||||
"timestamp" => *timestamp,
|
||||
);
|
||||
|
||||
let awaiting_proposer = {
|
||||
@@ -408,7 +408,7 @@ pub trait SlotCompatible {
|
||||
fn extract_timestamp_and_slot(
|
||||
&self,
|
||||
inherent: &InherentData,
|
||||
) -> Result<(u64, Slot, std::time::Duration), sp_consensus::Error>;
|
||||
) -> Result<(sp_timestamp::Timestamp, Slot, std::time::Duration), sp_consensus::Error>;
|
||||
}
|
||||
|
||||
/// Start a new slot worker.
|
||||
@@ -514,10 +514,7 @@ impl<T> Deref for SlotDuration<T> {
|
||||
}
|
||||
|
||||
impl<T: SlotData> SlotData for SlotDuration<T> {
|
||||
/// Get the slot duration in milliseconds.
|
||||
fn slot_duration(&self) -> u64
|
||||
where T: SlotData,
|
||||
{
|
||||
fn slot_duration(&self) -> std::time::Duration {
|
||||
self.0.slot_duration()
|
||||
}
|
||||
|
||||
@@ -562,7 +559,7 @@ impl<T: Clone + Send + Sync + 'static> SlotDuration<T> {
|
||||
}
|
||||
}?;
|
||||
|
||||
if slot_duration.slot_duration() == 0u64 {
|
||||
if slot_duration.slot_duration() == Default::default() {
|
||||
return Err(sp_blockchain::Error::Application(Box::new(Error::SlotDurationInvalid(slot_duration))))
|
||||
}
|
||||
|
||||
|
||||
@@ -52,7 +52,7 @@ pub struct SlotInfo {
|
||||
/// The slot number.
|
||||
pub slot: Slot,
|
||||
/// Current timestamp.
|
||||
pub timestamp: u64,
|
||||
pub timestamp: sp_timestamp::Timestamp,
|
||||
/// The instant at which the slot ends.
|
||||
pub ends_at: Instant,
|
||||
/// The inherent data.
|
||||
@@ -61,6 +61,26 @@ pub struct SlotInfo {
|
||||
pub duration: Duration,
|
||||
}
|
||||
|
||||
impl SlotInfo {
|
||||
/// Create a new [`SlotInfo`].
|
||||
///
|
||||
/// `ends_at` is calculated using `timestamp` and `duration`.
|
||||
pub fn new(
|
||||
slot: Slot,
|
||||
timestamp: sp_timestamp::Timestamp,
|
||||
inherent_data: InherentData,
|
||||
duration: Duration,
|
||||
) -> Self {
|
||||
Self {
|
||||
slot,
|
||||
timestamp,
|
||||
inherent_data,
|
||||
duration,
|
||||
ends_at: Instant::now() + time_until_next(timestamp.as_duration(), duration),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A stream that returns every time there is a new slot.
|
||||
pub(crate) struct Slots<SC> {
|
||||
last_slot: Slot,
|
||||
@@ -73,13 +93,13 @@ pub(crate) struct Slots<SC> {
|
||||
impl<SC> Slots<SC> {
|
||||
/// Create a new `Slots` stream.
|
||||
pub fn new(
|
||||
slot_duration: u64,
|
||||
slot_duration: Duration,
|
||||
inherent_data_providers: InherentDataProviders,
|
||||
timestamp_extractor: SC,
|
||||
) -> Self {
|
||||
Slots {
|
||||
last_slot: 0.into(),
|
||||
slot_duration: Duration::from_millis(slot_duration),
|
||||
slot_duration,
|
||||
inner_delay: None,
|
||||
inherent_data_providers,
|
||||
timestamp_extractor,
|
||||
@@ -122,21 +142,19 @@ impl<SC: SlotCompatible> Stream for Slots<SC> {
|
||||
};
|
||||
// reschedule delay for next slot.
|
||||
let ends_in = offset +
|
||||
time_until_next(Duration::from_millis(timestamp), slot_duration);
|
||||
let ends_at = Instant::now() + ends_in;
|
||||
time_until_next(timestamp.as_duration(), slot_duration);
|
||||
self.inner_delay = Some(Delay::new(ends_in));
|
||||
|
||||
// never yield the same slot twice.
|
||||
if slot > self.last_slot {
|
||||
self.last_slot = slot;
|
||||
|
||||
break Poll::Ready(Some(Ok(SlotInfo {
|
||||
break Poll::Ready(Some(Ok(SlotInfo::new(
|
||||
slot,
|
||||
duration: self.slot_duration,
|
||||
timestamp,
|
||||
ends_at,
|
||||
inherent_data,
|
||||
})))
|
||||
self.slot_duration,
|
||||
))))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user