Rework inherent data client side (#8526)

* Lol

* Yeah

* Moare

* adaasda

* Convert AURA to new pallet macro

* AURA: Switch to `CurrentSlot` instead of `LastTimestamp`

This switches AURA to use `CurrentSlot` instead of `LastTimestamp`.

* Add missing file

* Update frame/aura/src/migrations.rs

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

* Remove the runtime side provide inherent code

* Use correct weight

* Add TODO

* Remove the Inherent from AURA

* 🤦

* Remove unused stuff

* Update primitives authorship

* Fix babe inherent data provider

* Fix consensus-uncles

* Fix BABE

* Do some further changes to authorship primitives... :D

* More work

* Make it compile the happy path

* Make it async!

* Take hash

* More stuff

* Hacks

* Revert "Hacks"

This reverts commit cfffad88668cfdebf632a59c4fbfada001ef8251.

* Fix

* Make `execute_block` return the final block header

* Move Aura digest stuff

* Make it possible to disable equivocation checking

* Fix fix fix

* Some refactorings

* Comment

* Fixes fixes fixes

* More cleanups

* Some love

* Better love

* Make slot duration being exposed as `Duration` to the outside

* Some slot info love

* Add `build_aura_worker` utility function

* Copy copy copy

* Some stuff

* Start fixing pow

* Fix pow

* Remove some bounds

* More work

* Make grandpa work

* Make slots use `async_trait`

* Introduce `SharedData`

* Add test and fix bugs

* Switch to `SharedData`

* Make grandpa tests working

* More Babe work

* Make grandpa work

* Introduce `SharedData`

* Add test and fix bugs

* Switch to `SharedData`

* Make grandpa tests working

* More Babe work

* Make it async

* Fix fix

* Use `async_trait` in sc-consensus-slots

This makes the code a little bit easier to read and also expresses that
there can always only be one call at a time to `on_slot`.

* Make grandpa tests compile

* More Babe tests work

* Fix network test

* Start fixing service test

* Finish service-test

* Fix sc-consensus-aura

* Fix fix fix

* More fixes

* Make everything compile *yeah*

* Make manual-seal compile

* More fixes

* Start fixing Aura

* Fix Aura tests

* Fix Babe tests

* Make everything compile

* Move code around and switch to async_trait

* Fix Babe

* Docs docs docs

* Move to FRAME

* Fix fix fix

* Make everything compile

* Last cleanups

* Fix integration test

* Change slot usage of the timestamp

* We really need to switch to `impl-trait-for-tuples`

* Update primitives/inherents/src/lib.rs

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

* Update primitives/inherents/src/lib.rs

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

* Update primitives/inherents/src/lib.rs

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

* Some extra logging

* Remove dbg!

* Update primitives/consensus/common/src/import_queue/basic_queue.rs

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>
This commit is contained in:
Bastian Köcher
2021-05-03 16:39:25 +02:00
committed by GitHub
parent ef07c3be0d
commit 2675741a09
52 changed files with 1506 additions and 1178 deletions
+123 -87
View File
@@ -23,7 +23,7 @@
//! provides generic functionality for slots.
#![forbid(unsafe_code)]
#![deny(missing_docs)]
#![warn(missing_docs)]
mod slots;
mod aux_schema;
@@ -41,12 +41,13 @@ use sp_api::{ProvideRuntimeApi, ApiRef};
use sp_arithmetic::traits::BaseArithmetic;
use sp_consensus::{BlockImport, Proposer, SyncOracle, SelectChain, CanAuthorWith, SlotData};
use sp_consensus_slots::Slot;
use sp_inherents::{InherentData, InherentDataProviders};
use sp_inherents::CreateInherentDataProviders;
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, Header, HashFor, NumberFor}
traits::{Block as BlockT, Header as HeaderT, HashFor, NumberFor}
};
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_WARN, CONSENSUS_INFO};
use sp_timestamp::Timestamp;
/// The changes that need to applied to the storage to create the state for a block.
///
@@ -75,8 +76,7 @@ pub trait SlotWorker<B: BlockT, Proof> {
/// the slot. Otherwise `None` is returned.
async fn on_slot(
&mut self,
chain_head: B::Header,
slot_info: SlotInfo,
slot_info: SlotInfo<B>,
) -> Option<SlotResult<B, Proof>>;
}
@@ -187,21 +187,19 @@ pub trait SimpleSlotWorker<B: BlockT> {
/// Remaining duration for proposing.
fn proposing_remaining_duration(
&self,
head: &B::Header,
slot_info: &SlotInfo,
slot_info: &SlotInfo<B>,
) -> Duration;
/// Implements [`SlotWorker::on_slot`].
async fn on_slot(
&mut self,
chain_head: B::Header,
slot_info: SlotInfo,
slot_info: SlotInfo<B>,
) -> Option<SlotResult<B, <Self::Proposer as Proposer<B>>::Proof>> {
let (timestamp, slot) = (slot_info.timestamp, slot_info.slot);
let telemetry = self.telemetry();
let logging_target = self.logging_target();
let proposing_remaining_duration = self.proposing_remaining_duration(&chain_head, &slot_info);
let proposing_remaining_duration = self.proposing_remaining_duration(&slot_info);
let proposing_remaining = if proposing_remaining_duration == Duration::default() {
debug!(
@@ -215,13 +213,13 @@ pub trait SimpleSlotWorker<B: BlockT> {
Delay::new(proposing_remaining_duration)
};
let epoch_data = match self.epoch_data(&chain_head, slot) {
let epoch_data = match self.epoch_data(&slot_info.chain_head, slot) {
Ok(epoch_data) => epoch_data,
Err(err) => {
warn!(
target: logging_target,
"Unable to fetch epoch data at block {:?}: {:?}",
chain_head.hash(),
slot_info.chain_head.hash(),
err,
);
@@ -229,7 +227,7 @@ pub trait SimpleSlotWorker<B: BlockT> {
telemetry;
CONSENSUS_WARN;
"slots.unable_fetching_authorities";
"slot" => ?chain_head.hash(),
"slot" => ?slot_info.chain_head.hash(),
"err" => ?err,
);
@@ -237,7 +235,7 @@ pub trait SimpleSlotWorker<B: BlockT> {
}
};
self.notify_slot(&chain_head, slot, &epoch_data);
self.notify_slot(&slot_info.chain_head, slot, &epoch_data);
let authorities_len = self.authorities_len(&epoch_data);
@@ -256,9 +254,9 @@ pub trait SimpleSlotWorker<B: BlockT> {
return None;
}
let claim = self.claim_slot(&chain_head, slot, &epoch_data)?;
let claim = self.claim_slot(&slot_info.chain_head, slot, &epoch_data)?;
if self.should_backoff(slot, &chain_head) {
if self.should_backoff(slot, &slot_info.chain_head) {
return None;
}
@@ -266,7 +264,7 @@ pub trait SimpleSlotWorker<B: BlockT> {
target: self.logging_target(),
"Starting authorship at slot {}; timestamp = {}",
slot,
timestamp,
*timestamp,
);
telemetry!(
@@ -277,7 +275,7 @@ pub trait SimpleSlotWorker<B: BlockT> {
"timestamp" => *timestamp,
);
let proposer = match self.proposer(&chain_head).await {
let proposer = match self.proposer(&slot_info.chain_head).await {
Ok(p) => p,
Err(err) => {
warn!(
@@ -422,94 +420,119 @@ pub trait SimpleSlotWorker<B: BlockT> {
impl<B: BlockT, T: SimpleSlotWorker<B> + Send> SlotWorker<B, <T::Proposer as Proposer<B>>::Proof> for T {
async fn on_slot(
&mut self,
chain_head: B::Header,
slot_info: SlotInfo,
slot_info: SlotInfo<B>,
) -> Option<SlotResult<B, <T::Proposer as Proposer<B>>::Proof>> {
SimpleSlotWorker::on_slot(self, chain_head, slot_info).await
SimpleSlotWorker::on_slot(self, slot_info).await
}
}
/// Slot compatible inherent data.
pub trait SlotCompatible {
/// Extract timestamp and slot from inherent data.
fn extract_timestamp_and_slot(
&self,
inherent: &InherentData,
) -> Result<(sp_timestamp::Timestamp, Slot, std::time::Duration), sp_consensus::Error>;
/// Slot specific extension that the inherent data provider needs to implement.
pub trait InherentDataProviderExt {
/// The current timestamp that will be found in the [`InherentData`].
fn timestamp(&self) -> Timestamp;
/// The current slot that will be found in the [`InherentData`].
fn slot(&self) -> Slot;
}
impl<T, S, P> InherentDataProviderExt for (T, S, P)
where
T: Deref<Target = Timestamp>,
S: Deref<Target = Slot>,
{
fn timestamp(&self) -> Timestamp {
*self.0.deref()
}
fn slot(&self) -> Slot {
*self.1.deref()
}
}
impl<T, S, P, R> InherentDataProviderExt for (T, S, P, R)
where
T: Deref<Target = Timestamp>,
S: Deref<Target = Slot>,
{
fn timestamp(&self) -> Timestamp {
*self.0.deref()
}
fn slot(&self) -> Slot {
*self.1.deref()
}
}
impl<T, S> InherentDataProviderExt for (T, S)
where
T: Deref<Target = Timestamp>,
S: Deref<Target = Slot>,
{
fn timestamp(&self) -> Timestamp {
*self.0.deref()
}
fn slot(&self) -> Slot {
*self.1.deref()
}
}
/// Start a new slot worker.
///
/// Every time a new slot is triggered, `worker.on_slot` is called and the future it returns is
/// polled until completion, unless we are major syncing.
pub fn start_slot_worker<B, C, W, T, SO, SC, CAW, Proof>(
pub async fn start_slot_worker<B, C, W, T, SO, CAW, CIDP, Proof>(
slot_duration: SlotDuration<T>,
client: C,
mut worker: W,
mut sync_oracle: SO,
inherent_data_providers: InherentDataProviders,
timestamp_extractor: SC,
create_inherent_data_providers: CIDP,
can_author_with: CAW,
) -> impl Future<Output = ()>
)
where
B: BlockT,
C: SelectChain<B>,
W: SlotWorker<B, Proof>,
SO: SyncOracle + Send,
SC: SlotCompatible + Unpin,
T: SlotData + Clone,
CAW: CanAuthorWith<B> + Send,
CIDP: CreateInherentDataProviders<B, ()> + Send,
CIDP::InherentDataProviders: InherentDataProviderExt + Send,
{
let SlotDuration(slot_duration) = slot_duration;
// rather than use a timer interval, we schedule our waits ourselves
let mut slots = Slots::<SC>::new(
let mut slots = Slots::new(
slot_duration.slot_duration(),
inherent_data_providers,
timestamp_extractor,
create_inherent_data_providers,
client,
);
async move {
loop {
let slot_info = match slots.next_slot().await {
Ok(slot) => slot,
Err(err) => {
debug!(target: "slots", "Faulty timer: {:?}", err);
return
},
};
// only propose when we are not syncing.
if sync_oracle.is_major_syncing() {
debug!(target: "slots", "Skipping proposal slot due to sync.");
continue;
loop {
let slot_info = match slots.next_slot().await {
Ok(r) => r,
Err(e) => {
warn!(target: "slots", "Error while polling for next slot: {:?}", e);
return;
}
};
let slot = slot_info.slot;
let chain_head = match client.best_chain() {
Ok(x) => x,
Err(e) => {
warn!(
target: "slots",
"Unable to author block in slot {}. No best block header: {:?}",
slot,
e,
);
continue;
}
};
if sync_oracle.is_major_syncing() {
debug!(target: "slots", "Skipping proposal slot due to sync.");
continue;
}
if let Err(err) = can_author_with.can_author_with(&BlockId::Hash(chain_head.hash())) {
warn!(
target: "slots",
"Unable to author block in slot {},. `can_author_with` returned: {} \
Probably a node update is required!",
slot,
err,
);
} else {
worker.on_slot(chain_head, slot_info).await;
}
if let Err(err) = can_author_with
.can_author_with(&BlockId::Hash(slot_info.chain_head.hash()))
{
warn!(
target: "slots",
"Unable to author block in slot {},. `can_author_with` returned: {} \
Probably a node update is required!",
slot_info.slot,
err,
);
} else {
let _ = worker.on_slot(slot_info).await;
}
}
}
@@ -627,7 +650,10 @@ impl SlotProportion {
/// to parent. If the number of skipped slots is greated than 0 this method will apply
/// an exponential backoff of at most `2^7 * slot_duration`, if no slots were skipped
/// this method will return `None.`
pub fn slot_lenience_exponential(parent_slot: Slot, slot_info: &SlotInfo) -> Option<Duration> {
pub fn slot_lenience_exponential<Block: BlockT>(
parent_slot: Slot,
slot_info: &SlotInfo<Block>,
) -> Option<Duration> {
// never give more than 2^this times the lenience.
const BACKOFF_CAP: u64 = 7;
@@ -656,7 +682,10 @@ pub fn slot_lenience_exponential(parent_slot: Slot, slot_info: &SlotInfo) -> Opt
/// to parent. If the number of skipped slots is greated than 0 this method will apply
/// a linear backoff of at most `20 * slot_duration`, if no slots were skipped
/// this method will return `None.`
pub fn slot_lenience_linear(parent_slot: Slot, slot_info: &SlotInfo) -> Option<Duration> {
pub fn slot_lenience_linear<Block: BlockT>(
parent_slot: u64,
slot_info: &SlotInfo<Block>,
) -> Option<Duration> {
// never give more than 20 times more lenience.
const BACKOFF_CAP: u64 = 20;
@@ -777,20 +806,27 @@ impl<N> BackoffAuthoringBlocksStrategy<N> for () {
#[cfg(test)]
mod test {
use super::*;
use std::time::{Duration, Instant};
use crate::{BackoffAuthoringOnFinalizedHeadLagging, BackoffAuthoringBlocksStrategy};
use substrate_test_runtime_client::runtime::Block;
use substrate_test_runtime_client::runtime::{Block, Header};
use sp_api::NumberFor;
const SLOT_DURATION: Duration = Duration::from_millis(6000);
fn slot(slot: u64) -> super::slots::SlotInfo {
fn slot(slot: u64) -> super::slots::SlotInfo<Block> {
super::slots::SlotInfo {
slot: slot.into(),
duration: SLOT_DURATION,
timestamp: Default::default(),
inherent_data: Default::default(),
ends_at: Instant::now(),
chain_head: Header::new(
1,
Default::default(),
Default::default(),
Default::default(),
Default::default(),
),
block_size_limit: None,
}
}
@@ -798,20 +834,20 @@ mod test {
#[test]
fn linear_slot_lenience() {
// if no slots are skipped there should be no lenience
assert_eq!(super::slot_lenience_linear(1.into(), &slot(2)), None);
assert_eq!(super::slot_lenience_linear(1u64.into(), &slot(2)), None);
// otherwise the lenience is incremented linearly with
// the number of skipped slots.
for n in 3..=22 {
assert_eq!(
super::slot_lenience_linear(1.into(), &slot(n)),
super::slot_lenience_linear(1u64.into(), &slot(n)),
Some(SLOT_DURATION * (n - 2) as u32),
);
}
// but we cap it to a maximum of 20 slots
assert_eq!(
super::slot_lenience_linear(1.into(), &slot(23)),
super::slot_lenience_linear(1u64.into(), &slot(23)),
Some(SLOT_DURATION * 20),
);
}
@@ -819,24 +855,24 @@ mod test {
#[test]
fn exponential_slot_lenience() {
// if no slots are skipped there should be no lenience
assert_eq!(super::slot_lenience_exponential(1.into(), &slot(2)), None);
assert_eq!(super::slot_lenience_exponential(1u64.into(), &slot(2)), None);
// otherwise the lenience is incremented exponentially every two slots
for n in 3..=17 {
assert_eq!(
super::slot_lenience_exponential(1.into(), &slot(n)),
super::slot_lenience_exponential(1u64.into(), &slot(n)),
Some(SLOT_DURATION * 2u32.pow((n / 2 - 1) as u32)),
);
}
// but we cap it to a maximum of 14 slots
assert_eq!(
super::slot_lenience_exponential(1.into(), &slot(18)),
super::slot_lenience_exponential(1u64.into(), &slot(18)),
Some(SLOT_DURATION * 2u32.pow(7)),
);
assert_eq!(
super::slot_lenience_exponential(1.into(), &slot(19)),
super::slot_lenience_exponential(1u64.into(), &slot(19)),
Some(SLOT_DURATION * 2u32.pow(7)),
);
}
+67 -29
View File
@@ -20,9 +20,10 @@
//!
//! This is used instead of `futures_timer::Interval` because it was unreliable.
use super::{SlotCompatible, Slot};
use sp_consensus::Error;
use sp_inherents::{InherentData, InherentDataProviders};
use super::{Slot, InherentDataProviderExt};
use sp_consensus::{Error, SelectChain};
use sp_inherents::{InherentData, CreateInherentDataProviders, InherentDataProvider};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
use std::time::{Duration, Instant};
use futures_timer::Delay;
@@ -38,19 +39,19 @@ pub fn duration_now() -> Duration {
))
}
/// Returns the duration until the next slot, based on current duration since
pub fn time_until_next(now: Duration, slot_duration: Duration) -> Duration {
/// Returns the duration until the next slot from now.
pub fn time_until_next(slot_duration: Duration) -> Duration {
let remaining_full_millis = slot_duration.as_millis()
- (now.as_millis() % slot_duration.as_millis())
- (duration_now().as_millis() % slot_duration.as_millis())
- 1;
Duration::from_millis(remaining_full_millis as u64)
}
/// Information about a slot.
pub struct SlotInfo {
/// The slot number.
pub struct SlotInfo<B: BlockT> {
/// The slot number as found in the inherent data.
pub slot: Slot,
/// Current timestamp.
/// Current timestamp as found in the inherent data.
pub timestamp: sp_timestamp::Timestamp,
/// The instant at which the slot ends.
pub ends_at: Instant,
@@ -58,13 +59,15 @@ pub struct SlotInfo {
pub inherent_data: InherentData,
/// Slot duration.
pub duration: Duration,
/// The chain header this slot is based on.
pub chain_head: B::Header,
/// Some potential block size limit for the block to be authored at this slot.
///
/// For more information see [`Proposer::propose`](sp_consensus::Proposer::propose).
pub block_size_limit: Option<usize>,
}
impl SlotInfo {
impl<B: BlockT> SlotInfo<B> {
/// Create a new [`SlotInfo`].
///
/// `ends_at` is calculated using `timestamp` and `duration`.
@@ -73,6 +76,7 @@ impl SlotInfo {
timestamp: sp_timestamp::Timestamp,
inherent_data: InherentData,
duration: Duration,
chain_head: B::Header,
block_size_limit: Option<usize>,
) -> Self {
Self {
@@ -80,46 +84,55 @@ impl SlotInfo {
timestamp,
inherent_data,
duration,
chain_head,
block_size_limit,
ends_at: Instant::now() + time_until_next(timestamp.as_duration(), duration),
ends_at: Instant::now() + time_until_next(duration),
}
}
}
/// A stream that returns every time there is a new slot.
pub(crate) struct Slots<SC> {
pub(crate) struct Slots<Block, C, IDP> {
last_slot: Slot,
slot_duration: Duration,
inner_delay: Option<Delay>,
inherent_data_providers: InherentDataProviders,
timestamp_extractor: SC,
create_inherent_data_providers: IDP,
client: C,
_phantom: std::marker::PhantomData<Block>,
}
impl<SC> Slots<SC> {
impl<Block, C, IDP> Slots<Block, C, IDP> {
/// Create a new `Slots` stream.
pub fn new(
slot_duration: Duration,
inherent_data_providers: InherentDataProviders,
timestamp_extractor: SC,
create_inherent_data_providers: IDP,
client: C,
) -> Self {
Slots {
last_slot: 0.into(),
slot_duration,
inner_delay: None,
inherent_data_providers,
timestamp_extractor,
create_inherent_data_providers,
client,
_phantom: Default::default(),
}
}
}
impl<SC: SlotCompatible> Slots<SC> {
impl<Block, C, IDP> Slots<Block, C, IDP>
where
Block: BlockT,
C: SelectChain<Block>,
IDP: CreateInherentDataProviders<Block, ()>,
IDP::InherentDataProviders: crate::InherentDataProviderExt,
{
/// Returns a future that fires when the next slot starts.
pub async fn next_slot(&mut self) -> Result<SlotInfo, Error> {
pub async fn next_slot(&mut self) -> Result<SlotInfo<Block>, Error> {
loop {
self.inner_delay = match self.inner_delay.take() {
None => {
// schedule wait.
let wait_dur = time_until_next(duration_now(), self.slot_duration);
let wait_dur = time_until_next(self.slot_duration);
Some(Delay::new(wait_dur))
}
Some(d) => Some(d),
@@ -130,15 +143,39 @@ impl<SC: SlotCompatible> Slots<SC> {
}
// timeout has fired.
let inherent_data = match self.inherent_data_providers.create_inherent_data() {
Ok(id) => id,
Err(err) => return Err(sp_consensus::Error::InherentData(err)),
let ends_at = Instant::now() + time_until_next(self.slot_duration);
let chain_head = match self.client.best_chain() {
Ok(x) => x,
Err(e) => {
log::warn!(
target: "slots",
"Unable to author block in slot. No best block header: {:?}",
e,
);
// Let's try at the next slot..
self.inner_delay.take();
continue;
}
};
let result = self.timestamp_extractor.extract_timestamp_and_slot(&inherent_data);
let (timestamp, slot, offset) = result?;
let inherent_data_providers = self.create_inherent_data_providers
.create_inherent_data_providers(chain_head.hash(), ())
.await?;
if Instant::now() > ends_at {
log::warn!(
target: "slots",
"Creating inherent data providers took more time than we had left for the slot.",
);
}
let timestamp = inherent_data_providers.timestamp();
let slot = inherent_data_providers.slot();
let inherent_data = inherent_data_providers.create_inherent_data()?;
// reschedule delay for next slot.
let ends_in = offset +
time_until_next(timestamp.as_duration(), self.slot_duration);
let ends_in = time_until_next(self.slot_duration);
self.inner_delay = Some(Delay::new(ends_in));
// never yield the same slot twice.
@@ -150,6 +187,7 @@ impl<SC: SlotCompatible> Slots<SC> {
timestamp,
inherent_data,
self.slot_duration,
chain_head,
None,
))
}