mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 09:21:05 +00:00
elastic scaling: preserve candidate ordering in provisioner (#3778)
https://github.com/paritytech/polkadot-sdk/issues/3742
This commit is contained in:
@@ -14,6 +14,8 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use fatality::Nested;
|
||||
use futures::channel::{mpsc, oneshot};
|
||||
|
||||
@@ -24,7 +26,7 @@ use polkadot_node_subsystem::{
|
||||
use polkadot_node_subsystem_util::{runtime, Error as UtilError};
|
||||
use polkadot_primitives::{BackedCandidate, ValidationCodeHash};
|
||||
|
||||
use crate::LOG_TARGET;
|
||||
use crate::{ParaId, LOG_TARGET};
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
pub type FatalResult<T> = std::result::Result<T, FatalError>;
|
||||
@@ -55,7 +57,7 @@ pub enum Error {
|
||||
InvalidSignature,
|
||||
|
||||
#[error("Failed to send candidates {0:?}")]
|
||||
Send(Vec<BackedCandidate>),
|
||||
Send(HashMap<ParaId, Vec<BackedCandidate>>),
|
||||
|
||||
#[error("FetchPoV failed")]
|
||||
FetchPoV,
|
||||
|
||||
@@ -2231,15 +2231,16 @@ async fn handle_statement_message<Context>(
|
||||
|
||||
fn handle_get_backed_candidates_message(
|
||||
state: &State,
|
||||
requested_candidates: Vec<(CandidateHash, Hash)>,
|
||||
tx: oneshot::Sender<Vec<BackedCandidate>>,
|
||||
requested_candidates: HashMap<ParaId, Vec<(CandidateHash, Hash)>>,
|
||||
tx: oneshot::Sender<HashMap<ParaId, Vec<BackedCandidate>>>,
|
||||
metrics: &Metrics,
|
||||
) -> Result<(), Error> {
|
||||
let _timer = metrics.time_get_backed_candidates();
|
||||
|
||||
let backed = requested_candidates
|
||||
.into_iter()
|
||||
.filter_map(|(candidate_hash, relay_parent)| {
|
||||
let mut backed = HashMap::with_capacity(requested_candidates.len());
|
||||
|
||||
for (para_id, para_candidates) in requested_candidates {
|
||||
for (candidate_hash, relay_parent) in para_candidates.iter() {
|
||||
let rp_state = match state.per_relay_parent.get(&relay_parent) {
|
||||
Some(rp_state) => rp_state,
|
||||
None => {
|
||||
@@ -2249,13 +2250,13 @@ fn handle_get_backed_candidates_message(
|
||||
?candidate_hash,
|
||||
"Requested candidate's relay parent is out of view",
|
||||
);
|
||||
return None
|
||||
break
|
||||
},
|
||||
};
|
||||
rp_state
|
||||
let maybe_backed_candidate = rp_state
|
||||
.table
|
||||
.attested_candidate(
|
||||
&candidate_hash,
|
||||
candidate_hash,
|
||||
&rp_state.table_context,
|
||||
rp_state.minimum_backing_votes,
|
||||
)
|
||||
@@ -2265,9 +2266,18 @@ fn handle_get_backed_candidates_message(
|
||||
&rp_state.table_context,
|
||||
rp_state.inject_core_index,
|
||||
)
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
});
|
||||
|
||||
if let Some(backed_candidate) = maybe_backed_candidate {
|
||||
backed
|
||||
.entry(para_id)
|
||||
.or_insert_with(|| Vec::with_capacity(para_candidates.len()))
|
||||
.push(backed_candidate);
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tx.send(backed).map_err(|data| Error::Send(data))?;
|
||||
Ok(())
|
||||
|
||||
@@ -663,13 +663,19 @@ fn backing_works(#[case] elastic_scaling_mvp: bool) {
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let msg = CandidateBackingMessage::GetBackedCandidates(
|
||||
vec![(candidate_a_hash, test_state.relay_parent)],
|
||||
std::iter::once((
|
||||
test_state.chain_ids[0],
|
||||
vec![(candidate_a_hash, test_state.relay_parent)],
|
||||
))
|
||||
.collect(),
|
||||
tx,
|
||||
);
|
||||
|
||||
virtual_overseer.send(FromOrchestra::Communication { msg }).await;
|
||||
|
||||
let candidates = rx.await.unwrap();
|
||||
let mut candidates = rx.await.unwrap();
|
||||
assert_eq!(1, candidates.len());
|
||||
let candidates = candidates.remove(&test_state.chain_ids[0]).unwrap();
|
||||
assert_eq!(1, candidates.len());
|
||||
assert_eq!(candidates[0].validity_votes().len(), 3);
|
||||
|
||||
@@ -695,6 +701,323 @@ fn backing_works(#[case] elastic_scaling_mvp: bool) {
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn get_backed_candidate_preserves_order() {
|
||||
let mut test_state = TestState::default();
|
||||
test_state
|
||||
.node_features
|
||||
.resize((node_features::FeatureIndex::ElasticScalingMVP as u8 + 1) as usize, false);
|
||||
test_state
|
||||
.node_features
|
||||
.set(node_features::FeatureIndex::ElasticScalingMVP as u8 as usize, true);
|
||||
|
||||
// Set a single validator as the first validator group. It simplifies the test.
|
||||
test_state.validator_groups.0[0] = vec![ValidatorIndex(2)];
|
||||
// Add another validator group for the third core.
|
||||
test_state.validator_groups.0.push(vec![ValidatorIndex(3)]);
|
||||
// Assign the second core to the same para as the first one.
|
||||
test_state.availability_cores[1] =
|
||||
CoreState::Scheduled(ScheduledCore { para_id: test_state.chain_ids[0], collator: None });
|
||||
// Add another availability core for paraid 2.
|
||||
test_state.availability_cores.push(CoreState::Scheduled(ScheduledCore {
|
||||
para_id: test_state.chain_ids[1],
|
||||
collator: None,
|
||||
}));
|
||||
|
||||
test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
|
||||
test_startup(&mut virtual_overseer, &test_state).await;
|
||||
|
||||
let pov_a = PoV { block_data: BlockData(vec![1, 2, 3]) };
|
||||
let pov_b = PoV { block_data: BlockData(vec![3, 4, 5]) };
|
||||
let pov_c = PoV { block_data: BlockData(vec![5, 6, 7]) };
|
||||
let validation_code_ab = ValidationCode(vec![1, 2, 3]);
|
||||
let validation_code_c = ValidationCode(vec![4, 5, 6]);
|
||||
|
||||
let parent_head_data_a = test_state.head_data.get(&test_state.chain_ids[0]).unwrap();
|
||||
let parent_head_data_b = {
|
||||
let mut head = parent_head_data_a.clone();
|
||||
head.0[0] = 98;
|
||||
head
|
||||
};
|
||||
let output_head_data_b = {
|
||||
let mut head = parent_head_data_a.clone();
|
||||
head.0[0] = 99;
|
||||
head
|
||||
};
|
||||
let parent_head_data_c = test_state.head_data.get(&test_state.chain_ids[1]).unwrap();
|
||||
let output_head_data_c = {
|
||||
let mut head = parent_head_data_c.clone();
|
||||
head.0[0] = 97;
|
||||
head
|
||||
};
|
||||
|
||||
let pvd_a = PersistedValidationData {
|
||||
parent_head: parent_head_data_a.clone(),
|
||||
relay_parent_number: 0_u32.into(),
|
||||
max_pov_size: 1024,
|
||||
relay_parent_storage_root: dummy_hash(),
|
||||
};
|
||||
let pvd_b = PersistedValidationData {
|
||||
parent_head: parent_head_data_b.clone(),
|
||||
relay_parent_number: 0_u32.into(),
|
||||
max_pov_size: 1024,
|
||||
relay_parent_storage_root: dummy_hash(),
|
||||
};
|
||||
let pvd_c = PersistedValidationData {
|
||||
parent_head: parent_head_data_c.clone(),
|
||||
relay_parent_number: 0_u32.into(),
|
||||
max_pov_size: 1024,
|
||||
relay_parent_storage_root: dummy_hash(),
|
||||
};
|
||||
|
||||
let candidate_a = TestCandidateBuilder {
|
||||
para_id: test_state.chain_ids[0],
|
||||
relay_parent: test_state.relay_parent,
|
||||
pov_hash: pov_a.hash(),
|
||||
head_data: parent_head_data_b.clone(),
|
||||
erasure_root: make_erasure_root(&test_state, pov_a.clone(), pvd_a.clone()),
|
||||
validation_code: validation_code_ab.0.clone(),
|
||||
persisted_validation_data_hash: pvd_a.hash(),
|
||||
}
|
||||
.build();
|
||||
let candidate_b = TestCandidateBuilder {
|
||||
para_id: test_state.chain_ids[0],
|
||||
relay_parent: test_state.relay_parent,
|
||||
pov_hash: pov_b.hash(),
|
||||
head_data: output_head_data_b.clone(),
|
||||
erasure_root: make_erasure_root(&test_state, pov_b.clone(), pvd_b.clone()),
|
||||
validation_code: validation_code_ab.0.clone(),
|
||||
persisted_validation_data_hash: pvd_b.hash(),
|
||||
}
|
||||
.build();
|
||||
let candidate_c = TestCandidateBuilder {
|
||||
para_id: test_state.chain_ids[1],
|
||||
relay_parent: test_state.relay_parent,
|
||||
pov_hash: pov_c.hash(),
|
||||
head_data: output_head_data_c.clone(),
|
||||
erasure_root: make_erasure_root(&test_state, pov_b.clone(), pvd_c.clone()),
|
||||
validation_code: validation_code_c.0.clone(),
|
||||
persisted_validation_data_hash: pvd_c.hash(),
|
||||
}
|
||||
.build();
|
||||
let candidate_a_hash = candidate_a.hash();
|
||||
let candidate_b_hash = candidate_b.hash();
|
||||
let candidate_c_hash = candidate_c.hash();
|
||||
|
||||
// Back a chain of two candidates for the first paraid. Back one candidate for the second
|
||||
// paraid.
|
||||
for (candidate, pvd, validator_index) in [
|
||||
(candidate_a, pvd_a, ValidatorIndex(2)),
|
||||
(candidate_b, pvd_b, ValidatorIndex(1)),
|
||||
(candidate_c, pvd_c, ValidatorIndex(3)),
|
||||
] {
|
||||
let public = Keystore::sr25519_generate_new(
|
||||
&*test_state.keystore,
|
||||
ValidatorId::ID,
|
||||
Some(&test_state.validators[validator_index.0 as usize].to_seed()),
|
||||
)
|
||||
.expect("Insert key into keystore");
|
||||
|
||||
let signed = SignedFullStatementWithPVD::sign(
|
||||
&test_state.keystore,
|
||||
StatementWithPVD::Seconded(candidate.clone(), pvd.clone()),
|
||||
&test_state.signing_context,
|
||||
validator_index,
|
||||
&public.into(),
|
||||
)
|
||||
.ok()
|
||||
.flatten()
|
||||
.expect("should be signed");
|
||||
|
||||
let statement =
|
||||
CandidateBackingMessage::Statement(test_state.relay_parent, signed.clone());
|
||||
|
||||
virtual_overseer.send(FromOrchestra::Communication { msg: statement }).await;
|
||||
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::Provisioner(
|
||||
ProvisionerMessage::ProvisionableData(
|
||||
_,
|
||||
ProvisionableData::BackedCandidate(candidate_receipt)
|
||||
)
|
||||
) => {
|
||||
assert_eq!(candidate_receipt, candidate.to_plain());
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
// Happy case, all candidates should be present.
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let msg = CandidateBackingMessage::GetBackedCandidates(
|
||||
[
|
||||
(
|
||||
test_state.chain_ids[0],
|
||||
vec![
|
||||
(candidate_a_hash, test_state.relay_parent),
|
||||
(candidate_b_hash, test_state.relay_parent),
|
||||
],
|
||||
),
|
||||
(test_state.chain_ids[1], vec![(candidate_c_hash, test_state.relay_parent)]),
|
||||
]
|
||||
.into_iter()
|
||||
.collect(),
|
||||
tx,
|
||||
);
|
||||
virtual_overseer.send(FromOrchestra::Communication { msg }).await;
|
||||
let mut candidates = rx.await.unwrap();
|
||||
assert_eq!(2, candidates.len());
|
||||
assert_eq!(
|
||||
candidates
|
||||
.remove(&test_state.chain_ids[0])
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|c| c.hash())
|
||||
.collect::<Vec<_>>(),
|
||||
vec![candidate_a_hash, candidate_b_hash]
|
||||
);
|
||||
assert_eq!(
|
||||
candidates
|
||||
.remove(&test_state.chain_ids[1])
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|c| c.hash())
|
||||
.collect::<Vec<_>>(),
|
||||
vec![candidate_c_hash]
|
||||
);
|
||||
|
||||
// The first candidate of the first para is invalid (we supply the wrong relay parent or a
|
||||
// wrong candidate hash). No candidates should be returned for paraid 1. ParaId 2 should be
|
||||
// fine.
|
||||
for candidates in [
|
||||
vec![
|
||||
(candidate_a_hash, Hash::repeat_byte(9)),
|
||||
(candidate_b_hash, test_state.relay_parent),
|
||||
],
|
||||
vec![
|
||||
(CandidateHash(Hash::repeat_byte(9)), test_state.relay_parent),
|
||||
(candidate_b_hash, test_state.relay_parent),
|
||||
],
|
||||
] {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let msg = CandidateBackingMessage::GetBackedCandidates(
|
||||
[
|
||||
(test_state.chain_ids[0], candidates),
|
||||
(test_state.chain_ids[1], vec![(candidate_c_hash, test_state.relay_parent)]),
|
||||
]
|
||||
.into_iter()
|
||||
.collect(),
|
||||
tx,
|
||||
);
|
||||
virtual_overseer.send(FromOrchestra::Communication { msg }).await;
|
||||
let mut candidates = rx.await.unwrap();
|
||||
assert_eq!(candidates.len(), 1);
|
||||
|
||||
assert!(candidates.remove(&test_state.chain_ids[0]).is_none());
|
||||
assert_eq!(
|
||||
candidates
|
||||
.remove(&test_state.chain_ids[1])
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|c| c.hash())
|
||||
.collect::<Vec<_>>(),
|
||||
vec![candidate_c_hash]
|
||||
);
|
||||
}
|
||||
|
||||
// The second candidate of the first para is invalid (we supply the wrong relay parent or a
|
||||
// wrong candidate hash). The first candidate of the first para should still be present.
|
||||
// ParaId 2 is fine.
|
||||
for candidates in [
|
||||
vec![
|
||||
(candidate_a_hash, test_state.relay_parent),
|
||||
(candidate_b_hash, Hash::repeat_byte(9)),
|
||||
],
|
||||
vec![
|
||||
(candidate_a_hash, test_state.relay_parent),
|
||||
(CandidateHash(Hash::repeat_byte(9)), test_state.relay_parent),
|
||||
],
|
||||
] {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let msg = CandidateBackingMessage::GetBackedCandidates(
|
||||
[
|
||||
(test_state.chain_ids[0], candidates),
|
||||
(test_state.chain_ids[1], vec![(candidate_c_hash, test_state.relay_parent)]),
|
||||
]
|
||||
.into_iter()
|
||||
.collect(),
|
||||
tx,
|
||||
);
|
||||
virtual_overseer.send(FromOrchestra::Communication { msg }).await;
|
||||
let mut candidates = rx.await.unwrap();
|
||||
assert_eq!(2, candidates.len());
|
||||
assert_eq!(
|
||||
candidates
|
||||
.remove(&test_state.chain_ids[0])
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|c| c.hash())
|
||||
.collect::<Vec<_>>(),
|
||||
vec![candidate_a_hash]
|
||||
);
|
||||
assert_eq!(
|
||||
candidates
|
||||
.remove(&test_state.chain_ids[1])
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|c| c.hash())
|
||||
.collect::<Vec<_>>(),
|
||||
vec![candidate_c_hash]
|
||||
);
|
||||
}
|
||||
|
||||
// Both candidates of para id 1 are invalid (we supply the wrong relay parent or a wrong
|
||||
// candidate hash). No candidates should be returned for para id 1. Para Id 2 is fine.
|
||||
for candidates in [
|
||||
vec![
|
||||
(CandidateHash(Hash::repeat_byte(9)), test_state.relay_parent),
|
||||
(CandidateHash(Hash::repeat_byte(10)), test_state.relay_parent),
|
||||
],
|
||||
vec![
|
||||
(candidate_a_hash, Hash::repeat_byte(9)),
|
||||
(candidate_b_hash, Hash::repeat_byte(10)),
|
||||
],
|
||||
] {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let msg = CandidateBackingMessage::GetBackedCandidates(
|
||||
[
|
||||
(test_state.chain_ids[0], candidates),
|
||||
(test_state.chain_ids[1], vec![(candidate_c_hash, test_state.relay_parent)]),
|
||||
]
|
||||
.into_iter()
|
||||
.collect(),
|
||||
tx,
|
||||
);
|
||||
virtual_overseer.send(FromOrchestra::Communication { msg }).await;
|
||||
let mut candidates = rx.await.unwrap();
|
||||
assert_eq!(candidates.len(), 1);
|
||||
|
||||
assert!(candidates.remove(&test_state.chain_ids[0]).is_none());
|
||||
assert_eq!(
|
||||
candidates
|
||||
.remove(&test_state.chain_ids[1])
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|c| c.hash())
|
||||
.collect::<Vec<_>>(),
|
||||
vec![candidate_c_hash]
|
||||
);
|
||||
}
|
||||
|
||||
virtual_overseer
|
||||
.send(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(
|
||||
ActiveLeavesUpdate::stop_work(test_state.relay_parent),
|
||||
)))
|
||||
.await;
|
||||
virtual_overseer
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn extract_core_index_from_statement_works() {
|
||||
let test_state = TestState::default();
|
||||
@@ -950,13 +1273,19 @@ fn backing_works_while_validation_ongoing() {
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let msg = CandidateBackingMessage::GetBackedCandidates(
|
||||
vec![(candidate_a.hash(), test_state.relay_parent)],
|
||||
std::iter::once((
|
||||
test_state.chain_ids[0],
|
||||
vec![(candidate_a.hash(), test_state.relay_parent)],
|
||||
))
|
||||
.collect(),
|
||||
tx,
|
||||
);
|
||||
|
||||
virtual_overseer.send(FromOrchestra::Communication { msg }).await;
|
||||
|
||||
let candidates = rx.await.unwrap();
|
||||
let mut candidates = rx.await.unwrap();
|
||||
assert_eq!(candidates.len(), 1);
|
||||
let candidates = candidates.remove(&test_state.chain_ids[0]).unwrap();
|
||||
assert_eq!(1, candidates.len());
|
||||
assert_eq!(candidates[0].validity_votes().len(), 3);
|
||||
|
||||
@@ -1565,7 +1894,11 @@ fn backing_works_after_failed_validation() {
|
||||
// and check that it is still alive.
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let msg = CandidateBackingMessage::GetBackedCandidates(
|
||||
vec![(candidate.hash(), test_state.relay_parent)],
|
||||
std::iter::once((
|
||||
test_state.chain_ids[0],
|
||||
vec![(candidate.hash(), test_state.relay_parent)],
|
||||
))
|
||||
.collect(),
|
||||
tx,
|
||||
);
|
||||
|
||||
|
||||
@@ -46,7 +46,7 @@ use polkadot_primitives::{
|
||||
BackedCandidate, BlockNumber, CandidateHash, CandidateReceipt, CoreIndex, CoreState, Hash,
|
||||
Id as ParaId, OccupiedCoreAssumption, SessionIndex, SignedAvailabilityBitfield, ValidatorIndex,
|
||||
};
|
||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
|
||||
mod disputes;
|
||||
mod error;
|
||||
@@ -598,13 +598,11 @@ async fn select_candidate_hashes_from_tracked(
|
||||
candidates: &[CandidateReceipt],
|
||||
relay_parent: Hash,
|
||||
sender: &mut impl overseer::ProvisionerSenderTrait,
|
||||
) -> Result<Vec<(CandidateHash, Hash)>, Error> {
|
||||
) -> Result<HashMap<ParaId, Vec<(CandidateHash, Hash)>>, Error> {
|
||||
let block_number = get_block_number_under_construction(relay_parent, sender).await?;
|
||||
|
||||
let mut selected_candidates =
|
||||
Vec::with_capacity(candidates.len().min(availability_cores.len()));
|
||||
let mut selected_parachains =
|
||||
HashSet::with_capacity(candidates.len().min(availability_cores.len()));
|
||||
HashMap::with_capacity(candidates.len().min(availability_cores.len()));
|
||||
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
@@ -638,7 +636,7 @@ async fn select_candidate_hashes_from_tracked(
|
||||
CoreState::Free => continue,
|
||||
};
|
||||
|
||||
if selected_parachains.contains(&scheduled_core.para_id) {
|
||||
if selected_candidates.contains_key(&scheduled_core.para_id) {
|
||||
// We already picked a candidate for this parachain. Elastic scaling only works with
|
||||
// prospective parachains mode.
|
||||
continue
|
||||
@@ -677,8 +675,10 @@ async fn select_candidate_hashes_from_tracked(
|
||||
"Selected candidate receipt",
|
||||
);
|
||||
|
||||
selected_parachains.insert(candidate.descriptor.para_id);
|
||||
selected_candidates.push((candidate_hash, candidate.descriptor.relay_parent));
|
||||
selected_candidates.insert(
|
||||
candidate.descriptor.para_id,
|
||||
vec![(candidate_hash, candidate.descriptor.relay_parent)],
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -695,12 +695,12 @@ async fn request_backable_candidates(
|
||||
bitfields: &[SignedAvailabilityBitfield],
|
||||
relay_parent: Hash,
|
||||
sender: &mut impl overseer::ProvisionerSenderTrait,
|
||||
) -> Result<Vec<(CandidateHash, Hash)>, Error> {
|
||||
) -> Result<HashMap<ParaId, Vec<(CandidateHash, Hash)>>, Error> {
|
||||
let block_number = get_block_number_under_construction(relay_parent, sender).await?;
|
||||
|
||||
// Record how many cores are scheduled for each paraid. Use a BTreeMap because
|
||||
// we'll need to iterate through them.
|
||||
let mut scheduled_cores: BTreeMap<ParaId, usize> = BTreeMap::new();
|
||||
let mut scheduled_cores_per_para: BTreeMap<ParaId, usize> = BTreeMap::new();
|
||||
// The on-chain ancestors of a para present in availability-cores.
|
||||
let mut ancestors: HashMap<ParaId, Ancestors> =
|
||||
HashMap::with_capacity(availability_cores.len());
|
||||
@@ -709,7 +709,7 @@ async fn request_backable_candidates(
|
||||
let core_idx = CoreIndex(core_idx as u32);
|
||||
match core {
|
||||
CoreState::Scheduled(scheduled_core) => {
|
||||
*scheduled_cores.entry(scheduled_core.para_id).or_insert(0) += 1;
|
||||
*scheduled_cores_per_para.entry(scheduled_core.para_id).or_insert(0) += 1;
|
||||
},
|
||||
CoreState::Occupied(occupied_core) => {
|
||||
let is_available = bitfields_indicate_availability(
|
||||
@@ -726,14 +726,14 @@ async fn request_backable_candidates(
|
||||
|
||||
if let Some(ref scheduled_core) = occupied_core.next_up_on_available {
|
||||
// Request a new backable candidate for the newly scheduled para id.
|
||||
*scheduled_cores.entry(scheduled_core.para_id).or_insert(0) += 1;
|
||||
*scheduled_cores_per_para.entry(scheduled_core.para_id).or_insert(0) += 1;
|
||||
}
|
||||
} else if occupied_core.time_out_at <= block_number {
|
||||
// Timed out before being available.
|
||||
|
||||
if let Some(ref scheduled_core) = occupied_core.next_up_on_time_out {
|
||||
// Candidate's availability timed out, practically same as scheduled.
|
||||
*scheduled_cores.entry(scheduled_core.para_id).or_insert(0) += 1;
|
||||
*scheduled_cores_per_para.entry(scheduled_core.para_id).or_insert(0) += 1;
|
||||
}
|
||||
} else {
|
||||
// Not timed out and not available.
|
||||
@@ -747,10 +747,10 @@ async fn request_backable_candidates(
|
||||
};
|
||||
}
|
||||
|
||||
let mut selected_candidates: Vec<(CandidateHash, Hash)> =
|
||||
Vec::with_capacity(availability_cores.len());
|
||||
let mut selected_candidates: HashMap<ParaId, Vec<(CandidateHash, Hash)>> =
|
||||
HashMap::with_capacity(scheduled_cores_per_para.len());
|
||||
|
||||
for (para_id, core_count) in scheduled_cores {
|
||||
for (para_id, core_count) in scheduled_cores_per_para {
|
||||
let para_ancestors = ancestors.remove(¶_id).unwrap_or_default();
|
||||
|
||||
// If elastic scaling MVP is disabled, only allow one candidate per parachain.
|
||||
@@ -777,7 +777,7 @@ async fn request_backable_candidates(
|
||||
continue
|
||||
}
|
||||
|
||||
selected_candidates.extend(response.into_iter().take(core_count));
|
||||
selected_candidates.insert(para_id, response);
|
||||
}
|
||||
|
||||
Ok(selected_candidates)
|
||||
@@ -826,33 +826,38 @@ async fn select_candidates(
|
||||
selected_candidates.clone(),
|
||||
tx,
|
||||
));
|
||||
let mut candidates = rx.await.map_err(|err| Error::CanceledBackedCandidates(err))?;
|
||||
let candidates = rx.await.map_err(|err| Error::CanceledBackedCandidates(err))?;
|
||||
gum::trace!(target: LOG_TARGET, leaf_hash=?relay_parent,
|
||||
"Got {} backed candidates", candidates.len());
|
||||
|
||||
// keep only one candidate with validation code.
|
||||
let mut with_validation_code = false;
|
||||
candidates.retain(|c| {
|
||||
if c.candidate().commitments.new_validation_code.is_some() {
|
||||
if with_validation_code {
|
||||
return false
|
||||
// merge the candidates into a common collection, preserving the order
|
||||
let mut merged_candidates = Vec::with_capacity(availability_cores.len());
|
||||
|
||||
for para_candidates in candidates.into_values() {
|
||||
for candidate in para_candidates {
|
||||
if candidate.candidate().commitments.new_validation_code.is_some() {
|
||||
if with_validation_code {
|
||||
break
|
||||
} else {
|
||||
with_validation_code = true;
|
||||
}
|
||||
}
|
||||
|
||||
with_validation_code = true;
|
||||
merged_candidates.push(candidate);
|
||||
}
|
||||
|
||||
true
|
||||
});
|
||||
}
|
||||
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
n_candidates = candidates.len(),
|
||||
n_candidates = merged_candidates.len(),
|
||||
n_cores = availability_cores.len(),
|
||||
?relay_parent,
|
||||
"Selected backed candidates",
|
||||
);
|
||||
|
||||
Ok(candidates)
|
||||
Ok(merged_candidates)
|
||||
}
|
||||
|
||||
/// Produces a block number 1 higher than that of the relay parent
|
||||
|
||||
@@ -258,6 +258,8 @@ mod select_candidates {
|
||||
BlockNumber, CandidateCommitments, CommittedCandidateReceipt, PersistedValidationData,
|
||||
};
|
||||
use rstest::rstest;
|
||||
use std::ops::Not;
|
||||
use CoreState::{Free, Scheduled};
|
||||
|
||||
const BLOCK_UNDER_PRODUCTION: BlockNumber = 128;
|
||||
|
||||
@@ -323,9 +325,6 @@ mod select_candidates {
|
||||
// 11: Occupied(next_up_on_available and available, but different successor para_id)
|
||||
// ]
|
||||
fn mock_availability_cores_one_per_para() -> Vec<CoreState> {
|
||||
use std::ops::Not;
|
||||
use CoreState::{Free, Scheduled};
|
||||
|
||||
vec![
|
||||
// 0: Free,
|
||||
Free,
|
||||
@@ -389,9 +388,6 @@ mod select_candidates {
|
||||
// For test purposes with multiple possible cores assigned to a para, we always return this set
|
||||
// of availability cores:
|
||||
fn mock_availability_cores_multiple_per_para() -> Vec<CoreState> {
|
||||
use std::ops::Not;
|
||||
use CoreState::{Free, Scheduled};
|
||||
|
||||
vec![
|
||||
// 0: Free,
|
||||
Free,
|
||||
@@ -562,7 +558,10 @@ mod select_candidates {
|
||||
use ChainApiMessage::BlockNumber;
|
||||
use RuntimeApiMessage::Request;
|
||||
|
||||
let mut backed_iter = expected.clone().into_iter();
|
||||
let mut backed = expected.clone().into_iter().fold(HashMap::new(), |mut acc, candidate| {
|
||||
acc.entry(candidate.descriptor().para_id).or_insert(vec![]).push(candidate);
|
||||
acc
|
||||
});
|
||||
|
||||
expected.sort_by_key(|c| c.candidate().descriptor.para_id);
|
||||
let mut candidates_iter = expected
|
||||
@@ -583,11 +582,30 @@ mod select_candidates {
|
||||
hashes,
|
||||
sender,
|
||||
)) => {
|
||||
let response: Vec<BackedCandidate> =
|
||||
backed_iter.by_ref().take(hashes.len()).collect();
|
||||
let expected_hashes: Vec<(CandidateHash, Hash)> = response
|
||||
let mut response: HashMap<ParaId, Vec<BackedCandidate>> = HashMap::new();
|
||||
for (para_id, requested_candidates) in hashes.clone() {
|
||||
response.insert(
|
||||
para_id,
|
||||
backed
|
||||
.get_mut(¶_id)
|
||||
.unwrap()
|
||||
.drain(0..requested_candidates.len())
|
||||
.collect(),
|
||||
);
|
||||
}
|
||||
let expected_hashes: HashMap<ParaId, Vec<(CandidateHash, Hash)>> = response
|
||||
.iter()
|
||||
.map(|candidate| (candidate.hash(), candidate.descriptor().relay_parent))
|
||||
.map(|(para_id, candidates)| {
|
||||
(
|
||||
*para_id,
|
||||
candidates
|
||||
.iter()
|
||||
.map(|candidate| {
|
||||
(candidate.hash(), candidate.descriptor().relay_parent)
|
||||
})
|
||||
.collect(),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
|
||||
assert_eq!(expected_hashes, hashes);
|
||||
@@ -768,7 +786,7 @@ mod select_candidates {
|
||||
#[rstest]
|
||||
#[case(ProspectiveParachainsMode::Disabled)]
|
||||
#[case(ProspectiveParachainsMode::Enabled {max_candidate_depth: 0, allowed_ancestry_len: 0})]
|
||||
fn selects_max_one_code_upgrade(
|
||||
fn selects_max_one_code_upgrade_one_core_per_para(
|
||||
#[case] prospective_parachains_mode: ProspectiveParachainsMode,
|
||||
) {
|
||||
let mock_cores = mock_availability_cores_one_per_para();
|
||||
@@ -780,7 +798,12 @@ mod select_candidates {
|
||||
let cores = [1, 4, 7, 8, 10, 12];
|
||||
let cores_with_code = [1, 4, 8];
|
||||
|
||||
let expected_cores = [1, 7, 10, 12];
|
||||
// We can't be sure which one code upgrade the provisioner will pick. We can only assert
|
||||
// that it only picks one. These are the possible cores for which the provisioner will
|
||||
// supply candidates.
|
||||
// There are multiple possibilities depending on which code upgrade it
|
||||
// chooses.
|
||||
let possible_expected_cores = [[1, 7, 10, 12], [4, 7, 10, 12], [7, 8, 10, 12]];
|
||||
|
||||
let committed_receipts: Vec<_> = (0..=mock_cores.len())
|
||||
.map(|i| {
|
||||
@@ -820,8 +843,10 @@ mod select_candidates {
|
||||
// Then, some of them get filtered due to new validation code rule.
|
||||
let expected_backed: Vec<_> =
|
||||
cores.iter().map(|&idx| backed_candidates[idx].clone()).collect();
|
||||
let expected_backed_filtered: Vec<_> =
|
||||
expected_cores.iter().map(|&idx| candidates[idx].clone()).collect();
|
||||
let expected_backed_filtered: Vec<Vec<_>> = possible_expected_cores
|
||||
.iter()
|
||||
.map(|indices| indices.iter().map(|&idx| candidates[idx].clone()).collect())
|
||||
.collect();
|
||||
|
||||
let mock_cores_clone = mock_cores.clone();
|
||||
|
||||
@@ -850,13 +875,120 @@ mod select_candidates {
|
||||
|
||||
assert_eq!(result.len(), 4);
|
||||
|
||||
result.into_iter().for_each(|c| {
|
||||
assert!(
|
||||
expected_backed_filtered.iter().any(|c2| c.candidate().corresponds_to(c2)),
|
||||
"Failed to find candidate: {:?}",
|
||||
c,
|
||||
)
|
||||
});
|
||||
assert!(expected_backed_filtered.iter().any(|expected_backed_filtered| {
|
||||
result.clone().into_iter().all(|c| {
|
||||
expected_backed_filtered.iter().any(|c2| c.candidate().corresponds_to(c2))
|
||||
})
|
||||
}));
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn selects_max_one_code_upgrade_multiple_cores_per_para() {
|
||||
let prospective_parachains_mode =
|
||||
ProspectiveParachainsMode::Enabled { max_candidate_depth: 0, allowed_ancestry_len: 0 };
|
||||
let mock_cores = vec![
|
||||
// 0: Scheduled(default),
|
||||
Scheduled(scheduled_core(1)),
|
||||
// 1: Scheduled(default),
|
||||
Scheduled(scheduled_core(2)),
|
||||
// 2: Scheduled(default),
|
||||
Scheduled(scheduled_core(2)),
|
||||
// 3: Scheduled(default),
|
||||
Scheduled(scheduled_core(2)),
|
||||
// 4: Scheduled(default),
|
||||
Scheduled(scheduled_core(3)),
|
||||
// 5: Scheduled(default),
|
||||
Scheduled(scheduled_core(3)),
|
||||
// 6: Scheduled(default),
|
||||
Scheduled(scheduled_core(3)),
|
||||
];
|
||||
|
||||
let empty_hash = PersistedValidationData::<Hash, BlockNumber>::default().hash();
|
||||
let cores_with_code = [0, 2, 4, 5];
|
||||
|
||||
// We can't be sure which one code upgrade the provisioner will pick. We can only assert
|
||||
// that it only picks one.
|
||||
// These are the possible cores for which the provisioner will
|
||||
// supply candidates. There are multiple possibilities depending on which code upgrade it
|
||||
// chooses.
|
||||
let possible_expected_cores = [vec![0, 1], vec![1, 2, 3], vec![4, 1]];
|
||||
|
||||
let committed_receipts: Vec<_> = (0..mock_cores.len())
|
||||
.map(|i| {
|
||||
let mut descriptor = dummy_candidate_descriptor(dummy_hash());
|
||||
descriptor.para_id = mock_cores[i].para_id().unwrap();
|
||||
descriptor.persisted_validation_data_hash = empty_hash;
|
||||
descriptor.pov_hash = Hash::from_low_u64_be(i as u64);
|
||||
CommittedCandidateReceipt {
|
||||
descriptor,
|
||||
commitments: CandidateCommitments {
|
||||
new_validation_code: if cores_with_code.contains(&i) {
|
||||
Some(vec![].into())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
..Default::default()
|
||||
},
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Input to select_candidates
|
||||
let candidates: Vec<_> = committed_receipts.iter().map(|r| r.to_plain()).collect();
|
||||
// Build possible outputs from select_candidates
|
||||
let backed_candidates: Vec<_> = committed_receipts
|
||||
.iter()
|
||||
.map(|committed_receipt| {
|
||||
BackedCandidate::new(
|
||||
committed_receipt.clone(),
|
||||
Vec::new(),
|
||||
default_bitvec(MOCK_GROUP_SIZE),
|
||||
None,
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
|
||||
// First, provisioner will request backable candidates for each scheduled core.
|
||||
// Then, some of them get filtered due to new validation code rule.
|
||||
let expected_backed: Vec<_> =
|
||||
(0..mock_cores.len()).map(|idx| backed_candidates[idx].clone()).collect();
|
||||
let expected_backed_filtered: Vec<Vec<_>> = possible_expected_cores
|
||||
.iter()
|
||||
.map(|indices| indices.iter().map(|&idx| candidates[idx].clone()).collect())
|
||||
.collect();
|
||||
|
||||
let mock_cores_clone = mock_cores.clone();
|
||||
|
||||
test_harness(
|
||||
|r| {
|
||||
mock_overseer(
|
||||
r,
|
||||
mock_cores_clone,
|
||||
expected_backed,
|
||||
HashMap::new(),
|
||||
prospective_parachains_mode,
|
||||
)
|
||||
},
|
||||
|mut tx: TestSubsystemSender| async move {
|
||||
let result = select_candidates(
|
||||
&mock_cores,
|
||||
&[],
|
||||
&candidates,
|
||||
prospective_parachains_mode,
|
||||
true,
|
||||
Default::default(),
|
||||
&mut tx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(expected_backed_filtered.iter().any(|expected_backed_filtered| {
|
||||
result.clone().into_iter().all(|c| {
|
||||
expected_backed_filtered.iter().any(|c2| c.candidate().corresponds_to(c2))
|
||||
}) && (expected_backed_filtered.len() == result.len())
|
||||
}));
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
@@ -811,7 +811,7 @@ fn test_candidate_validation_msg() -> CandidateValidationMessage {
|
||||
|
||||
fn test_candidate_backing_msg() -> CandidateBackingMessage {
|
||||
let (sender, _) = oneshot::channel();
|
||||
CandidateBackingMessage::GetBackedCandidates(Vec::new(), sender)
|
||||
CandidateBackingMessage::GetBackedCandidates(Default::default(), sender)
|
||||
}
|
||||
|
||||
fn test_chain_api_msg() -> ChainApiMessage {
|
||||
|
||||
@@ -82,8 +82,15 @@ pub struct CanSecondRequest {
|
||||
pub enum CandidateBackingMessage {
|
||||
/// Requests a set of backable candidates attested by the subsystem.
|
||||
///
|
||||
/// Each pair is (candidate_hash, candidate_relay_parent).
|
||||
GetBackedCandidates(Vec<(CandidateHash, Hash)>, oneshot::Sender<Vec<BackedCandidate>>),
|
||||
/// The order of candidates of the same para must be preserved in the response.
|
||||
/// If a backed candidate of a para cannot be retrieved, the response should not contain any
|
||||
/// candidates of the same para that follow it in the input vector. In other words, assuming
|
||||
/// candidates are supplied in dependency order, we must ensure that this dependency order is
|
||||
/// preserved.
|
||||
GetBackedCandidates(
|
||||
HashMap<ParaId, Vec<(CandidateHash, Hash)>>,
|
||||
oneshot::Sender<HashMap<ParaId, Vec<BackedCandidate>>>,
|
||||
),
|
||||
/// Request the subsystem to check whether it's allowed to second given candidate.
|
||||
/// The rule is to only fetch collations that are either built on top of the root
|
||||
/// of some fragment tree or have a parent node which represents backed candidate.
|
||||
|
||||
@@ -340,9 +340,15 @@ enum BitfieldSigningMessage { }
|
||||
```rust
|
||||
enum CandidateBackingMessage {
|
||||
/// Requests a set of backable candidates attested by the subsystem.
|
||||
///
|
||||
/// Each pair is (candidate_hash, candidate_relay_parent).
|
||||
GetBackedCandidates(Vec<(CandidateHash, Hash)>, oneshot::Sender<Vec<BackedCandidate>>),
|
||||
/// The order of candidates of the same para must be preserved in the response.
|
||||
/// If a backed candidate of a para cannot be retrieved, the response should not contain any
|
||||
/// candidates of the same para that follow it in the input vector. In other words, assuming
|
||||
/// candidates are supplied in dependency order, we must ensure that this dependency order is
|
||||
/// preserved.
|
||||
GetBackedCandidates(
|
||||
HashMap<ParaId, Vec<(CandidateHash, Hash)>>,
|
||||
oneshot::Sender<HashMap<ParaId, Vec<BackedCandidate>>>,
|
||||
),
|
||||
/// Note that the Candidate Backing subsystem should second the given candidate in the context of the
|
||||
/// given relay-parent (ref. by hash). This candidate must be validated using the provided PoV.
|
||||
/// The PoV is expected to match the `pov_hash` in the descriptor.
|
||||
|
||||
Reference in New Issue
Block a user