Elastic scaling: use an assumed CoreIndex in candidate-backing (#3229)

First step in implementing
https://github.com/paritytech/polkadot-sdk/issues/3144

### Summary of changes
- switch statement `Table` candidate mapping from `ParaId` to
`CoreIndex`
- introduce experimental `InjectCoreIndex`  node feature.
- determine and assume a `CoreIndex` for a candidate based on statement
validator index. If the signature is valid it means validator controls
the validator that index and we can easily map it to a validator
group/core.
- introduce a temporary provisioner fix until we fully enable elastic
scaling in the subystem. The fix ensures we don't fetch the same
backable candidate when calling `get_backable_candidate` for each core.

TODO:
- [x] fix backing tests
- [x] fix statement table tests
- [x] add new test

---------

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>
Signed-off-by: alindima <alin@parity.io>
Co-authored-by: alindima <alin@parity.io>
This commit is contained in:
Andrei Sandu
2024-02-22 15:22:31 +07:00
committed by GitHub
parent e76b244853
commit 60e537b95f
13 changed files with 462 additions and 96 deletions
Generated
+3
View File
@@ -12490,6 +12490,7 @@ dependencies = [
"polkadot-primitives-test-helpers",
"polkadot-statement-table",
"sc-keystore",
"schnellru",
"sp-application-crypto",
"sp-core",
"sp-keyring",
@@ -13159,6 +13160,7 @@ version = "7.0.0"
dependencies = [
"bitvec",
"hex-literal",
"log",
"parity-scale-codec",
"polkadot-core-primitives",
"polkadot-parachain-primitives",
@@ -13556,6 +13558,7 @@ dependencies = [
"parity-scale-codec",
"polkadot-primitives",
"sp-core",
"tracing-gum",
]
[[package]]
+1
View File
@@ -22,6 +22,7 @@ bitvec = { version = "1.0.0", default-features = false, features = ["alloc"] }
gum = { package = "tracing-gum", path = "../../gum" }
thiserror = { workspace = true }
fatality = "0.0.6"
schnellru = "0.2.1"
[dev-dependencies]
sp-core = { path = "../../../../substrate/primitives/core" }
+3
View File
@@ -48,6 +48,9 @@ pub enum Error {
#[error("Candidate is not found")]
CandidateNotFound,
#[error("CoreIndex cannot be determined for a candidate")]
CoreIndexUnavailable,
#[error("Signature is invalid")]
InvalidSignature,
+233 -52
View File
@@ -70,13 +70,14 @@ use std::{
sync::Arc,
};
use bitvec::vec::BitVec;
use bitvec::{order::Lsb0 as BitOrderLsb0, vec::BitVec};
use futures::{
channel::{mpsc, oneshot},
future::BoxFuture,
stream::FuturesOrdered,
FutureExt, SinkExt, StreamExt, TryFutureExt,
};
use schnellru::{ByLength, LruMap};
use error::{Error, FatalResult};
use polkadot_node_primitives::{
@@ -104,10 +105,12 @@ use polkadot_node_subsystem_util::{
Validator,
};
use polkadot_primitives::{
vstaging::{node_features::FeatureIndex, NodeFeatures},
BackedCandidate, CandidateCommitments, CandidateHash, CandidateReceipt,
CommittedCandidateReceipt, CoreIndex, CoreState, ExecutorParams, Hash, Id as ParaId,
PersistedValidationData, PvfExecKind, SigningContext, ValidationCode, ValidatorId,
ValidatorIndex, ValidatorSignature, ValidityAttestation,
CommittedCandidateReceipt, CoreIndex, CoreState, ExecutorParams, GroupIndex, GroupRotationInfo,
Hash, Id as ParaId, IndexedVec, PersistedValidationData, PvfExecKind, SessionIndex,
SigningContext, ValidationCode, ValidatorId, ValidatorIndex, ValidatorSignature,
ValidityAttestation,
};
use sp_keystore::KeystorePtr;
use statement_table::{
@@ -118,7 +121,7 @@ use statement_table::{
},
Config as TableConfig, Context as TableContextTrait, Table,
};
use util::vstaging::get_disabled_validators_with_fallback;
use util::{runtime::request_node_features, vstaging::get_disabled_validators_with_fallback};
mod error;
@@ -209,7 +212,9 @@ struct PerRelayParentState {
/// The hash of the relay parent on top of which this job is doing it's work.
parent: Hash,
/// The `ParaId` assigned to the local validator at this relay parent.
assignment: Option<ParaId>,
assigned_para: Option<ParaId>,
/// The `CoreIndex` assigned to the local validator at this relay parent.
assigned_core: Option<CoreIndex>,
/// The candidates that are backed by enough validators in their group, by hash.
backed: HashSet<CandidateHash>,
/// The table of candidates and statements under this relay-parent.
@@ -224,6 +229,15 @@ struct PerRelayParentState {
fallbacks: HashMap<CandidateHash, AttestingData>,
/// The minimum backing votes threshold.
minimum_backing_votes: u32,
/// If true, we're appending extra bits in the BackedCandidate validator indices bitfield,
/// which represent the assigned core index. True if ElasticScalingMVP is enabled.
inject_core_index: bool,
/// The core states for all cores.
cores: Vec<CoreState>,
/// The validator index -> group mapping at this relay parent.
validator_to_group: Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>,
/// The associated group rotation information.
group_rotation_info: GroupRotationInfo,
}
struct PerCandidateState {
@@ -275,6 +289,9 @@ struct State {
/// This is guaranteed to have an entry for each candidate with a relay parent in the implicit
/// or explicit view for which a `Seconded` statement has been successfully imported.
per_candidate: HashMap<CandidateHash, PerCandidateState>,
/// Cache the per-session Validator->Group mapping.
validator_to_group_cache:
LruMap<SessionIndex, Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>>,
/// A cloneable sender which is dispatched to background candidate validation tasks to inform
/// the main task of the result.
background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
@@ -292,6 +309,7 @@ impl State {
per_leaf: HashMap::default(),
per_relay_parent: HashMap::default(),
per_candidate: HashMap::new(),
validator_to_group_cache: LruMap::new(ByLength::new(2)),
background_validation_tx,
keystore,
}
@@ -379,10 +397,10 @@ struct AttestingData {
backing: Vec<ValidatorIndex>,
}
#[derive(Default)]
#[derive(Default, Debug)]
struct TableContext {
validator: Option<Validator>,
groups: HashMap<ParaId, Vec<ValidatorIndex>>,
groups: HashMap<CoreIndex, Vec<ValidatorIndex>>,
validators: Vec<ValidatorId>,
disabled_validators: Vec<ValidatorIndex>,
}
@@ -404,7 +422,7 @@ impl TableContext {
impl TableContextTrait for TableContext {
type AuthorityId = ValidatorIndex;
type Digest = CandidateHash;
type GroupId = ParaId;
type GroupId = CoreIndex;
type Signature = ValidatorSignature;
type Candidate = CommittedCandidateReceipt;
@@ -412,15 +430,11 @@ impl TableContextTrait for TableContext {
candidate.hash()
}
fn candidate_group(candidate: &CommittedCandidateReceipt) -> ParaId {
candidate.descriptor().para_id
fn is_member_of(&self, authority: &ValidatorIndex, core: &CoreIndex) -> bool {
self.groups.get(core).map_or(false, |g| g.iter().any(|a| a == authority))
}
fn is_member_of(&self, authority: &ValidatorIndex, group: &ParaId) -> bool {
self.groups.get(group).map_or(false, |g| g.iter().any(|a| a == authority))
}
fn get_group_size(&self, group: &ParaId) -> Option<usize> {
fn get_group_size(&self, group: &CoreIndex) -> Option<usize> {
self.groups.get(group).map(|g| g.len())
}
}
@@ -442,19 +456,20 @@ fn primitive_statement_to_table(s: &SignedFullStatementWithPVD) -> TableSignedSt
fn table_attested_to_backed(
attested: TableAttestedCandidate<
ParaId,
CoreIndex,
CommittedCandidateReceipt,
ValidatorIndex,
ValidatorSignature,
>,
table_context: &TableContext,
inject_core_index: bool,
) -> Option<BackedCandidate> {
let TableAttestedCandidate { candidate, validity_votes, group_id: para_id } = attested;
let TableAttestedCandidate { candidate, validity_votes, group_id: core_index } = attested;
let (ids, validity_votes): (Vec<_>, Vec<ValidityAttestation>) =
validity_votes.into_iter().map(|(id, vote)| (id, vote.into())).unzip();
let group = table_context.groups.get(&para_id)?;
let group = table_context.groups.get(&core_index)?;
let mut validator_indices = BitVec::with_capacity(group.len());
@@ -479,6 +494,12 @@ fn table_attested_to_backed(
}
vote_positions.sort_by_key(|(_orig, pos_in_group)| *pos_in_group);
if inject_core_index {
let core_index_to_inject: BitVec<u8, BitOrderLsb0> =
BitVec::from_vec(vec![core_index.0 as u8]);
validator_indices.extend(core_index_to_inject);
}
Some(BackedCandidate {
candidate,
validity_votes: vote_positions
@@ -971,7 +992,14 @@ async fn handle_active_leaves_update<Context>(
// construct a `PerRelayParent` from the runtime API
// and insert it.
let per = construct_per_relay_parent_state(ctx, maybe_new, &state.keystore, mode).await?;
let per = construct_per_relay_parent_state(
ctx,
maybe_new,
&state.keystore,
&mut state.validator_to_group_cache,
mode,
)
.await?;
if let Some(per) = per {
state.per_relay_parent.insert(maybe_new, per);
@@ -981,31 +1009,112 @@ async fn handle_active_leaves_update<Context>(
Ok(())
}
macro_rules! try_runtime_api {
($x: expr) => {
match $x {
Ok(x) => x,
Err(err) => {
// Only bubble up fatal errors.
error::log_error(Err(Into::<runtime::Error>::into(err).into()))?;
// We can't do candidate validation work if we don't have the
// requisite runtime API data. But these errors should not take
// down the node.
return Ok(None)
},
}
};
}
fn core_index_from_statement(
validator_to_group: &IndexedVec<ValidatorIndex, Option<GroupIndex>>,
group_rotation_info: &GroupRotationInfo,
cores: &[CoreState],
statement: &SignedFullStatementWithPVD,
) -> Option<CoreIndex> {
let compact_statement = statement.as_unchecked();
let candidate_hash = CandidateHash(*compact_statement.unchecked_payload().candidate_hash());
let n_cores = cores.len();
gum::trace!(
target:LOG_TARGET,
?group_rotation_info,
?statement,
?validator_to_group,
n_cores = ?cores.len(),
?candidate_hash,
"Extracting core index from statement"
);
let statement_validator_index = statement.validator_index();
let Some(Some(group_index)) = validator_to_group.get(statement_validator_index) else {
gum::debug!(
target: LOG_TARGET,
?group_rotation_info,
?statement,
?validator_to_group,
n_cores = ?cores.len() ,
?candidate_hash,
"Invalid validator index: {:?}",
statement_validator_index
);
return None
};
// First check if the statement para id matches the core assignment.
let core_index = group_rotation_info.core_for_group(*group_index, n_cores);
if core_index.0 as usize > n_cores {
gum::warn!(target: LOG_TARGET, ?candidate_hash, ?core_index, n_cores, "Invalid CoreIndex");
return None
}
if let StatementWithPVD::Seconded(candidate, _pvd) = statement.payload() {
let candidate_para_id = candidate.descriptor.para_id;
let assigned_para_id = match &cores[core_index.0 as usize] {
CoreState::Free => {
gum::debug!(target: LOG_TARGET, ?candidate_hash, "Invalid CoreIndex, core is not assigned to any para_id");
return None
},
CoreState::Occupied(occupied) =>
if let Some(next) = &occupied.next_up_on_available {
next.para_id
} else {
return None
},
CoreState::Scheduled(scheduled) => scheduled.para_id,
};
if assigned_para_id != candidate_para_id {
gum::debug!(
target: LOG_TARGET,
?candidate_hash,
?core_index,
?assigned_para_id,
?candidate_para_id,
"Invalid CoreIndex, core is assigned to a different para_id"
);
return None
}
return Some(core_index)
} else {
return Some(core_index)
}
}
/// Load the data necessary to do backing work on top of a relay-parent.
#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
async fn construct_per_relay_parent_state<Context>(
ctx: &mut Context,
relay_parent: Hash,
keystore: &KeystorePtr,
validator_to_group_cache: &mut LruMap<
SessionIndex,
Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>,
>,
mode: ProspectiveParachainsMode,
) -> Result<Option<PerRelayParentState>, Error> {
macro_rules! try_runtime_api {
($x: expr) => {
match $x {
Ok(x) => x,
Err(err) => {
// Only bubble up fatal errors.
error::log_error(Err(Into::<runtime::Error>::into(err).into()))?;
// We can't do candidate validation work if we don't have the
// requisite runtime API data. But these errors should not take
// down the node.
return Ok(None)
},
}
};
}
let parent = relay_parent;
let (session_index, validators, groups, cores) = futures::try_join!(
@@ -1020,6 +1129,16 @@ async fn construct_per_relay_parent_state<Context>(
.map_err(Error::JoinMultiple)?;
let session_index = try_runtime_api!(session_index);
let inject_core_index = request_node_features(parent, session_index, ctx.sender())
.await?
.unwrap_or(NodeFeatures::EMPTY)
.get(FeatureIndex::ElasticScalingMVP as usize)
.map(|b| *b)
.unwrap_or(false);
gum::debug!(target: LOG_TARGET, inject_core_index, ?parent, "New state");
let validators: Vec<_> = try_runtime_api!(validators);
let (validator_groups, group_rotation_info) = try_runtime_api!(groups);
let cores = try_runtime_api!(cores);
@@ -1055,18 +1174,24 @@ async fn construct_per_relay_parent_state<Context>(
},
};
let mut groups = HashMap::new();
let n_cores = cores.len();
let mut assignment = None;
for (idx, core) in cores.into_iter().enumerate() {
let mut groups = HashMap::<CoreIndex, Vec<ValidatorIndex>>::new();
let mut assigned_core = None;
let mut assigned_para = None;
for (idx, core) in cores.iter().enumerate() {
let core_para_id = match core {
CoreState::Scheduled(scheduled) => scheduled.para_id,
CoreState::Occupied(occupied) =>
if mode.is_enabled() {
// Async backing makes it legal to build on top of
// occupied core.
occupied.candidate_descriptor.para_id
if let Some(next) = &occupied.next_up_on_available {
next.para_id
} else {
continue
}
} else {
continue
},
@@ -1077,11 +1202,27 @@ async fn construct_per_relay_parent_state<Context>(
let group_index = group_rotation_info.group_for_core(core_index, n_cores);
if let Some(g) = validator_groups.get(group_index.0 as usize) {
if validator.as_ref().map_or(false, |v| g.contains(&v.index())) {
assignment = Some(core_para_id);
assigned_para = Some(core_para_id);
assigned_core = Some(core_index);
}
groups.insert(core_para_id, g.clone());
groups.insert(core_index, g.clone());
}
}
gum::debug!(target: LOG_TARGET, ?groups, "TableContext");
let validator_to_group = validator_to_group_cache
.get_or_insert(session_index, || {
let mut vector = vec![None; validators.len()];
for (group_idx, validator_group) in validator_groups.iter().enumerate() {
for validator in validator_group {
vector[validator.0 as usize] = Some(GroupIndex(group_idx as u32));
}
}
Arc::new(IndexedVec::<_, _>::from(vector))
})
.expect("Just inserted");
let table_context = TableContext { validator, groups, validators, disabled_validators };
let table_config = TableConfig {
@@ -1094,7 +1235,8 @@ async fn construct_per_relay_parent_state<Context>(
Ok(Some(PerRelayParentState {
prospective_parachains_mode: mode,
parent,
assignment,
assigned_core,
assigned_para,
backed: HashSet::new(),
table: Table::new(table_config),
table_context,
@@ -1102,6 +1244,10 @@ async fn construct_per_relay_parent_state<Context>(
awaiting_validation: HashSet::new(),
fallbacks: HashMap::new(),
minimum_backing_votes,
inject_core_index,
cores,
validator_to_group: validator_to_group.clone(),
group_rotation_info,
}))
}
@@ -1519,15 +1665,16 @@ async fn import_statement<Context>(
per_candidate: &mut HashMap<CandidateHash, PerCandidateState>,
statement: &SignedFullStatementWithPVD,
) -> Result<Option<TableSummary>, Error> {
let candidate_hash = statement.payload().candidate_hash();
gum::debug!(
target: LOG_TARGET,
statement = ?statement.payload().to_compact(),
validator_index = statement.validator_index().0,
?candidate_hash,
"Importing statement",
);
let candidate_hash = statement.payload().candidate_hash();
// If this is a new candidate (statement is 'seconded' and candidate is unknown),
// we need to create an entry in the `PerCandidateState` map.
//
@@ -1593,7 +1740,15 @@ async fn import_statement<Context>(
let stmt = primitive_statement_to_table(statement);
Ok(rp_state.table.import_statement(&rp_state.table_context, stmt))
let core = core_index_from_statement(
&rp_state.validator_to_group,
&rp_state.group_rotation_info,
&rp_state.cores,
statement,
)
.ok_or(Error::CoreIndexUnavailable)?;
Ok(rp_state.table.import_statement(&rp_state.table_context, core, stmt))
}
/// Handles a summary received from [`import_statement`] and dispatches `Backed` notifications and
@@ -1615,7 +1770,11 @@ async fn post_import_statement_actions<Context>(
// `HashSet::insert` returns true if the thing wasn't in there already.
if rp_state.backed.insert(candidate_hash) {
if let Some(backed) = table_attested_to_backed(attested, &rp_state.table_context) {
if let Some(backed) = table_attested_to_backed(
attested,
&rp_state.table_context,
rp_state.inject_core_index,
) {
let para_id = backed.candidate.descriptor.para_id;
gum::debug!(
target: LOG_TARGET,
@@ -1654,8 +1813,14 @@ async fn post_import_statement_actions<Context>(
);
ctx.send_unbounded_message(message);
}
} else {
gum::debug!(target: LOG_TARGET, ?candidate_hash, "Cannot get BackedCandidate");
}
} else {
gum::debug!(target: LOG_TARGET, ?candidate_hash, "Candidate already known");
}
} else {
gum::debug!(target: LOG_TARGET, "No attested candidate");
}
issue_new_misbehaviors(ctx, rp_state.parent, &mut rp_state.table);
@@ -1859,9 +2024,10 @@ async fn maybe_validate_and_import<Context>(
let candidate_hash = summary.candidate;
if Some(summary.group_id) != rp_state.assignment {
if Some(summary.group_id) != rp_state.assigned_core {
return Ok(())
}
let attesting = match statement.payload() {
StatementWithPVD::Seconded(receipt, _) => {
let attesting = AttestingData {
@@ -2004,10 +2170,11 @@ async fn handle_second_message<Context>(
}
// Sanity check that candidate is from our assignment.
if Some(candidate.descriptor().para_id) != rp_state.assignment {
if Some(candidate.descriptor().para_id) != rp_state.assigned_para {
gum::debug!(
target: LOG_TARGET,
our_assignment = ?rp_state.assignment,
our_assignment_core = ?rp_state.assigned_core,
our_assignment_para = ?rp_state.assigned_para,
collation = ?candidate.descriptor().para_id,
"Subsystem asked to second for para outside of our assignment",
);
@@ -2015,6 +2182,14 @@ async fn handle_second_message<Context>(
return Ok(())
}
gum::debug!(
target: LOG_TARGET,
our_assignment_core = ?rp_state.assigned_core,
our_assignment_para = ?rp_state.assigned_para,
collation = ?candidate.descriptor().para_id,
"Current assignments vs collation",
);
// If the message is a `CandidateBackingMessage::Second`, sign and dispatch a
// Seconded statement only if we have not signed a Valid statement for the requested candidate.
//
@@ -2087,7 +2262,13 @@ fn handle_get_backed_candidates_message(
&rp_state.table_context,
rp_state.minimum_backing_votes,
)
.and_then(|attested| table_attested_to_backed(attested, &rp_state.table_context))
.and_then(|attested| {
table_attested_to_backed(
attested,
&rp_state.table_context,
rp_state.inject_core_index,
)
})
})
.collect();
+123 -5
View File
@@ -65,13 +65,14 @@ fn dummy_pvd() -> PersistedValidationData {
}
}
struct TestState {
pub(crate) struct TestState {
chain_ids: Vec<ParaId>,
keystore: KeystorePtr,
validators: Vec<Sr25519Keyring>,
validator_public: Vec<ValidatorId>,
validation_data: PersistedValidationData,
validator_groups: (Vec<Vec<ValidatorIndex>>, GroupRotationInfo),
validator_to_group: IndexedVec<ValidatorIndex, Option<GroupIndex>>,
availability_cores: Vec<CoreState>,
head_data: HashMap<ParaId, HeadData>,
signing_context: SigningContext,
@@ -114,6 +115,11 @@ impl Default for TestState {
.into_iter()
.map(|g| g.into_iter().map(ValidatorIndex).collect())
.collect();
let validator_to_group: IndexedVec<_, _> =
vec![Some(0), Some(1), Some(0), Some(0), None, Some(0)]
.into_iter()
.map(|x| x.map(|x| GroupIndex(x)))
.collect();
let group_rotation_info =
GroupRotationInfo { session_start_block: 0, group_rotation_frequency: 100, now: 1 };
@@ -143,6 +149,7 @@ impl Default for TestState {
validators,
validator_public,
validator_groups: (validator_groups, group_rotation_info),
validator_to_group,
availability_cores,
head_data,
validation_data,
@@ -285,6 +292,16 @@ async fn test_startup(virtual_overseer: &mut VirtualOverseer, test_state: &TestS
}
);
// Node features request from runtime: all features are disabled.
assert_matches!(
virtual_overseer.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(_parent, RuntimeApiRequest::NodeFeatures(_session_index, tx))
) => {
tx.send(Ok(Default::default())).unwrap();
}
);
// Check if subsystem job issues a request for the minimum backing votes.
assert_matches!(
virtual_overseer.recv().await,
@@ -639,6 +656,107 @@ fn backing_works() {
});
}
#[test]
fn extract_core_index_from_statement_works() {
let test_state = TestState::default();
let pov_a = PoV { block_data: BlockData(vec![42, 43, 44]) };
let pvd_a = dummy_pvd();
let validation_code_a = ValidationCode(vec![1, 2, 3]);
let pov_hash = pov_a.hash();
let mut candidate = TestCandidateBuilder {
para_id: test_state.chain_ids[0],
relay_parent: test_state.relay_parent,
pov_hash,
erasure_root: make_erasure_root(&test_state, pov_a.clone(), pvd_a.clone()),
persisted_validation_data_hash: pvd_a.hash(),
validation_code: validation_code_a.0.clone(),
..Default::default()
}
.build();
let public2 = Keystore::sr25519_generate_new(
&*test_state.keystore,
ValidatorId::ID,
Some(&test_state.validators[2].to_seed()),
)
.expect("Insert key into keystore");
let signed_statement_1 = SignedFullStatementWithPVD::sign(
&test_state.keystore,
StatementWithPVD::Seconded(candidate.clone(), pvd_a.clone()),
&test_state.signing_context,
ValidatorIndex(2),
&public2.into(),
)
.ok()
.flatten()
.expect("should be signed");
let public1 = Keystore::sr25519_generate_new(
&*test_state.keystore,
ValidatorId::ID,
Some(&test_state.validators[1].to_seed()),
)
.expect("Insert key into keystore");
let signed_statement_2 = SignedFullStatementWithPVD::sign(
&test_state.keystore,
StatementWithPVD::Seconded(candidate.clone(), pvd_a.clone()),
&test_state.signing_context,
ValidatorIndex(1),
&public1.into(),
)
.ok()
.flatten()
.expect("should be signed");
candidate.descriptor.para_id = test_state.chain_ids[1];
let signed_statement_3 = SignedFullStatementWithPVD::sign(
&test_state.keystore,
StatementWithPVD::Seconded(candidate, pvd_a.clone()),
&test_state.signing_context,
ValidatorIndex(1),
&public1.into(),
)
.ok()
.flatten()
.expect("should be signed");
let core_index_1 = core_index_from_statement(
&test_state.validator_to_group,
&test_state.validator_groups.1,
&test_state.availability_cores,
&signed_statement_1,
)
.unwrap();
assert_eq!(core_index_1, CoreIndex(0));
let core_index_2 = core_index_from_statement(
&test_state.validator_to_group,
&test_state.validator_groups.1,
&test_state.availability_cores,
&signed_statement_2,
);
// Must be none, para_id in descriptor is different than para assigned to core
assert_eq!(core_index_2, None);
let core_index_3 = core_index_from_statement(
&test_state.validator_to_group,
&test_state.validator_groups.1,
&test_state.availability_cores,
&signed_statement_3,
)
.unwrap();
assert_eq!(core_index_3, CoreIndex(1));
}
#[test]
fn backing_works_while_validation_ongoing() {
let test_state = TestState::default();
@@ -1422,7 +1540,7 @@ fn backing_works_after_failed_validation() {
fn candidate_backing_reorders_votes() {
use sp_core::Encode;
let para_id = ParaId::from(10);
let core_idx = CoreIndex(10);
let validators = vec![
Sr25519Keyring::Alice,
Sr25519Keyring::Bob,
@@ -1436,7 +1554,7 @@ fn candidate_backing_reorders_votes() {
let validator_groups = {
let mut validator_groups = HashMap::new();
validator_groups
.insert(para_id, vec![0, 1, 2, 3, 4, 5].into_iter().map(ValidatorIndex).collect());
.insert(core_idx, vec![0, 1, 2, 3, 4, 5].into_iter().map(ValidatorIndex).collect());
validator_groups
};
@@ -1466,10 +1584,10 @@ fn candidate_backing_reorders_votes() {
(ValidatorIndex(3), fake_attestation(3)),
(ValidatorIndex(1), fake_attestation(1)),
],
group_id: para_id,
group_id: core_idx,
};
let backed = table_attested_to_backed(attested, &table_context).unwrap();
let backed = table_attested_to_backed(attested, &table_context, false).unwrap();
let expected_bitvec = {
let mut validator_indices = BitVec::<u8, bitvec::order::Lsb0>::with_capacity(6);
@@ -185,6 +185,16 @@ async fn activate_leaf(
}
);
// Node features request from runtime: all features are disabled.
assert_matches!(
virtual_overseer.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(parent, RuntimeApiRequest::NodeFeatures(_session_index, tx))
) if parent == hash => {
tx.send(Ok(Default::default())).unwrap();
}
);
// Check if subsystem job issues a request for the minimum backing votes.
assert_matches!(
virtual_overseer.recv().await,
@@ -305,10 +315,11 @@ async fn assert_hypothetical_frontier_requests(
) => {
let idx = match expected_requests.iter().position(|r| r.0 == request) {
Some(idx) => idx,
None => panic!(
None =>
panic!(
"unexpected hypothetical frontier request, no match found for {:?}",
request
),
),
};
let resp = std::mem::take(&mut expected_requests[idx].1);
tx.send(resp).unwrap();
@@ -1268,6 +1279,7 @@ fn concurrent_dependent_candidates() {
let statement_b = CandidateBackingMessage::Statement(leaf_parent, signed_b.clone());
virtual_overseer.send(FromOrchestra::Communication { msg: statement_a }).await;
// At this point the subsystem waits for response, the previous message is received,
// send a second one without blocking.
let _ = virtual_overseer
@@ -1388,7 +1400,19 @@ fn concurrent_dependent_candidates() {
assert_eq!(sess_idx, 1);
tx.send(Ok(Some(ExecutorParams::default()))).unwrap();
},
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_parent,
RuntimeApiRequest::ValidatorGroups(tx),
)) => {
tx.send(Ok(test_state.validator_groups.clone())).unwrap();
},
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_parent,
RuntimeApiRequest::AvailabilityCores(tx),
)) => {
tx.send(Ok(test_state.availability_cores.clone())).unwrap();
},
_ => panic!("unexpected message received from overseer: {:?}", msg),
}
}
@@ -1419,7 +1443,6 @@ fn seconding_sanity_check_occupy_same_depth() {
let leaf_parent = get_parent_hash(leaf_hash);
let activated = new_leaf(leaf_hash, LEAF_BLOCK_NUMBER);
let min_block_number = LEAF_BLOCK_NUMBER - LEAF_ANCESTRY_LEN;
let min_relay_parents = vec![(para_id_a, min_block_number), (para_id_b, min_block_number)];
let test_leaf_a = TestLeaf { activated, min_relay_parents };
@@ -1555,13 +1578,14 @@ fn occupied_core_assignment() {
const LEAF_A_BLOCK_NUMBER: BlockNumber = 100;
const LEAF_A_ANCESTRY_LEN: BlockNumber = 3;
let para_id = test_state.chain_ids[0];
let previous_para_id = test_state.chain_ids[1];
// Set the core state to occupied.
let mut candidate_descriptor = ::test_helpers::dummy_candidate_descriptor(Hash::zero());
candidate_descriptor.para_id = para_id;
candidate_descriptor.para_id = previous_para_id;
test_state.availability_cores[0] = CoreState::Occupied(OccupiedCore {
group_responsible: Default::default(),
next_up_on_available: None,
next_up_on_available: Some(ScheduledCore { para_id, collator: None }),
occupied_since: 100_u32,
time_out_at: 200_u32,
next_up_on_time_out: None,
+10 -2
View File
@@ -681,10 +681,17 @@ async fn request_backable_candidates(
CoreState::Free => continue,
};
// We should be calling this once per para rather than per core.
// TODO: Will be fixed in https://github.com/paritytech/polkadot-sdk/pull/3233.
// For now, at least make sure we don't supply the same candidate multiple times in case a
// para has multiple cores scheduled.
let response = get_backable_candidate(relay_parent, para_id, required_path, sender).await?;
match response {
Some((hash, relay_parent)) => selected_candidates.push((hash, relay_parent)),
Some((hash, relay_parent)) => {
if !selected_candidates.iter().any(|bc| &(hash, relay_parent) == bc) {
selected_candidates.push((hash, relay_parent))
}
},
None => {
gum::debug!(
target: LOG_TARGET,
@@ -726,6 +733,7 @@ async fn select_candidates(
)
.await?,
};
gum::debug!(target: LOG_TARGET, ?selected_candidates, "Got backable candidates");
// now get the backed candidates corresponding to these candidate receipts
let (tx, rx) = oneshot::channel();
+2
View File
@@ -14,6 +14,7 @@ bitvec = { version = "1.0.0", default-features = false, features = ["alloc", "se
hex-literal = "0.4.1"
parity-scale-codec = { version = "3.6.1", default-features = false, features = ["bit-vec", "derive"] }
scale-info = { version = "2.10.0", default-features = false, features = ["bit-vec", "derive", "serde"] }
log = { workspace = true, default-features = false }
serde = { features = ["alloc", "derive"], workspace = true }
application-crypto = { package = "sp-application-crypto", path = "../../substrate/primitives/application-crypto", default-features = false, features = ["serde"] }
@@ -38,6 +39,7 @@ std = [
"application-crypto/std",
"bitvec/std",
"inherents/std",
"log/std",
"parity-scale-codec/std",
"polkadot-core-primitives/std",
"polkadot-parachain-primitives/std",
+26 -1
View File
@@ -72,6 +72,7 @@ pub use metrics::{
/// The key type ID for a collator key.
pub const COLLATOR_KEY_TYPE_ID: KeyTypeId = KeyTypeId(*b"coll");
const LOG_TARGET: &str = "runtime::primitives";
mod collator_app {
use application_crypto::{app_crypto, sr25519};
@@ -746,17 +747,29 @@ impl<H> BackedCandidate<H> {
///
/// Returns either an error, indicating that one of the signatures was invalid or that the index
/// was out-of-bounds, or the number of signatures checked.
pub fn check_candidate_backing<H: AsRef<[u8]> + Clone + Encode>(
pub fn check_candidate_backing<H: AsRef<[u8]> + Clone + Encode + core::fmt::Debug>(
backed: &BackedCandidate<H>,
signing_context: &SigningContext<H>,
group_len: usize,
validator_lookup: impl Fn(usize) -> Option<ValidatorId>,
) -> Result<usize, ()> {
if backed.validator_indices.len() != group_len {
log::debug!(
target: LOG_TARGET,
"Check candidate backing: indices mismatch: group_len = {} , indices_len = {}",
group_len,
backed.validator_indices.len(),
);
return Err(())
}
if backed.validity_votes.len() > group_len {
log::debug!(
target: LOG_TARGET,
"Check candidate backing: Too many votes, expected: {}, found: {}",
group_len,
backed.validity_votes.len(),
);
return Err(())
}
@@ -778,11 +791,23 @@ pub fn check_candidate_backing<H: AsRef<[u8]> + Clone + Encode>(
if sig.verify(&payload[..], &validator_id) {
signed += 1;
} else {
log::debug!(
target: LOG_TARGET,
"Check candidate backing: Invalid signature. validator_id = {:?}, validator_index = {} ",
validator_id,
val_in_group_idx,
);
return Err(())
}
}
if signed != backed.validity_votes.len() {
log::error!(
target: LOG_TARGET,
"Check candidate backing: Too many signatures, expected = {}, found = {}",
backed.validity_votes.len() ,
signed,
);
return Err(())
}
+5 -1
View File
@@ -64,9 +64,13 @@ pub mod node_features {
/// Tells if tranch0 assignments could be sent in a single certificate.
/// Reserved for: `<https://github.com/paritytech/polkadot-sdk/issues/628>`
EnableAssignmentsV2 = 0,
/// This feature enables the extension of `BackedCandidate::validator_indices` by 8 bits.
/// The value stored there represents the assumed core index where the candidates
/// are backed. This is needed for the elastic scaling MVP.
ElasticScalingMVP = 1,
/// First unassigned feature bit.
/// Every time a new feature flag is assigned it should take this value.
/// and this should be incremented.
FirstUnassigned = 1,
FirstUnassigned = 2,
}
}
+1
View File
@@ -13,3 +13,4 @@ workspace = true
parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] }
sp-core = { path = "../../substrate/primitives/core" }
primitives = { package = "polkadot-primitives", path = "../primitives" }
gum = { package = "tracing-gum", path = "../node/gum" }
+23 -27
View File
@@ -36,6 +36,7 @@ use primitives::{
};
use parity_scale_codec::{Decode, Encode};
const LOG_TARGET: &str = "parachain::statement-table";
/// Context for the statement table.
pub trait Context {
@@ -53,9 +54,6 @@ pub trait Context {
/// get the digest of a candidate.
fn candidate_digest(candidate: &Self::Candidate) -> Self::Digest;
/// get the group of a candidate.
fn candidate_group(candidate: &Self::Candidate) -> Self::GroupId;
/// Whether a authority is a member of a group.
/// Members are meant to submit candidates and vote on validity.
fn is_member_of(&self, authority: &Self::AuthorityId, group: &Self::GroupId) -> bool;
@@ -342,13 +340,13 @@ impl<Ctx: Context> Table<Ctx> {
pub fn import_statement(
&mut self,
context: &Ctx,
group_id: Ctx::GroupId,
statement: SignedStatement<Ctx::Candidate, Ctx::Digest, Ctx::AuthorityId, Ctx::Signature>,
) -> Option<Summary<Ctx::Digest, Ctx::GroupId>> {
let SignedStatement { statement, signature, sender: signer } = statement;
let res = match statement {
Statement::Seconded(candidate) =>
self.import_candidate(context, signer.clone(), candidate, signature),
self.import_candidate(context, signer.clone(), candidate, signature, group_id),
Statement::Valid(digest) =>
self.validity_vote(context, signer.clone(), digest, ValidityVote::Valid(signature)),
};
@@ -387,9 +385,10 @@ impl<Ctx: Context> Table<Ctx> {
authority: Ctx::AuthorityId,
candidate: Ctx::Candidate,
signature: Ctx::Signature,
group: Ctx::GroupId,
) -> ImportResult<Ctx> {
let group = Ctx::candidate_group(&candidate);
if !context.is_member_of(&authority, &group) {
gum::debug!(target: LOG_TARGET, authority = ?authority, group = ?group, "New `Misbehavior::UnauthorizedStatement`, candidate backed by validator that doesn't belong to expected group" );
return Err(Misbehavior::UnauthorizedStatement(UnauthorizedStatement {
statement: SignedStatement {
signature,
@@ -634,10 +633,6 @@ mod tests {
Digest(candidate.1)
}
fn candidate_group(candidate: &Candidate) -> GroupId {
GroupId(candidate.0)
}
fn is_member_of(&self, authority: &AuthorityId, group: &GroupId) -> bool {
self.authorities.get(authority).map(|v| v == group).unwrap_or(false)
}
@@ -675,10 +670,10 @@ mod tests {
sender: AuthorityId(1),
};
table.import_statement(&context, statement_a);
table.import_statement(&context, GroupId(2), statement_a);
assert!(!table.detected_misbehavior.contains_key(&AuthorityId(1)));
table.import_statement(&context, statement_b);
table.import_statement(&context, GroupId(2), statement_b);
assert_eq!(
table.detected_misbehavior[&AuthorityId(1)][0],
Misbehavior::MultipleCandidates(MultipleCandidates {
@@ -711,10 +706,10 @@ mod tests {
sender: AuthorityId(1),
};
table.import_statement(&context, statement_a);
table.import_statement(&context, GroupId(2), statement_a);
assert!(!table.detected_misbehavior.contains_key(&AuthorityId(1)));
table.import_statement(&context, statement_b);
table.import_statement(&context, GroupId(2), statement_b);
assert!(!table.detected_misbehavior.contains_key(&AuthorityId(1)));
}
@@ -735,7 +730,7 @@ mod tests {
sender: AuthorityId(1),
};
table.import_statement(&context, statement);
table.import_statement(&context, GroupId(2), statement);
assert_eq!(
table.detected_misbehavior[&AuthorityId(1)][0],
@@ -769,7 +764,7 @@ mod tests {
};
let candidate_a_digest = Digest(100);
table.import_statement(&context, candidate_a);
table.import_statement(&context, GroupId(2), candidate_a);
assert!(!table.detected_misbehavior.contains_key(&AuthorityId(1)));
assert!(!table.detected_misbehavior.contains_key(&AuthorityId(2)));
@@ -779,7 +774,7 @@ mod tests {
signature: Signature(2),
sender: AuthorityId(2),
};
table.import_statement(&context, bad_validity_vote);
table.import_statement(&context, GroupId(3), bad_validity_vote);
assert_eq!(
table.detected_misbehavior[&AuthorityId(2)][0],
@@ -811,7 +806,7 @@ mod tests {
sender: AuthorityId(1),
};
table.import_statement(&context, statement);
table.import_statement(&context, GroupId(2), statement);
assert!(!table.detected_misbehavior.contains_key(&AuthorityId(1)));
let invalid_statement = SignedStatement {
@@ -820,7 +815,7 @@ mod tests {
sender: AuthorityId(1),
};
table.import_statement(&context, invalid_statement);
table.import_statement(&context, GroupId(2), invalid_statement);
assert!(table.detected_misbehavior.contains_key(&AuthorityId(1)));
}
@@ -842,7 +837,7 @@ mod tests {
};
let candidate_digest = Digest(100);
table.import_statement(&context, statement);
table.import_statement(&context, GroupId(2), statement);
assert!(!table.detected_misbehavior.contains_key(&AuthorityId(1)));
let extra_vote = SignedStatement {
@@ -851,7 +846,7 @@ mod tests {
sender: AuthorityId(1),
};
table.import_statement(&context, extra_vote);
table.import_statement(&context, GroupId(2), extra_vote);
assert_eq!(
table.detected_misbehavior[&AuthorityId(1)][0],
Misbehavior::ValidityDoubleVote(ValidityDoubleVote::IssuedAndValidity(
@@ -910,7 +905,7 @@ mod tests {
};
let candidate_digest = Digest(100);
table.import_statement(&context, statement);
table.import_statement(&context, GroupId(2), statement);
assert!(!table.detected_misbehavior.contains_key(&AuthorityId(1)));
assert!(table.attested_candidate(&candidate_digest, &context, 2).is_none());
@@ -921,7 +916,7 @@ mod tests {
sender: AuthorityId(2),
};
table.import_statement(&context, vote);
table.import_statement(&context, GroupId(2), vote);
assert!(!table.detected_misbehavior.contains_key(&AuthorityId(2)));
assert!(table.attested_candidate(&candidate_digest, &context, 2).is_some());
}
@@ -944,7 +939,7 @@ mod tests {
};
let summary = table
.import_statement(&context, statement)
.import_statement(&context, GroupId(2), statement)
.expect("candidate import to give summary");
assert_eq!(summary.candidate, Digest(100));
@@ -971,7 +966,7 @@ mod tests {
};
let candidate_digest = Digest(100);
table.import_statement(&context, statement);
table.import_statement(&context, GroupId(2), statement);
assert!(!table.detected_misbehavior.contains_key(&AuthorityId(1)));
let vote = SignedStatement {
@@ -980,8 +975,9 @@ mod tests {
sender: AuthorityId(2),
};
let summary =
table.import_statement(&context, vote).expect("candidate vote to give summary");
let summary = table
.import_statement(&context, GroupId(2), vote)
.expect("candidate vote to give summary");
assert!(!table.detected_misbehavior.contains_key(&AuthorityId(2)));
+3 -3
View File
@@ -35,8 +35,8 @@ pub use generic::{Config, Context, Table};
pub mod v2 {
use crate::generic;
use primitives::{
CandidateHash, CommittedCandidateReceipt, CompactStatement as PrimitiveStatement, Id,
ValidatorIndex, ValidatorSignature,
CandidateHash, CommittedCandidateReceipt, CompactStatement as PrimitiveStatement,
CoreIndex, ValidatorIndex, ValidatorSignature,
};
/// Statements about candidates on the network.
@@ -59,7 +59,7 @@ pub mod v2 {
>;
/// A summary of import of a statement.
pub type Summary = generic::Summary<CandidateHash, Id>;
pub type Summary = generic::Summary<CandidateHash, CoreIndex>;
impl<'a> From<&'a Statement> for PrimitiveStatement {
fn from(s: &'a Statement) -> PrimitiveStatement {