Refactor im-online and print more debug info. (#4771)

* Initial version.

* Fix tests.

* Refactor using StorageValueRef.

* Add tests and apply review suggestions.

* Bump runtime.

Co-authored-by: Gavin Wood <github@gavwood.com>
This commit is contained in:
Tomasz Drwięga
2020-01-31 19:59:49 +01:00
committed by GitHub
parent a36a170c15
commit 2f9315cc02
5 changed files with 353 additions and 157 deletions
+2 -2
View File
@@ -80,8 +80,8 @@ pub const VERSION: RuntimeVersion = RuntimeVersion {
// and set impl_version to 0. If only runtime
// implementation changes and behavior does not, then leave spec_version as
// is and increment impl_version.
spec_version: 210,
impl_version: 1,
spec_version: 211,
impl_version: 0,
apis: RUNTIME_API_VERSIONS,
};
+183 -153
View File
@@ -72,13 +72,14 @@ mod tests;
use sp_application_crypto::RuntimeAppPublic;
use codec::{Encode, Decode};
use sp_core::offchain::{OpaqueNetworkState, StorageKind};
use sp_core::offchain::OpaqueNetworkState;
use sp_std::prelude::*;
use sp_std::convert::TryInto;
use pallet_session::historical::IdentificationTuple;
use sp_runtime::{
offchain::storage::StorageValueRef,
RuntimeDebug,
traits::{Convert, Member, Printable, Saturating}, Perbill,
traits::{Convert, Member, Saturating, SimpleArithmetic}, Perbill,
transaction_validity::{
TransactionValidity, ValidTransaction, InvalidTransaction,
TransactionPriority,
@@ -89,7 +90,7 @@ use sp_staking::{
offence::{ReportOffence, Offence, Kind},
};
use frame_support::{
decl_module, decl_event, decl_storage, print, Parameter, debug, decl_error,
decl_module, decl_event, decl_storage, Parameter, debug, decl_error,
traits::Get,
};
use frame_system::{self as system, ensure_none};
@@ -129,37 +130,70 @@ pub mod ed25519 {
pub type AuthorityId = app_ed25519::Public;
}
/// The local storage database key under which the worker progress status
/// is tracked.
const DB_KEY: &[u8] = b"parity/im-online-worker-status";
const DB_PREFIX: &[u8] = b"parity/im-online-heartbeat/";
/// How many blocks do we wait for heartbeat transaction to be included
/// before sending another one.
const INCLUDE_THRESHOLD: u32 = 3;
/// It's important to persist the worker state, since e.g. the
/// server could be restarted while starting the gossip process, but before
/// finishing it. With every execution of the off-chain worker we check
/// if we need to recover and resume gossipping or if there is already
/// another off-chain worker in the process of gossipping.
/// Status of the offchain worker code.
///
/// This stores the block number at which heartbeat was requested and when the worker
/// has actually managed to produce it.
/// Note we store such status for every `authority_index` separately.
#[derive(Encode, Decode, Clone, PartialEq, Eq, RuntimeDebug)]
struct WorkerStatus<BlockNumber> {
done: bool,
gossipping_at: BlockNumber,
struct HeartbeatStatus<BlockNumber> {
/// An index of the session that we are supposed to send heartbeat for.
pub session_index: SessionIndex,
/// A block number at which the heartbeat for that session has been actually sent.
///
/// It may be 0 in case the sending failed. In such case we should just retry
/// as soon as possible (i.e. in a worker running for the next block).
pub sent_at: BlockNumber,
}
impl<BlockNumber: PartialEq + SimpleArithmetic + Copy> HeartbeatStatus<BlockNumber> {
/// Returns true if heartbeat has been recently sent.
///
/// Parameters:
/// `session_index` - index of current session.
/// `now` - block at which the offchain worker is running.
///
/// This function will return `true` iff:
/// 1. the session index is the same (we don't care if it went up or down)
/// 2. the heartbeat has been sent recently (within the threshold)
///
/// The reasoning for 1. is that it's better to send an extra heartbeat than
/// to stall or not send one in case of a bug.
fn is_recent(&self, session_index: SessionIndex, now: BlockNumber) -> bool {
self.session_index == session_index && self.sent_at + INCLUDE_THRESHOLD.into() > now
}
}
/// Error which may occur while executing the off-chain code.
#[derive(RuntimeDebug)]
enum OffchainErr {
DecodeWorkerStatus,
#[cfg_attr(test, derive(PartialEq))]
enum OffchainErr<BlockNumber> {
TooEarly(BlockNumber),
WaitingForInclusion(BlockNumber),
AlreadyOnline(u32),
FailedSigning,
FailedToAcquireLock,
NetworkState,
SubmitTransaction,
}
impl Printable for OffchainErr {
fn print(&self) {
match self {
OffchainErr::DecodeWorkerStatus => print("Offchain error: decoding WorkerStatus failed!"),
OffchainErr::FailedSigning => print("Offchain error: signing failed!"),
OffchainErr::NetworkState => print("Offchain error: fetching network state failed!"),
OffchainErr::SubmitTransaction => print("Offchain error: submitting transaction failed!"),
impl<BlockNumber: sp_std::fmt::Debug> sp_std::fmt::Debug for OffchainErr<BlockNumber> {
fn fmt(&self, fmt: &mut sp_std::fmt::Formatter) -> sp_std::fmt::Result {
match *self {
OffchainErr::TooEarly(ref block) =>
write!(fmt, "Too early to send heartbeat, next expected at {:?}", block),
OffchainErr::WaitingForInclusion(ref block) =>
write!(fmt, "Heartbeat already sent at {:?}. Waiting for inclusion.", block),
OffchainErr::AlreadyOnline(auth_idx) =>
write!(fmt, "Authority {} is already online", auth_idx),
OffchainErr::FailedSigning => write!(fmt, "Failed to sign heartbeat"),
OffchainErr::FailedToAcquireLock => write!(fmt, "Failed to acquire lock"),
OffchainErr::NetworkState => write!(fmt, "Failed to fetch network state"),
OffchainErr::SubmitTransaction => write!(fmt, "Failed to submit transaction"),
}
}
}
@@ -197,7 +231,9 @@ pub trait Trait: frame_system::Trait + pallet_session::historical::Trait {
/// An expected duration of the session.
///
/// This parameter is used to determine the longevity of `heartbeat` transaction
/// and a rough time when the heartbeat should be sent.
/// and a rough time when we should start considering sending hearbeats,
/// since the workers avoids sending them at the very beginning of the session, assuming
/// there is a chance the authority will produce a block and they won't be necessary.
type SessionDuration: Get<Self::BlockNumber>;
/// A type that gives us the ability to submit unresponsiveness offence reports.
@@ -225,8 +261,13 @@ decl_event!(
decl_storage! {
trait Store for Module<T: Trait> as ImOnline {
/// The block number when we should gossip.
GossipAt get(fn gossip_at): T::BlockNumber;
/// The block number after which it's ok to send heartbeats in current session.
///
/// At the beginning of each session we set this to a value that should
/// fall roughly in the middle of the session duration.
/// The idea is to first wait for the validators to produce a block
/// in the current session, so that the heartbeat later on will not be necessary.
HeartbeatAfter get(fn heartbeat_after): T::BlockNumber;
/// The current set of keys that may issue a heartbeat.
Keys get(fn keys): Vec<T::AuthorityId>;
@@ -302,12 +343,29 @@ decl_module! {
// Only send messages if we are a potential validator.
if sp_io::offchain::is_validator() {
Self::offchain(now);
for res in Self::send_heartbeats(now).into_iter().flatten() {
if let Err(e) = res {
debug::debug!(
target: "imonline",
"Skipping heartbeat at {:?}: {:?}",
now,
e,
)
}
}
} else {
debug::trace!(
target: "imonline",
"Skipping heartbeat at {:?}. Not a validator.",
now,
)
}
}
}
}
type OffchainResult<T, A> = Result<A, OffchainErr<<T as frame_system::Trait>::BlockNumber>>;
/// Keep track of number of authored blocks per authority, uncles are counted as
/// well since they're a valid proof of onlineness.
impl<T: Trait + pallet_authorship::Trait> pallet_authorship::EventHandler<T::ValidatorId, T::BlockNumber> for Module<T> {
@@ -365,156 +423,128 @@ impl<T: Trait> Module<T> {
);
}
pub(crate) fn offchain(now: T::BlockNumber) {
let next_gossip = <GossipAt<T>>::get();
let check = Self::check_not_yet_gossipped(now, next_gossip);
let (curr_worker_status, not_yet_gossipped) = match check {
Ok((s, v)) => (s, v),
Err(err) => {
print(err);
return;
},
};
if next_gossip < now && not_yet_gossipped {
let value_set = Self::compare_and_set_worker_status(now, false, curr_worker_status);
if !value_set {
// value could not be set in local storage, since the value was
// different from `curr_worker_status`. this indicates that
// another worker was running in parallel.
return;
}
match Self::do_gossip_at(now) {
Ok(_) => {},
Err(err) => print(err),
}
} else {
debug::native::debug!(
target: "imonline",
"Skipping gossip at: {:?} >= {:?} || {:?}",
next_gossip,
now,
if not_yet_gossipped { "not gossipped" } else { "gossipped" }
);
pub(crate) fn send_heartbeats(block_number: T::BlockNumber)
-> OffchainResult<T, impl Iterator<Item=OffchainResult<T, ()>>>
{
let heartbeat_after = <HeartbeatAfter<T>>::get();
if block_number < heartbeat_after {
return Err(OffchainErr::TooEarly(heartbeat_after))
}
let session_index = <pallet_session::Module<T>>::current_index();
Ok(Self::local_authority_keys()
.map(move |(authority_index, key)|
Self::send_single_heartbeat(authority_index, key, session_index, block_number)
))
}
fn do_gossip_at(block_number: T::BlockNumber) -> Result<(), OffchainErr> {
// we run only when a local authority key is configured
let authorities = Keys::<T>::get();
let mut results = Vec::new();
let mut local_keys = T::AuthorityId::all();
local_keys.sort();
for (authority_index, key) in authorities.into_iter()
.enumerate()
.filter_map(|(index, authority)| {
local_keys.binary_search(&authority)
.ok()
.map(|location| (index as u32, &local_keys[location]))
})
{
if Self::is_online(authority_index) {
debug::native::info!(
target: "imonline",
"[index: {:?}] Skipping sending heartbeat at block: {:?}. Already online.",
authority_index,
block_number
);
continue;
}
fn send_single_heartbeat(
authority_index: u32,
key: T::AuthorityId,
session_index: SessionIndex,
block_number: T::BlockNumber
) -> OffchainResult<T, ()> {
// A helper function to prepare heartbeat call.
let prepare_heartbeat = || -> OffchainResult<T, Call<T>> {
let network_state = sp_io::offchain::network_state()
.map_err(|_| OffchainErr::NetworkState)?;
let heartbeat_data = Heartbeat {
block_number,
network_state,
session_index: <pallet_session::Module<T>>::current_index(),
session_index,
authority_index,
};
let signature = key.sign(&heartbeat_data.encode()).ok_or(OffchainErr::FailedSigning)?;
let call = Call::heartbeat(heartbeat_data, signature);
Ok(Call::heartbeat(heartbeat_data, signature))
};
debug::info!(
target: "imonline",
"[index: {:?}] Reporting im-online at block: {:?}",
authority_index,
block_number
);
results.push(
T::SubmitTransaction::submit_unsigned(call)
.map_err(|_| OffchainErr::SubmitTransaction)
);
if Self::is_online(authority_index) {
return Err(OffchainErr::AlreadyOnline(authority_index));
}
// fail only after trying all keys.
results.into_iter().collect::<Result<Vec<_>, OffchainErr>>()?;
// acquire lock for that authority at current heartbeat to make sure we don't
// send concurrent heartbeats.
Self::with_heartbeat_lock(
authority_index,
session_index,
block_number,
|| {
let call = prepare_heartbeat()?;
debug::info!(
target: "imonline",
"[index: {:?}] Reporting im-online at block: {:?} (session: {:?}): {:?}",
authority_index,
block_number,
session_index,
call,
);
// once finished we set the worker status without comparing
// if the existing value changed in the meantime. this is
// because at this point the heartbeat was definitely submitted.
Self::set_worker_status(block_number, true);
T::SubmitTransaction::submit_unsigned(call)
.map_err(|_| OffchainErr::SubmitTransaction)?;
Ok(())
}
fn compare_and_set_worker_status(
gossipping_at: T::BlockNumber,
done: bool,
curr_worker_status: Option<Vec<u8>>,
) -> bool {
let enc = WorkerStatus {
done,
gossipping_at,
};
sp_io::offchain::local_storage_compare_and_set(
StorageKind::PERSISTENT,
DB_KEY,
curr_worker_status,
&enc.encode()
Ok(())
},
)
}
fn set_worker_status(
gossipping_at: T::BlockNumber,
done: bool,
) {
let enc = WorkerStatus {
done,
gossipping_at,
};
sp_io::offchain::local_storage_set(StorageKind::PERSISTENT, DB_KEY, &enc.encode());
fn local_authority_keys() -> impl Iterator<Item=(u32, T::AuthorityId)> {
// we run only when a local authority key is configured
let authorities = Keys::<T>::get();
let mut local_keys = T::AuthorityId::all();
local_keys.sort();
authorities.into_iter()
.enumerate()
.filter_map(move |(index, authority)| {
local_keys.binary_search(&authority)
.ok()
.map(|location| (index as u32, local_keys[location].clone()))
})
}
// Checks if a heartbeat gossip already occurred at this block number.
// Returns a tuple of `(current worker status, bool)`, whereby the bool
// is true if not yet gossipped.
fn check_not_yet_gossipped(
fn with_heartbeat_lock<R>(
authority_index: u32,
session_index: SessionIndex,
now: T::BlockNumber,
next_gossip: T::BlockNumber,
) -> Result<(Option<Vec<u8>>, bool), OffchainErr> {
let last_gossip = sp_io::offchain::local_storage_get(StorageKind::PERSISTENT, DB_KEY);
match last_gossip {
Some(last) => {
let worker_status: WorkerStatus<T::BlockNumber> = Decode::decode(&mut &last[..])
.map_err(|_| OffchainErr::DecodeWorkerStatus)?;
f: impl FnOnce() -> OffchainResult<T, R>,
) -> OffchainResult<T, R> {
let key = {
let mut key = DB_PREFIX.to_vec();
key.extend(authority_index.encode());
key
};
let storage = StorageValueRef::persistent(&key);
let res = storage.mutate(|status: Option<Option<HeartbeatStatus<T::BlockNumber>>>| {
// Check if there is already a lock for that particular block.
// This means that the heartbeat has already been sent, and we are just waiting
// for it to be included. However if it doesn't get included for INCLUDE_THRESHOLD
// we will re-send it.
match status {
// we are still waiting for inclusion.
Some(Some(status)) if status.is_recent(session_index, now) => {
Err(OffchainErr::WaitingForInclusion(status.sent_at))
},
// attempt to set new status
_ => Ok(HeartbeatStatus {
session_index,
sent_at: now,
}),
}
})?;
let was_aborted = !worker_status.done && worker_status.gossipping_at < now;
let mut new_status = res.map_err(|_| OffchainErr::FailedToAcquireLock)?;
// another off-chain worker is currently in the process of submitting
let already_submitting =
!worker_status.done && worker_status.gossipping_at == now;
// we got the lock, let's try to send the heartbeat.
let res = f();
let not_yet_gossipped =
worker_status.done && worker_status.gossipping_at < next_gossip;
let ret = (was_aborted && !already_submitting) || not_yet_gossipped;
Ok((Some(last), ret))
},
None => Ok((None, true)),
// clear the lock in case we have failed to send transaction.
if res.is_err() {
new_status.sent_at = 0.into();
storage.set(&new_status);
}
res
}
fn initialize_keys(keys: &[T::AuthorityId]) {
@@ -544,10 +574,10 @@ impl<T: Trait> pallet_session::OneSessionHandler<T::AccountId> for Module<T> {
{
// Tell the offchain worker to start making the next session's heartbeats.
// Since we consider producing blocks as being online,
// the hearbeat is defered a bit to prevent spaming.
// the heartbeat is defered a bit to prevent spaming.
let block_number = <frame_system::Module<T>>::block_number();
let half_session = T::SessionDuration::get() / 2.into();
<GossipAt<T>>::put(block_number + half_session);
<HeartbeatAfter<T>>::put(block_number + half_session);
// Remember who the authorities are for the new session.
Keys::<T>::put(validators.map(|x| x.1).collect::<Vec<_>>());
+11 -2
View File
@@ -210,7 +210,11 @@ fn should_generate_heartbeats() {
// when
UintAuthorityId::set_all_keys(vec![0, 1, 2]);
ImOnline::offchain(2);
ImOnline::send_heartbeats(2)
.unwrap()
// make sure to consume the iterator and check there are no errors.
.collect::<Result<Vec<_>, _>>().unwrap();
// then
let transaction = state.write().transactions.pop().unwrap();
@@ -315,7 +319,12 @@ fn should_not_send_a_report_if_already_online() {
// when
UintAuthorityId::set_all_keys(vec![0]); // all authorities use pallet_session key 0
ImOnline::offchain(4);
// we expect error, since the authority is already online.
let mut res = ImOnline::send_heartbeats(4).unwrap();
assert_eq!(res.next().unwrap().unwrap(), ());
assert_eq!(res.next().unwrap().unwrap_err(), OffchainErr::AlreadyOnline(1));
assert_eq!(res.next().unwrap().unwrap_err(), OffchainErr::AlreadyOnline(2));
assert_eq!(res.next(), None);
// then
let transaction = pool_state.write().transactions.pop().unwrap();
@@ -17,3 +17,4 @@
//! A collection of higher lever helpers for offchain calls.
pub mod http;
pub mod storage;
@@ -0,0 +1,156 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! A set of storage helpers for offchain workers.
use sp_core::offchain::StorageKind;
/// A storage value with a static key.
pub type StorageValue = StorageValueRef<'static>;
/// An abstraction over local storage value.
pub struct StorageValueRef<'a> {
key: &'a [u8],
kind: StorageKind,
}
impl<'a> StorageValueRef<'a> {
/// Create a new reference to a value in the persistent local storage.
pub fn persistent(key: &'a [u8]) -> Self {
Self { key, kind: StorageKind::PERSISTENT }
}
/// Create a new reference to a value in the fork-aware local storage.
pub fn local(key: &'a [u8]) -> Self {
Self { key, kind: StorageKind::LOCAL }
}
/// Set the value of the storage to encoding of given parameter.
///
/// Note that the storage may be accessed by workers running concurrently,
/// if you happen to write a `get-check-set` pattern you should most likely
/// be using `mutate` instead.
pub fn set(&self, value: &impl codec::Encode) {
value.using_encoded(|val| {
sp_io::offchain::local_storage_set(self.kind, self.key, val)
})
}
/// Retrieve & decode the value from storage.
///
/// Note that if you want to do some checks based on the value
/// and write changes after that you should rather be using `mutate`.
///
/// The function returns `None` if the value was not found in storage,
/// otherwise a decoding of the value to requested type.
pub fn get<T: codec::Decode>(&self) -> Option<Option<T>> {
sp_io::offchain::local_storage_get(self.kind, self.key)
.map(|val| T::decode(&mut &*val).ok())
}
/// Retrieve & decode the value and set it to a new one atomicaly.
///
/// Function `f` should return a new value that we should attempt to write to storage.
/// This function returns:
/// 1. `Ok(Ok(T))` in case the value has been succesfuly set.
/// 2. `Ok(Err(T))` in case the value was returned, but it couldn't have been set.
/// 3. `Err(_)` in case `f` returns an error.
pub fn mutate<T, E, F>(&self, f: F) -> Result<Result<T, T>, E> where
T: codec::Codec,
F: FnOnce(Option<Option<T>>) -> Result<T, E>
{
let value = sp_io::offchain::local_storage_get(self.kind, self.key);
let decoded = value.as_deref().map(|mut v| T::decode(&mut v).ok());
let val = f(decoded)?;
let set = val.using_encoded(|new_val| {
sp_io::offchain::local_storage_compare_and_set(
self.kind,
self.key,
value,
new_val,
)
});
if set {
Ok(Ok(val))
} else {
Ok(Err(val))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use sp_io::TestExternalities;
use sp_core::offchain::{
OffchainExt,
OffchainStorage,
testing,
};
#[test]
fn should_set_and_get() {
let (offchain, state) = testing::TestOffchainExt::new();
let mut t = TestExternalities::default();
t.register_extension(OffchainExt::new(offchain));
t.execute_with(|| {
let val = StorageValue::persistent(b"testval");
assert_eq!(val.get::<u32>(), None);
val.set(&15_u32);
assert_eq!(val.get::<u32>(), Some(Some(15_u32)));
assert_eq!(val.get::<Vec<u8>>(), Some(None));
assert_eq!(
state.read().persistent_storage.get(b"", b"testval"),
Some(vec![15_u8, 0, 0, 0])
);
})
}
#[test]
fn should_mutate() {
let (offchain, state) = testing::TestOffchainExt::new();
let mut t = TestExternalities::default();
t.register_extension(OffchainExt::new(offchain));
t.execute_with(|| {
let val = StorageValue::persistent(b"testval");
let result = val.mutate::<u32, (), _>(|val| {
assert_eq!(val, None);
Ok(16_u32)
});
assert_eq!(result, Ok(Ok(16_u32)));
assert_eq!(val.get::<u32>(), Some(Some(16_u32)));
assert_eq!(
state.read().persistent_storage.get(b"", b"testval"),
Some(vec![16_u8, 0, 0, 0])
);
// mutate again, but this time early-exit.
let res = val.mutate::<u32, (), _>(|val| {
assert_eq!(val, Some(Some(16_u32)));
Err(())
});
assert_eq!(res, Err(()));
})
}
}