Chain Selection: Follow-ups (#3328)

* DB skeleton

* key formats

* lexicographic test

* custom types for DB

* implement backend for db-v1

* remove VoidBackend and integrate with real DbBackend

* detect stagnant blocks on in interval

* fix tests

* add tests for stagnant

* send ChainSelectionMessage::Approved

* tests for DB backend

* unused import

* upgrade kvdb-memorydb

Co-authored-by: Andronik Ordian <write@reusable.software>
This commit is contained in:
Robert Habermeier
2021-07-06 16:00:52 -05:00
committed by GitHub
parent 5ba0de035e
commit f69c175119
12 changed files with 1353 additions and 144 deletions
+184 -98
View File
@@ -25,15 +25,18 @@ use polkadot_subsystem::{
errors::ChainApiError,
};
use kvdb::KeyValueDB;
use parity_scale_codec::Error as CodecError;
use futures::channel::oneshot;
use futures::prelude::*;
use std::time::{UNIX_EPOCH, SystemTime};
use std::time::{UNIX_EPOCH, Duration,SystemTime};
use std::sync::Arc;
use crate::backend::{Backend, OverlayedBackend, BackendWriteOp};
mod backend;
mod db_backend;
mod tree;
#[cfg(test)]
@@ -43,6 +46,10 @@ const LOG_TARGET: &str = "parachain::chain-selection";
/// Timestamp based on the 1 Jan 1970 UNIX base, which is persistent across node restarts and OS reboots.
type Timestamp = u64;
// If a block isn't approved in 120 seconds, nodes will abandon it
// and begin building on another chain.
const STAGNANT_TIMEOUT: Timestamp = 120;
#[derive(Debug, Clone)]
enum Approval {
// Approved
@@ -202,96 +209,143 @@ impl Error {
}
}
fn timestamp_now() -> Timestamp {
// `SystemTime` is notoriously non-monotonic, so our timers might not work
// exactly as expected. Regardless, stagnation is detected on the order of minutes,
// and slippage of a few seconds in either direction won't cause any major harm.
//
// The exact time that a block becomes stagnant in the local node is always expected
// to differ from other nodes due to network asynchrony and delays in block propagation.
// Non-monotonicity exarcerbates that somewhat, but not meaningfully.
/// A clock used for fetching the current timestamp.
pub trait Clock {
/// Get the current timestamp.
fn timestamp_now(&self) -> Timestamp;
}
match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(d) => d.as_secs(),
Err(e) => {
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Current time is before unix epoch. Validation will not work correctly."
);
struct SystemClock;
0
impl Clock for SystemClock {
fn timestamp_now(&self) -> Timestamp {
// `SystemTime` is notoriously non-monotonic, so our timers might not work
// exactly as expected. Regardless, stagnation is detected on the order of minutes,
// and slippage of a few seconds in either direction won't cause any major harm.
//
// The exact time that a block becomes stagnant in the local node is always expected
// to differ from other nodes due to network asynchrony and delays in block propagation.
// Non-monotonicity exarcerbates that somewhat, but not meaningfully.
match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(d) => d.as_secs(),
Err(e) => {
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Current time is before unix epoch. Validation will not work correctly."
);
0
}
}
}
}
fn stagnant_timeout_from_now() -> Timestamp {
// If a block isn't approved in 120 seconds, nodes will abandon it
// and begin building on another chain.
const STAGNANT_TIMEOUT: Timestamp = 120;
/// The interval, in seconds to check for stagnant blocks.
#[derive(Debug, Clone)]
pub struct StagnantCheckInterval(Duration);
timestamp_now() + STAGNANT_TIMEOUT
impl Default for StagnantCheckInterval {
fn default() -> Self {
// 5 seconds is a reasonable balance between avoiding DB reads and
// ensuring validators are generally in agreement on stagnant blocks.
//
// Assuming a network delay of D, the longest difference in view possible
// between 2 validators is D + 5s.
const DEFAULT_STAGNANT_CHECK_INTERVAL: Duration = Duration::from_secs(5);
StagnantCheckInterval(DEFAULT_STAGNANT_CHECK_INTERVAL)
}
}
// TODO https://github.com/paritytech/polkadot/issues/3293:
//
// This is used just so we can have a public function that calls
// `run` and eliminates all the unused errors.
//
// Should be removed when the real implementation is done.
struct VoidBackend;
impl Backend for VoidBackend {
fn load_block_entry(&self, _: &Hash) -> Result<Option<BlockEntry>, Error> {
Ok(None)
}
fn load_leaves(&self) -> Result<LeafEntrySet, Error> {
Ok(LeafEntrySet::default())
}
fn load_stagnant_at(&self, _: Timestamp) -> Result<Vec<Hash>, Error> {
Ok(Vec::new())
}
fn load_stagnant_at_up_to(&self, _: Timestamp)
-> Result<Vec<(Timestamp, Vec<Hash>)>, Error>
{
Ok(Vec::new())
}
fn load_first_block_number(&self) -> Result<Option<BlockNumber>, Error> {
Ok(None)
}
fn load_blocks_by_number(&self, _: BlockNumber) -> Result<Vec<Hash>, Error> {
Ok(Vec::new())
impl StagnantCheckInterval {
/// Create a new stagnant-check interval wrapping the given duration.
pub fn new(interval: Duration) -> Self {
StagnantCheckInterval(interval)
}
fn write<I>(&mut self, _: I) -> Result<(), Error>
where I: IntoIterator<Item = BackendWriteOp>
{
Ok(())
fn timeout_stream(&self) -> impl Stream<Item = ()> {
let interval = self.0;
let mut delay = futures_timer::Delay::new(interval);
futures::stream::poll_fn(move |cx| {
let poll = delay.poll_unpin(cx);
if poll.is_ready() {
delay.reset(interval)
}
poll.map(Some)
})
}
}
/// Configuration for the chain selection subsystem.
#[derive(Debug, Clone)]
pub struct Config {
/// The column in the database that the storage should use.
pub col_data: u32,
/// How often to check for stagnant blocks.
pub stagnant_check_interval: StagnantCheckInterval,
}
/// The chain selection subsystem.
pub struct ChainSelectionSubsystem;
pub struct ChainSelectionSubsystem {
config: Config,
db: Arc<dyn KeyValueDB>,
}
impl ChainSelectionSubsystem {
/// Create a new instance of the subsystem with the given config
/// and key-value store.
pub fn new(config: Config, db: Arc<dyn KeyValueDB>) -> Self {
ChainSelectionSubsystem {
config,
db,
}
}
}
impl<Context> Subsystem<Context> for ChainSelectionSubsystem
where Context: SubsystemContext<Message = ChainSelectionMessage>
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let backend = VoidBackend;
let backend = crate::db_backend::v1::DbBackend::new(
self.db,
crate::db_backend::v1::Config { col_data: self.config.col_data },
);
SpawnedSubsystem {
future: run(ctx, backend).map(|()| Ok(())).boxed(),
future: run(
ctx,
backend,
self.config.stagnant_check_interval,
Box::new(SystemClock),
)
.map(Ok)
.boxed(),
name: "chain-selection-subsystem",
}
}
}
async fn run<Context, B>(mut ctx: Context, mut backend: B)
async fn run<Context, B>(
mut ctx: Context,
mut backend: B,
stagnant_check_interval: StagnantCheckInterval,
clock: Box<dyn Clock + Send + Sync>,
)
where
Context: SubsystemContext<Message = ChainSelectionMessage>,
B: Backend,
{
loop {
let res = run_iteration(&mut ctx, &mut backend).await;
let res = run_iteration(
&mut ctx,
&mut backend,
&stagnant_check_interval,
&*clock,
).await;
match res {
Err(e) => {
e.trace();
@@ -313,55 +367,69 @@ async fn run<Context, B>(mut ctx: Context, mut backend: B)
//
// A return value of `Ok` indicates that an exit should be made, while non-fatal errors
// lead to another call to this function.
async fn run_iteration<Context, B>(ctx: &mut Context, backend: &mut B)
async fn run_iteration<Context, B>(
ctx: &mut Context,
backend: &mut B,
stagnant_check_interval: &StagnantCheckInterval,
clock: &(dyn Clock + Sync),
)
-> Result<(), Error>
where
Context: SubsystemContext<Message = ChainSelectionMessage>,
B: Backend,
{
// TODO https://github.com/paritytech/polkadot/issues/3293: Add stagnant checking timer loop.
let mut stagnant_check_stream = stagnant_check_interval.timeout_stream();
loop {
match ctx.recv().await? {
FromOverseer::Signal(OverseerSignal::Conclude) => {
return Ok(())
}
FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => {
for leaf in update.activated {
let write_ops = handle_active_leaf(
ctx,
&*backend,
leaf.hash,
).await?;
futures::select! {
msg = ctx.recv().fuse() => {
let msg = msg?;
match msg {
FromOverseer::Signal(OverseerSignal::Conclude) => {
return Ok(())
}
FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => {
for leaf in update.activated {
let write_ops = handle_active_leaf(
ctx,
&*backend,
clock.timestamp_now() + STAGNANT_TIMEOUT,
leaf.hash,
).await?;
backend.write(write_ops)?;
}
}
FromOverseer::Signal(OverseerSignal::BlockFinalized(h, n)) => {
handle_finalized_block(backend, h, n)?
}
FromOverseer::Communication { msg } => match msg {
ChainSelectionMessage::Approved(hash) => {
handle_approved_block(backend, hash)?
}
ChainSelectionMessage::Leaves(tx) => {
let leaves = load_leaves(ctx, &*backend).await?;
let _ = tx.send(leaves);
}
ChainSelectionMessage::BestLeafContaining(required, tx) => {
let best_containing = crate::backend::find_best_leaf_containing(
&*backend,
required,
)?;
backend.write(write_ops)?;
}
}
FromOverseer::Signal(OverseerSignal::BlockFinalized(h, n)) => {
handle_finalized_block(backend, h, n)?
}
FromOverseer::Communication { msg } => match msg {
ChainSelectionMessage::Approved(hash) => {
handle_approved_block(backend, hash)?
}
ChainSelectionMessage::Leaves(tx) => {
let leaves = load_leaves(ctx, &*backend).await?;
let _ = tx.send(leaves);
}
ChainSelectionMessage::BestLeafContaining(required, tx) => {
let best_containing = crate::backend::find_best_leaf_containing(
&*backend,
required,
)?;
// note - this may be none if the finalized block is
// a leaf. this is fine according to the expected usage of the
// function. `None` responses should just `unwrap_or(required)`,
// so if the required block is the finalized block, then voilá.
// note - this may be none if the finalized block is
// a leaf. this is fine according to the expected usage of the
// function. `None` responses should just `unwrap_or(required)`,
// so if the required block is the finalized block, then voilá.
let _ = tx.send(best_containing);
let _ = tx.send(best_containing);
}
}
}
}
};
_ = stagnant_check_stream.next().fuse() => {
detect_stagnant(backend, clock.timestamp_now())?;
}
}
}
}
@@ -415,6 +483,7 @@ async fn fetch_block_weight(
async fn handle_active_leaf(
ctx: &mut impl SubsystemContext,
backend: &impl Backend,
stagnant_at: Timestamp,
hash: Hash,
) -> Result<Vec<BackendWriteOp>, Error> {
let lower_bound = match backend.load_first_block_number()? {
@@ -475,6 +544,7 @@ async fn handle_active_leaf(
header.parent_hash,
reversion_logs,
weight,
stagnant_at,
)?;
}
@@ -556,6 +626,22 @@ fn handle_approved_block(
backend.write(ops)
}
fn detect_stagnant(
backend: &mut impl Backend,
now: Timestamp,
) -> Result<(), Error> {
let ops = {
let overlay = crate::tree::detect_stagnant(
&*backend,
now,
)?;
overlay.into_write_ops()
};
backend.write(ops)
}
// Load the leaves from the backend. If there are no leaves, then return
// the finalized block.
async fn load_leaves(