collation-generation: Avoid using para_backing_state if runtime is ancient (#4070)

fixes https://github.com/paritytech/polkadot-sdk/issues/4067

Also add an early bail out for look ahead collator such that we don't
waste time if a CollatorFn is not set.

TODO:
- [x] add test.
- [x] Polkadot System Parachain burn-in.

---------

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>
This commit is contained in:
Andrei Sandu
2024-04-11 13:36:30 +03:00
committed by GitHub
parent 69cc7f2090
commit 9ede4152ef
3 changed files with 135 additions and 38 deletions
+46 -25
View File
@@ -206,9 +206,12 @@ async fn handle_new_activations<Context>(
// follow the procedure from the guide: // follow the procedure from the guide:
// https://paritytech.github.io/polkadot-sdk/book/node/collators/collation-generation.html // https://paritytech.github.io/polkadot-sdk/book/node/collators/collation-generation.html
// If there is no collation function provided, bail out early.
// Important: Lookahead collator and slot based collator do not use `CollatorFn`.
if config.collator.is_none() { if config.collator.is_none() {
return Ok(()) return Ok(())
} }
let para_id = config.para_id; let para_id = config.para_id;
let _overall_timer = metrics.time_new_activations(); let _overall_timer = metrics.time_new_activations();
@@ -232,9 +235,14 @@ async fn handle_new_activations<Context>(
// The loop bellow will fill in cores that the para is allowed to build on. // The loop bellow will fill in cores that the para is allowed to build on.
let mut cores_to_build_on = Vec::new(); let mut cores_to_build_on = Vec::new();
// This assumption refers to all cores of the parachain, taking elastic scaling
// into account.
let mut para_assumption = None;
for (core_idx, core) in availability_cores.into_iter().enumerate() { for (core_idx, core) in availability_cores.into_iter().enumerate() {
let scheduled_core = match core { // This nested assumption refers only to the core being iterated.
CoreState::Scheduled(scheduled_core) => scheduled_core, let (core_assumption, scheduled_core) = match core {
CoreState::Scheduled(scheduled_core) =>
(OccupiedCoreAssumption::Free, scheduled_core),
CoreState::Occupied(occupied_core) => match async_backing_params { CoreState::Occupied(occupied_core) => match async_backing_params {
Some(params) if params.max_candidate_depth >= 1 => { Some(params) if params.max_candidate_depth >= 1 => {
// maximum candidate depth when building on top of a block // maximum candidate depth when building on top of a block
@@ -257,7 +265,7 @@ async fn handle_new_activations<Context>(
}; };
match res { match res {
Some(res) => res, Some(res) => (OccupiedCoreAssumption::Included, res),
None => continue, None => continue,
} }
}, },
@@ -291,6 +299,10 @@ async fn handle_new_activations<Context>(
"core is not assigned to our para. Keep going.", "core is not assigned to our para. Keep going.",
); );
} else { } else {
// This does not work for elastic scaling, but it should be enough for single
// core parachains. If async backing runtime is available we later override
// the assumption based on the `para_backing_state` API response.
para_assumption = Some(core_assumption);
// Accumulate cores for building collation(s) outside the loop. // Accumulate cores for building collation(s) outside the loop.
cores_to_build_on.push(CoreIndex(core_idx as u32)); cores_to_build_on.push(CoreIndex(core_idx as u32));
} }
@@ -301,34 +313,43 @@ async fn handle_new_activations<Context>(
continue continue
} }
let para_backing_state = // If at least one core is assigned to us, `para_assumption` is `Some`.
request_para_backing_state(relay_parent, config.para_id, ctx.sender()) let Some(mut para_assumption) = para_assumption else { continue };
.await
.await??
.ok_or(crate::error::Error::MissingParaBackingState)?;
// We are being very optimistic here, but one of the cores could pend availability some more // If it is none it means that neither async backing or elastic scaling (which
// block, ore even time out. // depends on it) are supported. We'll use the `para_assumption` we got from
// For timeout assumption the collator can't really know because it doesn't receive bitfield // iterating cores.
// gossip. if async_backing_params.is_some() {
let assumption = if para_backing_state.pending_availability.is_empty() { // We are being very optimistic here, but one of the cores could pend availability some
OccupiedCoreAssumption::Free // more block, ore even time out.
} else { // For timeout assumption the collator can't really know because it doesn't receive
OccupiedCoreAssumption::Included // bitfield gossip.
}; let para_backing_state =
request_para_backing_state(relay_parent, config.para_id, ctx.sender())
.await
.await??
.ok_or(crate::error::Error::MissingParaBackingState)?;
// Override the assumption about the para's assigned cores.
para_assumption = if para_backing_state.pending_availability.is_empty() {
OccupiedCoreAssumption::Free
} else {
OccupiedCoreAssumption::Included
}
}
gum::debug!( gum::debug!(
target: LOG_TARGET, target: LOG_TARGET,
relay_parent = ?relay_parent, relay_parent = ?relay_parent,
our_para = %config.para_id, our_para = %para_id,
?assumption, ?para_assumption,
"Occupied core(s) assumption", "Occupied core(s) assumption",
); );
let mut validation_data = match request_persisted_validation_data( let mut validation_data = match request_persisted_validation_data(
relay_parent, relay_parent,
config.para_id, para_id,
assumption, para_assumption,
ctx.sender(), ctx.sender(),
) )
.await .await
@@ -339,7 +360,7 @@ async fn handle_new_activations<Context>(
gum::debug!( gum::debug!(
target: LOG_TARGET, target: LOG_TARGET,
relay_parent = ?relay_parent, relay_parent = ?relay_parent,
our_para = %config.para_id, our_para = %para_id,
"validation data is not available", "validation data is not available",
); );
continue continue
@@ -348,8 +369,8 @@ async fn handle_new_activations<Context>(
let validation_code_hash = match obtain_validation_code_hash_with_assumption( let validation_code_hash = match obtain_validation_code_hash_with_assumption(
relay_parent, relay_parent,
config.para_id, para_id,
assumption, para_assumption,
ctx.sender(), ctx.sender(),
) )
.await? .await?
@@ -359,7 +380,7 @@ async fn handle_new_activations<Context>(
gum::debug!( gum::debug!(
target: LOG_TARGET, target: LOG_TARGET,
relay_parent = ?relay_parent, relay_parent = ?relay_parent,
our_para = %config.para_id, our_para = %para_id,
"validation code hash is not found.", "validation code hash is not found.",
); );
continue continue
+77 -13
View File
@@ -807,6 +807,56 @@ fn distribute_collation_for_occupied_core_with_async_backing_enabled(#[case] run
OccupiedCoreAssumption::Included, OccupiedCoreAssumption::Included,
1, 1,
pending_availability, pending_availability,
runtime_version,
)
.await;
virtual_overseer
});
}
#[test]
fn distribute_collation_for_occupied_core_pre_async_backing() {
let activated_hash: Hash = [1; 32].into();
let para_id = ParaId::from(5);
let total_cores = 3;
// Use runtime version before async backing
let runtime_version = RuntimeApiRequest::ASYNC_BACKING_STATE_RUNTIME_REQUIREMENT - 1;
let cores = (0..total_cores)
.into_iter()
.map(|_idx| CoreState::Scheduled(ScheduledCore { para_id, collator: None }))
.collect::<Vec<_>>();
let claim_queue = cores
.iter()
.enumerate()
.map(|(idx, _core)| (CoreIndex::from(idx as u32), VecDeque::from([para_id])))
.collect::<BTreeMap<_, _>>();
test_harness(|mut virtual_overseer| async move {
helpers::initialize_collator(&mut virtual_overseer, para_id).await;
helpers::activate_new_head(&mut virtual_overseer, activated_hash).await;
helpers::handle_runtime_calls_on_new_head_activation(
&mut virtual_overseer,
activated_hash,
AsyncBackingParams { max_candidate_depth: 1, allowed_ancestry_len: 1 },
cores,
runtime_version,
claim_queue,
)
.await;
helpers::handle_cores_processing_for_a_leaf(
&mut virtual_overseer,
activated_hash,
para_id,
// `CoreState` is `Free` => `OccupiedCoreAssumption` is `Free`
OccupiedCoreAssumption::Free,
total_cores,
vec![],
runtime_version,
) )
.await; .await;
@@ -826,6 +876,8 @@ fn distribute_collation_for_occupied_cores_with_async_backing_enabled_and_elasti
) { ) {
let activated_hash: Hash = [1; 32].into(); let activated_hash: Hash = [1; 32].into();
let para_id = ParaId::from(5); let para_id = ParaId::from(5);
// Using latest runtime with the fancy claim queue exposed.
let runtime_version = RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT;
let cores = (0..3) let cores = (0..3)
.into_iter() .into_iter()
@@ -863,8 +915,7 @@ fn distribute_collation_for_occupied_cores_with_async_backing_enabled_and_elasti
activated_hash, activated_hash,
AsyncBackingParams { max_candidate_depth: 1, allowed_ancestry_len: 1 }, AsyncBackingParams { max_candidate_depth: 1, allowed_ancestry_len: 1 },
cores, cores,
// Using latest runtime with the fancy claim queue exposed. runtime_version,
RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT,
claim_queue, claim_queue,
) )
.await; .await;
@@ -882,6 +933,7 @@ fn distribute_collation_for_occupied_cores_with_async_backing_enabled_and_elasti
}, },
total_cores, total_cores,
pending_availability, pending_availability,
runtime_version,
) )
.await; .await;
@@ -901,6 +953,8 @@ fn distribute_collation_for_free_cores_with_async_backing_enabled_and_elastic_sc
) { ) {
let activated_hash: Hash = [1; 32].into(); let activated_hash: Hash = [1; 32].into();
let para_id = ParaId::from(5); let para_id = ParaId::from(5);
// Using latest runtime with the fancy claim queue exposed.
let runtime_version = RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT;
let cores = (0..total_cores) let cores = (0..total_cores)
.into_iter() .into_iter()
@@ -921,8 +975,7 @@ fn distribute_collation_for_free_cores_with_async_backing_enabled_and_elastic_sc
activated_hash, activated_hash,
AsyncBackingParams { max_candidate_depth: 1, allowed_ancestry_len: 1 }, AsyncBackingParams { max_candidate_depth: 1, allowed_ancestry_len: 1 },
cores, cores,
// Using latest runtime with the fancy claim queue exposed. runtime_version,
RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT,
claim_queue, claim_queue,
) )
.await; .await;
@@ -935,6 +988,7 @@ fn distribute_collation_for_free_cores_with_async_backing_enabled_and_elastic_sc
OccupiedCoreAssumption::Free, OccupiedCoreAssumption::Free,
total_cores, total_cores,
vec![], vec![],
runtime_version,
) )
.await; .await;
@@ -1074,6 +1128,13 @@ mod helpers {
} }
); );
let async_backing_response =
if runtime_version >= RuntimeApiRequest::ASYNC_BACKING_STATE_RUNTIME_REQUIREMENT {
Ok(async_backing_params)
} else {
Err(RuntimeApiError::NotSupported { runtime_api_name: "async_backing_params" })
};
assert_matches!( assert_matches!(
overseer_recv(virtual_overseer).await, overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request( AllMessages::RuntimeApi(RuntimeApiMessage::Request(
@@ -1083,7 +1144,7 @@ mod helpers {
), ),
)) => { )) => {
assert_eq!(hash, activated_hash); assert_eq!(hash, activated_hash);
let _ = tx.send(Ok(async_backing_params)); let _ = tx.send(async_backing_response);
} }
); );
@@ -1121,6 +1182,7 @@ mod helpers {
expected_occupied_core_assumption: OccupiedCoreAssumption, expected_occupied_core_assumption: OccupiedCoreAssumption,
cores_assigned: usize, cores_assigned: usize,
pending_availability: Vec<CandidatePendingAvailability>, pending_availability: Vec<CandidatePendingAvailability>,
runtime_version: u32,
) { ) {
// Expect no messages if no cores is assigned to the para // Expect no messages if no cores is assigned to the para
if cores_assigned == 0 { if cores_assigned == 0 {
@@ -1138,14 +1200,16 @@ mod helpers {
max_pov_size: 1024, max_pov_size: 1024,
}; };
assert_matches!( if runtime_version >= RuntimeApiRequest::ASYNC_BACKING_STATE_RUNTIME_REQUIREMENT {
overseer_recv(virtual_overseer).await, assert_matches!(
AllMessages::RuntimeApi( overseer_recv(virtual_overseer).await,
RuntimeApiMessage::Request(parent, RuntimeApiRequest::ParaBackingState(p_id, tx)) AllMessages::RuntimeApi(
) if parent == activated_hash && p_id == para_id => { RuntimeApiMessage::Request(parent, RuntimeApiRequest::ParaBackingState(p_id, tx))
tx.send(Ok(Some(dummy_backing_state(pending_availability)))).unwrap(); ) if parent == activated_hash && p_id == para_id => {
} tx.send(Ok(Some(dummy_backing_state(pending_availability)))).unwrap();
); }
);
}
assert_matches!( assert_matches!(
overseer_recv(virtual_overseer).await, overseer_recv(virtual_overseer).await,
+12
View File
@@ -0,0 +1,12 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json
title: Avoid using `para_backing_state` if runtime doesn't support async backing
doc:
- audience: Node Operator
description: |
Fixes https://github.com/paritytech/polkadot-sdk/issues/4067 which prevents collators to
upgrade to latest release (v1.10.0)
crates: [ ]