Migrate provisioner subsystem (#5568)

This commit is contained in:
Chris Sosnin
2022-05-23 22:43:58 +03:00
committed by GitHub
parent 1ccd68dd15
commit d0453cf24d
4 changed files with 225 additions and 183 deletions
+30 -4
View File
@@ -15,16 +15,18 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
///! Error types for provisioner module
use fatality;
use fatality::Nested;
use futures::channel::{mpsc, oneshot};
use polkadot_node_subsystem::errors::{ChainApiError, RuntimeApiError};
use polkadot_node_subsystem::errors::{ChainApiError, RuntimeApiError, SubsystemError};
use polkadot_node_subsystem_util as util;
use polkadot_primitives::v2::Hash;
use thiserror::Error;
pub type FatalResult<T> = std::result::Result<T, FatalError>;
pub type Result<T> = std::result::Result<T, Error>;
/// Errors in the provisioner.
#[derive(Debug, Error)]
#[allow(missing_docs)]
#[fatality::fatality(splitable)]
pub enum Error {
#[error(transparent)]
Util(#[from] util::Error),
@@ -63,6 +65,13 @@ pub enum Error {
"backed candidate does not correspond to selected candidate; check logic in provisioner"
)]
BackedCandidateOrderingProblem,
#[fatal]
#[error("Failed to spawn background task")]
FailedToSpawnBackgroundTask,
#[error(transparent)]
SubsystemError(#[from] SubsystemError),
}
/// Used by `get_onchain_disputes` to represent errors related to fetching on-chain disputes from the Runtime
@@ -81,3 +90,20 @@ pub enum GetOnchainDisputesError {
)]
NotSupported(#[source] RuntimeApiError, Hash),
}
pub fn log_error(result: Result<()>) -> std::result::Result<(), FatalError> {
match result.into_nested()? {
Ok(()) => Ok(()),
Err(jfyi) => {
jfyi.log();
Ok(())
},
}
}
impl JfyiError {
/// Log a `JfyiError`.
pub fn log(self) {
gum::debug!(target: super::LOG_TARGET, error = ?self);
}
}
+188 -164
View File
@@ -21,10 +21,10 @@
use bitvec::vec::BitVec;
use futures::{
channel::{mpsc, oneshot},
prelude::*,
channel::oneshot, future::BoxFuture, prelude::*, stream::FuturesUnordered, FutureExt,
};
use futures_timer::Delay;
use polkadot_node_primitives::CandidateVotes;
use polkadot_node_subsystem::{
jaeger,
@@ -32,28 +32,23 @@ use polkadot_node_subsystem::{
CandidateBackingMessage, ChainApiMessage, DisputeCoordinatorMessage, ProvisionableData,
ProvisionerInherentData, ProvisionerMessage,
},
overseer, ActivatedLeaf, LeafStatus, PerLeafSpan,
};
use polkadot_node_subsystem_util::{
request_availability_cores, request_persisted_validation_data, JobSender, JobSubsystem,
JobTrait,
overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, LeafStatus, OverseerSignal,
PerLeafSpan, SpawnedSubsystem, SubsystemError,
};
use polkadot_node_subsystem_util::{request_availability_cores, request_persisted_validation_data};
use polkadot_primitives::v2::{
BackedCandidate, BlockNumber, CandidateHash, CandidateReceipt, CoreState, DisputeState,
DisputeStatement, DisputeStatementSet, Hash, MultiDisputeStatementSet, OccupiedCoreAssumption,
SessionIndex, SignedAvailabilityBitfield, ValidatorIndex,
};
use std::{
collections::{BTreeMap, HashMap, HashSet},
pin::Pin,
};
use std::collections::{BTreeMap, HashMap, HashSet};
mod error;
mod metrics;
mod onchain_disputes;
pub use self::metrics::*;
use error::Error;
use error::{Error, FatalResult};
#[cfg(test)]
mod tests;
@@ -63,198 +58,230 @@ const PRE_PROPOSE_TIMEOUT: std::time::Duration = core::time::Duration::from_mill
const LOG_TARGET: &str = "parachain::provisioner";
enum InherentAfter {
Ready,
Wait(Delay),
/// The provisioner subsystem.
pub struct ProvisionerSubsystem {
metrics: Metrics,
}
impl InherentAfter {
fn new_from_now() -> Self {
InherentAfter::Wait(Delay::new(PRE_PROPOSE_TIMEOUT))
}
fn is_ready(&self) -> bool {
match *self {
InherentAfter::Ready => true,
InherentAfter::Wait(_) => false,
}
}
async fn ready(&mut self) {
match *self {
InherentAfter::Ready => {
// Make sure we never end the returned future.
// This is required because the `select!` that calls this future will end in a busy loop.
futures::pending!()
},
InherentAfter::Wait(ref mut d) => {
d.await;
*self = InherentAfter::Ready;
},
}
impl ProvisionerSubsystem {
/// Create a new instance of the `ProvisionerSubsystem`.
pub fn new(metrics: Metrics) -> Self {
Self { metrics }
}
}
/// Provisioner run arguments.
#[derive(Debug, Clone, Copy)]
pub struct ProvisionerConfig;
/// A per-relay-parent job for the provisioning subsystem.
pub struct ProvisionerJob<Sender> {
/// A per-relay-parent state for the provisioning subsystem.
pub struct PerRelayParent {
leaf: ActivatedLeaf,
receiver: mpsc::Receiver<ProvisionerMessage>,
backed_candidates: Vec<CandidateReceipt>,
signed_bitfields: Vec<SignedAvailabilityBitfield>,
metrics: Metrics,
inherent_after: InherentAfter,
is_inherent_ready: bool,
awaiting_inherent: Vec<oneshot::Sender<ProvisionerInherentData>>,
_phantom: std::marker::PhantomData<Sender>,
span: PerLeafSpan,
}
impl<Sender> JobTrait for ProvisionerJob<Sender>
where
Sender: overseer::ProvisionerSenderTrait + std::marker::Unpin,
{
type ToJob = ProvisionerMessage;
type OutgoingMessages = overseer::ProvisionerOutgoingMessages;
type Sender = Sender;
type Error = Error;
type RunArgs = ProvisionerConfig;
type Metrics = Metrics;
impl PerRelayParent {
fn new(leaf: ActivatedLeaf) -> Self {
let span = PerLeafSpan::new(leaf.span.clone(), "provisioner");
const NAME: &'static str = "provisioner-job";
/// Run a job for the parent block indicated
//
// this function is in charge of creating and executing the job's main loop
fn run(
leaf: ActivatedLeaf,
_: Self::RunArgs,
metrics: Self::Metrics,
receiver: mpsc::Receiver<ProvisionerMessage>,
mut sender: JobSender<Sender>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
let span = leaf.span.clone();
async move {
let job = ProvisionerJob::new(leaf, metrics, receiver);
job.run_loop(sender.subsystem_sender(), PerLeafSpan::new(span, "provisioner"))
.await
}
.boxed()
}
}
impl<Sender> ProvisionerJob<Sender>
where
Sender: overseer::ProvisionerSenderTrait,
{
fn new(
leaf: ActivatedLeaf,
metrics: Metrics,
receiver: mpsc::Receiver<ProvisionerMessage>,
) -> Self {
Self {
leaf,
receiver,
backed_candidates: Vec::new(),
signed_bitfields: Vec::new(),
metrics,
inherent_after: InherentAfter::new_from_now(),
is_inherent_ready: false,
awaiting_inherent: Vec::new(),
_phantom: std::marker::PhantomData::<Sender>::default(),
span,
}
}
}
type InherentDelays = FuturesUnordered<BoxFuture<'static, Hash>>;
#[overseer::subsystem(Provisioner, error=SubsystemError, prefix=self::overseer)]
impl<Context> ProvisionerSubsystem {
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = async move {
run(ctx, self.metrics)
.await
.map_err(|e| SubsystemError::with_origin("provisioner", e))
}
.boxed();
SpawnedSubsystem { name: "provisioner-subsystem", future }
}
}
#[overseer::contextbounds(Provisioner, prefix = self::overseer)]
async fn run<Context>(mut ctx: Context, metrics: Metrics) -> FatalResult<()> {
let mut inherent_delays = InherentDelays::new();
let mut per_relay_parent = HashMap::new();
loop {
let result =
run_iteration(&mut ctx, &mut per_relay_parent, &mut inherent_delays, &metrics).await;
match result {
Ok(()) => break,
err => crate::error::log_error(err)?,
}
}
async fn run_loop(mut self, sender: &mut Sender, span: PerLeafSpan) -> Result<(), Error> {
loop {
futures::select! {
msg = self.receiver.next() => match msg {
Some(ProvisionerMessage::RequestInherentData(_, return_sender)) => {
let _span = span.child("req-inherent-data");
let _timer = self.metrics.time_request_inherent_data();
Ok(())
}
if self.inherent_after.is_ready() {
self.send_inherent_data(sender, vec![return_sender]).await;
} else {
self.awaiting_inherent.push(return_sender);
}
}
Some(ProvisionerMessage::ProvisionableData(_, data)) => {
let span = span.child("provisionable-data");
let _timer = self.metrics.time_provisionable_data();
#[overseer::contextbounds(Provisioner, prefix = self::overseer)]
async fn run_iteration<Context>(
ctx: &mut Context,
per_relay_parent: &mut HashMap<Hash, PerRelayParent>,
inherent_delays: &mut InherentDelays,
metrics: &Metrics,
) -> Result<(), Error> {
loop {
futures::select! {
from_overseer = ctx.recv().fuse() => {
match from_overseer? {
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) =>
handle_active_leaves_update(update, per_relay_parent, inherent_delays),
FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {},
FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOrchestra::Communication { msg } => {
handle_communication(ctx, per_relay_parent, msg, metrics).await?;
},
}
},
hash = inherent_delays.select_next_some() => {
if let Some(state) = per_relay_parent.get_mut(&hash) {
state.is_inherent_ready = true;
self.note_provisionable_data(&span, data);
}
None => break,
},
_ = self.inherent_after.ready().fuse() => {
let _span = span.child("send-inherent-data");
let return_senders = std::mem::take(&mut self.awaiting_inherent);
let return_senders = std::mem::take(&mut state.awaiting_inherent);
if !return_senders.is_empty() {
self.send_inherent_data(sender, return_senders).await;
send_inherent_data_bg(ctx, &state, return_senders, metrics.clone()).await?;
}
}
}
}
}
}
Ok(())
fn handle_active_leaves_update(
update: ActiveLeavesUpdate,
per_relay_parent: &mut HashMap<Hash, PerRelayParent>,
inherent_delays: &mut InherentDelays,
) {
for deactivated in &update.deactivated {
per_relay_parent.remove(deactivated);
}
async fn send_inherent_data(
&mut self,
sender: &mut Sender,
return_senders: Vec<oneshot::Sender<ProvisionerInherentData>>,
) {
for leaf in update.activated {
let delay_fut = Delay::new(PRE_PROPOSE_TIMEOUT).map(move |_| leaf.hash).boxed();
per_relay_parent.insert(leaf.hash, PerRelayParent::new(leaf));
inherent_delays.push(delay_fut);
}
}
#[overseer::contextbounds(Provisioner, prefix = self::overseer)]
async fn handle_communication<Context>(
ctx: &mut Context,
per_relay_parent: &mut HashMap<Hash, PerRelayParent>,
message: ProvisionerMessage,
metrics: &Metrics,
) -> Result<(), Error> {
match message {
ProvisionerMessage::RequestInherentData(relay_parent, return_sender) => {
if let Some(state) = per_relay_parent.get_mut(&relay_parent) {
if state.is_inherent_ready {
send_inherent_data_bg(ctx, &state, vec![return_sender], metrics.clone())
.await?;
} else {
state.awaiting_inherent.push(return_sender);
}
}
},
ProvisionerMessage::ProvisionableData(relay_parent, data) => {
if let Some(state) = per_relay_parent.get_mut(&relay_parent) {
let span = state.span.child("provisionable-data");
let _timer = metrics.time_provisionable_data();
note_provisionable_data(state, &span, data);
}
},
}
Ok(())
}
#[overseer::contextbounds(Provisioner, prefix = self::overseer)]
async fn send_inherent_data_bg<Context>(
ctx: &mut Context,
per_relay_parent: &PerRelayParent,
return_senders: Vec<oneshot::Sender<ProvisionerInherentData>>,
metrics: Metrics,
) -> Result<(), Error> {
let leaf = per_relay_parent.leaf.clone();
let signed_bitfields = per_relay_parent.signed_bitfields.clone();
let backed_candidates = per_relay_parent.backed_candidates.clone();
let span = per_relay_parent.span.child("req-inherent-data");
let mut sender = ctx.sender().clone();
let bg = async move {
let _span = span;
let _timer = metrics.time_request_inherent_data();
if let Err(err) = send_inherent_data(
&self.leaf,
&self.signed_bitfields,
&self.backed_candidates,
&leaf,
&signed_bitfields,
&backed_candidates,
return_senders,
sender,
&self.metrics,
&mut sender,
&metrics,
)
.await
{
gum::warn!(target: LOG_TARGET, err = ?err, "failed to assemble or send inherent data");
self.metrics.on_inherent_data_request(Err(()));
metrics.on_inherent_data_request(Err(()));
} else {
self.metrics.on_inherent_data_request(Ok(()));
metrics.on_inherent_data_request(Ok(()));
gum::debug!(
target: LOG_TARGET,
signed_bitfield_count = self.signed_bitfields.len(),
backed_candidates_count = self.backed_candidates.len(),
leaf_hash = ?self.leaf.hash,
signed_bitfield_count = signed_bitfields.len(),
backed_candidates_count = backed_candidates.len(),
leaf_hash = ?leaf.hash,
"inherent data sent successfully"
);
self.metrics.observe_inherent_data_bitfields_count(self.signed_bitfields.len());
metrics.observe_inherent_data_bitfields_count(signed_bitfields.len());
}
}
};
fn note_provisionable_data(
&mut self,
span: &jaeger::Span,
provisionable_data: ProvisionableData,
) {
match provisionable_data {
ProvisionableData::Bitfield(_, signed_bitfield) =>
self.signed_bitfields.push(signed_bitfield),
ProvisionableData::BackedCandidate(backed_candidate) => {
let candidate_hash = backed_candidate.hash();
gum::trace!(
target: LOG_TARGET,
?candidate_hash,
para = ?backed_candidate.descriptor().para_id,
"noted backed candidate",
);
let _span = span
.child("provisionable-backed")
.with_candidate(candidate_hash)
.with_para_id(backed_candidate.descriptor().para_id);
self.backed_candidates.push(backed_candidate)
},
_ => {},
}
ctx.spawn("send-inherent-data", bg.boxed())
.map_err(|_| Error::FailedToSpawnBackgroundTask)?;
Ok(())
}
fn note_provisionable_data(
per_relay_parent: &mut PerRelayParent,
span: &jaeger::Span,
provisionable_data: ProvisionableData,
) {
match provisionable_data {
ProvisionableData::Bitfield(_, signed_bitfield) =>
per_relay_parent.signed_bitfields.push(signed_bitfield),
ProvisionableData::BackedCandidate(backed_candidate) => {
let candidate_hash = backed_candidate.hash();
gum::trace!(
target: LOG_TARGET,
?candidate_hash,
para = ?backed_candidate.descriptor().para_id,
"noted backed candidate",
);
let _span = span
.child("provisionable-backed")
.with_candidate(candidate_hash)
.with_para_id(backed_candidate.descriptor().para_id);
per_relay_parent.backed_candidates.push(backed_candidate)
},
_ => {},
}
}
@@ -807,6 +834,3 @@ async fn select_disputes(
})
.collect())
}
/// The provisioner subsystem.
pub type ProvisionerSubsystem<Spawner, Sender> = JobSubsystem<ProvisionerJob<Sender>, Spawner>;
+3 -1
View File
@@ -197,6 +197,7 @@ mod select_availability_bitfields {
mod common {
use super::super::*;
use futures::channel::mpsc;
use polkadot_node_subsystem::messages::AllMessages;
use polkadot_node_subsystem_test_helpers::TestSubsystemSender;
@@ -225,6 +226,7 @@ mod select_candidates {
scheduled_core,
};
use ::test_helpers::{dummy_candidate_descriptor, dummy_hash};
use futures::channel::mpsc;
use polkadot_node_subsystem::messages::{
AllMessages, RuntimeApiMessage,
RuntimeApiRequest::{
@@ -497,8 +499,8 @@ mod select_candidates {
}
mod select_disputes {
use super::{super::*, common::test_harness};
use futures::channel::mpsc;
use polkadot_node_subsystem::{
messages::{AllMessages, DisputeCoordinatorMessage, RuntimeApiMessage, RuntimeApiRequest},
RuntimeApiError,
+4 -14
View File
@@ -24,18 +24,15 @@ use polkadot_node_core_av_store::Config as AvailabilityConfig;
use polkadot_node_core_candidate_validation::Config as CandidateValidationConfig;
use polkadot_node_core_chain_selection::Config as ChainSelectionConfig;
use polkadot_node_core_dispute_coordinator::Config as DisputeCoordinatorConfig;
use polkadot_node_core_provisioner::ProvisionerConfig;
use polkadot_node_network_protocol::request_response::{v1 as request_v1, IncomingRequestReceiver};
use polkadot_node_subsystem_types::messages::ProvisionerMessage;
#[cfg(any(feature = "malus", test))]
pub use polkadot_overseer::{
dummy::{dummy_overseer_builder, DummySubsystem},
HeadSupportsParachains,
};
use polkadot_overseer::{
gen::SubsystemContext, metrics::Metrics as OverseerMetrics, BlockInfo,
InitializedOverseerBuilder, MetricsTrait, Overseer, OverseerConnector, OverseerHandle,
OverseerSubsystemContext, SpawnGlue,
metrics::Metrics as OverseerMetrics, BlockInfo, InitializedOverseerBuilder, MetricsTrait,
Overseer, OverseerConnector, OverseerHandle, SpawnGlue,
};
use polkadot_primitives::runtime_api::ParachainHost;
@@ -158,10 +155,7 @@ pub fn prepared_overseer_builder<'a, Spawner, RuntimeClient>(
AvailabilityRecoverySubsystem,
BitfieldSigningSubsystem,
BitfieldDistributionSubsystem,
ProvisionerSubsystem<
SpawnGlue<Spawner>,
<OverseerSubsystemContext<ProvisionerMessage> as SubsystemContext>::Sender,
>,
ProvisionerSubsystem,
RuntimeApiSubsystem<RuntimeClient>,
AvailabilityStoreSubsystem,
NetworkBridgeSubsystem<
@@ -249,11 +243,7 @@ where
Box::new(network_service.clone()),
Metrics::register(registry)?,
))
.provisioner(ProvisionerSubsystem::new(
spawner.clone(),
ProvisionerConfig,
Metrics::register(registry)?,
))
.provisioner(ProvisionerSubsystem::new(Metrics::register(registry)?))
.runtime_api(RuntimeApiSubsystem::new(
runtime_client.clone(),
Metrics::register(registry)?,