mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-10 05:17:59 +00:00
more resilient subsystems: av-store (#1888)
* utils: remove unused error * av-store: do not exit early on errors * av-store: revert logging change on master * av-store: add a test
This commit is contained in:
@@ -27,7 +27,7 @@ use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH};
|
||||
|
||||
use codec::{Encode, Decode};
|
||||
use futures::{select, channel::oneshot, future::{self, Either}, Future, FutureExt, TryFutureExt};
|
||||
use futures::{select, channel::oneshot, future::{self, Either}, Future, FutureExt};
|
||||
use futures_timer::Delay;
|
||||
use kvdb_rocksdb::{Database, DatabaseConfig};
|
||||
use kvdb::{KeyValueDB, DBTransaction};
|
||||
@@ -57,34 +57,31 @@ mod columns {
|
||||
#[derive(Debug, Error)]
|
||||
enum Error {
|
||||
#[error(transparent)]
|
||||
ChainAPI(#[from] ChainApiError),
|
||||
RuntimeApi(#[from] RuntimeApiError),
|
||||
#[error(transparent)]
|
||||
ChainApi(#[from] ChainApiError),
|
||||
#[error(transparent)]
|
||||
Erasure(#[from] erasure::Error),
|
||||
#[error(transparent)]
|
||||
Io(#[from] io::Error),
|
||||
#[error(transparent)]
|
||||
ChainApiChannelIsClosed(#[from] oneshot::Canceled),
|
||||
Oneshot(#[from] oneshot::Canceled),
|
||||
#[error(transparent)]
|
||||
Subsystem(#[from] SubsystemError),
|
||||
#[error(transparent)]
|
||||
Time(#[from] SystemTimeError),
|
||||
}
|
||||
|
||||
/// Class of errors which we should handle more gracefully.
|
||||
/// An occurrence of this error should not bring down the subsystem.
|
||||
#[derive(Debug, Error)]
|
||||
enum NonFatalError {
|
||||
/// A Runtime API error occurred.
|
||||
#[error(transparent)]
|
||||
RuntimeApi(#[from] RuntimeApiError),
|
||||
|
||||
/// The receiver's end of the channel is closed.
|
||||
#[error(transparent)]
|
||||
Oneshot(#[from] oneshot::Canceled),
|
||||
|
||||
/// Overseer channel's buffer is full.
|
||||
#[error(transparent)]
|
||||
OverseerOutOfCapacity(#[from] SubsystemError),
|
||||
impl Error {
|
||||
fn severity(&self) -> log::Level {
|
||||
match self {
|
||||
// don't spam the log with spurious errors
|
||||
Self::RuntimeApi(_) |
|
||||
Self::Oneshot(_) => log::Level::Debug,
|
||||
// it's worth reporting otherwise
|
||||
_ => log::Level::Warn,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A wrapper type for delays.
|
||||
@@ -108,48 +105,48 @@ impl PruningDelay {
|
||||
|
||||
fn as_duration(&self) -> Option<Duration> {
|
||||
match self {
|
||||
PruningDelay::In(d) => Some(*d),
|
||||
PruningDelay::Indefinite => None,
|
||||
PruningDelay::In(d) => Some(*d),
|
||||
PruningDelay::Indefinite => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Duration> for PruningDelay {
|
||||
fn from(d: Duration) -> Self {
|
||||
fn from(d: Duration) -> Self {
|
||||
Self::In(d)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for PruningDelay {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
match (self, other) {
|
||||
(PruningDelay::In(this), PruningDelay::In(that)) => {this == that},
|
||||
(PruningDelay::Indefinite, PruningDelay::Indefinite) => true,
|
||||
(PruningDelay::In(this), PruningDelay::In(that)) => {this == that},
|
||||
(PruningDelay::Indefinite, PruningDelay::Indefinite) => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd for PruningDelay {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
match (self, other) {
|
||||
(PruningDelay::In(this), PruningDelay::In(that)) => this.partial_cmp(that),
|
||||
(PruningDelay::In(_), PruningDelay::Indefinite) => Some(Ordering::Less),
|
||||
(PruningDelay::Indefinite, PruningDelay::In(_)) => Some(Ordering::Greater),
|
||||
(PruningDelay::Indefinite, PruningDelay::Indefinite) => Some(Ordering::Equal),
|
||||
(PruningDelay::In(this), PruningDelay::In(that)) => this.partial_cmp(that),
|
||||
(PruningDelay::In(_), PruningDelay::Indefinite) => Some(Ordering::Less),
|
||||
(PruningDelay::Indefinite, PruningDelay::In(_)) => Some(Ordering::Greater),
|
||||
(PruningDelay::Indefinite, PruningDelay::Indefinite) => Some(Ordering::Equal),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Ord for PruningDelay {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
match (self, other) {
|
||||
(PruningDelay::In(this), PruningDelay::In(that)) => this.cmp(that),
|
||||
(PruningDelay::In(_), PruningDelay::Indefinite) => Ordering::Less,
|
||||
(PruningDelay::Indefinite, PruningDelay::In(_)) => Ordering::Greater,
|
||||
(PruningDelay::Indefinite, PruningDelay::Indefinite) => Ordering::Equal,
|
||||
(PruningDelay::In(this), PruningDelay::In(that)) => this.cmp(that),
|
||||
(PruningDelay::In(_), PruningDelay::Indefinite) => Ordering::Less,
|
||||
(PruningDelay::Indefinite, PruningDelay::In(_)) => Ordering::Greater,
|
||||
(PruningDelay::Indefinite, PruningDelay::Indefinite) => Ordering::Equal,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A key for chunk pruning records.
|
||||
@@ -227,13 +224,13 @@ struct PruningConfig {
|
||||
}
|
||||
|
||||
impl Default for PruningConfig {
|
||||
fn default() -> Self {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
keep_stored_block_for: KEEP_STORED_BLOCK_FOR,
|
||||
keep_finalized_block_for: KEEP_FINALIZED_BLOCK_FOR,
|
||||
keep_finalized_chunk_for: KEEP_FINALIZED_CHUNK_FOR,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Decode, Encode, Eq, PartialEq)]
|
||||
@@ -268,9 +265,9 @@ impl Ord for PoVPruningRecord {
|
||||
}
|
||||
|
||||
impl PartialOrd for PoVPruningRecord {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Decode, Encode, Eq)]
|
||||
@@ -477,56 +474,69 @@ fn get_next_chunk_pruning_time(db: &Arc<dyn KeyValueDB>) -> Option<NextChunkPrun
|
||||
}
|
||||
|
||||
async fn run<Context>(mut subsystem: AvailabilityStoreSubsystem, mut ctx: Context)
|
||||
-> Result<(), Error>
|
||||
where
|
||||
Context: SubsystemContext<Message=AvailabilityStoreMessage>,
|
||||
{
|
||||
loop {
|
||||
// Every time the following two methods are called a read from DB is performed.
|
||||
// But given that these are very small values which are essentially a newtype
|
||||
// wrappers around `Duration` (`NextChunkPruning` and `NextPoVPruning`) and also the
|
||||
// fact of the frequent reads itself we assume these to end up cached in the memory
|
||||
// anyway and thus these db reads to be reasonably fast.
|
||||
let pov_pruning_time = subsystem.maybe_prune_povs()?;
|
||||
let chunk_pruning_time = subsystem.maybe_prune_chunks()?;
|
||||
|
||||
let mut pov_pruning_time = pov_pruning_time.fuse();
|
||||
let mut chunk_pruning_time = chunk_pruning_time.fuse();
|
||||
|
||||
select! {
|
||||
incoming = ctx.recv().fuse() => {
|
||||
match incoming {
|
||||
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => break,
|
||||
Ok(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
|
||||
ActiveLeavesUpdate { activated, .. })
|
||||
)) => {
|
||||
for activated in activated.into_iter() {
|
||||
process_block_activated(&mut ctx, &subsystem.inner, activated).await?;
|
||||
}
|
||||
}
|
||||
Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(hash))) => {
|
||||
process_block_finalized(&subsystem, &mut ctx, &subsystem.inner, hash).await?;
|
||||
}
|
||||
Ok(FromOverseer::Communication { msg }) => {
|
||||
process_message(&mut subsystem, &mut ctx, msg).await?;
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("AvailabilityStoreSubsystem err: {:#?}", e);
|
||||
break
|
||||
},
|
||||
}
|
||||
let res = run_iteration(&mut subsystem, &mut ctx).await;
|
||||
match res {
|
||||
Err(e) => {
|
||||
log::log!(target: LOG_TARGET, e.severity(), "{}", e);
|
||||
}
|
||||
pov_pruning_time = pov_pruning_time => {
|
||||
subsystem.prune_povs()?;
|
||||
}
|
||||
chunk_pruning_time = chunk_pruning_time => {
|
||||
subsystem.prune_chunks()?;
|
||||
}
|
||||
complete => break,
|
||||
Ok(true) => {
|
||||
log::info!(target: LOG_TARGET, "received `Conclude` signal, exiting");
|
||||
break;
|
||||
},
|
||||
Ok(false) => continue,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
async fn run_iteration<Context>(subsystem: &mut AvailabilityStoreSubsystem, ctx: &mut Context)
|
||||
-> Result<bool, Error>
|
||||
where
|
||||
Context: SubsystemContext<Message=AvailabilityStoreMessage>,
|
||||
{
|
||||
// Every time the following two methods are called a read from DB is performed.
|
||||
// But given that these are very small values which are essentially a newtype
|
||||
// wrappers around `Duration` (`NextChunkPruning` and `NextPoVPruning`) and also the
|
||||
// fact of the frequent reads itself we assume these to end up cached in the memory
|
||||
// anyway and thus these db reads to be reasonably fast.
|
||||
let pov_pruning_time = subsystem.maybe_prune_povs()?;
|
||||
let chunk_pruning_time = subsystem.maybe_prune_chunks()?;
|
||||
|
||||
let mut pov_pruning_time = pov_pruning_time.fuse();
|
||||
let mut chunk_pruning_time = chunk_pruning_time.fuse();
|
||||
|
||||
select! {
|
||||
incoming = ctx.recv().fuse() => {
|
||||
match incoming? {
|
||||
FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(true),
|
||||
FromOverseer::Signal(OverseerSignal::ActiveLeaves(
|
||||
ActiveLeavesUpdate { activated, .. })
|
||||
) => {
|
||||
for activated in activated.into_iter() {
|
||||
process_block_activated(ctx, &subsystem.inner, activated).await?;
|
||||
}
|
||||
}
|
||||
FromOverseer::Signal(OverseerSignal::BlockFinalized(hash)) => {
|
||||
process_block_finalized(subsystem, ctx, &subsystem.inner, hash).await?;
|
||||
}
|
||||
FromOverseer::Communication { msg } => {
|
||||
process_message(subsystem, ctx, msg).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
pov_pruning_time = pov_pruning_time => {
|
||||
subsystem.prune_povs()?;
|
||||
}
|
||||
chunk_pruning_time = chunk_pruning_time => {
|
||||
subsystem.prune_chunks()?;
|
||||
}
|
||||
complete => return Ok(true),
|
||||
}
|
||||
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
/// As soon as certain block is finalized its pruning records and records of all
|
||||
@@ -647,7 +657,7 @@ where
|
||||
async fn request_candidate_events<Context>(
|
||||
ctx: &mut Context,
|
||||
hash: Hash,
|
||||
) -> Result<Vec<CandidateEvent>, NonFatalError>
|
||||
) -> Result<Vec<CandidateEvent>, Error>
|
||||
where
|
||||
Context: SubsystemContext<Message=AvailabilityStoreMessage>
|
||||
{
|
||||
@@ -673,48 +683,44 @@ where
|
||||
{
|
||||
use AvailabilityStoreMessage::*;
|
||||
|
||||
fn log_send_error(request: &'static str) {
|
||||
log::debug!(target: LOG_TARGET, "error sending a response to {}", request);
|
||||
}
|
||||
|
||||
match msg {
|
||||
QueryAvailableData(hash, tx) => {
|
||||
tx.send(available_data(&subsystem.inner, &hash).map(|d| d.data))
|
||||
.unwrap_or_else(|_| log_send_error("QueryAvailableData"));
|
||||
.map_err(|_| oneshot::Canceled)?;
|
||||
}
|
||||
QueryDataAvailability(hash, tx) => {
|
||||
tx.send(available_data(&subsystem.inner, &hash).is_some())
|
||||
.unwrap_or_else(|_| log_send_error("QueryDataAvailability"));
|
||||
.map_err(|_| oneshot::Canceled)?;
|
||||
}
|
||||
QueryChunk(hash, id, tx) => {
|
||||
tx.send(get_chunk(subsystem, &hash, id)?)
|
||||
.unwrap_or_else(|_| log_send_error("QueryChunk"));
|
||||
.map_err(|_| oneshot::Canceled)?;
|
||||
}
|
||||
QueryChunkAvailability(hash, id, tx) => {
|
||||
tx.send(get_chunk(subsystem, &hash, id)?.is_some())
|
||||
.unwrap_or_else(|_| log_send_error("QueryChunkAvailability"));
|
||||
.map_err(|_| oneshot::Canceled)?;
|
||||
}
|
||||
StoreChunk { candidate_hash, relay_parent, validator_index, chunk, tx } => {
|
||||
// Current block number is relay_parent block number + 1.
|
||||
let block_number = get_block_number(ctx, relay_parent).await? + 1;
|
||||
match store_chunk(subsystem, &candidate_hash, validator_index, chunk, block_number) {
|
||||
Err(e) => {
|
||||
tx.send(Err(())).unwrap_or_else(|_| log_send_error("StoreChunk (Err)"));
|
||||
tx.send(Err(())).map_err(|_| oneshot::Canceled)?;
|
||||
return Err(e);
|
||||
}
|
||||
Ok(()) => {
|
||||
tx.send(Ok(())).unwrap_or_else(|_| log_send_error("StoreChunk (Ok)"));
|
||||
tx.send(Ok(())).map_err(|_| oneshot::Canceled)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
StoreAvailableData(hash, id, n_validators, av_data, tx) => {
|
||||
match store_available_data(subsystem, &hash, id, n_validators, av_data) {
|
||||
Err(e) => {
|
||||
tx.send(Err(())).unwrap_or_else(|_| log_send_error("StoreAvailableData (Err)"));
|
||||
tx.send(Err(())).map_err(|_| oneshot::Canceled)?;
|
||||
return Err(e);
|
||||
}
|
||||
Ok(()) => {
|
||||
tx.send(Ok(())).unwrap_or_else(|_| log_send_error("StoreAvailableData (Ok)"));
|
||||
tx.send(Ok(())).map_err(|_| oneshot::Canceled)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -995,7 +1001,7 @@ where
|
||||
{
|
||||
fn start(self, ctx: Context) -> SpawnedSubsystem {
|
||||
let future = run(self, ctx)
|
||||
.map_err(|e| SubsystemError::with_origin("availability-store", e))
|
||||
.map(|_| Ok(()))
|
||||
.boxed();
|
||||
|
||||
SpawnedSubsystem {
|
||||
|
||||
@@ -30,7 +30,9 @@ use polkadot_primitives::v1::{
|
||||
PersistedValidationData, PoV, Id as ParaId,
|
||||
};
|
||||
use polkadot_node_subsystem_util::TimeoutExt;
|
||||
use polkadot_subsystem::ActiveLeavesUpdate;
|
||||
use polkadot_subsystem::{
|
||||
ActiveLeavesUpdate, errors::RuntimeApiError,
|
||||
};
|
||||
use polkadot_node_subsystem_test_helpers as test_helpers;
|
||||
|
||||
struct TestHarness {
|
||||
@@ -167,6 +169,51 @@ async fn overseer_signal(
|
||||
.expect(&format!("{:?} is more than enough for sending signals.", TIMEOUT));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn runtime_api_error_does_not_stop_the_subsystem() {
|
||||
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||
|
||||
test_harness(PruningConfig::default(), store, |test_harness| async move {
|
||||
let TestHarness { mut virtual_overseer } = test_harness;
|
||||
let new_leaf = Hash::repeat_byte(0x01);
|
||||
|
||||
overseer_signal(
|
||||
&mut virtual_overseer,
|
||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||
activated: smallvec![new_leaf.clone()],
|
||||
deactivated: smallvec![],
|
||||
}),
|
||||
).await;
|
||||
|
||||
// runtime api call fails
|
||||
assert_matches!(
|
||||
overseer_recv(&mut virtual_overseer).await,
|
||||
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
relay_parent,
|
||||
RuntimeApiRequest::CandidateEvents(tx),
|
||||
)) => {
|
||||
assert_eq!(relay_parent, new_leaf);
|
||||
tx.send(Err(RuntimeApiError::from("oh no".to_string()))).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
// but that's fine, we're still alive
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let candidate_hash = Hash::repeat_byte(33);
|
||||
let validator_index = 5;
|
||||
let query_chunk = AvailabilityStoreMessage::QueryChunk(
|
||||
candidate_hash,
|
||||
validator_index,
|
||||
tx,
|
||||
);
|
||||
|
||||
overseer_send(&mut virtual_overseer, query_chunk.into()).await;
|
||||
|
||||
assert!(rx.await.unwrap().is_none());
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn store_chunk_works() {
|
||||
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||
|
||||
Reference in New Issue
Block a user