mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-09 20:11:09 +00:00
Cadidate selection check assignment (#2042)
* Cadidate selection check assignment * Apply suggestions from code review Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> Co-authored-by: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com> * Review fixes * Punish collator for wrong announcements * Update node/core/candidate-selection/src/lib.rs Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> Co-authored-by: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>
This commit is contained in:
Generated
+1
@@ -4960,6 +4960,7 @@ dependencies = [
|
||||
"polkadot-node-subsystem-util",
|
||||
"polkadot-primitives",
|
||||
"sp-core",
|
||||
"sp-keystore",
|
||||
"thiserror",
|
||||
"tracing",
|
||||
"tracing-futures",
|
||||
|
||||
@@ -9,6 +9,9 @@ futures = "0.3.8"
|
||||
tracing = "0.1.22"
|
||||
tracing-futures = "0.2.4"
|
||||
thiserror = "1.0.22"
|
||||
|
||||
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
|
||||
polkadot-primitives = { path = "../../../primitives" }
|
||||
polkadot-node-subsystem = { path = "../../subsystem" }
|
||||
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
|
||||
|
||||
@@ -23,22 +23,28 @@ use futures::{
|
||||
channel::{mpsc, oneshot},
|
||||
prelude::*,
|
||||
};
|
||||
use sp_keystore::SyncCryptoStorePtr;
|
||||
use polkadot_node_subsystem::{
|
||||
errors::ChainApiError,
|
||||
messages::{
|
||||
AllMessages, CandidateBackingMessage, CandidateSelectionMessage, CollatorProtocolMessage,
|
||||
RuntimeApiRequest,
|
||||
},
|
||||
};
|
||||
use polkadot_node_subsystem_util::{
|
||||
self as util, delegated_subsystem, JobTrait, FromJobCommand, metrics::{self, prometheus},
|
||||
self as util, request_from_runtime, request_validator_groups, delegated_subsystem,
|
||||
JobTrait, FromJobCommand, Validator, metrics::{self, prometheus},
|
||||
};
|
||||
use polkadot_primitives::v1::{
|
||||
CandidateReceipt, CollatorId, CoreState, CoreIndex, Hash, Id as ParaId, PoV,
|
||||
};
|
||||
use polkadot_primitives::v1::{CandidateReceipt, CollatorId, Hash, Id as ParaId, PoV};
|
||||
use std::pin::Pin;
|
||||
use thiserror::Error;
|
||||
|
||||
const LOG_TARGET: &'static str = "candidate_selection";
|
||||
|
||||
struct CandidateSelectionJob {
|
||||
assignment: ParaId,
|
||||
sender: mpsc::Sender<FromJobCommand>,
|
||||
receiver: mpsc::Receiver<CandidateSelectionMessage>,
|
||||
metrics: Metrics,
|
||||
@@ -57,30 +63,92 @@ enum Error {
|
||||
ChainApi(#[from] ChainApiError),
|
||||
}
|
||||
|
||||
macro_rules! try_runtime_api {
|
||||
($x: expr) => {
|
||||
match $x {
|
||||
Ok(x) => x,
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
err = ?e,
|
||||
"Failed to fetch runtime API data for job",
|
||||
);
|
||||
|
||||
// We can't do candidate selection work if we don't have the
|
||||
// requisite runtime API data. But these errors should not take
|
||||
// down the node.
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl JobTrait for CandidateSelectionJob {
|
||||
type ToJob = CandidateSelectionMessage;
|
||||
type Error = Error;
|
||||
type RunArgs = ();
|
||||
type RunArgs = SyncCryptoStorePtr;
|
||||
type Metrics = Metrics;
|
||||
|
||||
const NAME: &'static str = "CandidateSelectionJob";
|
||||
|
||||
#[tracing::instrument(skip(_relay_parent, _run_args, metrics, receiver, sender), fields(subsystem = LOG_TARGET))]
|
||||
#[tracing::instrument(skip(keystore, metrics, receiver, sender), fields(subsystem = LOG_TARGET))]
|
||||
fn run(
|
||||
_relay_parent: Hash,
|
||||
_run_args: Self::RunArgs,
|
||||
relay_parent: Hash,
|
||||
keystore: Self::RunArgs,
|
||||
metrics: Self::Metrics,
|
||||
receiver: mpsc::Receiver<CandidateSelectionMessage>,
|
||||
sender: mpsc::Sender<FromJobCommand>,
|
||||
mut sender: mpsc::Sender<FromJobCommand>,
|
||||
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
|
||||
async move {
|
||||
CandidateSelectionJob::new(metrics, sender, receiver).run_loop().await
|
||||
let (groups, cores) = futures::try_join!(
|
||||
try_runtime_api!(request_validator_groups(relay_parent, &mut sender).await),
|
||||
try_runtime_api!(request_from_runtime(
|
||||
relay_parent,
|
||||
&mut sender,
|
||||
|tx| RuntimeApiRequest::AvailabilityCores(tx),
|
||||
).await),
|
||||
)?;
|
||||
|
||||
let (validator_groups, group_rotation_info) = try_runtime_api!(groups);
|
||||
let cores = try_runtime_api!(cores);
|
||||
|
||||
let n_cores = cores.len();
|
||||
|
||||
let validator = match Validator::new(relay_parent, keystore.clone(), sender.clone()).await {
|
||||
Ok(validator) => validator,
|
||||
Err(util::Error::NotAValidator) => return Ok(()),
|
||||
Err(err) => return Err(Error::Util(err)),
|
||||
};
|
||||
|
||||
let mut assignment = None;
|
||||
|
||||
for (idx, core) in cores.into_iter().enumerate() {
|
||||
// Ignore prospective assignments on occupied cores for the time being.
|
||||
if let CoreState::Scheduled(scheduled) = core {
|
||||
let core_index = CoreIndex(idx as _);
|
||||
let group_index = group_rotation_info.group_for_core(core_index, n_cores);
|
||||
if let Some(g) = validator_groups.get(group_index.0 as usize) {
|
||||
if g.contains(&validator.index()) {
|
||||
assignment = Some(scheduled.para_id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let assignment = match assignment {
|
||||
Some(assignment) => assignment,
|
||||
None => return Ok(()),
|
||||
};
|
||||
|
||||
CandidateSelectionJob::new(assignment, metrics, sender, receiver).run_loop().await
|
||||
}.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
impl CandidateSelectionJob {
|
||||
pub fn new(
|
||||
assignment: ParaId,
|
||||
metrics: Metrics,
|
||||
sender: mpsc::Sender<FromJobCommand>,
|
||||
receiver: mpsc::Receiver<CandidateSelectionMessage>,
|
||||
@@ -89,6 +157,7 @@ impl CandidateSelectionJob {
|
||||
sender,
|
||||
receiver,
|
||||
metrics,
|
||||
assignment,
|
||||
seconded_candidate: None,
|
||||
}
|
||||
}
|
||||
@@ -128,6 +197,23 @@ impl CandidateSelectionJob {
|
||||
) {
|
||||
let _timer = self.metrics.time_handle_collation();
|
||||
|
||||
if self.assignment != para_id {
|
||||
tracing::info!(
|
||||
target: LOG_TARGET,
|
||||
"Collator {:?} sent a collation outside of our assignment {:?}",
|
||||
collator_id,
|
||||
para_id,
|
||||
);
|
||||
if let Err(err) = forward_invalidity_note(&collator_id, &mut self.sender).await {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
err = ?err,
|
||||
"failed to forward invalidity note",
|
||||
);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if self.seconded_candidate.is_none() {
|
||||
let (candidate_receipt, pov) =
|
||||
match get_collation(
|
||||
@@ -342,7 +428,7 @@ impl metrics::Metrics for Metrics {
|
||||
}
|
||||
}
|
||||
|
||||
delegated_subsystem!(CandidateSelectionJob((), Metrics) <- CandidateSelectionMessage as CandidateSelectionSubsystem);
|
||||
delegated_subsystem!(CandidateSelectionJob(SyncCryptoStorePtr, Metrics) <- CandidateSelectionMessage as CandidateSelectionSubsystem);
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
@@ -365,6 +451,7 @@ mod tests {
|
||||
let (to_job_tx, to_job_rx) = mpsc::channel(0);
|
||||
let (from_job_tx, from_job_rx) = mpsc::channel(0);
|
||||
let mut job = CandidateSelectionJob {
|
||||
assignment: 123.into(),
|
||||
sender: from_job_tx,
|
||||
receiver: to_job_rx,
|
||||
metrics: Default::default(),
|
||||
|
||||
@@ -367,7 +367,7 @@ where
|
||||
),
|
||||
candidate_selection: CandidateSelectionSubsystem::new(
|
||||
spawner.clone(),
|
||||
(),
|
||||
keystore.clone(),
|
||||
Metrics::register(registry)?,
|
||||
),
|
||||
candidate_validation: CandidateValidationSubsystem::new(
|
||||
|
||||
Reference in New Issue
Block a user