From bcec9f9a996e1a19875e9a69813aa69a47fc17a2 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Mon, 2 Nov 2020 19:55:32 +0100 Subject: [PATCH] more resilient subsystems: av-store (#1888) * utils: remove unused error * av-store: do not exit early on errors * av-store: revert logging change on master * av-store: add a test --- polkadot/node/core/av-store/src/lib.rs | 202 ++++++++++++----------- polkadot/node/core/av-store/src/tests.rs | 49 +++++- 2 files changed, 152 insertions(+), 99 deletions(-) diff --git a/polkadot/node/core/av-store/src/lib.rs b/polkadot/node/core/av-store/src/lib.rs index 22b8c29107..0f248ba347 100644 --- a/polkadot/node/core/av-store/src/lib.rs +++ b/polkadot/node/core/av-store/src/lib.rs @@ -27,7 +27,7 @@ use std::sync::Arc; use std::time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH}; use codec::{Encode, Decode}; -use futures::{select, channel::oneshot, future::{self, Either}, Future, FutureExt, TryFutureExt}; +use futures::{select, channel::oneshot, future::{self, Either}, Future, FutureExt}; use futures_timer::Delay; use kvdb_rocksdb::{Database, DatabaseConfig}; use kvdb::{KeyValueDB, DBTransaction}; @@ -57,34 +57,31 @@ mod columns { #[derive(Debug, Error)] enum Error { #[error(transparent)] - ChainAPI(#[from] ChainApiError), + RuntimeApi(#[from] RuntimeApiError), + #[error(transparent)] + ChainApi(#[from] ChainApiError), #[error(transparent)] Erasure(#[from] erasure::Error), #[error(transparent)] Io(#[from] io::Error), #[error(transparent)] - ChainApiChannelIsClosed(#[from] oneshot::Canceled), + Oneshot(#[from] oneshot::Canceled), #[error(transparent)] Subsystem(#[from] SubsystemError), #[error(transparent)] Time(#[from] SystemTimeError), } -/// Class of errors which we should handle more gracefully. -/// An occurrence of this error should not bring down the subsystem. -#[derive(Debug, Error)] -enum NonFatalError { - /// A Runtime API error occurred. - #[error(transparent)] - RuntimeApi(#[from] RuntimeApiError), - - /// The receiver's end of the channel is closed. - #[error(transparent)] - Oneshot(#[from] oneshot::Canceled), - - /// Overseer channel's buffer is full. - #[error(transparent)] - OverseerOutOfCapacity(#[from] SubsystemError), +impl Error { + fn severity(&self) -> log::Level { + match self { + // don't spam the log with spurious errors + Self::RuntimeApi(_) | + Self::Oneshot(_) => log::Level::Debug, + // it's worth reporting otherwise + _ => log::Level::Warn, + } + } } /// A wrapper type for delays. @@ -108,48 +105,48 @@ impl PruningDelay { fn as_duration(&self) -> Option { match self { - PruningDelay::In(d) => Some(*d), - PruningDelay::Indefinite => None, + PruningDelay::In(d) => Some(*d), + PruningDelay::Indefinite => None, } } } impl From for PruningDelay { - fn from(d: Duration) -> Self { + fn from(d: Duration) -> Self { Self::In(d) - } + } } impl PartialEq for PruningDelay { - fn eq(&self, other: &Self) -> bool { + fn eq(&self, other: &Self) -> bool { match (self, other) { - (PruningDelay::In(this), PruningDelay::In(that)) => {this == that}, - (PruningDelay::Indefinite, PruningDelay::Indefinite) => true, + (PruningDelay::In(this), PruningDelay::In(that)) => {this == that}, + (PruningDelay::Indefinite, PruningDelay::Indefinite) => true, _ => false, } - } + } } impl PartialOrd for PruningDelay { - fn partial_cmp(&self, other: &Self) -> Option { + fn partial_cmp(&self, other: &Self) -> Option { match (self, other) { - (PruningDelay::In(this), PruningDelay::In(that)) => this.partial_cmp(that), - (PruningDelay::In(_), PruningDelay::Indefinite) => Some(Ordering::Less), - (PruningDelay::Indefinite, PruningDelay::In(_)) => Some(Ordering::Greater), - (PruningDelay::Indefinite, PruningDelay::Indefinite) => Some(Ordering::Equal), + (PruningDelay::In(this), PruningDelay::In(that)) => this.partial_cmp(that), + (PruningDelay::In(_), PruningDelay::Indefinite) => Some(Ordering::Less), + (PruningDelay::Indefinite, PruningDelay::In(_)) => Some(Ordering::Greater), + (PruningDelay::Indefinite, PruningDelay::Indefinite) => Some(Ordering::Equal), } - } + } } impl Ord for PruningDelay { - fn cmp(&self, other: &Self) -> Ordering { + fn cmp(&self, other: &Self) -> Ordering { match (self, other) { - (PruningDelay::In(this), PruningDelay::In(that)) => this.cmp(that), - (PruningDelay::In(_), PruningDelay::Indefinite) => Ordering::Less, - (PruningDelay::Indefinite, PruningDelay::In(_)) => Ordering::Greater, - (PruningDelay::Indefinite, PruningDelay::Indefinite) => Ordering::Equal, + (PruningDelay::In(this), PruningDelay::In(that)) => this.cmp(that), + (PruningDelay::In(_), PruningDelay::Indefinite) => Ordering::Less, + (PruningDelay::Indefinite, PruningDelay::In(_)) => Ordering::Greater, + (PruningDelay::Indefinite, PruningDelay::Indefinite) => Ordering::Equal, } - } + } } /// A key for chunk pruning records. @@ -227,13 +224,13 @@ struct PruningConfig { } impl Default for PruningConfig { - fn default() -> Self { + fn default() -> Self { Self { keep_stored_block_for: KEEP_STORED_BLOCK_FOR, keep_finalized_block_for: KEEP_FINALIZED_BLOCK_FOR, keep_finalized_chunk_for: KEEP_FINALIZED_CHUNK_FOR, } - } + } } #[derive(Debug, Decode, Encode, Eq, PartialEq)] @@ -268,9 +265,9 @@ impl Ord for PoVPruningRecord { } impl PartialOrd for PoVPruningRecord { - fn partial_cmp(&self, other: &Self) -> Option { + fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) - } + } } #[derive(Debug, Decode, Encode, Eq)] @@ -477,56 +474,69 @@ fn get_next_chunk_pruning_time(db: &Arc) -> Option(mut subsystem: AvailabilityStoreSubsystem, mut ctx: Context) - -> Result<(), Error> where Context: SubsystemContext, { loop { - // Every time the following two methods are called a read from DB is performed. - // But given that these are very small values which are essentially a newtype - // wrappers around `Duration` (`NextChunkPruning` and `NextPoVPruning`) and also the - // fact of the frequent reads itself we assume these to end up cached in the memory - // anyway and thus these db reads to be reasonably fast. - let pov_pruning_time = subsystem.maybe_prune_povs()?; - let chunk_pruning_time = subsystem.maybe_prune_chunks()?; - - let mut pov_pruning_time = pov_pruning_time.fuse(); - let mut chunk_pruning_time = chunk_pruning_time.fuse(); - - select! { - incoming = ctx.recv().fuse() => { - match incoming { - Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => break, - Ok(FromOverseer::Signal(OverseerSignal::ActiveLeaves( - ActiveLeavesUpdate { activated, .. }) - )) => { - for activated in activated.into_iter() { - process_block_activated(&mut ctx, &subsystem.inner, activated).await?; - } - } - Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(hash))) => { - process_block_finalized(&subsystem, &mut ctx, &subsystem.inner, hash).await?; - } - Ok(FromOverseer::Communication { msg }) => { - process_message(&mut subsystem, &mut ctx, msg).await?; - } - Err(e) => { - log::error!("AvailabilityStoreSubsystem err: {:#?}", e); - break - }, - } + let res = run_iteration(&mut subsystem, &mut ctx).await; + match res { + Err(e) => { + log::log!(target: LOG_TARGET, e.severity(), "{}", e); } - pov_pruning_time = pov_pruning_time => { - subsystem.prune_povs()?; - } - chunk_pruning_time = chunk_pruning_time => { - subsystem.prune_chunks()?; - } - complete => break, + Ok(true) => { + log::info!(target: LOG_TARGET, "received `Conclude` signal, exiting"); + break; + }, + Ok(false) => continue, } } +} - Ok(()) +async fn run_iteration(subsystem: &mut AvailabilityStoreSubsystem, ctx: &mut Context) + -> Result +where + Context: SubsystemContext, +{ + // Every time the following two methods are called a read from DB is performed. + // But given that these are very small values which are essentially a newtype + // wrappers around `Duration` (`NextChunkPruning` and `NextPoVPruning`) and also the + // fact of the frequent reads itself we assume these to end up cached in the memory + // anyway and thus these db reads to be reasonably fast. + let pov_pruning_time = subsystem.maybe_prune_povs()?; + let chunk_pruning_time = subsystem.maybe_prune_chunks()?; + + let mut pov_pruning_time = pov_pruning_time.fuse(); + let mut chunk_pruning_time = chunk_pruning_time.fuse(); + + select! { + incoming = ctx.recv().fuse() => { + match incoming? { + FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(true), + FromOverseer::Signal(OverseerSignal::ActiveLeaves( + ActiveLeavesUpdate { activated, .. }) + ) => { + for activated in activated.into_iter() { + process_block_activated(ctx, &subsystem.inner, activated).await?; + } + } + FromOverseer::Signal(OverseerSignal::BlockFinalized(hash)) => { + process_block_finalized(subsystem, ctx, &subsystem.inner, hash).await?; + } + FromOverseer::Communication { msg } => { + process_message(subsystem, ctx, msg).await?; + } + } + } + pov_pruning_time = pov_pruning_time => { + subsystem.prune_povs()?; + } + chunk_pruning_time = chunk_pruning_time => { + subsystem.prune_chunks()?; + } + complete => return Ok(true), + } + + Ok(false) } /// As soon as certain block is finalized its pruning records and records of all @@ -647,7 +657,7 @@ where async fn request_candidate_events( ctx: &mut Context, hash: Hash, -) -> Result, NonFatalError> +) -> Result, Error> where Context: SubsystemContext { @@ -673,48 +683,44 @@ where { use AvailabilityStoreMessage::*; - fn log_send_error(request: &'static str) { - log::debug!(target: LOG_TARGET, "error sending a response to {}", request); - } - match msg { QueryAvailableData(hash, tx) => { tx.send(available_data(&subsystem.inner, &hash).map(|d| d.data)) - .unwrap_or_else(|_| log_send_error("QueryAvailableData")); + .map_err(|_| oneshot::Canceled)?; } QueryDataAvailability(hash, tx) => { tx.send(available_data(&subsystem.inner, &hash).is_some()) - .unwrap_or_else(|_| log_send_error("QueryDataAvailability")); + .map_err(|_| oneshot::Canceled)?; } QueryChunk(hash, id, tx) => { tx.send(get_chunk(subsystem, &hash, id)?) - .unwrap_or_else(|_| log_send_error("QueryChunk")); + .map_err(|_| oneshot::Canceled)?; } QueryChunkAvailability(hash, id, tx) => { tx.send(get_chunk(subsystem, &hash, id)?.is_some()) - .unwrap_or_else(|_| log_send_error("QueryChunkAvailability")); + .map_err(|_| oneshot::Canceled)?; } StoreChunk { candidate_hash, relay_parent, validator_index, chunk, tx } => { // Current block number is relay_parent block number + 1. let block_number = get_block_number(ctx, relay_parent).await? + 1; match store_chunk(subsystem, &candidate_hash, validator_index, chunk, block_number) { Err(e) => { - tx.send(Err(())).unwrap_or_else(|_| log_send_error("StoreChunk (Err)")); + tx.send(Err(())).map_err(|_| oneshot::Canceled)?; return Err(e); } Ok(()) => { - tx.send(Ok(())).unwrap_or_else(|_| log_send_error("StoreChunk (Ok)")); + tx.send(Ok(())).map_err(|_| oneshot::Canceled)?; } } } StoreAvailableData(hash, id, n_validators, av_data, tx) => { match store_available_data(subsystem, &hash, id, n_validators, av_data) { Err(e) => { - tx.send(Err(())).unwrap_or_else(|_| log_send_error("StoreAvailableData (Err)")); + tx.send(Err(())).map_err(|_| oneshot::Canceled)?; return Err(e); } Ok(()) => { - tx.send(Ok(())).unwrap_or_else(|_| log_send_error("StoreAvailableData (Ok)")); + tx.send(Ok(())).map_err(|_| oneshot::Canceled)?; } } } @@ -995,7 +1001,7 @@ where { fn start(self, ctx: Context) -> SpawnedSubsystem { let future = run(self, ctx) - .map_err(|e| SubsystemError::with_origin("availability-store", e)) + .map(|_| Ok(())) .boxed(); SpawnedSubsystem { diff --git a/polkadot/node/core/av-store/src/tests.rs b/polkadot/node/core/av-store/src/tests.rs index 6c1a950f06..c2d6c9550a 100644 --- a/polkadot/node/core/av-store/src/tests.rs +++ b/polkadot/node/core/av-store/src/tests.rs @@ -30,7 +30,9 @@ use polkadot_primitives::v1::{ PersistedValidationData, PoV, Id as ParaId, }; use polkadot_node_subsystem_util::TimeoutExt; -use polkadot_subsystem::ActiveLeavesUpdate; +use polkadot_subsystem::{ + ActiveLeavesUpdate, errors::RuntimeApiError, +}; use polkadot_node_subsystem_test_helpers as test_helpers; struct TestHarness { @@ -167,6 +169,51 @@ async fn overseer_signal( .expect(&format!("{:?} is more than enough for sending signals.", TIMEOUT)); } +#[test] +fn runtime_api_error_does_not_stop_the_subsystem() { + let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); + + test_harness(PruningConfig::default(), store, |test_harness| async move { + let TestHarness { mut virtual_overseer } = test_harness; + let new_leaf = Hash::repeat_byte(0x01); + + overseer_signal( + &mut virtual_overseer, + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: smallvec![new_leaf.clone()], + deactivated: smallvec![], + }), + ).await; + + // runtime api call fails + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::CandidateEvents(tx), + )) => { + assert_eq!(relay_parent, new_leaf); + tx.send(Err(RuntimeApiError::from("oh no".to_string()))).unwrap(); + } + ); + + // but that's fine, we're still alive + let (tx, rx) = oneshot::channel(); + let candidate_hash = Hash::repeat_byte(33); + let validator_index = 5; + let query_chunk = AvailabilityStoreMessage::QueryChunk( + candidate_hash, + validator_index, + tx, + ); + + overseer_send(&mut virtual_overseer, query_chunk.into()).await; + + assert!(rx.await.unwrap().is_none()); + + }); +} + #[test] fn store_chunk_works() { let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));