diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 3ee8796871..f732d1e1ff 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -4960,6 +4960,7 @@ dependencies = [ "polkadot-node-subsystem-util", "polkadot-primitives", "sp-core", + "sp-keystore", "thiserror", "tracing", "tracing-futures", diff --git a/polkadot/node/core/candidate-selection/Cargo.toml b/polkadot/node/core/candidate-selection/Cargo.toml index 3d988e21e0..c62cd0b423 100644 --- a/polkadot/node/core/candidate-selection/Cargo.toml +++ b/polkadot/node/core/candidate-selection/Cargo.toml @@ -9,6 +9,9 @@ futures = "0.3.8" tracing = "0.1.22" tracing-futures = "0.2.4" thiserror = "1.0.22" + +sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } + polkadot-primitives = { path = "../../../primitives" } polkadot-node-subsystem = { path = "../../subsystem" } polkadot-node-subsystem-util = { path = "../../subsystem-util" } diff --git a/polkadot/node/core/candidate-selection/src/lib.rs b/polkadot/node/core/candidate-selection/src/lib.rs index 0eb5242979..5812a47f15 100644 --- a/polkadot/node/core/candidate-selection/src/lib.rs +++ b/polkadot/node/core/candidate-selection/src/lib.rs @@ -23,22 +23,28 @@ use futures::{ channel::{mpsc, oneshot}, prelude::*, }; +use sp_keystore::SyncCryptoStorePtr; use polkadot_node_subsystem::{ errors::ChainApiError, messages::{ AllMessages, CandidateBackingMessage, CandidateSelectionMessage, CollatorProtocolMessage, + RuntimeApiRequest, }, }; use polkadot_node_subsystem_util::{ - self as util, delegated_subsystem, JobTrait, FromJobCommand, metrics::{self, prometheus}, + self as util, request_from_runtime, request_validator_groups, delegated_subsystem, + JobTrait, FromJobCommand, Validator, metrics::{self, prometheus}, +}; +use polkadot_primitives::v1::{ + CandidateReceipt, CollatorId, CoreState, CoreIndex, Hash, Id as ParaId, PoV, }; -use polkadot_primitives::v1::{CandidateReceipt, CollatorId, Hash, Id as ParaId, PoV}; use std::pin::Pin; use thiserror::Error; const LOG_TARGET: &'static str = "candidate_selection"; struct CandidateSelectionJob { + assignment: ParaId, sender: mpsc::Sender, receiver: mpsc::Receiver, metrics: Metrics, @@ -57,30 +63,92 @@ enum Error { ChainApi(#[from] ChainApiError), } +macro_rules! try_runtime_api { + ($x: expr) => { + match $x { + Ok(x) => x, + Err(e) => { + tracing::warn!( + target: LOG_TARGET, + err = ?e, + "Failed to fetch runtime API data for job", + ); + + // We can't do candidate selection work if we don't have the + // requisite runtime API data. But these errors should not take + // down the node. + return Ok(()); + } + } + } +} + impl JobTrait for CandidateSelectionJob { type ToJob = CandidateSelectionMessage; type Error = Error; - type RunArgs = (); + type RunArgs = SyncCryptoStorePtr; type Metrics = Metrics; const NAME: &'static str = "CandidateSelectionJob"; - #[tracing::instrument(skip(_relay_parent, _run_args, metrics, receiver, sender), fields(subsystem = LOG_TARGET))] + #[tracing::instrument(skip(keystore, metrics, receiver, sender), fields(subsystem = LOG_TARGET))] fn run( - _relay_parent: Hash, - _run_args: Self::RunArgs, + relay_parent: Hash, + keystore: Self::RunArgs, metrics: Self::Metrics, receiver: mpsc::Receiver, - sender: mpsc::Sender, + mut sender: mpsc::Sender, ) -> Pin> + Send>> { async move { - CandidateSelectionJob::new(metrics, sender, receiver).run_loop().await + let (groups, cores) = futures::try_join!( + try_runtime_api!(request_validator_groups(relay_parent, &mut sender).await), + try_runtime_api!(request_from_runtime( + relay_parent, + &mut sender, + |tx| RuntimeApiRequest::AvailabilityCores(tx), + ).await), + )?; + + let (validator_groups, group_rotation_info) = try_runtime_api!(groups); + let cores = try_runtime_api!(cores); + + let n_cores = cores.len(); + + let validator = match Validator::new(relay_parent, keystore.clone(), sender.clone()).await { + Ok(validator) => validator, + Err(util::Error::NotAValidator) => return Ok(()), + Err(err) => return Err(Error::Util(err)), + }; + + let mut assignment = None; + + for (idx, core) in cores.into_iter().enumerate() { + // Ignore prospective assignments on occupied cores for the time being. + if let CoreState::Scheduled(scheduled) = core { + let core_index = CoreIndex(idx as _); + let group_index = group_rotation_info.group_for_core(core_index, n_cores); + if let Some(g) = validator_groups.get(group_index.0 as usize) { + if g.contains(&validator.index()) { + assignment = Some(scheduled.para_id); + break; + } + } + } + } + + let assignment = match assignment { + Some(assignment) => assignment, + None => return Ok(()), + }; + + CandidateSelectionJob::new(assignment, metrics, sender, receiver).run_loop().await }.boxed() } } impl CandidateSelectionJob { pub fn new( + assignment: ParaId, metrics: Metrics, sender: mpsc::Sender, receiver: mpsc::Receiver, @@ -89,6 +157,7 @@ impl CandidateSelectionJob { sender, receiver, metrics, + assignment, seconded_candidate: None, } } @@ -128,6 +197,23 @@ impl CandidateSelectionJob { ) { let _timer = self.metrics.time_handle_collation(); + if self.assignment != para_id { + tracing::info!( + target: LOG_TARGET, + "Collator {:?} sent a collation outside of our assignment {:?}", + collator_id, + para_id, + ); + if let Err(err) = forward_invalidity_note(&collator_id, &mut self.sender).await { + tracing::warn!( + target: LOG_TARGET, + err = ?err, + "failed to forward invalidity note", + ); + } + return; + } + if self.seconded_candidate.is_none() { let (candidate_receipt, pov) = match get_collation( @@ -342,7 +428,7 @@ impl metrics::Metrics for Metrics { } } -delegated_subsystem!(CandidateSelectionJob((), Metrics) <- CandidateSelectionMessage as CandidateSelectionSubsystem); +delegated_subsystem!(CandidateSelectionJob(SyncCryptoStorePtr, Metrics) <- CandidateSelectionMessage as CandidateSelectionSubsystem); #[cfg(test)] mod tests { @@ -365,6 +451,7 @@ mod tests { let (to_job_tx, to_job_rx) = mpsc::channel(0); let (from_job_tx, from_job_rx) = mpsc::channel(0); let mut job = CandidateSelectionJob { + assignment: 123.into(), sender: from_job_tx, receiver: to_job_rx, metrics: Default::default(), diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index e59994cdcc..c9f654ea22 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -367,7 +367,7 @@ where ), candidate_selection: CandidateSelectionSubsystem::new( spawner.clone(), - (), + keystore.clone(), Metrics::register(registry)?, ), candidate_validation: CandidateValidationSubsystem::new(