From 0f4e849e0ac2de8c9880077c085985c5f656329c Mon Sep 17 00:00:00 2001 From: Andrei Sandu <54316454+sandreim@users.noreply.github.com> Date: Wed, 3 Apr 2024 18:01:34 +0300 Subject: [PATCH] Add ClaimQueue wrapper (#3950) Remove `fetch_next_scheduled_on_core` in favor of new wrapper and methods for accessing it. --------- Signed-off-by: Andrei Sandu --- polkadot/node/collation-generation/src/lib.rs | 14 +++--- .../node/collation-generation/src/tests.rs | 13 +++--- .../statement-distribution/src/v2/mod.rs | 4 +- polkadot/node/subsystem-util/src/vstaging.rs | 44 ++++++++++++------- prdoc/pr_3950.prdoc | 12 +++++ 5 files changed, 55 insertions(+), 32 deletions(-) create mode 100644 prdoc/pr_3950.prdoc diff --git a/polkadot/node/collation-generation/src/lib.rs b/polkadot/node/collation-generation/src/lib.rs index fb82871bb1..60ea1cf5ff 100644 --- a/polkadot/node/collation-generation/src/lib.rs +++ b/polkadot/node/collation-generation/src/lib.rs @@ -45,13 +45,12 @@ use polkadot_node_subsystem::{ use polkadot_node_subsystem_util::{ request_async_backing_params, request_availability_cores, request_para_backing_state, request_persisted_validation_data, request_validation_code, request_validation_code_hash, - request_validators, - vstaging::{fetch_claim_queue, fetch_next_scheduled_on_core}, + request_validators, vstaging::fetch_claim_queue, }; use polkadot_primitives::{ collator_signature_payload, CandidateCommitments, CandidateDescriptor, CandidateReceipt, CollatorPair, CoreIndex, CoreState, Hash, Id as ParaId, OccupiedCoreAssumption, - PersistedValidationData, ValidationCodeHash, + PersistedValidationData, ScheduledCore, ValidationCodeHash, }; use sp_core::crypto::Pair; use std::sync::Arc; @@ -245,11 +244,10 @@ async fn handle_new_activations( // Use claim queue if available, or fallback to `next_up_on_available` let res = match maybe_claim_queue { Some(ref claim_queue) => { - // read what's in the claim queue for this core - fetch_next_scheduled_on_core( - claim_queue, - CoreIndex(core_idx as u32), - ) + // read what's in the claim queue for this core at depth 0. + claim_queue + .get_claim_for(CoreIndex(core_idx as u32), 0) + .map(|para_id| ScheduledCore { para_id, collator: None }) }, None => { // Runtime doesn't support claim queue runtime api. Fallback to diff --git a/polkadot/node/collation-generation/src/tests.rs b/polkadot/node/collation-generation/src/tests.rs index 781d27188d..1ec2cccfae 100644 --- a/polkadot/node/collation-generation/src/tests.rs +++ b/polkadot/node/collation-generation/src/tests.rs @@ -28,7 +28,7 @@ use polkadot_node_subsystem::{ ActivatedLeaf, }; use polkadot_node_subsystem_test_helpers::{subsystem_test_harness, TestSubsystemContextHandle}; -use polkadot_node_subsystem_util::{vstaging::ClaimQueueSnapshot, TimeoutExt}; +use polkadot_node_subsystem_util::TimeoutExt; use polkadot_primitives::{ async_backing::{BackingState, CandidatePendingAvailability}, AsyncBackingParams, BlockNumber, CollatorPair, HeadData, PersistedValidationData, @@ -620,8 +620,7 @@ fn fallback_when_no_validation_code_hash_api(#[case] runtime_version: u32) { _hash, RuntimeApiRequest::ClaimQueue(tx), ))) if runtime_version >= RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT => { - let res = ClaimQueueSnapshot::new(); - tx.send(Ok(res)).unwrap(); + tx.send(Ok(Default::default())).unwrap(); }, Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( _hash, @@ -783,7 +782,7 @@ fn distribute_collation_for_occupied_core_with_async_backing_enabled(#[case] run candidate_hash: Default::default(), candidate_descriptor: dummy_candidate_descriptor(dummy_hash()), })]; - let claim_queue = ClaimQueueSnapshot::from([(CoreIndex::from(0), VecDeque::from([para_id]))]); + let claim_queue = BTreeMap::from([(CoreIndex::from(0), VecDeque::from([para_id]))]).into(); test_harness(|mut virtual_overseer| async move { helpers::initialize_collator(&mut virtual_overseer, para_id).await; @@ -965,7 +964,7 @@ fn no_collation_is_distributed_for_occupied_core_with_async_backing_disabled( candidate_hash: Default::default(), candidate_descriptor: dummy_candidate_descriptor(dummy_hash()), })]; - let claim_queue = ClaimQueueSnapshot::from([(CoreIndex::from(0), VecDeque::from([para_id]))]); + let claim_queue = BTreeMap::from([(CoreIndex::from(0), VecDeque::from([para_id]))]).into(); test_harness(|mut virtual_overseer| async move { helpers::initialize_collator(&mut virtual_overseer, para_id).await; @@ -1053,7 +1052,7 @@ mod helpers { async_backing_params: AsyncBackingParams, cores: Vec, runtime_version: u32, - claim_queue: ClaimQueueSnapshot, + claim_queue: BTreeMap>, ) { assert_matches!( overseer_recv(virtual_overseer).await, @@ -1107,7 +1106,7 @@ mod helpers { RuntimeApiRequest::ClaimQueue(tx), )) => { assert_eq!(hash, activated_hash); - let _ = tx.send(Ok(claim_queue)); + let _ = tx.send(Ok(claim_queue.into())); } ); } diff --git a/polkadot/node/network/statement-distribution/src/v2/mod.rs b/polkadot/node/network/statement-distribution/src/v2/mod.rs index b9f6f705ed..f5a8ec4a26 100644 --- a/polkadot/node/network/statement-distribution/src/v2/mod.rs +++ b/polkadot/node/network/statement-distribution/src/v2/mod.rs @@ -2163,8 +2163,8 @@ async fn determine_groups_per_para( // pending availability. let para_core_indices: Vec<_> = if let Some(claim_queue) = maybe_claim_queue { claim_queue - .into_iter() - .filter_map(|(core_index, paras)| Some((*paras.front()?, core_index))) + .iter_claims_at_depth(0) + .map(|(core_index, para)| (para, core_index)) .collect() } else { availability_cores diff --git a/polkadot/node/subsystem-util/src/vstaging.rs b/polkadot/node/subsystem-util/src/vstaging.rs index 25ea7ce7c9..b166a54f75 100644 --- a/polkadot/node/subsystem-util/src/vstaging.rs +++ b/polkadot/node/subsystem-util/src/vstaging.rs @@ -23,14 +23,40 @@ use std::collections::{BTreeMap, VecDeque}; use polkadot_node_subsystem_types::messages::{RuntimeApiMessage, RuntimeApiRequest}; use polkadot_overseer::SubsystemSender; -use polkadot_primitives::{CoreIndex, Hash, Id as ParaId, ScheduledCore, ValidatorIndex}; +use polkadot_primitives::{CoreIndex, Hash, Id as ParaId, ValidatorIndex}; use crate::{has_required_runtime, request_claim_queue, request_disabled_validators, runtime}; const LOG_TARGET: &'static str = "parachain::subsystem-util-vstaging"; /// A snapshot of the runtime claim queue at an arbitrary relay chain block. -pub type ClaimQueueSnapshot = BTreeMap>; +#[derive(Default)] +pub struct ClaimQueueSnapshot(BTreeMap>); + +impl From>> for ClaimQueueSnapshot { + fn from(claim_queue_snapshot: BTreeMap>) -> Self { + ClaimQueueSnapshot(claim_queue_snapshot) + } +} + +impl ClaimQueueSnapshot { + /// Returns the `ParaId` that has a claim for `core_index` at the specified `depth` in the + /// claim queue. A depth of `0` means the very next block. + pub fn get_claim_for(&self, core_index: CoreIndex, depth: usize) -> Option { + self.0.get(&core_index)?.get(depth).copied() + } + + /// Returns an iterator over all claimed cores and the claiming `ParaId` at the specified + /// `depth` in the claim queue. + pub fn iter_claims_at_depth( + &self, + depth: usize, + ) -> impl Iterator + '_ { + self.0 + .iter() + .filter_map(move |(core_index, paras)| Some((*core_index, *paras.get(depth)?))) + } +} // TODO: https://github.com/paritytech/polkadot-sdk/issues/1940 /// Returns disabled validators list if the runtime supports it. Otherwise logs a debug messages and @@ -78,21 +104,9 @@ pub async fn fetch_claim_queue( .await .await .map_err(runtime::Error::RuntimeRequestCanceled)??; - Ok(Some(res)) + Ok(Some(res.into())) } else { gum::trace!(target: LOG_TARGET, "Runtime doesn't support `request_claim_queue`"); Ok(None) } } - -/// Returns the next scheduled `ParaId` for a core in the claim queue, wrapped in `ScheduledCore`. -pub fn fetch_next_scheduled_on_core( - claim_queue: &ClaimQueueSnapshot, - core_idx: CoreIndex, -) -> Option { - claim_queue - .get(&core_idx)? - .front() - .cloned() - .map(|para_id| ScheduledCore { para_id, collator: None }) -} diff --git a/prdoc/pr_3950.prdoc b/prdoc/pr_3950.prdoc new file mode 100644 index 0000000000..a333521898 --- /dev/null +++ b/prdoc/pr_3950.prdoc @@ -0,0 +1,12 @@ +title: Add `ClaimQueue` wrapper + +doc: + - audience: Node Dev + description: | + Intoduces a new wrapper type: `ClaimQueueSnapshot`. It contains a snapshot of the `ClaimQueue` + at an arbitrary relay chain block. Two methods are exposed to allow access to the claims at + specific depths. + +crates: + - name: polkadot-node-subsystem-util + bump: minor