Expose ClaimQueue via a runtime api and use it in collation-generation (#3580)

The PR adds two things:
1. Runtime API exposing the whole claim queue
2. Consumes the API in `collation-generation` to fetch the next
scheduled `ParaEntry` for an occupied core.

Related to https://github.com/paritytech/polkadot-sdk/issues/1797
This commit is contained in:
Tsvetomir Dimitrov
2024-03-20 08:55:58 +02:00
committed by GitHub
parent e659c4b3f7
commit e58e854a32
15 changed files with 532 additions and 52 deletions
Generated
+1
View File
@@ -12396,6 +12396,7 @@ dependencies = [
"polkadot-node-subsystem-util",
"polkadot-primitives",
"polkadot-primitives-test-helpers",
"rstest",
"sp-core",
"sp-keyring",
"sp-maybe-compressed-blob",
@@ -14,7 +14,10 @@
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
use std::pin::Pin;
use std::{
collections::{BTreeMap, VecDeque},
pin::Pin,
};
use cumulus_relay_chain_interface::{RelayChainError, RelayChainResult};
use cumulus_relay_chain_rpc_interface::RelayChainRpcClient;
@@ -25,6 +28,7 @@ use polkadot_primitives::{
async_backing::{AsyncBackingParams, BackingState},
slashing,
vstaging::{ApprovalVotingParams, NodeFeatures},
CoreIndex,
};
use sc_authority_discovery::{AuthorityDiscovery, Error as AuthorityDiscoveryError};
use sc_client_api::AuxStore;
@@ -442,6 +446,13 @@ impl RuntimeApiSubsystemClient for BlockChainRpcClient {
async fn node_features(&self, at: Hash) -> Result<NodeFeatures, ApiError> {
Ok(self.rpc_client.parachain_host_node_features(at).await?)
}
async fn claim_queue(
&self,
at: Hash,
) -> Result<BTreeMap<CoreIndex, VecDeque<cumulus_primitives_core::ParaId>>, ApiError> {
Ok(self.rpc_client.parachain_host_claim_queue(at).await?)
}
}
#[async_trait::async_trait]
@@ -24,6 +24,7 @@ use jsonrpsee::{
};
use serde::de::DeserializeOwned;
use serde_json::Value as JsonValue;
use std::collections::VecDeque;
use tokio::sync::mpsc::Sender as TokioSender;
use parity_scale_codec::{Decode, Encode};
@@ -34,10 +35,10 @@ use cumulus_primitives_core::{
slashing,
vstaging::{ApprovalVotingParams, NodeFeatures},
BlockNumber, CandidateCommitments, CandidateEvent, CandidateHash,
CommittedCandidateReceipt, CoreState, DisputeState, ExecutorParams, GroupRotationInfo,
Hash as RelayHash, Header as RelayHeader, InboundHrmpMessage, OccupiedCoreAssumption,
PvfCheckStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo, ValidationCode,
ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature,
CommittedCandidateReceipt, CoreIndex, CoreState, DisputeState, ExecutorParams,
GroupRotationInfo, Hash as RelayHash, Header as RelayHeader, InboundHrmpMessage,
OccupiedCoreAssumption, PvfCheckStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo,
ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature,
},
InboundDownwardMessage, ParaId, PersistedValidationData,
};
@@ -647,6 +648,14 @@ impl RelayChainRpcClient {
.await
}
pub async fn parachain_host_claim_queue(
&self,
at: RelayHash,
) -> Result<BTreeMap<CoreIndex, VecDeque<ParaId>>, RelayChainError> {
self.call_remote_runtime_function("ParachainHost_claim_queue", at, None::<()>)
.await
}
pub async fn validation_code_hash(
&self,
at: RelayHash,
@@ -26,4 +26,5 @@ parity-scale-codec = { version = "3.6.1", default-features = false, features = [
polkadot-node-subsystem-test-helpers = { path = "../subsystem-test-helpers" }
test-helpers = { package = "polkadot-primitives-test-helpers", path = "../../primitives/test-helpers" }
assert_matches = "1.4.0"
rstest = "0.18.2"
sp-keyring = { path = "../../../substrate/primitives/keyring" }
+64 -10
View File
@@ -38,21 +38,25 @@ use polkadot_node_primitives::{
SubmitCollationParams,
};
use polkadot_node_subsystem::{
messages::{CollationGenerationMessage, CollatorProtocolMessage},
messages::{CollationGenerationMessage, CollatorProtocolMessage, RuntimeApiRequest},
overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, RuntimeApiError, SpawnedSubsystem,
SubsystemContext, SubsystemError, SubsystemResult,
};
use polkadot_node_subsystem_util::{
request_async_backing_params, request_availability_cores, request_persisted_validation_data,
request_validation_code, request_validation_code_hash, request_validators,
has_required_runtime, request_async_backing_params, request_availability_cores,
request_claim_queue, request_persisted_validation_data, request_validation_code,
request_validation_code_hash, request_validators,
};
use polkadot_primitives::{
collator_signature_payload, CandidateCommitments, CandidateDescriptor, CandidateReceipt,
CollatorPair, CoreState, Hash, Id as ParaId, OccupiedCoreAssumption, PersistedValidationData,
ValidationCodeHash,
CollatorPair, CoreIndex, CoreState, Hash, Id as ParaId, OccupiedCoreAssumption,
PersistedValidationData, ScheduledCore, ValidationCodeHash,
};
use sp_core::crypto::Pair;
use std::sync::Arc;
use std::{
collections::{BTreeMap, VecDeque},
sync::Arc,
};
mod error;
@@ -223,6 +227,7 @@ async fn handle_new_activations<Context>(
let availability_cores = availability_cores??;
let n_validators = validators??.len();
let async_backing_params = async_backing_params?.ok();
let maybe_claim_queue = fetch_claim_queue(ctx.sender(), relay_parent).await?;
for (core_idx, core) in availability_cores.into_iter().enumerate() {
let _availability_core_timer = metrics.time_new_activations_availability_core();
@@ -239,10 +244,25 @@ async fn handle_new_activations<Context>(
// TODO [now]: this assumes that next up == current.
// in practice we should only set `OccupiedCoreAssumption::Included`
// when the candidate occupying the core is also of the same para.
if let Some(scheduled) = occupied_core.next_up_on_available {
(scheduled, OccupiedCoreAssumption::Included)
} else {
continue
let res = match maybe_claim_queue {
Some(ref claim_queue) => {
// read what's in the claim queue for this core
fetch_next_scheduled_on_core(
claim_queue,
CoreIndex(core_idx as u32),
)
},
None => {
// Runtime doesn't support claim queue runtime api. Fallback to
// `next_up_on_available`
occupied_core.next_up_on_available
},
}
.map(|scheduled| (scheduled, OccupiedCoreAssumption::Included));
match res {
Some(res) => res,
None => continue,
}
},
_ => {
@@ -600,3 +620,37 @@ fn erasure_root(
let chunks = polkadot_erasure_coding::obtain_chunks_v1(n_validators, &available_data)?;
Ok(polkadot_erasure_coding::branches(&chunks).root())
}
// Checks if the runtime supports `request_claim_queue` and executes it. Returns `Ok(None)`
// otherwise. Any [`RuntimeApiError`]s are bubbled up to the caller.
async fn fetch_claim_queue(
sender: &mut impl overseer::CollationGenerationSenderTrait,
relay_parent: Hash,
) -> crate::error::Result<Option<BTreeMap<CoreIndex, VecDeque<ParaId>>>> {
if has_required_runtime(
sender,
relay_parent,
RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT,
)
.await
{
let res = request_claim_queue(relay_parent, sender).await.await??;
Ok(Some(res))
} else {
gum::trace!(target: LOG_TARGET, "Runtime doesn't support `request_claim_queue`");
Ok(None)
}
}
// Returns the next scheduled `ParaId` for a core in the claim queue, wrapped in `ScheduledCore`.
// This function is supposed to be used in `handle_new_activations` hence the return type.
fn fetch_next_scheduled_on_core(
claim_queue: &BTreeMap<CoreIndex, VecDeque<ParaId>>,
core_idx: CoreIndex,
) -> Option<ScheduledCore> {
claim_queue
.get(&core_idx)?
.front()
.cloned()
.map(|para_id| ScheduledCore { para_id, collator: None })
}
+319 -10
View File
@@ -25,15 +25,18 @@ use polkadot_node_primitives::{BlockData, Collation, CollationResult, MaybeCompr
use polkadot_node_subsystem::{
errors::RuntimeApiError,
messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest},
ActivatedLeaf,
};
use polkadot_node_subsystem_test_helpers::{subsystem_test_harness, TestSubsystemContextHandle};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_primitives::{
CollatorPair, HeadData, Id as ParaId, PersistedValidationData, ScheduledCore, ValidationCode,
AsyncBackingParams, CollatorPair, HeadData, Id as ParaId, Id, PersistedValidationData,
ScheduledCore, ValidationCode,
};
use rstest::rstest;
use sp_keyring::sr25519::Keyring as Sr25519Keyring;
use std::pin::Pin;
use test_helpers::{dummy_hash, dummy_head_data, dummy_validator};
use test_helpers::{dummy_candidate_descriptor, dummy_hash, dummy_head_data, dummy_validator};
type VirtualOverseer = TestSubsystemContextHandle<CollationGenerationMessage>;
@@ -132,8 +135,10 @@ fn scheduled_core_for<Id: Into<ParaId>>(para_id: Id) -> ScheduledCore {
ScheduledCore { para_id: para_id.into(), collator: None }
}
#[test]
fn requests_availability_per_relay_parent() {
#[rstest]
#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT - 1)]
#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT)]
fn requests_availability_per_relay_parent(#[case] runtime_version: u32) {
let activated_hashes: Vec<Hash> =
vec![[1; 32].into(), [4; 32].into(), [9; 32].into(), [16; 32].into()];
@@ -159,6 +164,18 @@ fn requests_availability_per_relay_parent() {
))) => {
tx.send(Err(RuntimeApiError::NotSupported { runtime_api_name: "doesnt_matter" })).unwrap();
},
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_hash,
RuntimeApiRequest::Version(tx),
))) => {
tx.send(Ok(runtime_version)).unwrap();
},
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_hash,
RuntimeApiRequest::ClaimQueue(tx),
))) if runtime_version >= RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT => {
tx.send(Ok(BTreeMap::new())).unwrap();
},
Some(msg) => panic!("didn't expect any other overseer requests given no availability cores; got {:?}", msg),
}
}
@@ -184,8 +201,10 @@ fn requests_availability_per_relay_parent() {
assert_eq!(requested_availability_cores, activated_hashes);
}
#[test]
fn requests_validation_data_for_scheduled_matches() {
#[rstest]
#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT - 1)]
#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT)]
fn requests_validation_data_for_scheduled_matches(#[case] runtime_version: u32) {
let activated_hashes: Vec<Hash> = vec![
Hash::repeat_byte(1),
Hash::repeat_byte(4),
@@ -242,6 +261,18 @@ fn requests_validation_data_for_scheduled_matches() {
}))
.unwrap();
},
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_hash,
RuntimeApiRequest::Version(tx),
))) => {
tx.send(Ok(runtime_version)).unwrap();
},
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_hash,
RuntimeApiRequest::ClaimQueue(tx),
))) if runtime_version >= RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT => {
tx.send(Ok(BTreeMap::new())).unwrap();
},
Some(msg) => {
panic!("didn't expect any other overseer requests; got {:?}", msg)
},
@@ -271,8 +302,10 @@ fn requests_validation_data_for_scheduled_matches() {
assert_eq!(requested_validation_data, vec![[4; 32].into()]);
}
#[test]
fn sends_distribute_collation_message() {
#[rstest]
#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT - 1)]
#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT)]
fn sends_distribute_collation_message(#[case] runtime_version: u32) {
let activated_hashes: Vec<Hash> = vec![
Hash::repeat_byte(1),
Hash::repeat_byte(4),
@@ -339,6 +372,18 @@ fn sends_distribute_collation_message() {
}))
.unwrap();
},
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_hash,
RuntimeApiRequest::Version(tx),
))) => {
tx.send(Ok(runtime_version)).unwrap();
},
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_hash,
RuntimeApiRequest::ClaimQueue(tx),
))) if runtime_version >= RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT => {
tx.send(Ok(BTreeMap::new())).unwrap();
},
Some(msg @ AllMessages::CollatorProtocol(_)) => {
inner_to_collator_protocol.lock().await.push(msg);
},
@@ -423,8 +468,10 @@ fn sends_distribute_collation_message() {
}
}
#[test]
fn fallback_when_no_validation_code_hash_api() {
#[rstest]
#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT - 1)]
#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT)]
fn fallback_when_no_validation_code_hash_api(#[case] runtime_version: u32) {
// This is a variant of the above test, but with the validation code hash API disabled.
let activated_hashes: Vec<Hash> = vec![
@@ -501,9 +548,22 @@ fn fallback_when_no_validation_code_hash_api() {
}))
.unwrap();
},
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_hash,
RuntimeApiRequest::Version(tx),
))) => {
tx.send(Ok(runtime_version)).unwrap();
},
Some(msg @ AllMessages::CollatorProtocol(_)) => {
inner_to_collator_protocol.lock().await.push(msg);
},
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_hash,
RuntimeApiRequest::ClaimQueue(tx),
))) if runtime_version >= RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT => {
let res = BTreeMap::<CoreIndex, VecDeque<ParaId>>::new();
tx.send(Ok(res)).unwrap();
},
Some(msg) => {
panic!("didn't expect any other overseer requests; got {:?}", msg)
},
@@ -635,3 +695,252 @@ fn submit_collation_leads_to_distribution() {
virtual_overseer
});
}
// There is one core in `Occupied` state and async backing is enabled. On new head activation
// `CollationGeneration` should produce and distribute a new collation.
#[rstest]
#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT - 1)]
#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT)]
fn distribute_collation_for_occupied_core_with_async_backing_enabled(#[case] runtime_version: u32) {
let activated_hash: Hash = [1; 32].into();
let para_id = ParaId::from(5);
// One core, in occupied state. The data in `CoreState` and `ClaimQueue` should match.
let cores: Vec<CoreState> = vec![CoreState::Occupied(polkadot_primitives::OccupiedCore {
next_up_on_available: Some(ScheduledCore { para_id, collator: None }),
occupied_since: 1,
time_out_at: 10,
next_up_on_time_out: Some(ScheduledCore { para_id, collator: None }),
availability: Default::default(), // doesn't matter
group_responsible: polkadot_primitives::GroupIndex(0),
candidate_hash: Default::default(),
candidate_descriptor: dummy_candidate_descriptor(dummy_hash()),
})];
let claim_queue = BTreeMap::from([(CoreIndex::from(0), VecDeque::from([para_id]))]);
test_harness(|mut virtual_overseer| async move {
helpers::initialize_collator(&mut virtual_overseer, para_id).await;
helpers::activate_new_head(&mut virtual_overseer, activated_hash).await;
helpers::handle_runtime_calls_on_new_head_activation(
&mut virtual_overseer,
activated_hash,
AsyncBackingParams { max_candidate_depth: 1, allowed_ancestry_len: 1 },
cores,
runtime_version,
claim_queue,
)
.await;
helpers::handle_core_processing_for_a_leaf(
&mut virtual_overseer,
activated_hash,
para_id,
// `CoreState` is `Occupied` => `OccupiedCoreAssumption` is `Included`
OccupiedCoreAssumption::Included,
)
.await;
virtual_overseer
});
}
// There is one core in `Occupied` state and async backing is disabled. On new head activation
// no new collation should be generated.
#[rstest]
#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT - 1)]
#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT)]
fn no_collation_is_distributed_for_occupied_core_with_async_backing_disabled(
#[case] runtime_version: u32,
) {
let activated_hash: Hash = [1; 32].into();
let para_id = ParaId::from(5);
// One core, in occupied state. The data in `CoreState` and `ClaimQueue` should match.
let cores: Vec<CoreState> = vec![CoreState::Occupied(polkadot_primitives::OccupiedCore {
next_up_on_available: Some(ScheduledCore { para_id, collator: None }),
occupied_since: 1,
time_out_at: 10,
next_up_on_time_out: Some(ScheduledCore { para_id, collator: None }),
availability: Default::default(), // doesn't matter
group_responsible: polkadot_primitives::GroupIndex(0),
candidate_hash: Default::default(),
candidate_descriptor: dummy_candidate_descriptor(dummy_hash()),
})];
let claim_queue = BTreeMap::from([(CoreIndex::from(0), VecDeque::from([para_id]))]);
test_harness(|mut virtual_overseer| async move {
helpers::initialize_collator(&mut virtual_overseer, para_id).await;
helpers::activate_new_head(&mut virtual_overseer, activated_hash).await;
helpers::handle_runtime_calls_on_new_head_activation(
&mut virtual_overseer,
activated_hash,
AsyncBackingParams { max_candidate_depth: 0, allowed_ancestry_len: 0 },
cores,
runtime_version,
claim_queue,
)
.await;
virtual_overseer
});
}
mod helpers {
use super::*;
// Sends `Initialize` with a collator config
pub async fn initialize_collator(virtual_overseer: &mut VirtualOverseer, para_id: ParaId) {
virtual_overseer
.send(FromOrchestra::Communication {
msg: CollationGenerationMessage::Initialize(test_config(para_id)),
})
.await;
}
// Sends `ActiveLeaves` for a single leaf with the specified hash. Block number is hardcoded.
pub async fn activate_new_head(virtual_overseer: &mut VirtualOverseer, activated_hash: Hash) {
virtual_overseer
.send(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: Some(ActivatedLeaf {
hash: activated_hash,
number: 10,
unpin_handle: polkadot_node_subsystem_test_helpers::mock::dummy_unpin_handle(
activated_hash,
),
span: Arc::new(overseer::jaeger::Span::Disabled),
}),
..Default::default()
})))
.await;
}
// Handle all runtime calls performed in `handle_new_activations`. Conditionally expects a
// `CLAIM_QUEUE_RUNTIME_REQUIREMENT` call if the passed `runtime_version` is greater or equal to
// `CLAIM_QUEUE_RUNTIME_REQUIREMENT`
pub async fn handle_runtime_calls_on_new_head_activation(
virtual_overseer: &mut VirtualOverseer,
activated_hash: Hash,
async_backing_params: AsyncBackingParams,
cores: Vec<CoreState>,
runtime_version: u32,
claim_queue: BTreeMap<CoreIndex, VecDeque<Id>>,
) {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(hash, RuntimeApiRequest::AvailabilityCores(tx))) => {
assert_eq!(hash, activated_hash);
let _ = tx.send(Ok(cores));
}
);
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(hash, RuntimeApiRequest::Validators(tx))) => {
assert_eq!(hash, activated_hash);
let _ = tx.send(Ok(vec![
Sr25519Keyring::Alice.public().into(),
Sr25519Keyring::Bob.public().into(),
Sr25519Keyring::Charlie.public().into(),
]));
}
);
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
hash,
RuntimeApiRequest::AsyncBackingParams(
tx,
),
)) => {
assert_eq!(hash, activated_hash);
let _ = tx.send(Ok(async_backing_params));
}
);
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
hash,
RuntimeApiRequest::Version(tx),
)) => {
assert_eq!(hash, activated_hash);
let _ = tx.send(Ok(runtime_version));
}
);
if runtime_version == RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
hash,
RuntimeApiRequest::ClaimQueue(tx),
)) => {
assert_eq!(hash, activated_hash);
let _ = tx.send(Ok(claim_queue));
}
);
}
}
// Handles all runtime requests performed in `handle_new_activations` for the case when a
// collation should be prepared for the new leaf
pub async fn handle_core_processing_for_a_leaf(
virtual_overseer: &mut VirtualOverseer,
activated_hash: Hash,
para_id: ParaId,
expected_occupied_core_assumption: OccupiedCoreAssumption,
) {
// Some hardcoded data - if needed, extract to parameters
let validation_code_hash = ValidationCodeHash::from(Hash::repeat_byte(42));
let parent_head = HeadData::from(vec![1, 2, 3]);
let pvd = PersistedValidationData {
parent_head: parent_head.clone(),
relay_parent_number: 10,
relay_parent_storage_root: Hash::repeat_byte(1),
max_pov_size: 1024,
};
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(hash, RuntimeApiRequest::PersistedValidationData(id, a, tx))) => {
assert_eq!(hash, activated_hash);
assert_eq!(id, para_id);
assert_eq!(a, expected_occupied_core_assumption);
let _ = tx.send(Ok(Some(pvd.clone())));
}
);
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
hash,
RuntimeApiRequest::ValidationCodeHash(
id,
assumption,
tx,
),
)) => {
assert_eq!(hash, activated_hash);
assert_eq!(id, para_id);
assert_eq!(assumption, expected_occupied_core_assumption);
let _ = tx.send(Ok(Some(validation_code_hash)));
}
);
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation{
candidate_receipt,
parent_head_data_hash,
..
}) => {
assert_eq!(parent_head_data_hash, parent_head.hash());
assert_eq!(candidate_receipt.descriptor().persisted_validation_data_hash, pvd.hash());
assert_eq!(candidate_receipt.descriptor().para_head, dummy_head_data().hash());
assert_eq!(candidate_receipt.descriptor().validation_code_hash, validation_code_hash);
}
);
}
}
+24 -5
View File
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::collections::btree_map::BTreeMap;
use std::collections::{btree_map::BTreeMap, VecDeque};
use schnellru::{ByLength, LruMap};
use sp_consensus_babe::Epoch;
@@ -23,10 +23,11 @@ use polkadot_primitives::{
async_backing, slashing,
vstaging::{self, ApprovalVotingParams},
AuthorityDiscoveryId, BlockNumber, CandidateCommitments, CandidateEvent, CandidateHash,
CommittedCandidateReceipt, CoreState, DisputeState, ExecutorParams, GroupRotationInfo, Hash,
Id as ParaId, InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption,
PersistedValidationData, PvfCheckStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo,
ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature,
CommittedCandidateReceipt, CoreIndex, CoreState, DisputeState, ExecutorParams,
GroupRotationInfo, Hash, Id as ParaId, InboundDownwardMessage, InboundHrmpMessage,
OccupiedCoreAssumption, PersistedValidationData, PvfCheckStatement, ScrapedOnChainVotes,
SessionIndex, SessionInfo, ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex,
ValidatorSignature,
};
/// For consistency we have the same capacity for all caches. We use 128 as we'll only need that
@@ -70,6 +71,7 @@ pub(crate) struct RequestResultCache {
async_backing_params: LruMap<Hash, async_backing::AsyncBackingParams>,
node_features: LruMap<SessionIndex, vstaging::NodeFeatures>,
approval_voting_params: LruMap<SessionIndex, ApprovalVotingParams>,
claim_queue: LruMap<Hash, BTreeMap<CoreIndex, VecDeque<ParaId>>>,
}
impl Default for RequestResultCache {
@@ -105,6 +107,7 @@ impl Default for RequestResultCache {
para_backing_state: LruMap::new(ByLength::new(DEFAULT_CACHE_CAP)),
async_backing_params: LruMap::new(ByLength::new(DEFAULT_CACHE_CAP)),
node_features: LruMap::new(ByLength::new(DEFAULT_CACHE_CAP)),
claim_queue: LruMap::new(ByLength::new(DEFAULT_CACHE_CAP)),
}
}
}
@@ -525,6 +528,21 @@ impl RequestResultCache {
) {
self.approval_voting_params.insert(session_index, value);
}
pub(crate) fn claim_queue(
&mut self,
relay_parent: &Hash,
) -> Option<&BTreeMap<CoreIndex, VecDeque<ParaId>>> {
self.claim_queue.get(relay_parent).map(|v| &*v)
}
pub(crate) fn cache_claim_queue(
&mut self,
relay_parent: Hash,
value: BTreeMap<CoreIndex, VecDeque<ParaId>>,
) {
self.claim_queue.insert(relay_parent, value);
}
}
pub(crate) enum RequestResult {
@@ -577,4 +595,5 @@ pub(crate) enum RequestResult {
ParaBackingState(Hash, ParaId, Option<async_backing::BackingState>),
AsyncBackingParams(Hash, async_backing::AsyncBackingParams),
NodeFeatures(SessionIndex, vstaging::NodeFeatures),
ClaimQueue(Hash, BTreeMap<CoreIndex, VecDeque<ParaId>>),
}
+11
View File
@@ -177,6 +177,9 @@ where
self.requests_cache.cache_async_backing_params(relay_parent, params),
NodeFeatures(session_index, params) =>
self.requests_cache.cache_node_features(session_index, params),
ClaimQueue(relay_parent, sender) => {
self.requests_cache.cache_claim_queue(relay_parent, sender);
},
}
}
@@ -329,6 +332,8 @@ where
Some(Request::NodeFeatures(index, sender))
}
},
Request::ClaimQueue(sender) =>
query!(claim_queue(), sender).map(|sender| Request::ClaimQueue(sender)),
}
}
@@ -626,5 +631,11 @@ where
sender,
result = (index)
),
Request::ClaimQueue(sender) => query!(
ClaimQueue,
claim_queue(),
ver = Request::CLAIM_QUEUE_RUNTIME_REQUIREMENT,
sender
),
}
}
+13 -5
View File
@@ -23,15 +23,16 @@ use polkadot_primitives::{
async_backing, slashing,
vstaging::{ApprovalVotingParams, NodeFeatures},
AuthorityDiscoveryId, BlockNumber, CandidateCommitments, CandidateEvent, CandidateHash,
CommittedCandidateReceipt, CoreState, DisputeState, ExecutorParams, GroupRotationInfo,
Id as ParaId, InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption,
PersistedValidationData, PvfCheckStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo,
Slot, ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature,
CommittedCandidateReceipt, CoreIndex, CoreState, DisputeState, ExecutorParams,
GroupRotationInfo, Id as ParaId, InboundDownwardMessage, InboundHrmpMessage,
OccupiedCoreAssumption, PersistedValidationData, PvfCheckStatement, ScrapedOnChainVotes,
SessionIndex, SessionInfo, Slot, ValidationCode, ValidationCodeHash, ValidatorId,
ValidatorIndex, ValidatorSignature,
};
use sp_api::ApiError;
use sp_core::testing::TaskExecutor;
use std::{
collections::{BTreeMap, HashMap},
collections::{BTreeMap, HashMap, VecDeque},
sync::{Arc, Mutex},
};
use test_helpers::{dummy_committed_candidate_receipt, dummy_validation_code};
@@ -286,6 +287,13 @@ impl RuntimeApiSubsystemClient for MockSubsystemClient {
async fn disabled_validators(&self, _: Hash) -> Result<Vec<ValidatorIndex>, ApiError> {
todo!("Not required for tests")
}
async fn claim_queue(
&self,
_: Hash,
) -> Result<BTreeMap<CoreIndex, VecDeque<ParaId>>, ApiError> {
todo!("Not required for tests")
}
}
#[test]
@@ -45,7 +45,7 @@ use polkadot_primitives::{
async_backing, slashing,
vstaging::{ApprovalVotingParams, NodeFeatures},
AuthorityDiscoveryId, BackedCandidate, BlockNumber, CandidateEvent, CandidateHash,
CandidateIndex, CandidateReceipt, CollatorId, CommittedCandidateReceipt, CoreState,
CandidateIndex, CandidateReceipt, CollatorId, CommittedCandidateReceipt, CoreIndex, CoreState,
DisputeState, ExecutorParams, GroupIndex, GroupRotationInfo, Hash, HeadData,
Header as BlockHeader, Id as ParaId, InboundDownwardMessage, InboundHrmpMessage,
MultiDisputeStatementSet, OccupiedCoreAssumption, PersistedValidationData, PvfCheckStatement,
@@ -55,7 +55,7 @@ use polkadot_primitives::{
};
use polkadot_statement_table::v2::Misbehavior;
use std::{
collections::{BTreeMap, HashMap, HashSet},
collections::{BTreeMap, HashMap, HashSet, VecDeque},
sync::Arc,
};
@@ -729,6 +729,9 @@ pub enum RuntimeApiRequest {
/// Approval voting params
/// `V10`
ApprovalVotingParams(SessionIndex, RuntimeApiSender<ApprovalVotingParams>),
/// Fetch the `ClaimQueue` from scheduler pallet
/// `V11`
ClaimQueue(RuntimeApiSender<BTreeMap<CoreIndex, VecDeque<ParaId>>>),
}
impl RuntimeApiRequest {
@@ -763,6 +766,9 @@ impl RuntimeApiRequest {
/// `approval_voting_params`
pub const APPROVAL_VOTING_PARAMS_REQUIREMENT: u32 = 10;
/// `ClaimQueue`
pub const CLAIM_QUEUE_RUNTIME_REQUIREMENT: u32 = 11;
}
/// A message to the Runtime API subsystem.
@@ -21,10 +21,11 @@ use polkadot_primitives::{
slashing,
vstaging::{self, ApprovalVotingParams},
Block, BlockNumber, CandidateCommitments, CandidateEvent, CandidateHash,
CommittedCandidateReceipt, CoreState, DisputeState, ExecutorParams, GroupRotationInfo, Hash,
Header, Id, InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption,
PersistedValidationData, PvfCheckStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo,
ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature,
CommittedCandidateReceipt, CoreIndex, CoreState, DisputeState, ExecutorParams,
GroupRotationInfo, Hash, Header, Id, InboundDownwardMessage, InboundHrmpMessage,
OccupiedCoreAssumption, PersistedValidationData, PvfCheckStatement, ScrapedOnChainVotes,
SessionIndex, SessionInfo, ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex,
ValidatorSignature,
};
use sc_client_api::{AuxStore, HeaderBackend};
use sc_transaction_pool_api::OffchainTransactionPoolFactory;
@@ -33,7 +34,10 @@ use sp_authority_discovery::AuthorityDiscoveryApi;
use sp_blockchain::{BlockStatus, Info};
use sp_consensus_babe::{BabeApi, Epoch};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
use std::{collections::BTreeMap, sync::Arc};
use std::{
collections::{BTreeMap, VecDeque},
sync::Arc,
};
/// Offers header utilities.
///
@@ -329,6 +333,10 @@ pub trait RuntimeApiSubsystemClient {
at: Hash,
session_index: SessionIndex,
) -> Result<ApprovalVotingParams, ApiError>;
// == v11: Claim queue ==
/// Fetch the `ClaimQueue` from scheduler pallet
async fn claim_queue(&self, at: Hash) -> Result<BTreeMap<CoreIndex, VecDeque<Id>>, ApiError>;
}
/// Default implementation of [`RuntimeApiSubsystemClient`] using the client.
@@ -594,6 +602,10 @@ where
) -> Result<ApprovalVotingParams, ApiError> {
self.client.runtime_api().approval_voting_params(at)
}
async fn claim_queue(&self, at: Hash) -> Result<BTreeMap<CoreIndex, VecDeque<Id>>, ApiError> {
self.client.runtime_api().claim_queue(at)
}
}
impl<Client, Block> HeaderBackend<Block> for DefaultSubsystemClient<Client>
+6 -2
View File
@@ -30,7 +30,7 @@ use polkadot_node_subsystem::{
messages::{RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender},
overseer, SubsystemSender,
};
use polkadot_primitives::{slashing, ExecutorParams};
use polkadot_primitives::{slashing, CoreIndex, ExecutorParams};
pub use overseer::{
gen::{OrchestraError as OverseerError, Timeout},
@@ -53,7 +53,10 @@ pub use rand;
use sp_application_crypto::AppCrypto;
use sp_core::ByteArray;
use sp_keystore::{Error as KeystoreError, KeystorePtr};
use std::time::Duration;
use std::{
collections::{BTreeMap, VecDeque},
time::Duration,
};
use thiserror::Error;
use vstaging::get_disabled_validators_with_fallback;
@@ -304,6 +307,7 @@ specialize_requests! {
fn request_submit_report_dispute_lost(dp: slashing::DisputeProof, okop: slashing::OpaqueKeyOwnershipProof) -> Option<()>; SubmitReportDisputeLost;
fn request_disabled_validators() -> Vec<ValidatorIndex>; DisabledValidators;
fn request_async_backing_params() -> AsyncBackingParams; AsyncBackingParams;
fn request_claim_queue() -> BTreeMap<CoreIndex, VecDeque<ParaId>>; ClaimQueue;
}
/// Requests executor parameters from the runtime effective at given relay-parent. First obtains
+13 -4
View File
@@ -117,14 +117,18 @@ use crate::{
async_backing, slashing,
vstaging::{self, ApprovalVotingParams},
AsyncBackingParams, BlockNumber, CandidateCommitments, CandidateEvent, CandidateHash,
CommittedCandidateReceipt, CoreState, DisputeState, ExecutorParams, GroupRotationInfo, Hash,
OccupiedCoreAssumption, PersistedValidationData, PvfCheckStatement, ScrapedOnChainVotes,
SessionIndex, SessionInfo, ValidatorId, ValidatorIndex, ValidatorSignature,
CommittedCandidateReceipt, CoreIndex, CoreState, DisputeState, ExecutorParams,
GroupRotationInfo, Hash, OccupiedCoreAssumption, PersistedValidationData, PvfCheckStatement,
ScrapedOnChainVotes, SessionIndex, SessionInfo, ValidatorId, ValidatorIndex,
ValidatorSignature,
};
use polkadot_core_primitives as pcp;
use polkadot_parachain_primitives::primitives as ppp;
use sp_std::{collections::btree_map::BTreeMap, prelude::*};
use sp_std::{
collections::{btree_map::BTreeMap, vec_deque::VecDeque},
prelude::*,
};
sp_api::decl_runtime_apis! {
/// The API for querying the state of parachains on-chain.
@@ -281,5 +285,10 @@ sp_api::decl_runtime_apis! {
/// Approval voting configuration parameters
#[api_version(10)]
fn approval_voting_params() -> ApprovalVotingParams;
/***** Added in v11 *****/
/// Claim queue
#[api_version(11)]
fn claim_queue() -> BTreeMap<CoreIndex, VecDeque<ppp::Id>>;
}
}
@@ -16,12 +16,15 @@
//! Put implementations of functions from staging APIs here.
use crate::{configuration, initializer, shared};
use crate::{configuration, initializer, scheduler, shared};
use primitives::{
vstaging::{ApprovalVotingParams, NodeFeatures},
ValidatorIndex,
CoreIndex, Id as ParaId, ValidatorIndex,
};
use sp_std::{
collections::{btree_map::BTreeMap, vec_deque::VecDeque},
prelude::Vec,
};
use sp_std::prelude::Vec;
/// Implementation for `DisabledValidators`
// CAVEAT: this should only be called on the node side
@@ -38,8 +41,18 @@ pub fn node_features<T: initializer::Config>() -> NodeFeatures {
<configuration::Pallet<T>>::config().node_features
}
/// Approval voting subsystem configuration parameteres
/// Approval voting subsystem configuration parameters
pub fn approval_voting_params<T: initializer::Config>() -> ApprovalVotingParams {
let config = <configuration::Pallet<T>>::config();
config.approval_voting_params
}
/// Returns the claimqueue from the scheduler
pub fn claim_queue<T: scheduler::Config>() -> BTreeMap<CoreIndex, VecDeque<ParaId>> {
<scheduler::Pallet<T>>::claimqueue()
.into_iter()
.map(|(core_index, entries)| {
(core_index, entries.into_iter().map(|e| e.para_id()).collect())
})
.collect()
}
+13
View File
@@ -0,0 +1,13 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json
title: Expose `ClaimQueue` via a runtime api and consume it in `collation-generation`
doc:
- audience: Node Dev
description: |
Creates a new runtime api exposing the `ClaimQueue` from `scheduler` pallet. Consume the api
in collation generation (if available) by getting what's scheduled on a core from the
`ClaimQueue` instead of from `next_up_on_available` (from `AvailabilityCores` runtime api).
crates: [ ]