Remove request multiplexer (#3624)

* WIP: Get rid of request multiplexer.

* WIP

* Receiver for handling of incoming requests.

* Get rid of useless `Fault` abstraction.

The things the type system let us do are not worth getting abstracted in
its own type. Instead error handling is going to be merely a pattern.

* Make most things compile again.

* Port availability distribution away from request multiplexer.

* Formatting.

* Port dispute distribution over.

* Fixup statement distribution.

* Handle request directly in collator protocol.

+ Only allow fatal errors at top level.

* Use direct request channel for availability recovery.

* Finally get rid of request multiplexer

Fixes #2842 and paves the way for more back pressure possibilities.

* Fix overseer and statement distribution tests.

* Fix collator protocol and network bridge tests.

* Fix tests in availability recovery.

* Fix availability distribution tests.

* Fix dispute distribution tests.

* Add missing dependency

* Typos.

* Review remarks.

* More remarks.
This commit is contained in:
Robert Klotzner
2021-08-12 13:11:36 +02:00
committed by GitHub
parent ecf71233c3
commit 55154a8d37
51 changed files with 1509 additions and 1746 deletions
@@ -17,35 +17,31 @@
//! Error handling related code and Error/Result definitions.
use polkadot_node_network_protocol::request_response::request::RequestError;
use polkadot_node_network_protocol::request_response::outgoing::RequestError;
use thiserror::Error;
use futures::channel::oneshot;
use polkadot_node_subsystem_util::{runtime, unwrap_non_fatal, Fault};
use polkadot_node_subsystem_util::runtime;
use polkadot_subsystem::SubsystemError;
use crate::LOG_TARGET;
#[derive(Debug, Error)]
#[derive(Debug, Error, derive_more::From)]
#[error(transparent)]
pub struct Error(pub Fault<NonFatal, Fatal>);
impl From<NonFatal> for Error {
fn from(e: NonFatal) -> Self {
Self(Fault::from_non_fatal(e))
}
}
impl From<Fatal> for Error {
fn from(f: Fatal) -> Self {
Self(Fault::from_fatal(f))
}
pub enum Error {
/// All fatal errors.
Fatal(Fatal),
/// All nonfatal/potentially recoverable errors.
NonFatal(NonFatal),
}
impl From<runtime::Error> for Error {
fn from(o: runtime::Error) -> Self {
Self(Fault::from_other(o))
match o {
runtime::Error::Fatal(f) => Self::Fatal(Fatal::Runtime(f)),
runtime::Error::NonFatal(f) => Self::NonFatal(NonFatal::Runtime(f)),
}
}
}
@@ -107,15 +103,23 @@ pub enum NonFatal {
Runtime(#[from] runtime::NonFatal),
}
/// General result type for fatal/nonfatal errors.
pub type Result<T> = std::result::Result<T, Error>;
/// Results which are never fatal.
pub type NonFatalResult<T> = std::result::Result<T, NonFatal>;
/// Utility for eating top level errors and log them.
///
/// We basically always want to try and continue on error. This utility function is meant to
/// consume top-level errors by simply logging them
pub fn log_error(result: Result<()>, ctx: &'static str) -> std::result::Result<(), Fatal> {
if let Some(error) = unwrap_non_fatal(result.map_err(|e| e.0))? {
tracing::warn!(target: LOG_TARGET, error = ?error, ctx);
match result {
Err(Error::Fatal(f)) => Err(f),
Err(Error::NonFatal(error)) => {
tracing::warn!(target: LOG_TARGET, error = ?error, ctx);
Ok(())
},
Ok(()) => Ok(()),
}
Ok(())
}
@@ -18,6 +18,7 @@ use futures::{future::Either, FutureExt, StreamExt, TryFutureExt};
use sp_keystore::SyncCryptoStorePtr;
use polkadot_node_network_protocol::request_response::{v1, IncomingRequestReceiver};
use polkadot_subsystem::{
messages::AvailabilityDistributionMessage, overseer, FromOverseer, OverseerSignal,
SpawnedSubsystem, SubsystemContext, SubsystemError,
@@ -38,7 +39,7 @@ mod pov_requester;
/// Responding to erasure chunk requests:
mod responder;
use responder::{answer_chunk_request_log, answer_pov_request_log};
use responder::{run_chunk_receiver, run_pov_receiver};
mod metrics;
/// Prometheus `Metrics` for availability distribution.
@@ -53,10 +54,20 @@ const LOG_TARGET: &'static str = "parachain::availability-distribution";
pub struct AvailabilityDistributionSubsystem {
/// Easy and efficient runtime access for this subsystem.
runtime: RuntimeInfo,
/// Receivers to receive messages from.
recvs: IncomingRequestReceivers,
/// Prometheus metrics.
metrics: Metrics,
}
/// Receivers to be passed into availability distribution.
pub struct IncomingRequestReceivers {
/// Receiver for incoming PoV requests.
pub pov_req_receiver: IncomingRequestReceiver<v1::PoVFetchingRequest>,
/// Receiver for incoming availability chunk requests.
pub chunk_req_receiver: IncomingRequestReceiver<v1::ChunkFetchingRequest>,
}
impl<Context> overseer::Subsystem<Context, SubsystemError> for AvailabilityDistributionSubsystem
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
@@ -74,18 +85,41 @@ where
impl AvailabilityDistributionSubsystem {
/// Create a new instance of the availability distribution.
pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> Self {
pub fn new(
keystore: SyncCryptoStorePtr,
recvs: IncomingRequestReceivers,
metrics: Metrics,
) -> Self {
let runtime = RuntimeInfo::new(Some(keystore));
Self { runtime, metrics }
Self { runtime, recvs, metrics }
}
/// Start processing work as passed on from the Overseer.
async fn run<Context>(mut self, mut ctx: Context) -> std::result::Result<(), Fatal>
async fn run<Context>(self, mut ctx: Context) -> std::result::Result<(), Fatal>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
Context: overseer::SubsystemContext<Message = AvailabilityDistributionMessage>,
{
let mut requester = Requester::new(self.metrics.clone()).fuse();
let Self { mut runtime, recvs, metrics } = self;
let IncomingRequestReceivers { pov_req_receiver, chunk_req_receiver } = recvs;
let mut requester = Requester::new(metrics.clone()).fuse();
{
let sender = ctx.sender().clone();
ctx.spawn(
"pov-receiver",
run_pov_receiver(sender.clone(), pov_req_receiver, metrics.clone()).boxed(),
)
.map_err(Fatal::SpawnTask)?;
ctx.spawn(
"chunk-receiver",
run_chunk_receiver(sender, chunk_req_receiver, metrics.clone()).boxed(),
)
.map_err(Fatal::SpawnTask)?;
}
loop {
let action = {
let mut subsystem_next = ctx.recv().fuse();
@@ -110,19 +144,13 @@ impl AvailabilityDistributionSubsystem {
log_error(
requester
.get_mut()
.update_fetching_heads(&mut ctx, &mut self.runtime, update)
.update_fetching_heads(&mut ctx, &mut runtime, update)
.await,
"Error in Requester::update_fetching_heads",
)?;
},
FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {},
FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOverseer::Communication {
msg: AvailabilityDistributionMessage::ChunkFetchingRequest(req),
} => answer_chunk_request_log(&mut ctx, req, &self.metrics).await,
FromOverseer::Communication {
msg: AvailabilityDistributionMessage::PoVFetchingRequest(req),
} => answer_pov_request_log(&mut ctx, req, &self.metrics).await,
FromOverseer::Communication {
msg:
AvailabilityDistributionMessage::FetchPoV {
@@ -136,7 +164,7 @@ impl AvailabilityDistributionSubsystem {
log_error(
pov_requester::fetch_pov(
&mut ctx,
&mut self.runtime,
&mut runtime,
relay_parent,
from_validator,
candidate_hash,
@@ -19,7 +19,7 @@
use futures::{channel::oneshot, future::BoxFuture, FutureExt};
use polkadot_node_network_protocol::request_response::{
request::{RequestError, Requests},
outgoing::{RequestError, Requests},
v1::{PoVFetchingRequest, PoVFetchingResponse},
OutgoingRequest, Recipient,
};
@@ -24,7 +24,7 @@ use futures::{
use polkadot_erasure_coding::branch_hash;
use polkadot_node_network_protocol::request_response::{
request::{OutgoingRequest, Recipient, RequestError, Requests},
outgoing::{OutgoingRequest, Recipient, RequestError, Requests},
v1::{ChunkFetchingRequest, ChunkFetchingResponse},
};
use polkadot_node_primitives::ErasureChunk;
@@ -20,28 +20,93 @@ use std::sync::Arc;
use futures::channel::oneshot;
use polkadot_node_network_protocol::request_response::{request::IncomingRequest, v1};
use polkadot_node_network_protocol::{
request_response::{incoming, v1, IncomingRequest, IncomingRequestReceiver},
UnifiedReputationChange as Rep,
};
use polkadot_node_primitives::{AvailableData, ErasureChunk};
use polkadot_primitives::v1::{CandidateHash, ValidatorIndex};
use polkadot_subsystem::{jaeger, messages::AvailabilityStoreMessage, SubsystemContext};
use polkadot_subsystem::{jaeger, messages::AvailabilityStoreMessage, SubsystemSender};
use crate::{
error::{NonFatal, Result},
error::{NonFatal, NonFatalResult, Result},
metrics::{Metrics, FAILED, NOT_FOUND, SUCCEEDED},
LOG_TARGET,
};
const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Received message could not be decoded.");
/// Receiver task to be forked as a separate task to handle PoV requests.
pub async fn run_pov_receiver<Sender>(
mut sender: Sender,
mut receiver: IncomingRequestReceiver<v1::PoVFetchingRequest>,
metrics: Metrics,
) where
Sender: SubsystemSender,
{
loop {
match receiver.recv(|| vec![COST_INVALID_REQUEST]).await {
Ok(msg) => {
answer_pov_request_log(&mut sender, msg, &metrics).await;
},
Err(incoming::Error::Fatal(f)) => {
tracing::debug!(
target: LOG_TARGET,
error = ?f,
"Shutting down POV receiver."
);
return
},
Err(incoming::Error::NonFatal(error)) => {
tracing::debug!(target: LOG_TARGET, ?error, "Error decoding incoming PoV request.");
},
}
}
}
/// Receiver task to be forked as a separate task to handle chunk requests.
pub async fn run_chunk_receiver<Sender>(
mut sender: Sender,
mut receiver: IncomingRequestReceiver<v1::ChunkFetchingRequest>,
metrics: Metrics,
) where
Sender: SubsystemSender,
{
loop {
match receiver.recv(|| vec![COST_INVALID_REQUEST]).await {
Ok(msg) => {
answer_chunk_request_log(&mut sender, msg, &metrics).await;
},
Err(incoming::Error::Fatal(f)) => {
tracing::debug!(
target: LOG_TARGET,
error = ?f,
"Shutting down chunk receiver."
);
return
},
Err(incoming::Error::NonFatal(error)) => {
tracing::debug!(
target: LOG_TARGET,
?error,
"Error decoding incoming chunk request."
);
},
}
}
}
/// Variant of `answer_pov_request` that does Prometheus metric and logging on errors.
///
/// Any errors of `answer_pov_request` will simply be logged.
pub async fn answer_pov_request_log<Context>(
ctx: &mut Context,
pub async fn answer_pov_request_log<Sender>(
sender: &mut Sender,
req: IncomingRequest<v1::PoVFetchingRequest>,
metrics: &Metrics,
) where
Context: SubsystemContext,
Sender: SubsystemSender,
{
let res = answer_pov_request(ctx, req).await;
let res = answer_pov_request(sender, req).await;
match res {
Ok(result) => metrics.on_served_pov(if result { SUCCEEDED } else { NOT_FOUND }),
Err(err) => {
@@ -58,15 +123,15 @@ pub async fn answer_pov_request_log<Context>(
/// Variant of `answer_chunk_request` that does Prometheus metric and logging on errors.
///
/// Any errors of `answer_request` will simply be logged.
pub async fn answer_chunk_request_log<Context>(
ctx: &mut Context,
pub async fn answer_chunk_request_log<Sender>(
sender: &mut Sender,
req: IncomingRequest<v1::ChunkFetchingRequest>,
metrics: &Metrics,
) -> ()
where
Context: SubsystemContext,
Sender: SubsystemSender,
{
let res = answer_chunk_request(ctx, req).await;
let res = answer_chunk_request(sender, req).await;
match res {
Ok(result) => metrics.on_served_chunk(if result { SUCCEEDED } else { NOT_FOUND }),
Err(err) => {
@@ -83,16 +148,16 @@ where
/// Answer an incoming PoV fetch request by querying the av store.
///
/// Returns: `Ok(true)` if chunk was found and served.
pub async fn answer_pov_request<Context>(
ctx: &mut Context,
pub async fn answer_pov_request<Sender>(
sender: &mut Sender,
req: IncomingRequest<v1::PoVFetchingRequest>,
) -> Result<bool>
where
Context: SubsystemContext,
Sender: SubsystemSender,
{
let _span = jaeger::Span::new(req.payload.candidate_hash, "answer-pov-request");
let av_data = query_available_data(ctx, req.payload.candidate_hash).await?;
let av_data = query_available_data(sender, req.payload.candidate_hash).await?;
let result = av_data.is_some();
@@ -111,18 +176,18 @@ where
/// Answer an incoming chunk request by querying the av store.
///
/// Returns: `Ok(true)` if chunk was found and served.
pub async fn answer_chunk_request<Context>(
ctx: &mut Context,
pub async fn answer_chunk_request<Sender>(
sender: &mut Sender,
req: IncomingRequest<v1::ChunkFetchingRequest>,
) -> Result<bool>
where
Context: SubsystemContext,
Sender: SubsystemSender,
{
let span = jaeger::Span::new(req.payload.candidate_hash, "answer-chunk-request");
let _child_span = span.child("answer-chunk-request").with_chunk_index(req.payload.index.0);
let chunk = query_chunk(ctx, req.payload.candidate_hash, req.payload.index).await?;
let chunk = query_chunk(sender, req.payload.candidate_hash, req.payload.index).await?;
let result = chunk.is_some();
@@ -145,16 +210,19 @@ where
}
/// Query chunk from the availability store.
async fn query_chunk<Context>(
ctx: &mut Context,
async fn query_chunk<Sender>(
sender: &mut Sender,
candidate_hash: CandidateHash,
validator_index: ValidatorIndex,
) -> Result<Option<ErasureChunk>>
) -> NonFatalResult<Option<ErasureChunk>>
where
Context: SubsystemContext,
Sender: SubsystemSender,
{
let (tx, rx) = oneshot::channel();
ctx.send_message(AvailabilityStoreMessage::QueryChunk(candidate_hash, validator_index, tx))
sender
.send_message(
AvailabilityStoreMessage::QueryChunk(candidate_hash, validator_index, tx).into(),
)
.await;
let result = rx.await.map_err(|e| {
@@ -171,15 +239,16 @@ where
}
/// Query PoV from the availability store.
async fn query_available_data<Context>(
ctx: &mut Context,
async fn query_available_data<Sender>(
sender: &mut Sender,
candidate_hash: CandidateHash,
) -> Result<Option<AvailableData>>
) -> NonFatalResult<Option<AvailableData>>
where
Context: SubsystemContext,
Sender: SubsystemSender,
{
let (tx, rx) = oneshot::channel();
ctx.send_message(AvailabilityStoreMessage::QueryAvailableData(candidate_hash, tx))
sender
.send_message(AvailabilityStoreMessage::QueryAvailableData(candidate_hash, tx).into())
.await;
let result = rx.await.map_err(|e| NonFatal::QueryAvailableDataResponseChannel(e))?;
@@ -18,6 +18,7 @@ use std::collections::HashSet;
use futures::{executor, future, Future};
use polkadot_node_network_protocol::request_response::IncomingRequest;
use polkadot_primitives::v1::CoreState;
use sp_keystore::SyncCryptoStorePtr;
@@ -41,17 +42,21 @@ fn test_harness<T: Future<Output = ()>>(
let pool = sp_core::testing::TaskExecutor::new();
let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
let subsystem = AvailabilityDistributionSubsystem::new(keystore, Default::default());
{
let subsystem = subsystem.run(context);
let (pov_req_receiver, pov_req_cfg) = IncomingRequest::get_config_receiver();
let (chunk_req_receiver, chunk_req_cfg) = IncomingRequest::get_config_receiver();
let subsystem = AvailabilityDistributionSubsystem::new(
keystore,
IncomingRequestReceivers { pov_req_receiver, chunk_req_receiver },
Default::default(),
);
let subsystem = subsystem.run(context);
let test_fut = test_fx(TestHarness { virtual_overseer, pool });
let test_fut = test_fx(TestHarness { virtual_overseer, pov_req_cfg, chunk_req_cfg, pool });
futures::pin_mut!(test_fut);
futures::pin_mut!(subsystem);
futures::pin_mut!(test_fut);
futures::pin_mut!(subsystem);
executor::block_on(future::join(test_fut, subsystem)).1.unwrap();
}
executor::block_on(future::join(test_fut, subsystem)).1.unwrap();
}
/// Simple basic check, whether the subsystem works as expected.
@@ -30,7 +30,7 @@ use futures::{
use futures_timer::Delay;
use sc_network as network;
use sc_network::{config as netconfig, IfDisconnected};
use sc_network::{config as netconfig, config::RequestResponseConfig, IfDisconnected};
use sp_core::{testing::TaskExecutor, traits::SpawnNamed};
use sp_keystore::SyncCryptoStorePtr;
@@ -59,6 +59,8 @@ use crate::LOG_TARGET;
type VirtualOverseer = test_helpers::TestSubsystemContextHandle<AvailabilityDistributionMessage>;
pub struct TestHarness {
pub virtual_overseer: VirtualOverseer,
pub pov_req_cfg: RequestResponseConfig,
pub chunk_req_cfg: RequestResponseConfig,
pub pool: TaskExecutor,
}
@@ -152,9 +154,7 @@ impl TestState {
/// Run, but fail after some timeout.
pub async fn run(self, harness: TestHarness) {
// Make sure test won't run forever.
let f = self
.run_inner(harness.pool, harness.virtual_overseer)
.timeout(Duration::from_secs(10));
let f = self.run_inner(harness).timeout(Duration::from_secs(10));
assert!(f.await.is_some(), "Test ran into timeout");
}
@@ -166,7 +166,7 @@ impl TestState {
///
/// We try to be as agnostic about details as possible, how the subsystem achieves those goals
/// should not be a matter to this test suite.
async fn run_inner(mut self, executor: TaskExecutor, virtual_overseer: VirtualOverseer) {
async fn run_inner(mut self, mut harness: TestHarness) {
// We skip genesis here (in reality ActiveLeavesUpdate can also skip a block:
let updates = {
let mut advanced = self.relay_chain.iter();
@@ -191,12 +191,12 @@ impl TestState {
// Test will fail if this does not happen until timeout.
let mut remaining_stores = self.valid_chunks.len();
let TestSubsystemContextHandle { tx, mut rx } = virtual_overseer;
let TestSubsystemContextHandle { tx, mut rx } = harness.virtual_overseer;
// Spawning necessary as incoming queue can only hold a single item, we don't want to dead
// lock ;-)
let update_tx = tx.clone();
executor.spawn(
harness.pool.spawn(
"Sending active leaves updates",
async move {
for update in updates {
@@ -219,16 +219,15 @@ impl TestState {
)) => {
for req in reqs {
// Forward requests:
let in_req = to_incoming_req(&executor, req);
executor.spawn(
"Request forwarding",
overseer_send(
tx.clone(),
AvailabilityDistributionMessage::ChunkFetchingRequest(in_req),
)
.boxed(),
);
let in_req = to_incoming_req(&harness.pool, req);
harness
.chunk_req_cfg
.inbound_queue
.as_mut()
.unwrap()
.send(in_req.into_raw())
.await
.unwrap();
}
},
AllMessages::AvailabilityStore(AvailabilityStoreMessage::QueryChunk(
@@ -295,18 +294,6 @@ async fn overseer_signal(
tx.send(FromOverseer::Signal(msg)).await.expect("Test subsystem no longer live");
}
async fn overseer_send(
mut tx: SingleItemSink<FromOverseer<AvailabilityDistributionMessage>>,
msg: impl Into<AvailabilityDistributionMessage>,
) {
let msg = msg.into();
tracing::trace!(target: LOG_TARGET, msg = ?msg, "sending message");
tx.send(FromOverseer::Communication { msg })
.await
.expect("Test subsystem no longer live");
tracing::trace!(target: LOG_TARGET, "sent message");
}
async fn overseer_recv(rx: &mut mpsc::UnboundedReceiver<AllMessages>) -> AllMessages {
tracing::trace!(target: LOG_TARGET, "waiting for message ...");
rx.next().await.expect("Test subsystem no longer live")