im-online: use EstimateNextSessionRotation to get better estimates of session progress (#8242)

* frame-support: add method to estimate current session progress

* im-online: use EstimateNextSessionRotation trait to delay heartbeats

* node: fix im-online pallet instantiation

* frame-support: fix docs

* frame: fix tests

* pallet-session: last block of periodic session means 100% session progress

* pallet-session: add test for periodic session progress

* pallet-babe: fix epoch progress and add test

* frame-support: return weight with session estimates

* pallet-im-online: add test for session progress logic
This commit is contained in:
André Silva
2021-03-12 11:50:07 +00:00
committed by GitHub
parent a4e8875897
commit 5182209788
12 changed files with 403 additions and 142 deletions
+70 -45
View File
@@ -81,20 +81,24 @@ use sp_std::prelude::*;
use sp_std::convert::TryInto;
use sp_runtime::{
offchain::storage::StorageValueRef,
RuntimeDebug,
traits::{Convert, Member, Saturating, AtLeast32BitUnsigned}, Perbill,
traits::{AtLeast32BitUnsigned, Convert, Member, Saturating},
transaction_validity::{
TransactionValidity, ValidTransaction, InvalidTransaction, TransactionSource,
TransactionPriority,
InvalidTransaction, TransactionPriority, TransactionSource, TransactionValidity,
ValidTransaction,
},
Perbill, Percent, RuntimeDebug,
};
use sp_staking::{
SessionIndex,
offence::{ReportOffence, Offence, Kind},
};
use frame_support::{
decl_module, decl_event, decl_storage, Parameter, decl_error,
traits::{Get, ValidatorSet, ValidatorSetWithIdentification, OneSessionHandler},
decl_error, decl_event, decl_module, decl_storage,
traits::{
EstimateNextSessionRotation, Get, OneSessionHandler, ValidatorSet,
ValidatorSetWithIdentification,
},
Parameter,
};
use frame_system::ensure_none;
use frame_system::offchain::{
@@ -181,7 +185,7 @@ impl<BlockNumber: PartialEq + AtLeast32BitUnsigned + Copy> HeartbeatStatus<Block
/// Error which may occur while executing the off-chain code.
#[cfg_attr(test, derive(PartialEq))]
enum OffchainErr<BlockNumber> {
TooEarly(BlockNumber),
TooEarly,
WaitingForInclusion(BlockNumber),
AlreadyOnline(u32),
FailedSigning,
@@ -193,8 +197,8 @@ enum OffchainErr<BlockNumber> {
impl<BlockNumber: sp_std::fmt::Debug> sp_std::fmt::Debug for OffchainErr<BlockNumber> {
fn fmt(&self, fmt: &mut sp_std::fmt::Formatter) -> sp_std::fmt::Result {
match *self {
OffchainErr::TooEarly(ref block) =>
write!(fmt, "Too early to send heartbeat, next expected at {:?}", block),
OffchainErr::TooEarly =>
write!(fmt, "Too early to send heartbeat."),
OffchainErr::WaitingForInclusion(ref block) =>
write!(fmt, "Heartbeat already sent at {:?}. Waiting for inclusion.", block),
OffchainErr::AlreadyOnline(auth_idx) =>
@@ -245,24 +249,24 @@ pub trait Config: SendTransactionTypes<Call<Self>> + frame_system::Config {
/// The overarching event type.
type Event: From<Event<Self>> + Into<<Self as frame_system::Config>::Event>;
/// An expected duration of the session.
///
/// This parameter is used to determine the longevity of `heartbeat` transaction
/// and a rough time when we should start considering sending heartbeats,
/// since the workers avoids sending them at the very beginning of the session, assuming
/// there is a chance the authority will produce a block and they won't be necessary.
type SessionDuration: Get<Self::BlockNumber>;
/// A type for retrieving the validators supposed to be online in a session.
type ValidatorSet: ValidatorSetWithIdentification<Self::AccountId>;
/// A trait that allows us to estimate the current session progress and also the
/// average session length.
///
/// This parameter is used to determine the longevity of `heartbeat` transaction and a
/// rough time when we should start considering sending heartbeats, since the workers
/// avoids sending them at the very beginning of the session, assuming there is a
/// chance the authority will produce a block and they won't be necessary.
type NextSessionRotation: EstimateNextSessionRotation<Self::BlockNumber>;
/// A type that gives us the ability to submit unresponsiveness offence reports.
type ReportUnresponsiveness:
ReportOffence<
Self::AccountId,
IdentificationTuple<Self>,
UnresponsivenessOffence<IdentificationTuple<Self>>,
>;
type ReportUnresponsiveness: ReportOffence<
Self::AccountId,
IdentificationTuple<Self>,
UnresponsivenessOffence<IdentificationTuple<Self>>,
>;
/// A configuration for base priority of unsigned transactions.
///
@@ -290,12 +294,17 @@ decl_event!(
decl_storage! {
trait Store for Module<T: Config> as ImOnline {
/// The block number after which it's ok to send heartbeats in current session.
/// The block number after which it's ok to send heartbeats in the current
/// session.
///
/// At the beginning of each session we set this to a value that should
/// fall roughly in the middle of the session duration.
/// The idea is to first wait for the validators to produce a block
/// in the current session, so that the heartbeat later on will not be necessary.
/// At the beginning of each session we set this to a value that should fall
/// roughly in the middle of the session duration. The idea is to first wait for
/// the validators to produce a block in the current session, so that the
/// heartbeat later on will not be necessary.
///
/// This value will only be used as a fallback if we fail to get a proper session
/// progress estimate from `NextSessionRotation`, as those estimates should be
/// more accurate then the value we calculate for `HeartbeatAfter`.
HeartbeatAfter get(fn heartbeat_after): T::BlockNumber;
/// The current set of keys that may issue a heartbeat.
@@ -469,19 +478,34 @@ impl<T: Config> Module<T> {
);
}
pub(crate) fn send_heartbeats(block_number: T::BlockNumber)
-> OffchainResult<T, impl Iterator<Item=OffchainResult<T, ()>>>
{
let heartbeat_after = <HeartbeatAfter<T>>::get();
if block_number < heartbeat_after {
return Err(OffchainErr::TooEarly(heartbeat_after))
pub(crate) fn send_heartbeats(
block_number: T::BlockNumber,
) -> OffchainResult<T, impl Iterator<Item = OffchainResult<T, ()>>> {
const HALF_SESSION: Percent = Percent::from_percent(50);
let too_early = if let (Some(progress), _) =
T::NextSessionRotation::estimate_current_session_progress(block_number)
{
// we try to get an estimate of the current session progress first since it
// should provide more accurate results and send the heartbeat if we're halfway
// through the session.
progress < HALF_SESSION
} else {
// otherwise we fallback to using the block number calculated at the beginning
// of the session that should roughly correspond to the middle of the session
let heartbeat_after = <HeartbeatAfter<T>>::get();
block_number < heartbeat_after
};
if too_early {
return Err(OffchainErr::TooEarly);
}
let session_index = T::ValidatorSet::session_index();
let validators_len = Keys::<T>::decode_len().unwrap_or_default() as u32;
Ok(Self::local_authority_keys()
.map(move |(authority_index, key)|
Ok(
Self::local_authority_keys().map(move |(authority_index, key)| {
Self::send_single_heartbeat(
authority_index,
key,
@@ -489,7 +513,8 @@ impl<T: Config> Module<T> {
block_number,
validators_len,
)
))
}),
)
}
@@ -648,7 +673,7 @@ impl<T: Config> OneSessionHandler<T::AccountId> for Module<T> {
// Since we consider producing blocks as being online,
// the heartbeat is deferred a bit to prevent spamming.
let block_number = <frame_system::Module<T>>::block_number();
let half_session = T::SessionDuration::get() / 2u32.into();
let half_session = T::NextSessionRotation::average_session_length() / 2u32.into();
<HeartbeatAfter<T>>::put(block_number + half_session);
// Remember who the authorities are for the new session.
@@ -699,10 +724,7 @@ const INVALID_VALIDATORS_LEN: u8 = 10;
impl<T: Config> frame_support::unsigned::ValidateUnsigned for Module<T> {
type Call = Call<T>;
fn validate_unsigned(
_source: TransactionSource,
call: &Self::Call,
) -> TransactionValidity {
fn validate_unsigned(_source: TransactionSource, call: &Self::Call) -> TransactionValidity {
if let Call::heartbeat(heartbeat, signature) = call {
if <Module<T>>::is_online(heartbeat.authority_index) {
// we already received a heartbeat for this authority
@@ -737,9 +759,12 @@ impl<T: Config> frame_support::unsigned::ValidateUnsigned for Module<T> {
ValidTransaction::with_tag_prefix("ImOnline")
.priority(T::UnsignedPriority::get())
.and_provides((current_session, authority_id))
.longevity(TryInto::<u64>::try_into(
T::SessionDuration::get() / 2u32.into()
).unwrap_or(64_u64))
.longevity(
TryInto::<u64>::try_into(
T::NextSessionRotation::average_session_length() / 2u32.into(),
)
.unwrap_or(64_u64),
)
.propagate(true)
.build()
} else {
+49 -10
View File
@@ -21,15 +21,19 @@
use std::cell::RefCell;
use crate::Config;
use sp_runtime::Perbill;
use sp_staking::{SessionIndex, offence::{ReportOffence, OffenceError}};
use sp_runtime::testing::{Header, UintAuthorityId, TestXt};
use sp_runtime::traits::{IdentityLookup, BlakeTwo256, ConvertInto};
use sp_core::H256;
use frame_support::parameter_types;
use crate as imonline;
use frame_support::{parameter_types, weights::Weight};
use pallet_session::historical as pallet_session_historical;
use sp_core::H256;
use sp_runtime::testing::{Header, TestXt, UintAuthorityId};
use sp_runtime::traits::{BlakeTwo256, ConvertInto, IdentityLookup};
use sp_runtime::{Perbill, Percent};
use sp_staking::{
offence::{OffenceError, ReportOffence},
SessionIndex,
};
use crate as imonline;
use crate::Config;
type UncheckedExtrinsic = frame_system::mocking::MockUncheckedExtrinsic<Runtime>;
type Block = frame_system::mocking::MockBlock<Runtime>;
@@ -176,6 +180,41 @@ impl pallet_authorship::Config for Runtime {
type EventHandler = ImOnline;
}
thread_local! {
pub static MOCK_CURRENT_SESSION_PROGRESS: RefCell<Option<Option<Percent>>> = RefCell::new(None);
}
thread_local! {
pub static MOCK_AVERAGE_SESSION_LENGTH: RefCell<Option<u64>> = RefCell::new(None);
}
pub struct TestNextSessionRotation;
impl frame_support::traits::EstimateNextSessionRotation<u64> for TestNextSessionRotation {
fn average_session_length() -> u64 {
// take the mock result if any and return it
let mock = MOCK_AVERAGE_SESSION_LENGTH.with(|p| p.borrow_mut().take());
mock.unwrap_or(pallet_session::PeriodicSessions::<Period, Offset>::average_session_length())
}
fn estimate_current_session_progress(now: u64) -> (Option<Percent>, Weight) {
let (estimate, weight) =
pallet_session::PeriodicSessions::<Period, Offset>::estimate_current_session_progress(
now,
);
// take the mock result if any and return it
let mock = MOCK_CURRENT_SESSION_PROGRESS.with(|p| p.borrow_mut().take());
(mock.unwrap_or(estimate), weight)
}
fn estimate_next_session_rotation(now: u64) -> (Option<u64>, Weight) {
pallet_session::PeriodicSessions::<Period, Offset>::estimate_next_session_rotation(now)
}
}
parameter_types! {
pub const UnsignedPriority: u64 = 1 << 20;
}
@@ -183,9 +222,9 @@ parameter_types! {
impl Config for Runtime {
type AuthorityId = UintAuthorityId;
type Event = Event;
type ReportUnresponsiveness = OffenceHandler;
type ValidatorSet = Historical;
type SessionDuration = Period;
type NextSessionRotation = TestNextSessionRotation;
type ReportUnresponsiveness = OffenceHandler;
type UnsignedPriority = UnsignedPriority;
type WeightInfo = ();
}
+83
View File
@@ -357,3 +357,86 @@ fn should_not_send_a_report_if_already_online() {
});
});
}
#[test]
fn should_handle_missing_progress_estimates() {
use frame_support::traits::OffchainWorker;
let mut ext = new_test_ext();
let (offchain, _state) = TestOffchainExt::new();
let (pool, state) = TestTransactionPoolExt::new();
ext.register_extension(OffchainDbExt::new(offchain.clone()));
ext.register_extension(OffchainWorkerExt::new(offchain));
ext.register_extension(TransactionPoolExt::new(pool));
ext.execute_with(|| {
let block = 1;
System::set_block_number(block);
UintAuthorityId::set_all_keys(vec![0, 1, 2]);
// buffer new validators
Session::rotate_session();
// enact the change and buffer another one
VALIDATORS.with(|l| *l.borrow_mut() = Some(vec![0, 1, 2]));
Session::rotate_session();
// we will return `None` on the next call to `estimate_current_session_progress`
// and the offchain worker should fallback to checking `HeartbeatAfter`
MOCK_CURRENT_SESSION_PROGRESS.with(|p| *p.borrow_mut() = Some(None));
ImOnline::offchain_worker(block);
assert_eq!(state.read().transactions.len(), 3);
});
}
#[test]
fn should_handle_non_linear_session_progress() {
// NOTE: this is the reason why we started using `EstimateNextSessionRotation` to figure out if
// we should send a heartbeat, it's possible that between successive blocks we progress through
// the session more than just one block increment (in BABE session length is defined in slots,
// not block numbers).
let mut ext = new_test_ext();
let (offchain, _state) = TestOffchainExt::new();
let (pool, _) = TestTransactionPoolExt::new();
ext.register_extension(OffchainDbExt::new(offchain.clone()));
ext.register_extension(OffchainWorkerExt::new(offchain));
ext.register_extension(TransactionPoolExt::new(pool));
ext.execute_with(|| {
UintAuthorityId::set_all_keys(vec![0, 1, 2]);
// buffer new validator
Session::rotate_session();
// mock the session length as being 10 blocks long,
// enact the change and buffer another one
VALIDATORS.with(|l| *l.borrow_mut() = Some(vec![0, 1, 2]));
// mock the session length has being 10 which should make us assume the fallback for half
// session will be reached by block 5.
MOCK_AVERAGE_SESSION_LENGTH.with(|p| *p.borrow_mut() = Some(10));
Session::rotate_session();
// if we don't have valid results for the current session progres then
// we'll fallback to `HeartbeatAfter` and only heartbeat on block 5.
MOCK_CURRENT_SESSION_PROGRESS.with(|p| *p.borrow_mut() = Some(None));
assert_eq!(
ImOnline::send_heartbeats(2).err(),
Some(OffchainErr::TooEarly),
);
MOCK_CURRENT_SESSION_PROGRESS.with(|p| *p.borrow_mut() = Some(None));
assert!(ImOnline::send_heartbeats(5).ok().is_some());
// if we have a valid current session progress then we'll heartbeat as soon
// as we're past 50% of the session regardless of the block number
MOCK_CURRENT_SESSION_PROGRESS
.with(|p| *p.borrow_mut() = Some(Some(Percent::from_percent(51))));
assert!(ImOnline::send_heartbeats(2).ok().is_some());
});
}