collation-generation + collator-protocol: collate on multiple assigned cores (#3795)

This works only for collators that implement the `collator_fn` allowing
`collation-generation` subsystem to pull collations triggered on new
heads.

Also enables
`request_v2::CollationFetchingResponse::CollationWithParentHeadData` for
test adder/undying collators.

TODO:
- [x] fix tests
- [x] new tests
- [x] PR doc

---------

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>
This commit is contained in:
Andrei Sandu
2024-03-27 16:44:10 +02:00
committed by GitHub
parent 25af0adf78
commit 417c54c61c
15 changed files with 556 additions and 140 deletions
@@ -28,6 +28,8 @@ pub enum Error {
Util(#[from] polkadot_node_subsystem_util::Error),
#[error(transparent)]
Erasure(#[from] polkadot_erasure_coding::Error),
#[error("Parachain backing state not available in runtime.")]
MissingParaBackingState,
}
pub type Result<T> = std::result::Result<T, Error>;
+108 -73
View File
@@ -44,8 +44,8 @@ use polkadot_node_subsystem::{
};
use polkadot_node_subsystem_util::{
has_required_runtime, request_async_backing_params, request_availability_cores,
request_claim_queue, request_persisted_validation_data, request_validation_code,
request_validation_code_hash, request_validators,
request_claim_queue, request_para_backing_state, request_persisted_validation_data,
request_validation_code, request_validation_code_hash, request_validators,
};
use polkadot_primitives::{
collator_signature_payload, CandidateCommitments, CandidateDescriptor, CandidateReceipt,
@@ -212,6 +212,7 @@ async fn handle_new_activations<Context>(
if config.collator.is_none() {
return Ok(())
}
let para_id = config.para_id;
let _overall_timer = metrics.time_new_activations();
@@ -225,25 +226,23 @@ async fn handle_new_activations<Context>(
);
let availability_cores = availability_cores??;
let n_validators = validators??.len();
let async_backing_params = async_backing_params?.ok();
let n_validators = validators??.len();
let maybe_claim_queue = fetch_claim_queue(ctx.sender(), relay_parent).await?;
for (core_idx, core) in availability_cores.into_iter().enumerate() {
let _availability_core_timer = metrics.time_new_activations_availability_core();
// The loop bellow will fill in cores that the para is allowed to build on.
let mut cores_to_build_on = Vec::new();
let (scheduled_core, assumption) = match core {
CoreState::Scheduled(scheduled_core) =>
(scheduled_core, OccupiedCoreAssumption::Free),
for (core_idx, core) in availability_cores.into_iter().enumerate() {
let scheduled_core = match core {
CoreState::Scheduled(scheduled_core) => scheduled_core,
CoreState::Occupied(occupied_core) => match async_backing_params {
Some(params) if params.max_candidate_depth >= 1 => {
// maximum candidate depth when building on top of a block
// pending availability is necessarily 1 - the depth of the
// pending block is 0 so the child has depth 1.
// TODO [now]: this assumes that next up == current.
// in practice we should only set `OccupiedCoreAssumption::Included`
// when the candidate occupying the core is also of the same para.
// Use claim queue if available, or fallback to `next_up_on_available`
let res = match maybe_claim_queue {
Some(ref claim_queue) => {
// read what's in the claim queue for this core
@@ -257,8 +256,7 @@ async fn handle_new_activations<Context>(
// `next_up_on_available`
occupied_core.next_up_on_available
},
}
.map(|scheduled| (scheduled, OccupiedCoreAssumption::Included));
};
match res {
Some(res) => res,
@@ -279,7 +277,7 @@ async fn handle_new_activations<Context>(
gum::trace!(
target: LOG_TARGET,
core_idx = %core_idx,
"core is free. Keep going.",
"core is not assigned to any para. Keep going.",
);
continue
},
@@ -294,64 +292,90 @@ async fn handle_new_activations<Context>(
their_para = %scheduled_core.para_id,
"core is not assigned to our para. Keep going.",
);
continue
} else {
// Accumulate cores for building collation(s) outside the loop.
cores_to_build_on.push(CoreIndex(core_idx as u32));
}
}
// we get validation data and validation code synchronously for each core instead of
// within the subtask loop, because we have only a single mutable handle to the
// context, so the work can't really be distributed
// Skip to next relay parent if there is no core assigned to us.
if cores_to_build_on.is_empty() {
continue
}
let validation_data = match request_persisted_validation_data(
relay_parent,
scheduled_core.para_id,
assumption,
ctx.sender(),
)
.await
.await??
{
Some(v) => v,
None => {
gum::trace!(
target: LOG_TARGET,
core_idx = %core_idx,
relay_parent = ?relay_parent,
our_para = %config.para_id,
their_para = %scheduled_core.para_id,
"validation data is not available",
);
continue
},
};
let para_backing_state =
request_para_backing_state(relay_parent, config.para_id, ctx.sender())
.await
.await??
.ok_or(crate::error::Error::MissingParaBackingState)?;
let validation_code_hash = match obtain_validation_code_hash_with_assumption(
relay_parent,
scheduled_core.para_id,
assumption,
ctx.sender(),
)
.await?
{
Some(v) => v,
None => {
gum::trace!(
target: LOG_TARGET,
core_idx = %core_idx,
relay_parent = ?relay_parent,
our_para = %config.para_id,
their_para = %scheduled_core.para_id,
"validation code hash is not found.",
);
continue
},
};
// We are being very optimistic here, but one of the cores could pend availability some more
// block, ore even time out.
// For timeout assumption the collator can't really know because it doesn't receive bitfield
// gossip.
let assumption = if para_backing_state.pending_availability.is_empty() {
OccupiedCoreAssumption::Free
} else {
OccupiedCoreAssumption::Included
};
let task_config = config.clone();
let metrics = metrics.clone();
let mut task_sender = ctx.sender().clone();
ctx.spawn(
"collation-builder",
Box::pin(async move {
gum::debug!(
target: LOG_TARGET,
relay_parent = ?relay_parent,
our_para = %config.para_id,
?assumption,
"Occupied core(s) assumption",
);
let mut validation_data = match request_persisted_validation_data(
relay_parent,
config.para_id,
assumption,
ctx.sender(),
)
.await
.await??
{
Some(v) => v,
None => {
gum::debug!(
target: LOG_TARGET,
relay_parent = ?relay_parent,
our_para = %config.para_id,
"validation data is not available",
);
continue
},
};
let validation_code_hash = match obtain_validation_code_hash_with_assumption(
relay_parent,
config.para_id,
assumption,
ctx.sender(),
)
.await?
{
Some(v) => v,
None => {
gum::debug!(
target: LOG_TARGET,
relay_parent = ?relay_parent,
our_para = %config.para_id,
"validation code hash is not found.",
);
continue
},
};
let task_config = config.clone();
let metrics = metrics.clone();
let mut task_sender = ctx.sender().clone();
ctx.spawn(
"chained-collation-builder",
Box::pin(async move {
for core_index in cores_to_build_on {
let collator_fn = match task_config.collator.as_ref() {
Some(x) => x,
None => return,
@@ -363,21 +387,23 @@ async fn handle_new_activations<Context>(
None => {
gum::debug!(
target: LOG_TARGET,
para_id = %scheduled_core.para_id,
?para_id,
"collator returned no collation on collate",
);
return
},
};
let parent_head = collation.head_data.clone();
construct_and_distribute_receipt(
PreparedCollation {
collation,
para_id: scheduled_core.para_id,
para_id,
relay_parent,
validation_data,
validation_data: validation_data.clone(),
validation_code_hash,
n_validators,
core_index,
},
task_config.key.clone(),
&mut task_sender,
@@ -385,9 +411,13 @@ async fn handle_new_activations<Context>(
&metrics,
)
.await;
}),
)?;
}
// Chain the collations. All else stays the same as we build the chained
// collation on same relay parent.
validation_data.parent_head = parent_head;
}
}),
)?;
}
Ok(())
@@ -408,6 +438,7 @@ async fn handle_submit_collation<Context>(
parent_head,
validation_code_hash,
result_sender,
core_index,
} = params;
let validators = request_validators(relay_parent, ctx.sender()).await.await??;
@@ -444,6 +475,7 @@ async fn handle_submit_collation<Context>(
validation_data,
validation_code_hash,
n_validators,
core_index,
};
construct_and_distribute_receipt(
@@ -465,6 +497,7 @@ struct PreparedCollation {
validation_data: PersistedValidationData,
validation_code_hash: ValidationCodeHash,
n_validators: usize,
core_index: CoreIndex,
}
/// Takes a prepared collation, along with its context, and produces a candidate receipt
@@ -483,6 +516,7 @@ async fn construct_and_distribute_receipt(
validation_data,
validation_code_hash,
n_validators,
core_index,
} = collation;
let persisted_validation_data_hash = validation_data.hash();
@@ -578,6 +612,7 @@ async fn construct_and_distribute_receipt(
pov,
parent_head_data,
result_sender,
core_index,
})
.await;
}
+256 -5
View File
@@ -30,13 +30,16 @@ use polkadot_node_subsystem::{
use polkadot_node_subsystem_test_helpers::{subsystem_test_harness, TestSubsystemContextHandle};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_primitives::{
AsyncBackingParams, CollatorPair, HeadData, Id as ParaId, Id, PersistedValidationData,
async_backing::{BackingState, CandidatePendingAvailability},
AsyncBackingParams, BlockNumber, CollatorPair, HeadData, PersistedValidationData,
ScheduledCore, ValidationCode,
};
use rstest::rstest;
use sp_keyring::sr25519::Keyring as Sr25519Keyring;
use std::pin::Pin;
use test_helpers::{dummy_candidate_descriptor, dummy_hash, dummy_head_data, dummy_validator};
use test_helpers::{
dummy_candidate_descriptor, dummy_hash, dummy_head_data, dummy_validator, make_candidate,
};
type VirtualOverseer = TestSubsystemContextHandle<CollationGenerationMessage>;
@@ -105,9 +108,9 @@ impl Future for TestCollator {
impl Unpin for TestCollator {}
async fn overseer_recv(overseer: &mut VirtualOverseer) -> AllMessages {
const TIMEOUT: std::time::Duration = std::time::Duration::from_millis(2000);
const TIMEOUT: std::time::Duration = std::time::Duration::from_millis(2000);
async fn overseer_recv(overseer: &mut VirtualOverseer) -> AllMessages {
overseer
.recv()
.timeout(TIMEOUT)
@@ -135,6 +138,41 @@ fn scheduled_core_for<Id: Into<ParaId>>(para_id: Id) -> ScheduledCore {
ScheduledCore { para_id: para_id.into(), collator: None }
}
fn dummy_candidate_pending_availability(
para_id: ParaId,
candidate_relay_parent: Hash,
relay_parent_number: BlockNumber,
) -> CandidatePendingAvailability {
let (candidate, _pvd) = make_candidate(
candidate_relay_parent,
relay_parent_number,
para_id,
dummy_head_data(),
HeadData(vec![1]),
ValidationCode(vec![1, 2, 3]).hash(),
);
let candidate_hash = candidate.hash();
CandidatePendingAvailability {
candidate_hash,
descriptor: candidate.descriptor,
commitments: candidate.commitments,
relay_parent_number,
max_pov_size: 5 * 1024 * 1024,
}
}
fn dummy_backing_state(pending_availability: Vec<CandidatePendingAvailability>) -> BackingState {
let constraints = helpers::dummy_constraints(
0,
vec![0],
dummy_head_data(),
ValidationCodeHash::from(Hash::repeat_byte(42)),
);
BackingState { constraints, pending_availability }
}
#[rstest]
#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT - 1)]
#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT)]
@@ -176,6 +214,12 @@ fn requests_availability_per_relay_parent(#[case] runtime_version: u32) {
))) if runtime_version >= RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT => {
tx.send(Ok(BTreeMap::new())).unwrap();
},
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_hash,
RuntimeApiRequest::ParaBackingState(_para_id, tx),
))) => {
tx.send(Ok(Some(dummy_backing_state(vec![])))).unwrap();
},
Some(msg) => panic!("didn't expect any other overseer requests given no availability cores; got {:?}", msg),
}
}
@@ -273,6 +317,12 @@ fn requests_validation_data_for_scheduled_matches(#[case] runtime_version: u32)
))) if runtime_version >= RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT => {
tx.send(Ok(BTreeMap::new())).unwrap();
},
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_hash,
RuntimeApiRequest::ParaBackingState(_para_id, tx),
))) => {
tx.send(Ok(Some(dummy_backing_state(vec![])))).unwrap();
},
Some(msg) => {
panic!("didn't expect any other overseer requests; got {:?}", msg)
},
@@ -384,6 +434,12 @@ fn sends_distribute_collation_message(#[case] runtime_version: u32) {
))) if runtime_version >= RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT => {
tx.send(Ok(BTreeMap::new())).unwrap();
},
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_hash,
RuntimeApiRequest::ParaBackingState(_para_id, tx),
))) => {
tx.send(Ok(Some(dummy_backing_state(vec![])))).unwrap();
},
Some(msg @ AllMessages::CollatorProtocol(_)) => {
inner_to_collator_protocol.lock().await.push(msg);
},
@@ -564,6 +620,12 @@ fn fallback_when_no_validation_code_hash_api(#[case] runtime_version: u32) {
let res = BTreeMap::<CoreIndex, VecDeque<ParaId>>::new();
tx.send(Ok(res)).unwrap();
},
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_hash,
RuntimeApiRequest::ParaBackingState(_para_id, tx),
))) => {
tx.send(Ok(Some(dummy_backing_state(vec![])))).unwrap();
},
Some(msg) => {
panic!("didn't expect any other overseer requests; got {:?}", msg)
},
@@ -611,6 +673,7 @@ fn submit_collation_is_no_op_before_initialization() {
parent_head: vec![1, 2, 3].into(),
validation_code_hash: Hash::repeat_byte(1).into(),
result_sender: None,
core_index: CoreIndex(0),
}),
})
.await;
@@ -647,6 +710,7 @@ fn submit_collation_leads_to_distribution() {
parent_head: vec![1, 2, 3].into(),
validation_code_hash,
result_sender: None,
core_index: CoreIndex(0),
}),
})
.await;
@@ -721,6 +785,9 @@ fn distribute_collation_for_occupied_core_with_async_backing_enabled(#[case] run
test_harness(|mut virtual_overseer| async move {
helpers::initialize_collator(&mut virtual_overseer, para_id).await;
helpers::activate_new_head(&mut virtual_overseer, activated_hash).await;
let pending_availability =
vec![dummy_candidate_pending_availability(para_id, activated_hash, 1)];
helpers::handle_runtime_calls_on_new_head_activation(
&mut virtual_overseer,
activated_hash,
@@ -728,6 +795,7 @@ fn distribute_collation_for_occupied_core_with_async_backing_enabled(#[case] run
cores,
runtime_version,
claim_queue,
pending_availability,
)
.await;
helpers::handle_core_processing_for_a_leaf(
@@ -736,6 +804,131 @@ fn distribute_collation_for_occupied_core_with_async_backing_enabled(#[case] run
para_id,
// `CoreState` is `Occupied` => `OccupiedCoreAssumption` is `Included`
OccupiedCoreAssumption::Included,
1,
)
.await;
virtual_overseer
});
}
// There are variable number of cores of cores in `Occupied` state and async backing is enabled.
// On new head activation `CollationGeneration` should produce and distribute a new collation
// with proper assumption about the para candidate chain availability at next block.
#[rstest]
#[case(0)]
#[case(1)]
#[case(2)]
fn distribute_collation_for_occupied_cores_with_async_backing_enabled_and_elastic_scaling(
#[case] candidates_pending_avail: u32,
) {
let activated_hash: Hash = [1; 32].into();
let para_id = ParaId::from(5);
let cores = (0..candidates_pending_avail)
.into_iter()
.map(|idx| {
CoreState::Occupied(polkadot_primitives::OccupiedCore {
next_up_on_available: Some(ScheduledCore { para_id, collator: None }),
occupied_since: 0,
time_out_at: 10,
next_up_on_time_out: Some(ScheduledCore { para_id, collator: None }),
availability: Default::default(), // doesn't matter
group_responsible: polkadot_primitives::GroupIndex(idx as u32),
candidate_hash: Default::default(),
candidate_descriptor: dummy_candidate_descriptor(dummy_hash()),
})
})
.collect::<Vec<_>>();
let pending_availability = (0..candidates_pending_avail)
.into_iter()
.map(|_idx| dummy_candidate_pending_availability(para_id, activated_hash, 0))
.collect::<Vec<_>>();
let claim_queue = cores
.iter()
.enumerate()
.map(|(idx, _core)| (CoreIndex::from(idx as u32), VecDeque::from([para_id])))
.collect::<BTreeMap<_, _>>();
let total_cores = cores.len();
test_harness(|mut virtual_overseer| async move {
helpers::initialize_collator(&mut virtual_overseer, para_id).await;
helpers::activate_new_head(&mut virtual_overseer, activated_hash).await;
helpers::handle_runtime_calls_on_new_head_activation(
&mut virtual_overseer,
activated_hash,
AsyncBackingParams { max_candidate_depth: 1, allowed_ancestry_len: 1 },
cores,
// Using latest runtime with the fancy claim queue exposed.
RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT,
claim_queue,
pending_availability,
)
.await;
helpers::handle_core_processing_for_a_leaf(
&mut virtual_overseer,
activated_hash,
para_id,
// `CoreState` is `Occupied` => `OccupiedCoreAssumption` is `Included`
OccupiedCoreAssumption::Included,
total_cores,
)
.await;
virtual_overseer
});
}
// There are variable number of cores of cores in `Free` state and async backing is enabled.
// On new head activation `CollationGeneration` should produce and distribute a new collation
// with proper assumption about the para candidate chain availability at next block.
#[rstest]
#[case(0)]
#[case(1)]
#[case(2)]
fn distribute_collation_for_free_cores_with_async_backing_enabled_and_elastic_scaling(
#[case] candidates_pending_avail: u32,
) {
let activated_hash: Hash = [1; 32].into();
let para_id = ParaId::from(5);
let cores = (0..candidates_pending_avail)
.into_iter()
.map(|_idx| CoreState::Scheduled(ScheduledCore { para_id, collator: None }))
.collect::<Vec<_>>();
let claim_queue = cores
.iter()
.enumerate()
.map(|(idx, _core)| (CoreIndex::from(idx as u32), VecDeque::from([para_id])))
.collect::<BTreeMap<_, _>>();
let total_cores = cores.len();
test_harness(|mut virtual_overseer| async move {
helpers::initialize_collator(&mut virtual_overseer, para_id).await;
helpers::activate_new_head(&mut virtual_overseer, activated_hash).await;
helpers::handle_runtime_calls_on_new_head_activation(
&mut virtual_overseer,
activated_hash,
AsyncBackingParams { max_candidate_depth: 1, allowed_ancestry_len: 1 },
cores,
// Using latest runtime with the fancy claim queue exposed.
RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT,
claim_queue,
vec![],
)
.await;
helpers::handle_core_processing_for_a_leaf(
&mut virtual_overseer,
activated_hash,
para_id,
// `CoreState` is `Free` => `OccupiedCoreAssumption` is `Free`
OccupiedCoreAssumption::Free,
total_cores,
)
.await;
@@ -777,6 +970,7 @@ fn no_collation_is_distributed_for_occupied_core_with_async_backing_disabled(
cores,
runtime_version,
claim_queue,
vec![],
)
.await;
@@ -785,8 +979,38 @@ fn no_collation_is_distributed_for_occupied_core_with_async_backing_disabled(
}
mod helpers {
use polkadot_primitives::{
async_backing::{Constraints, InboundHrmpLimitations},
BlockNumber,
};
use super::*;
// A set for dummy constraints for `ParaBackingState``
pub(crate) fn dummy_constraints(
min_relay_parent_number: BlockNumber,
valid_watermarks: Vec<BlockNumber>,
required_parent: HeadData,
validation_code_hash: ValidationCodeHash,
) -> Constraints {
Constraints {
min_relay_parent_number,
max_pov_size: 5 * 1024 * 1024,
max_code_size: 1_000_000,
ump_remaining: 10,
ump_remaining_bytes: 1_000,
max_ump_num_per_candidate: 10,
dmp_remaining_messages: vec![],
hrmp_inbound: InboundHrmpLimitations { valid_watermarks },
hrmp_channels_out: vec![],
max_hrmp_num_per_candidate: 0,
required_parent,
validation_code_hash,
upgrade_restriction: None,
future_validation_code: None,
}
}
// Sends `Initialize` with a collator config
pub async fn initialize_collator(virtual_overseer: &mut VirtualOverseer, para_id: ParaId) {
virtual_overseer
@@ -822,7 +1046,8 @@ mod helpers {
async_backing_params: AsyncBackingParams,
cores: Vec<CoreState>,
runtime_version: u32,
claim_queue: BTreeMap<CoreIndex, VecDeque<Id>>,
claim_queue: BTreeMap<CoreIndex, VecDeque<ParaId>>,
pending_availability: Vec<CandidatePendingAvailability>,
) {
assert_matches!(
overseer_recv(virtual_overseer).await,
@@ -857,6 +1082,25 @@ mod helpers {
}
);
// Process the `ParaBackingState` message, and return some dummy state.
let message = overseer_recv(virtual_overseer).await;
let para_id = match message {
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_,
RuntimeApiRequest::ParaBackingState(p_id, _),
)) => p_id,
_ => panic!("received unexpected message {:?}", message),
};
assert_matches!(
message,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(parent, RuntimeApiRequest::ParaBackingState(p_id, tx))
) if parent == activated_hash && p_id == para_id => {
tx.send(Ok(Some(dummy_backing_state(pending_availability)))).unwrap();
}
);
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
@@ -889,7 +1133,14 @@ mod helpers {
activated_hash: Hash,
para_id: ParaId,
expected_occupied_core_assumption: OccupiedCoreAssumption,
cores_assigned: usize,
) {
// Expect no messages if no cores is assigned to the para
if cores_assigned == 0 {
assert!(overseer_recv(virtual_overseer).timeout(TIMEOUT / 2).await.is_none());
return
}
// Some hardcoded data - if needed, extract to parameters
let validation_code_hash = ValidationCodeHash::from(Hash::repeat_byte(42));
let parent_head = HeadData::from(vec![1, 2, 3]);
@@ -203,20 +203,40 @@ struct PeerData {
version: CollationVersion,
}
/// A type wrapping a collation and it's designated core index.
struct CollationWithCoreIndex(Collation, CoreIndex);
impl CollationWithCoreIndex {
/// Returns inner collation ref.
pub fn collation(&self) -> &Collation {
&self.0
}
/// Returns inner collation mut ref.
pub fn collation_mut(&mut self) -> &mut Collation {
&mut self.0
}
/// Returns inner core index.
pub fn core_index(&self) -> &CoreIndex {
&self.1
}
}
struct PerRelayParent {
prospective_parachains_mode: ProspectiveParachainsMode,
/// Validators group responsible for backing candidates built
/// Per core index validators group responsible for backing candidates built
/// on top of this relay parent.
validator_group: ValidatorGroup,
validator_group: HashMap<CoreIndex, ValidatorGroup>,
/// Distributed collations.
collations: HashMap<CandidateHash, Collation>,
collations: HashMap<CandidateHash, CollationWithCoreIndex>,
}
impl PerRelayParent {
fn new(mode: ProspectiveParachainsMode) -> Self {
Self {
prospective_parachains_mode: mode,
validator_group: ValidatorGroup::default(),
validator_group: HashMap::default(),
collations: HashMap::new(),
}
}
@@ -350,6 +370,7 @@ async fn distribute_collation<Context>(
pov: PoV,
parent_head_data: HeadData,
result_sender: Option<oneshot::Sender<CollationSecondedSignal>>,
core_index: CoreIndex,
) -> Result<()> {
let candidate_relay_parent = receipt.descriptor.relay_parent;
let candidate_hash = receipt.hash();
@@ -422,7 +443,22 @@ async fn distribute_collation<Context>(
);
}
let our_core = our_cores[0];
// Double check that the specified `core_index` is among the ones our para has assignments for.
if !our_cores.iter().any(|assigned_core| assigned_core == &core_index) {
gum::warn!(
target: LOG_TARGET,
para_id = %id,
relay_parent = ?candidate_relay_parent,
cores = ?our_cores,
?core_index,
"Attempting to distribute collation for a core we are not assigned to ",
);
return Ok(())
}
let our_core = core_index;
// Determine the group on that core.
//
// When prospective parachains are disabled, candidate relay parent here is
@@ -464,10 +500,12 @@ async fn distribute_collation<Context>(
"Accepted collation, connecting to validators."
);
let validators_at_relay_parent = &mut per_relay_parent.validator_group.validators;
if validators_at_relay_parent.is_empty() {
*validators_at_relay_parent = validators;
}
// Insert validator group for the `core_index` at relay parent.
per_relay_parent.validator_group.entry(core_index).or_insert_with(|| {
let mut group = ValidatorGroup::default();
group.validators = validators;
group
});
// Update a set of connected validators if necessary.
connect_to_validators(ctx, &state.validator_groups_buf).await;
@@ -484,7 +522,10 @@ async fn distribute_collation<Context>(
per_relay_parent.collations.insert(
candidate_hash,
Collation { receipt, pov, parent_head_data, status: CollationStatus::Created },
CollationWithCoreIndex(
Collation { receipt, pov, parent_head_data, status: CollationStatus::Created },
core_index,
),
);
// If prospective parachains are disabled, a leaf should be known to peer.
@@ -690,7 +731,10 @@ async fn advertise_collation<Context>(
advertisement_timeouts: &mut FuturesUnordered<ResetInterestTimeout>,
metrics: &Metrics,
) {
for (candidate_hash, collation) in per_relay_parent.collations.iter_mut() {
for (candidate_hash, collation_and_core) in per_relay_parent.collations.iter_mut() {
let core_index = *collation_and_core.core_index();
let collation = collation_and_core.collation_mut();
// Check that peer will be able to request the collation.
if let CollationVersion::V1 = protocol_version {
if per_relay_parent.prospective_parachains_mode.is_enabled() {
@@ -704,11 +748,17 @@ async fn advertise_collation<Context>(
}
}
let should_advertise =
per_relay_parent
.validator_group
.should_advertise_to(candidate_hash, peer_ids, &peer);
let Some(validator_group) = per_relay_parent.validator_group.get_mut(&core_index) else {
gum::debug!(
target: LOG_TARGET,
?relay_parent,
?core_index,
"Skipping advertising to validator, validator group for core not found",
);
return
};
let should_advertise = validator_group.should_advertise_to(candidate_hash, peer_ids, &peer);
match should_advertise {
ShouldAdvertiseTo::Yes => {},
ShouldAdvertiseTo::NotAuthority | ShouldAdvertiseTo::AlreadyAdvertised => {
@@ -756,9 +806,7 @@ async fn advertise_collation<Context>(
))
.await;
per_relay_parent
.validator_group
.advertised_to_peer(candidate_hash, &peer_ids, peer);
validator_group.advertised_to_peer(candidate_hash, &peer_ids, peer);
advertisement_timeouts.push(ResetInterestTimeout::new(
*candidate_hash,
@@ -790,6 +838,7 @@ async fn process_msg<Context>(
pov,
parent_head_data,
result_sender,
core_index,
} => {
let _span1 = state
.span_per_relay_parent
@@ -820,6 +869,7 @@ async fn process_msg<Context>(
pov,
parent_head_data,
result_sender,
core_index,
)
.await?;
},
@@ -1053,7 +1103,7 @@ async fn handle_incoming_request<Context>(
};
let mode = per_relay_parent.prospective_parachains_mode;
let collation = match &req {
let collation_with_core = match &req {
VersionedCollationRequest::V1(_) if !mode.is_enabled() =>
per_relay_parent.collations.values_mut().next(),
VersionedCollationRequest::V2(req) =>
@@ -1070,22 +1120,24 @@ async fn handle_incoming_request<Context>(
return Ok(())
},
};
let (receipt, pov, parent_head_data) = if let Some(collation) = collation {
collation.status.advance_to_requested();
(
collation.receipt.clone(),
collation.pov.clone(),
collation.parent_head_data.clone(),
)
} else {
gum::warn!(
target: LOG_TARGET,
relay_parent = %relay_parent,
"received a `RequestCollation` for a relay parent we don't have collation stored.",
);
let (receipt, pov, parent_head_data) =
if let Some(collation_with_core) = collation_with_core {
let collation = collation_with_core.collation_mut();
collation.status.advance_to_requested();
(
collation.receipt.clone(),
collation.pov.clone(),
collation.parent_head_data.clone(),
)
} else {
gum::warn!(
target: LOG_TARGET,
relay_parent = %relay_parent,
"received a `RequestCollation` for a relay parent we don't have collation stored.",
);
return Ok(())
};
return Ok(())
};
state.metrics.on_collation_sent_requested();
@@ -1340,7 +1392,9 @@ where
.remove(removed)
.map(|per_relay_parent| per_relay_parent.collations)
.unwrap_or_default();
for collation in collations.into_values() {
for collation_with_core in collations.into_values() {
let collation = collation_with_core.collation();
let candidate_hash = collation.receipt.hash();
state.collation_result_senders.remove(&candidate_hash);
state.validator_groups_buf.remove_candidate(&candidate_hash);
@@ -1477,7 +1531,7 @@ async fn run_inner<Context>(
continue
};
let next_collation = {
let next_collation_with_core = {
let per_relay_parent = match state.per_relay_parent.get(&relay_parent) {
Some(per_relay_parent) => per_relay_parent,
None => continue,
@@ -1497,7 +1551,8 @@ async fn run_inner<Context>(
}
};
if let Some(collation) = next_collation {
if let Some(collation_with_core) = next_collation_with_core {
let collation = collation_with_core.collation();
let receipt = collation.receipt.clone();
let pov = collation.pov.clone();
let parent_head_data = collation.parent_head_data.clone();
@@ -377,6 +377,7 @@ async fn distribute_collation_with_receipt(
pov: pov.clone(),
parent_head_data: HeadData(vec![1, 2, 3]),
result_sender: None,
core_index: CoreIndex(0),
},
)
.await;
@@ -277,6 +277,7 @@ fn distribute_collation_from_implicit_view() {
pov: pov.clone(),
parent_head_data: HeadData(vec![1, 2, 3]),
result_sender: None,
core_index: CoreIndex(0),
},
)
.await;
@@ -358,6 +359,7 @@ fn distribute_collation_up_to_limit() {
pov: pov.clone(),
parent_head_data: HeadData(vec![1, 2, 3]),
result_sender: None,
core_index: CoreIndex(0),
},
)
.await;
@@ -45,14 +45,22 @@ use futures::FutureExt;
use polkadot_node_network_protocol::PeerId;
use polkadot_primitives::{AuthorityDiscoveryId, CandidateHash, GroupIndex, SessionIndex};
/// Elastic scaling: how many candidates per relay chain block the collator supports building.
pub const MAX_CHAINED_CANDIDATES_PER_RCB: NonZeroUsize = match NonZeroUsize::new(3) {
Some(cap) => cap,
None => panic!("max candidates per rcb cannot be zero"),
};
/// The ring buffer stores at most this many unique validator groups.
///
/// This value should be chosen in way that all groups assigned to our para
/// in the view can fit into the buffer.
pub const VALIDATORS_BUFFER_CAPACITY: NonZeroUsize = match NonZeroUsize::new(3) {
Some(cap) => cap,
None => panic!("buffer capacity must be non-zero"),
};
/// in the view can fit into the buffer multiplied by amount of candidates we support per relay
/// chain block in the case of elastic scaling.
pub const VALIDATORS_BUFFER_CAPACITY: NonZeroUsize =
match NonZeroUsize::new(3 * MAX_CHAINED_CANDIDATES_PER_RCB.get()) {
Some(cap) => cap,
None => panic!("buffer capacity must be non-zero"),
};
/// Unique identifier of a validators group.
#[derive(Debug)]
+4 -2
View File
@@ -31,8 +31,8 @@ use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
use polkadot_primitives::{
BlakeTwo256, BlockNumber, CandidateCommitments, CandidateHash, CollatorPair,
CommittedCandidateReceipt, CompactStatement, EncodeAs, Hash, HashT, HeadData, Id as ParaId,
PersistedValidationData, SessionIndex, Signed, UncheckedSigned, ValidationCode,
CommittedCandidateReceipt, CompactStatement, CoreIndex, EncodeAs, Hash, HashT, HeadData,
Id as ParaId, PersistedValidationData, SessionIndex, Signed, UncheckedSigned, ValidationCode,
ValidationCodeHash, ValidatorIndex, MAX_CODE_SIZE, MAX_POV_SIZE,
};
pub use sp_consensus_babe::{
@@ -524,6 +524,8 @@ pub struct SubmitCollationParams {
/// okay to just drop it. However, if it is called, it should be called with the signed
/// statement of a parachain validator seconding the collation.
pub result_sender: Option<futures::channel::oneshot::Sender<CollationSecondedSignal>>,
/// The core index on which the resulting candidate should be backed
pub core_index: CoreIndex,
}
/// This is the data we keep available for each candidate included in the relay chain.
@@ -228,6 +228,8 @@ pub enum CollatorProtocolMessage {
/// The result sender should be informed when at least one parachain validator seconded the
/// collation. It is also completely okay to just drop the sender.
result_sender: Option<oneshot::Sender<CollationSecondedSignal>>,
/// The core index where the candidate should be backed.
core_index: CoreIndex,
},
/// Report a collator as having provided an invalid collation. This should lead to disconnect
/// and blacklist of the collator.
+2 -1
View File
@@ -30,7 +30,7 @@ use polkadot_node_subsystem::{
messages::{RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender},
overseer, SubsystemSender,
};
use polkadot_primitives::{slashing, CoreIndex, ExecutorParams};
use polkadot_primitives::{async_backing::BackingState, slashing, CoreIndex, ExecutorParams};
pub use overseer::{
gen::{OrchestraError as OverseerError, Timeout},
@@ -308,6 +308,7 @@ specialize_requests! {
fn request_disabled_validators() -> Vec<ValidatorIndex>; DisabledValidators;
fn request_async_backing_params() -> AsyncBackingParams; AsyncBackingParams;
fn request_claim_queue() -> BTreeMap<CoreIndex, VecDeque<ParaId>>; ClaimQueue;
fn request_para_backing_state(para_id: ParaId) -> Option<BackingState>; ParaBackingState;
}
/// Requests executor parameters from the runtime effective at given relay-parent. First obtains