remove provisioner checks (#4254)

* chore/provisioner: move metrics to a separate module

* avoid the duplicate names

* reduce all checks

* fixup tests

* Update node/core/provisioner/src/lib.rs

Co-authored-by: Zeke Mostov <z.mostov@gmail.com>

* chore: fmt

* chore: spellcheck

* doc

* remove the enum anti-pattern

* guide update - remove all the responsibilities

* add another trivial check

* Update node/core/provisioner/src/metrics.rs

Co-authored-by: Andronik Ordian <write@reusable.software>

* Update roadmap/implementers-guide/src/node/utility/provisioner.md

Co-authored-by: Andronik Ordian <write@reusable.software>

* Update node/core/provisioner/src/metrics.rs

Co-authored-by: Andronik Ordian <write@reusable.software>

Co-authored-by: Zeke Mostov <z.mostov@gmail.com>
Co-authored-by: Andronik Ordian <write@reusable.software>
This commit is contained in:
Bernhard Schuster
2021-11-19 19:15:59 +01:00
committed by GitHub
parent eee4bb2577
commit d5d916a915
8 changed files with 321 additions and 886 deletions
+60 -346
View File
@@ -14,12 +14,11 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! The provisioner is responsible for assembling a relay chain block
//! from a set of available parachain candidates of its choice.
//! The provisioner is responsible for assembling a set of items, from which the
//! runtime will pick a subset and create a relay chain block.
#![deny(missing_docs, unused_crate_dependencies)]
use bitvec::vec::BitVec;
use futures::{
channel::{mpsc, oneshot},
prelude::*,
@@ -29,25 +28,24 @@ use polkadot_node_subsystem::{
errors::{ChainApiError, RuntimeApiError},
jaeger,
messages::{
CandidateBackingMessage, ChainApiMessage, DisputeCoordinatorMessage, ProvisionableData,
CandidateBackingMessage, DisputeCoordinatorMessage, ProvisionableData,
ProvisionerInherentData, ProvisionerMessage,
},
PerLeafSpan, SubsystemSender,
};
use polkadot_node_subsystem_util::{
self as util,
metrics::{self, prometheus},
request_availability_cores, request_persisted_validation_data, JobSender, JobSubsystem,
JobTrait,
};
use polkadot_node_subsystem_util::{self as util, JobSender, JobSubsystem, JobTrait};
use polkadot_primitives::v1::{
BackedCandidate, BlockNumber, CandidateReceipt, CoreState, DisputeStatement,
DisputeStatementSet, Hash, MultiDisputeStatementSet, OccupiedCoreAssumption,
SignedAvailabilityBitfield, ValidatorIndex,
BackedCandidate, CandidateHash, CandidateReceipt, DisputeStatement, DisputeStatementSet, Hash,
Id as ParaId, MultiDisputeStatementSet, SignedAvailabilityBitfield,
SignedAvailabilityBitfields,
};
use std::{collections::BTreeMap, pin::Pin, sync::Arc};
use std::{collections::HashSet, pin::Pin, sync::Arc};
use thiserror::Error;
mod metrics;
pub use self::metrics::*;
#[cfg(test)]
mod tests;
@@ -106,40 +104,17 @@ pub enum Error {
#[error(transparent)]
Util(#[from] util::Error),
#[error("failed to get availability cores")]
CanceledAvailabilityCores(#[source] oneshot::Canceled),
#[error("failed to get persisted validation data")]
CanceledPersistedValidationData(#[source] oneshot::Canceled),
#[error("failed to get block number")]
CanceledBlockNumber(#[source] oneshot::Canceled),
#[error("failed to get backed candidates")]
CanceledBackedCandidates(#[source] oneshot::Canceled),
#[error("failed to get votes on dispute")]
CanceledCandidateVotes(#[source] oneshot::Canceled),
#[error(transparent)]
ChainApi(#[from] ChainApiError),
#[error(transparent)]
Runtime(#[from] RuntimeApiError),
#[error("failed to send message to ChainAPI")]
ChainApiMessageSend(#[source] mpsc::SendError),
#[error("failed to send message to CandidateBacking to get backed candidates")]
GetBackedCandidatesSend(#[source] mpsc::SendError),
#[error("failed to send return message with Inherents")]
InherentDataReturnChannel,
#[error(
"backed candidate does not correspond to selected candidate; check logic in provisioner"
)]
BackedCandidateOrderingProblem,
}
impl JobTrait for ProvisioningJob {
@@ -193,11 +168,10 @@ impl ProvisioningJob {
sender: &mut impl SubsystemSender,
span: PerLeafSpan,
) -> Result<(), Error> {
use ProvisionerMessage::{ProvisionableData, RequestInherentData};
loop {
futures::select! {
msg = self.receiver.next() => match msg {
Some(RequestInherentData(_, return_sender)) => {
Some(ProvisionerMessage::RequestInherentData(_, return_sender)) => {
let _span = span.child("req-inherent-data");
let _timer = self.metrics.time_request_inherent_data();
@@ -207,7 +181,7 @@ impl ProvisioningJob {
self.awaiting_inherent.push(return_sender);
}
}
Some(ProvisionableData(_, data)) => {
Some(ProvisionerMessage::ProvisionableData(_, data)) => {
let span = span.child("provisionable-data");
let _timer = self.metrics.time_provisionable_data();
@@ -235,8 +209,8 @@ impl ProvisioningJob {
) {
if let Err(err) = send_inherent_data(
self.relay_parent,
&self.signed_bitfields,
&self.backed_candidates,
self.signed_bitfields.clone(),
self.backed_candidates.clone(),
return_senders,
sender,
)
@@ -268,46 +242,25 @@ impl ProvisioningJob {
}
}
type CoreAvailability = BitVec<bitvec::order::Lsb0, u8>;
/// The provisioner is the subsystem best suited to choosing which specific
/// backed candidates and availability bitfields should be assembled into the
/// block. To engage this functionality, a
/// `ProvisionerMessage::RequestInherentData` is sent; the response is a set of
/// non-conflicting candidates and the appropriate bitfields. Non-conflicting
/// means that there are never two distinct parachain candidates included for
/// the same parachain and that new parachain candidates cannot be included
/// until the previous one either gets declared available or expired.
///
/// The main complication here is going to be around handling
/// occupied-core-assumptions. We might have candidates that are only
/// includable when some bitfields are included. And we might have candidates
/// that are not includable when certain bitfields are included.
///
/// When we're choosing bitfields to include, the rule should be simple:
/// maximize availability. So basically, include all bitfields. And then
/// choose a coherent set of candidates along with that.
/// The provisioner is the subsystem best suited on the node side,
/// yet it lacks sufficient information to do weight based inherents limiting.
/// This does the minimalistic checks and forwards a most likely
/// too large set of bitfields, candidates, and dispute votes to
/// the runtime. The `fn create_inherent` in the runtime is responsible
/// to use a subset of these.
async fn send_inherent_data(
relay_parent: Hash,
bitfields: &[SignedAvailabilityBitfield],
candidates: &[CandidateReceipt],
bitfields: SignedAvailabilityBitfields,
candidate_receipts: Vec<CandidateReceipt>,
return_senders: Vec<oneshot::Sender<ProvisionerInherentData>>,
from_job: &mut impl SubsystemSender,
) -> Result<(), Error> {
let availability_cores = request_availability_cores(relay_parent, from_job)
.await
.await
.map_err(|err| Error::CanceledAvailabilityCores(err))??;
let backed_candidates =
collect_backed_candidates(candidate_receipts, relay_parent, from_job).await?;
let bitfields = select_availability_bitfields(&availability_cores, bitfields);
let candidates =
select_candidates(&availability_cores, &bitfields, candidates, relay_parent, from_job)
.await?;
let disputes = collect_disputes(from_job).await?;
let disputes = select_disputes(from_job).await?;
let inherent_data =
ProvisionerInherentData { bitfields, backed_candidates: candidates, disputes };
let inherent_data = ProvisionerInherentData { bitfields, backed_candidates, disputes };
for return_sender in return_senders {
return_sender
@@ -318,120 +271,33 @@ async fn send_inherent_data(
Ok(())
}
/// In general, we want to pick all the bitfields. However, we have the following constraints:
///
/// - not more than one per validator
/// - each 1 bit must correspond to an occupied core
///
/// If we have too many, an arbitrary selection policy is fine. For purposes of maximizing availability,
/// we pick the one with the greatest number of 1 bits.
///
/// Note: This does not enforce any sorting precondition on the output; the ordering there will be unrelated
/// to the sorting of the input.
fn select_availability_bitfields(
cores: &[CoreState],
bitfields: &[SignedAvailabilityBitfield],
) -> Vec<SignedAvailabilityBitfield> {
let mut selected: BTreeMap<ValidatorIndex, SignedAvailabilityBitfield> = BTreeMap::new();
'a: for bitfield in bitfields.iter().cloned() {
if bitfield.payload().0.len() != cores.len() {
continue
}
let is_better = selected
.get(&bitfield.validator_index())
.map_or(true, |b| b.payload().0.count_ones() < bitfield.payload().0.count_ones());
if !is_better {
continue
}
for (idx, _) in cores.iter().enumerate().filter(|v| !v.1.is_occupied()) {
// Bit is set for an unoccupied core - invalid
if *bitfield.payload().0.get(idx).as_deref().unwrap_or(&false) {
continue 'a
}
}
let _ = selected.insert(bitfield.validator_index(), bitfield);
}
selected.into_iter().map(|(_, b)| b).collect()
}
/// Determine which cores are free, and then to the degree possible, pick a candidate appropriate to each free core.
async fn select_candidates(
availability_cores: &[CoreState],
bitfields: &[SignedAvailabilityBitfield],
candidates: &[CandidateReceipt],
/// Collect backed candidates with a matching `relay_parent`.
async fn collect_backed_candidates(
candidate_receipts: Vec<CandidateReceipt>,
relay_parent: Hash,
sender: &mut impl SubsystemSender,
) -> Result<Vec<BackedCandidate>, Error> {
let block_number = get_block_number_under_construction(relay_parent, sender).await?;
let mut selected_candidates =
Vec::with_capacity(candidates.len().min(availability_cores.len()));
for (core_idx, core) in availability_cores.iter().enumerate() {
let (scheduled_core, assumption) = match core {
CoreState::Scheduled(scheduled_core) => (scheduled_core, OccupiedCoreAssumption::Free),
CoreState::Occupied(occupied_core) => {
if bitfields_indicate_availability(core_idx, bitfields, &occupied_core.availability)
{
if let Some(ref scheduled_core) = occupied_core.next_up_on_available {
(scheduled_core, OccupiedCoreAssumption::Included)
} else {
continue
}
} else {
if occupied_core.time_out_at != block_number {
continue
}
if let Some(ref scheduled_core) = occupied_core.next_up_on_time_out {
(scheduled_core, OccupiedCoreAssumption::TimedOut)
} else {
continue
}
}
},
CoreState::Free => continue,
};
let validation_data = match request_persisted_validation_data(
relay_parent,
scheduled_core.para_id,
assumption,
sender,
)
.await
.await
.map_err(|err| Error::CanceledPersistedValidationData(err))??
{
Some(v) => v,
None => continue,
};
let computed_validation_data_hash = validation_data.hash();
// we arbitrarily pick the first of the backed candidates which match the appropriate selection criteria
if let Some(candidate) = candidates.iter().find(|backed_candidate| {
let descriptor = &backed_candidate.descriptor;
descriptor.para_id == scheduled_core.para_id &&
descriptor.persisted_validation_data_hash == computed_validation_data_hash
}) {
let candidate_hash = candidate.hash();
tracing::trace!(
target: LOG_TARGET,
"Selecting candidate {}. para_id={} core={}",
candidate_hash,
candidate.descriptor.para_id,
core_idx,
);
selected_candidates.push(candidate_hash);
}
}
let max_one_candidate_per_para = HashSet::<ParaId>::with_capacity(candidate_receipts.len());
let selected_candidates = candidate_receipts
.into_iter()
.filter(|candidate_receipt| {
// assure the follow up query `GetBackedCandidate` succeeds
candidate_receipt.descriptor().relay_parent == relay_parent
})
.scan(max_one_candidate_per_para, |unique, candidate_receipt| {
let para_id = candidate_receipt.descriptor().para_id;
if unique.insert(para_id) {
Some(candidate_receipt.hash())
} else {
tracing::debug!(
target: LOG_TARGET,
?para_id,
"Duplicate candidate detected for para, only submitting one",
);
None
}
})
.collect::<Vec<CandidateHash>>();
// now get the backed candidates corresponding to these candidate receipts
let (tx, rx) = oneshot::channel();
@@ -445,106 +311,18 @@ async fn select_candidates(
.into(),
)
.await;
let mut candidates = rx.await.map_err(|err| Error::CanceledBackedCandidates(err))?;
// `selected_candidates` is generated in ascending order by core index, and `GetBackedCandidates`
// _should_ preserve that property, but let's just make sure.
//
// We can't easily map from `BackedCandidate` to `core_idx`, but we know that every selected candidate
// maps to either 0 or 1 backed candidate, and the hashes correspond. Therefore, by checking them
// in order, we can ensure that the backed candidates are also in order.
let mut backed_idx = 0;
for selected in selected_candidates {
if selected ==
candidates.get(backed_idx).ok_or(Error::BackedCandidateOrderingProblem)?.hash()
{
backed_idx += 1;
}
}
if candidates.len() != backed_idx {
Err(Error::BackedCandidateOrderingProblem)?;
}
// keep only one candidate with validation code.
let mut with_validation_code = false;
candidates.retain(|c| {
if c.candidate.commitments.new_validation_code.is_some() {
if with_validation_code {
return false
}
with_validation_code = true;
}
true
});
let backed_candidates = rx.await.map_err(|err| Error::CanceledBackedCandidates(err))?;
tracing::debug!(
target: LOG_TARGET,
"Selected {} candidates for {} cores",
candidates.len(),
availability_cores.len(),
"Selected {} backed candidates ready to be sanitized by the runtime",
backed_candidates.len(),
);
Ok(candidates)
Ok(backed_candidates)
}
/// Produces a block number 1 higher than that of the relay parent
/// in the event of an invalid `relay_parent`, returns `Ok(0)`
async fn get_block_number_under_construction(
relay_parent: Hash,
sender: &mut impl SubsystemSender,
) -> Result<BlockNumber, Error> {
let (tx, rx) = oneshot::channel();
sender.send_message(ChainApiMessage::BlockNumber(relay_parent, tx).into()).await;
match rx.await.map_err(|err| Error::CanceledBlockNumber(err))? {
Ok(Some(n)) => Ok(n + 1),
Ok(None) => Ok(0),
Err(err) => Err(err.into()),
}
}
/// The availability bitfield for a given core is the transpose
/// of a set of signed availability bitfields. It goes like this:
///
/// - construct a transverse slice along `core_idx`
/// - bitwise-or it with the availability slice
/// - count the 1 bits, compare to the total length; true on 2/3+
fn bitfields_indicate_availability(
core_idx: usize,
bitfields: &[SignedAvailabilityBitfield],
availability: &CoreAvailability,
) -> bool {
let mut availability = availability.clone();
let availability_len = availability.len();
for bitfield in bitfields {
let validator_idx = bitfield.validator_index().0 as usize;
match availability.get_mut(validator_idx) {
None => {
// in principle, this function might return a `Result<bool, Error>` so that we can more clearly express this error condition
// however, in practice, that would just push off an error-handling routine which would look a whole lot like this one.
// simpler to just handle the error internally here.
tracing::warn!(
target: LOG_TARGET,
validator_idx = %validator_idx,
availability_len = %availability_len,
"attempted to set a transverse bit at idx {} which is greater than bitfield size {}",
validator_idx,
availability_len,
);
return false
},
Some(mut bit_mut) => *bit_mut |= bitfield.payload().0[core_idx],
}
}
3 * availability.count_ones() >= 2 * availability.len()
}
async fn select_disputes(
async fn collect_disputes(
sender: &mut impl SubsystemSender,
) -> Result<MultiDisputeStatementSet, Error> {
let (tx, rx) = oneshot::channel();
@@ -560,6 +338,8 @@ async fn select_disputes(
// 2. Disputes are expected to be rare because they come with heavy slashing.
sender.send_message(DisputeCoordinatorMessage::RecentDisputes(tx).into()).await;
// TODO: scrape concluded disputes from on-chain to limit the number of disputes
// TODO: <https://github.com/paritytech/polkadot/issues/4329>
let recent_disputes = match rx.await {
Ok(r) => r,
Err(oneshot::Canceled) => {
@@ -614,71 +394,5 @@ async fn select_disputes(
.collect())
}
#[derive(Clone)]
struct MetricsInner {
inherent_data_requests: prometheus::CounterVec<prometheus::U64>,
request_inherent_data: prometheus::Histogram,
provisionable_data: prometheus::Histogram,
}
/// Provisioner metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_inherent_data_request(&self, response: Result<(), ()>) {
if let Some(metrics) = &self.0 {
match response {
Ok(()) => metrics.inherent_data_requests.with_label_values(&["succeeded"]).inc(),
Err(()) => metrics.inherent_data_requests.with_label_values(&["failed"]).inc(),
}
}
}
/// Provide a timer for `request_inherent_data` which observes on drop.
fn time_request_inherent_data(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.request_inherent_data.start_timer())
}
/// Provide a timer for `provisionable_data` which observes on drop.
fn time_provisionable_data(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.provisionable_data.start_timer())
}
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
inherent_data_requests: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"parachain_inherent_data_requests_total",
"Number of InherentData requests served by provisioner.",
),
&["success"],
)?,
registry,
)?,
request_inherent_data: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"parachain_provisioner_request_inherent_data",
"Time spent within `provisioner::request_inherent_data`",
))?,
registry,
)?,
provisionable_data: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"parachain_provisioner_provisionable_data",
"Time spent within `provisioner::provisionable_data`",
))?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
/// The provisioning subsystem.
pub type ProvisioningSubsystem<Spawner> = JobSubsystem<ProvisioningJob, Spawner>;
pub type ProvisionerSubsystem<Spawner> = JobSubsystem<ProvisioningJob, Spawner>;