Send statements to own backing group first (#2927)

* Factor out runtime module into utils.

* First fatal error design.

* Better error handling infra.

* Error handling cleanup.

* Send to peers of our group first.

* Finish backing group prioritization.

* Little cleanup.

* More cleanup.

* Forgot to checkin error.rs.

* Notes.

* Runtime -> RuntimeInfo

* qed in debug assert.

* PolkaErr -> Fault.
This commit is contained in:
Robert Klotzner
2021-04-27 21:47:32 +02:00
committed by GitHub
parent 36bd876311
commit c86a774b9d
17 changed files with 1031 additions and 280 deletions
@@ -23,29 +23,66 @@ use thiserror::Error;
use futures::channel::oneshot;
use polkadot_node_subsystem_util::{
runtime,
Error as UtilError,
};
use polkadot_node_subsystem_util::{Fault, Error as UtilError, runtime, unwrap_non_fatal};
use polkadot_subsystem::{errors::RuntimeApiError, SubsystemError};
use crate::LOG_TARGET;
/// Errors of this subsystem.
#[derive(Debug, Error)]
pub enum Error {
#[error("Response channel to obtain chunk failed")]
QueryChunkResponseChannel(#[source] oneshot::Canceled),
#[error(transparent)]
pub struct Error(pub Fault<NonFatal, Fatal>);
#[error("Response channel to obtain available data failed")]
QueryAvailableDataResponseChannel(#[source] oneshot::Canceled),
impl From<NonFatal> for Error {
fn from(e: NonFatal) -> Self {
Self(Fault::from_non_fatal(e))
}
}
impl From<Fatal> for Error {
fn from(f: Fatal) -> Self {
Self(Fault::from_fatal(f))
}
}
impl From<runtime::Error> for Error {
fn from(o: runtime::Error) -> Self {
Self(Fault::from_other(o))
}
}
/// Fatal errors of this subsystem.
#[derive(Debug, Error)]
pub enum Fatal {
/// Spawning a running task failed.
#[error("Spawning subsystem task failed")]
SpawnTask(#[source] SubsystemError),
/// Runtime API subsystem is down, which means we're shutting down.
#[error("Runtime request canceled")]
RuntimeRequestCanceled(oneshot::Canceled),
/// Requester stream exhausted.
#[error("Erasure chunk requester stream exhausted")]
RequesterExhausted,
#[error("Receive channel closed")]
IncomingMessageChannel(#[source] SubsystemError),
/// Spawning a running task failed.
#[error("Spawning subsystem task failed")]
SpawnTask(#[source] SubsystemError),
/// Errors coming from runtime::Runtime.
#[error("Error while accessing runtime information")]
Runtime(#[from] #[source] runtime::Fatal),
}
/// Non fatal errors of this subsystem.
#[derive(Debug, Error)]
pub enum NonFatal {
/// av-store will drop the sender on any error that happens.
#[error("Response channel to obtain chunk failed")]
QueryChunkResponseChannel(#[source] oneshot::Canceled),
/// av-store will drop the sender on any error that happens.
#[error("Response channel to obtain available data failed")]
QueryAvailableDataResponseChannel(#[source] oneshot::Canceled),
/// We tried accessing a session that was not cached.
#[error("Session is not cached.")]
@@ -55,11 +92,7 @@ pub enum Error {
#[error("Not a validator.")]
NotAValidator,
/// Requester stream exhausted.
#[error("Erasure chunk requester stream exhausted")]
RequesterExhausted,
/// Sending response failed.
/// Sending request response failed (Can happen on timeouts for example).
#[error("Sending a request's response failed.")]
SendResponse,
@@ -68,10 +101,6 @@ pub enum Error {
#[error("Utility request failed")]
UtilRequest(UtilError),
/// Runtime API subsystem is down, which means we're shutting down.
#[error("Runtime request canceled")]
RuntimeRequestCanceled(oneshot::Canceled),
/// Some request to the runtime failed.
/// For example if we prune a block we're requesting info about.
#[error("Runtime API error")]
@@ -98,39 +127,30 @@ pub enum Error {
/// Errors coming from runtime::Runtime.
#[error("Error while accessing runtime information")]
Runtime(#[source] runtime::Error),
Runtime(#[from] #[source] runtime::NonFatal),
}
pub type Result<T> = std::result::Result<T, Error>;
impl From<runtime::Error> for Error {
fn from(err: runtime::Error) -> Self {
Self::Runtime(err)
}
}
impl From<SubsystemError> for Error {
fn from(err: SubsystemError) -> Self {
Self::IncomingMessageChannel(err)
}
}
/// Receive a response from a runtime request and convert errors.
pub(crate) async fn recv_runtime<V>(
r: oneshot::Receiver<std::result::Result<V, RuntimeApiError>>,
) -> std::result::Result<V, Error> {
r.await
.map_err(Error::RuntimeRequestCanceled)?
.map_err(Error::RuntimeRequest)
}
/// Utility for eating top level errors and log them.
///
/// We basically always want to try and continue on error. This utility function is meant to
/// consume top-level errors by simply logging them
pub fn log_error(result: Result<()>, ctx: &'static str) {
if let Err(error) = result {
pub fn log_error(result: Result<()>, ctx: &'static str)
-> std::result::Result<(), Fatal>
{
if let Some(error) = unwrap_non_fatal(result.map_err(|e| e.0))? {
tracing::warn!(target: LOG_TARGET, error = ?error, ctx);
}
Ok(())
}
/// Receive a response from a runtime request and convert errors.
pub(crate) async fn recv_runtime<V>(
r: oneshot::Receiver<std::result::Result<V, RuntimeApiError>>,
) -> Result<V> {
let result = r.await
.map_err(Fatal::RuntimeRequestCanceled)?
.map_err(NonFatal::RuntimeRequest)?;
Ok(result)
}
@@ -25,10 +25,10 @@ use polkadot_subsystem::{
/// Error and [`Result`] type for this subsystem.
mod error;
pub use error::Error;
pub use error::{Fatal, NonFatal};
use error::{Result, log_error};
use polkadot_node_subsystem_util::runtime::Runtime;
use polkadot_node_subsystem_util::runtime::RuntimeInfo;
/// `Requester` taking care of requesting chunks for candidates pending availability.
mod requester;
@@ -59,7 +59,7 @@ pub struct AvailabilityDistributionSubsystem {
/// Pointer to a keystore, which is required for determining this nodes validator index.
keystore: SyncCryptoStorePtr,
/// Easy and efficient runtime access for this subsystem.
runtime: Runtime,
runtime: RuntimeInfo,
/// Prometheus metrics.
metrics: Metrics,
}
@@ -85,12 +85,12 @@ impl AvailabilityDistributionSubsystem {
/// Create a new instance of the availability distribution.
pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> Self {
let runtime = Runtime::new(keystore.clone());
let runtime = RuntimeInfo::new(keystore.clone());
Self { keystore, runtime, metrics }
}
/// Start processing work as passed on from the Overseer.
async fn run<Context>(mut self, mut ctx: Context) -> Result<()>
async fn run<Context>(mut self, mut ctx: Context) -> std::result::Result<(), Fatal>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage> + Sync + Send,
{
@@ -108,10 +108,10 @@ impl AvailabilityDistributionSubsystem {
// Handle task messages sending:
let message = match action {
Either::Left(subsystem_msg) => {
subsystem_msg.map_err(|e| Error::IncomingMessageChannel(e))?
subsystem_msg.map_err(|e| Fatal::IncomingMessageChannel(e))?
}
Either::Right(from_task) => {
let from_task = from_task.ok_or(Error::RequesterExhausted)?;
let from_task = from_task.ok_or(Fatal::RequesterExhausted)?;
ctx.send_message(from_task).await;
continue;
}
@@ -133,7 +133,7 @@ impl AvailabilityDistributionSubsystem {
log_error(
requester.get_mut().update_fetching_heads(&mut ctx, update).await,
"Error in Requester::update_fetching_heads"
);
)?;
}
FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {}
FromOverseer::Signal(OverseerSignal::Conclude) => {
@@ -169,7 +169,7 @@ impl AvailabilityDistributionSubsystem {
tx,
).await,
"PoVRequester::fetch_pov"
);
)?;
}
}
}
@@ -33,9 +33,10 @@ use polkadot_subsystem::{
ActiveLeavesUpdate, SubsystemContext, ActivatedLeaf,
messages::{AllMessages, NetworkBridgeMessage, IfDisconnected}
};
use polkadot_node_subsystem_util::runtime::{Runtime, ValidatorInfo};
use polkadot_node_subsystem_util::runtime::{RuntimeInfo, ValidatorInfo};
use crate::error::{Error, log_error};
use crate::error::{Fatal, NonFatal};
use crate::LOG_TARGET;
/// Number of sessions we want to keep in the LRU.
const NUM_SESSIONS: usize = 2;
@@ -63,7 +64,7 @@ impl PoVRequester {
pub async fn update_connected_validators<Context>(
&mut self,
ctx: &mut Context,
runtime: &mut Runtime,
runtime: &mut RuntimeInfo,
update: &ActiveLeavesUpdate,
) -> super::Result<()>
where
@@ -87,7 +88,7 @@ impl PoVRequester {
pub async fn fetch_pov<Context>(
&self,
ctx: &mut Context,
runtime: &mut Runtime,
runtime: &mut RuntimeInfo,
parent: Hash,
from_validator: ValidatorIndex,
candidate_hash: CandidateHash,
@@ -99,7 +100,7 @@ impl PoVRequester {
{
let info = &runtime.get_session_info(ctx, parent).await?.session_info;
let authority_id = info.discovery_keys.get(from_validator.0 as usize)
.ok_or(Error::InvalidValidatorIndex)?
.ok_or(NonFatal::InvalidValidatorIndex)?
.clone();
let (req, pending_response) = OutgoingRequest::new(
Recipient::Authority(authority_id),
@@ -125,7 +126,8 @@ impl PoVRequester {
.with_relay_parent(parent);
ctx.spawn("pov-fetcher", fetch_pov_job(pov_hash, pending_response.boxed(), span, tx).boxed())
.await
.map_err(|e| Error::SpawnTask(e))
.map_err(|e| Fatal::SpawnTask(e))?;
Ok(())
}
}
@@ -136,10 +138,13 @@ async fn fetch_pov_job(
span: jaeger::Span,
tx: oneshot::Sender<PoV>,
) {
log_error(
do_fetch_pov(pov_hash, pending_response, span, tx).await,
"fetch_pov_job",
)
if let Err(err) = do_fetch_pov(pov_hash, pending_response, span, tx).await {
tracing::warn!(
target: LOG_TARGET,
?err,
"fetch_pov_job"
);
}
}
/// Do the actual work of waiting for the response.
@@ -149,24 +154,24 @@ async fn do_fetch_pov(
_span: jaeger::Span,
tx: oneshot::Sender<PoV>,
)
-> super::Result<()>
-> std::result::Result<(), NonFatal>
{
let response = pending_response.await.map_err(Error::FetchPoV)?;
let response = pending_response.await.map_err(NonFatal::FetchPoV)?;
let pov = match response {
PoVFetchingResponse::PoV(pov) => pov,
PoVFetchingResponse::NoSuchPoV => {
return Err(Error::NoSuchPoV)
return Err(NonFatal::NoSuchPoV)
}
};
if pov.hash() == pov_hash {
tx.send(pov).map_err(|_| Error::SendResponse)
tx.send(pov).map_err(|_| NonFatal::SendResponse)
} else {
Err(Error::UnexpectedPoV)
Err(NonFatal::UnexpectedPoV)
}
}
/// Get the session indeces for the given relay chain parents.
async fn get_activated_sessions<Context>(ctx: &mut Context, runtime: &mut Runtime, new_heads: impl Iterator<Item = &Hash>)
async fn get_activated_sessions<Context>(ctx: &mut Context, runtime: &mut RuntimeInfo, new_heads: impl Iterator<Item = &Hash>)
-> super::Result<impl Iterator<Item = (Hash, SessionIndex)>>
where
Context: SubsystemContext,
@@ -181,7 +186,7 @@ where
/// Connect to validators of our validator group.
async fn connect_to_relevant_validators<Context>(
ctx: &mut Context,
runtime: &mut Runtime,
runtime: &mut RuntimeInfo,
parent: Hash,
session: SessionIndex
)
@@ -206,7 +211,7 @@ where
/// Return: `None` if not a validator.
async fn determine_relevant_validators<Context>(
ctx: &mut Context,
runtime: &mut Runtime,
runtime: &mut RuntimeInfo,
parent: Hash,
session: SessionIndex,
)
@@ -275,7 +280,7 @@ mod tests {
let (mut context, mut virtual_overseer) =
test_helpers::make_subsystem_context::<AvailabilityDistributionMessage, TaskExecutor>(pool.clone());
let keystore = make_ferdie_keystore();
let mut runtime = polkadot_node_subsystem_util::runtime::Runtime::new(keystore);
let mut runtime = polkadot_node_subsystem_util::runtime::RuntimeInfo::new(keystore);
let (tx, rx) = oneshot::channel();
let testee = async {
@@ -34,7 +34,7 @@ use polkadot_subsystem::messages::{
use polkadot_subsystem::{SubsystemContext, jaeger};
use crate::{
error::{Error, Result},
error::{Fatal, Result},
session_cache::{BadValidators, SessionInfo},
LOG_TARGET,
metrics::{Metrics, SUCCEEDED, FAILED},
@@ -191,7 +191,7 @@ impl FetchTask {
ctx.spawn("chunk-fetcher", running.run(kill).boxed())
.await
.map_err(|e| Error::SpawnTask(e))?;
.map_err(|e| Fatal::SpawnTask(e))?;
Ok(FetchTask {
live_in,
@@ -28,7 +28,7 @@ use polkadot_subsystem::{
SubsystemContext, jaeger,
};
use crate::error::{Error, Result};
use crate::error::{NonFatal, Result};
use crate::{LOG_TARGET, metrics::{Metrics, SUCCEEDED, FAILED, NOT_FOUND}};
/// Variant of `answer_pov_request` that does Prometheus metric and logging on errors.
@@ -107,7 +107,7 @@ where
}
};
req.send_response(response).map_err(|_| Error::SendResponse)?;
req.send_response(response).map_err(|_| NonFatal::SendResponse)?;
Ok(result)
}
@@ -144,7 +144,7 @@ where
Some(chunk) => v1::ChunkFetchingResponse::Chunk(chunk.into()),
};
req.send_response(response).map_err(|_| Error::SendResponse)?;
req.send_response(response).map_err(|_| NonFatal::SendResponse)?;
Ok(result)
}
@@ -164,7 +164,7 @@ where
))
.await;
rx.await.map_err(|e| {
let result = rx.await.map_err(|e| {
tracing::trace!(
target: LOG_TARGET,
?validator_index,
@@ -172,8 +172,9 @@ where
error = ?e,
"Error retrieving chunk",
);
Error::QueryChunkResponseChannel(e)
})
NonFatal::QueryChunkResponseChannel(e)
})?;
Ok(result)
}
/// Query PoV from the availability store.
@@ -191,5 +192,6 @@ where
))
.await;
rx.await.map_err(|e| Error::QueryAvailableDataResponseChannel(e))
let result = rx.await.map_err(|e| NonFatal::QueryAvailableDataResponseChannel(e))?;
Ok(result)
}
@@ -33,7 +33,7 @@ use polkadot_primitives::v1::{
use polkadot_subsystem::SubsystemContext;
use super::{
error::{recv_runtime, Error},
error::{recv_runtime, Error, NonFatal},
LOG_TARGET,
};
@@ -189,9 +189,9 @@ impl SessionCache {
let session = self
.session_info_cache
.get_mut(&report.session_index)
.ok_or(Error::NoSuchCachedSession)?
.ok_or(NonFatal::NoSuchCachedSession)?
.as_mut()
.ok_or(Error::NotAValidator)?;
.ok_or(NonFatal::NotAValidator)?;
let group = session
.validator_groups
.get_mut(report.group_index.0 as usize)
@@ -231,7 +231,7 @@ impl SessionCache {
..
} = recv_runtime(request_session_info(parent, session_index, ctx.sender()).await)
.await?
.ok_or(Error::NoSuchSession(session_index))?;
.ok_or(NonFatal::NoSuchSession(session_index))?;
if let Some(our_index) = self.get_our_index(validators).await {
// Get our group index: