mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-26 05:17:58 +00:00
#1259 was merged into a feature branch, but we've decided to merge node-side changes for disabling straight into master. This is a dependency of #1841 and #2637. --------- Co-authored-by: Tsvetomir Dimitrov <tsvetomir@parity.io>
This commit is contained in:
@@ -118,6 +118,7 @@ use statement_table::{
|
||||
},
|
||||
Config as TableConfig, Context as TableContextTrait, Table,
|
||||
};
|
||||
use util::vstaging::get_disabled_validators_with_fallback;
|
||||
|
||||
mod error;
|
||||
|
||||
@@ -383,6 +384,21 @@ struct TableContext {
|
||||
validator: Option<Validator>,
|
||||
groups: HashMap<ParaId, Vec<ValidatorIndex>>,
|
||||
validators: Vec<ValidatorId>,
|
||||
disabled_validators: Vec<ValidatorIndex>,
|
||||
}
|
||||
|
||||
impl TableContext {
|
||||
// Returns `true` if the provided `ValidatorIndex` is in the disabled validators list
|
||||
pub fn validator_is_disabled(&self, validator_idx: &ValidatorIndex) -> bool {
|
||||
self.disabled_validators
|
||||
.iter()
|
||||
.any(|disabled_val_idx| *disabled_val_idx == *validator_idx)
|
||||
}
|
||||
|
||||
// Returns `true` if the local validator is in the disabled validators list
|
||||
pub fn local_validator_is_disabled(&self) -> Option<bool> {
|
||||
self.validator.as_ref().map(|v| v.disabled())
|
||||
}
|
||||
}
|
||||
|
||||
impl TableContextTrait for TableContext {
|
||||
@@ -1010,21 +1026,33 @@ async fn construct_per_relay_parent_state<Context>(
|
||||
let minimum_backing_votes =
|
||||
try_runtime_api!(request_min_backing_votes(parent, session_index, ctx.sender()).await);
|
||||
|
||||
let signing_context = SigningContext { parent_hash: parent, session_index };
|
||||
let validator =
|
||||
match Validator::construct(&validators, signing_context.clone(), keystore.clone()) {
|
||||
Ok(v) => Some(v),
|
||||
Err(util::Error::NotAValidator) => None,
|
||||
Err(e) => {
|
||||
gum::warn!(
|
||||
target: LOG_TARGET,
|
||||
err = ?e,
|
||||
"Cannot participate in candidate backing",
|
||||
);
|
||||
// TODO: https://github.com/paritytech/polkadot-sdk/issues/1940
|
||||
// Once runtime ver `DISABLED_VALIDATORS_RUNTIME_REQUIREMENT` is released remove this call to
|
||||
// `get_disabled_validators_with_fallback`, add `request_disabled_validators` call to the
|
||||
// `try_join!` above and use `try_runtime_api!` to get `disabled_validators`
|
||||
let disabled_validators = get_disabled_validators_with_fallback(ctx.sender(), parent)
|
||||
.await
|
||||
.map_err(Error::UtilError)?;
|
||||
|
||||
return Ok(None)
|
||||
},
|
||||
};
|
||||
let signing_context = SigningContext { parent_hash: parent, session_index };
|
||||
let validator = match Validator::construct(
|
||||
&validators,
|
||||
&disabled_validators,
|
||||
signing_context.clone(),
|
||||
keystore.clone(),
|
||||
) {
|
||||
Ok(v) => Some(v),
|
||||
Err(util::Error::NotAValidator) => None,
|
||||
Err(e) => {
|
||||
gum::warn!(
|
||||
target: LOG_TARGET,
|
||||
err = ?e,
|
||||
"Cannot participate in candidate backing",
|
||||
);
|
||||
|
||||
return Ok(None)
|
||||
},
|
||||
};
|
||||
|
||||
let mut groups = HashMap::new();
|
||||
let n_cores = cores.len();
|
||||
@@ -1054,7 +1082,7 @@ async fn construct_per_relay_parent_state<Context>(
|
||||
}
|
||||
}
|
||||
|
||||
let table_context = TableContext { groups, validators, validator };
|
||||
let table_context = TableContext { validator, groups, validators, disabled_validators };
|
||||
let table_config = TableConfig {
|
||||
allow_multiple_seconded: match mode {
|
||||
ProspectiveParachainsMode::Enabled { .. } => true,
|
||||
@@ -1726,6 +1754,19 @@ async fn kick_off_validation_work<Context>(
|
||||
background_validation_tx: &mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
|
||||
attesting: AttestingData,
|
||||
) -> Result<(), Error> {
|
||||
// Do nothing if the local validator is disabled or not a validator at all
|
||||
match rp_state.table_context.local_validator_is_disabled() {
|
||||
Some(true) => {
|
||||
gum::info!(target: LOG_TARGET, "We are disabled - don't kick off validation");
|
||||
return Ok(())
|
||||
},
|
||||
Some(false) => {}, // we are not disabled - move on
|
||||
None => {
|
||||
gum::debug!(target: LOG_TARGET, "We are not a validator - don't kick off validation");
|
||||
return Ok(())
|
||||
},
|
||||
}
|
||||
|
||||
let candidate_hash = attesting.candidate.hash();
|
||||
if rp_state.issued_statements.contains(&candidate_hash) {
|
||||
return Ok(())
|
||||
@@ -1783,6 +1824,16 @@ async fn maybe_validate_and_import<Context>(
|
||||
},
|
||||
};
|
||||
|
||||
// Don't import statement if the sender is disabled
|
||||
if rp_state.table_context.validator_is_disabled(&statement.validator_index()) {
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
sender_validator_idx = ?statement.validator_index(),
|
||||
"Not importing statement because the sender is disabled"
|
||||
);
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
let res = import_statement(ctx, rp_state, &mut state.per_candidate, &statement).await;
|
||||
|
||||
// if we get an Error::RejectedByProspectiveParachains,
|
||||
@@ -1944,6 +1995,13 @@ async fn handle_second_message<Context>(
|
||||
Some(r) => r,
|
||||
};
|
||||
|
||||
// Just return if the local validator is disabled. If we are here the local node should be a
|
||||
// validator but defensively use `unwrap_or(false)` to continue processing in this case.
|
||||
if rp_state.table_context.local_validator_is_disabled().unwrap_or(false) {
|
||||
gum::warn!(target: LOG_TARGET, "Local validator is disabled. Don't validate and second");
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
// Sanity check that candidate is from our assignment.
|
||||
if Some(candidate.descriptor().para_id) != rp_state.assignment {
|
||||
gum::debug!(
|
||||
@@ -1990,6 +2048,7 @@ async fn handle_statement_message<Context>(
|
||||
) -> Result<(), Error> {
|
||||
let _timer = metrics.time_process_statement();
|
||||
|
||||
// Validator disabling is handled in `maybe_validate_and_import`
|
||||
match maybe_validate_and_import(ctx, state, relay_parent, statement).await {
|
||||
Err(Error::ValidationFailed(_)) => Ok(()),
|
||||
Err(e) => Err(e),
|
||||
|
||||
@@ -41,7 +41,7 @@ use sp_keyring::Sr25519Keyring;
|
||||
use sp_keystore::Keystore;
|
||||
use sp_tracing as _;
|
||||
use statement_table::v2::Misbehavior;
|
||||
use std::collections::HashMap;
|
||||
use std::{collections::HashMap, time::Duration};
|
||||
|
||||
mod prospective_parachains;
|
||||
|
||||
@@ -77,6 +77,7 @@ struct TestState {
|
||||
signing_context: SigningContext,
|
||||
relay_parent: Hash,
|
||||
minimum_backing_votes: u32,
|
||||
disabled_validators: Vec<ValidatorIndex>,
|
||||
}
|
||||
|
||||
impl TestState {
|
||||
@@ -148,6 +149,7 @@ impl Default for TestState {
|
||||
signing_context,
|
||||
relay_parent,
|
||||
minimum_backing_votes: LEGACY_MIN_BACKING_VOTES,
|
||||
disabled_validators: Vec::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -293,6 +295,26 @@ async fn test_startup(virtual_overseer: &mut VirtualOverseer, test_state: &TestS
|
||||
tx.send(Ok(test_state.minimum_backing_votes)).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
// Check that subsystem job issues a request for the runtime version.
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::RuntimeApi(
|
||||
RuntimeApiMessage::Request(parent, RuntimeApiRequest::Version(tx))
|
||||
) if parent == test_state.relay_parent => {
|
||||
tx.send(Ok(RuntimeApiRequest::DISABLED_VALIDATORS_RUNTIME_REQUIREMENT)).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
// Check that subsystem job issues a request for the disabled validators.
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::RuntimeApi(
|
||||
RuntimeApiMessage::Request(parent, RuntimeApiRequest::DisabledValidators(tx))
|
||||
) if parent == test_state.relay_parent => {
|
||||
tx.send(Ok(test_state.disabled_validators.clone())).unwrap();
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
async fn assert_validation_requests(
|
||||
@@ -1420,6 +1442,7 @@ fn candidate_backing_reorders_votes() {
|
||||
|
||||
let table_context = TableContext {
|
||||
validator: None,
|
||||
disabled_validators: Vec::new(),
|
||||
groups: validator_groups,
|
||||
validators: validator_public.clone(),
|
||||
};
|
||||
@@ -1957,3 +1980,307 @@ fn new_leaf_view_doesnt_clobber_old() {
|
||||
virtual_overseer
|
||||
});
|
||||
}
|
||||
|
||||
// Test that a disabled local validator doesn't do any work on `CandidateBackingMessage::Second`
|
||||
#[test]
|
||||
fn disabled_validator_doesnt_distribute_statement_on_receiving_second() {
|
||||
let mut test_state = TestState::default();
|
||||
test_state.disabled_validators.push(ValidatorIndex(0));
|
||||
|
||||
test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
|
||||
test_startup(&mut virtual_overseer, &test_state).await;
|
||||
|
||||
let pov = PoV { block_data: BlockData(vec![42, 43, 44]) };
|
||||
let pvd = dummy_pvd();
|
||||
let validation_code = ValidationCode(vec![1, 2, 3]);
|
||||
|
||||
let expected_head_data = test_state.head_data.get(&test_state.chain_ids[0]).unwrap();
|
||||
|
||||
let pov_hash = pov.hash();
|
||||
let candidate = TestCandidateBuilder {
|
||||
para_id: test_state.chain_ids[0],
|
||||
relay_parent: test_state.relay_parent,
|
||||
pov_hash,
|
||||
head_data: expected_head_data.clone(),
|
||||
erasure_root: make_erasure_root(&test_state, pov.clone(), pvd.clone()),
|
||||
persisted_validation_data_hash: pvd.hash(),
|
||||
validation_code: validation_code.0.clone(),
|
||||
}
|
||||
.build();
|
||||
|
||||
let second = CandidateBackingMessage::Second(
|
||||
test_state.relay_parent,
|
||||
candidate.to_plain(),
|
||||
pvd.clone(),
|
||||
pov.clone(),
|
||||
);
|
||||
|
||||
virtual_overseer.send(FromOrchestra::Communication { msg: second }).await;
|
||||
|
||||
// Ensure backing subsystem is not doing any work
|
||||
assert_matches!(virtual_overseer.recv().timeout(Duration::from_secs(1)).await, None);
|
||||
|
||||
virtual_overseer
|
||||
.send(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(
|
||||
ActiveLeavesUpdate::stop_work(test_state.relay_parent),
|
||||
)))
|
||||
.await;
|
||||
virtual_overseer
|
||||
});
|
||||
}
|
||||
|
||||
// Test that a disabled local validator doesn't do any work on `CandidateBackingMessage::Statement`
|
||||
#[test]
|
||||
fn disabled_validator_doesnt_distribute_statement_on_receiving_statement() {
|
||||
let mut test_state = TestState::default();
|
||||
test_state.disabled_validators.push(ValidatorIndex(0));
|
||||
|
||||
test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
|
||||
test_startup(&mut virtual_overseer, &test_state).await;
|
||||
|
||||
let pov = PoV { block_data: BlockData(vec![42, 43, 44]) };
|
||||
let pvd = dummy_pvd();
|
||||
let validation_code = ValidationCode(vec![1, 2, 3]);
|
||||
|
||||
let expected_head_data = test_state.head_data.get(&test_state.chain_ids[0]).unwrap();
|
||||
|
||||
let pov_hash = pov.hash();
|
||||
let candidate = TestCandidateBuilder {
|
||||
para_id: test_state.chain_ids[0],
|
||||
relay_parent: test_state.relay_parent,
|
||||
pov_hash,
|
||||
head_data: expected_head_data.clone(),
|
||||
erasure_root: make_erasure_root(&test_state, pov.clone(), pvd.clone()),
|
||||
persisted_validation_data_hash: pvd.hash(),
|
||||
validation_code: validation_code.0.clone(),
|
||||
}
|
||||
.build();
|
||||
|
||||
let public2 = Keystore::sr25519_generate_new(
|
||||
&*test_state.keystore,
|
||||
ValidatorId::ID,
|
||||
Some(&test_state.validators[2].to_seed()),
|
||||
)
|
||||
.expect("Insert key into keystore");
|
||||
|
||||
let signed = SignedFullStatementWithPVD::sign(
|
||||
&test_state.keystore,
|
||||
StatementWithPVD::Seconded(candidate.clone(), pvd.clone()),
|
||||
&test_state.signing_context,
|
||||
ValidatorIndex(2),
|
||||
&public2.into(),
|
||||
)
|
||||
.ok()
|
||||
.flatten()
|
||||
.expect("should be signed");
|
||||
|
||||
let statement = CandidateBackingMessage::Statement(test_state.relay_parent, signed.clone());
|
||||
|
||||
virtual_overseer.send(FromOrchestra::Communication { msg: statement }).await;
|
||||
|
||||
// Ensure backing subsystem is not doing any work
|
||||
assert_matches!(virtual_overseer.recv().timeout(Duration::from_secs(1)).await, None);
|
||||
|
||||
virtual_overseer
|
||||
.send(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(
|
||||
ActiveLeavesUpdate::stop_work(test_state.relay_parent),
|
||||
)))
|
||||
.await;
|
||||
virtual_overseer
|
||||
});
|
||||
}
|
||||
|
||||
// Test that a validator doesn't do any work on receiving a `CandidateBackingMessage::Statement`
|
||||
// from a disabled validator
|
||||
#[test]
|
||||
fn validator_ignores_statements_from_disabled_validators() {
|
||||
let mut test_state = TestState::default();
|
||||
test_state.disabled_validators.push(ValidatorIndex(2));
|
||||
|
||||
test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
|
||||
test_startup(&mut virtual_overseer, &test_state).await;
|
||||
|
||||
let pov = PoV { block_data: BlockData(vec![42, 43, 44]) };
|
||||
let pvd = dummy_pvd();
|
||||
let validation_code = ValidationCode(vec![1, 2, 3]);
|
||||
|
||||
let expected_head_data = test_state.head_data.get(&test_state.chain_ids[0]).unwrap();
|
||||
|
||||
let pov_hash = pov.hash();
|
||||
let candidate = TestCandidateBuilder {
|
||||
para_id: test_state.chain_ids[0],
|
||||
relay_parent: test_state.relay_parent,
|
||||
pov_hash,
|
||||
head_data: expected_head_data.clone(),
|
||||
erasure_root: make_erasure_root(&test_state, pov.clone(), pvd.clone()),
|
||||
persisted_validation_data_hash: pvd.hash(),
|
||||
validation_code: validation_code.0.clone(),
|
||||
}
|
||||
.build();
|
||||
let candidate_commitments_hash = candidate.commitments.hash();
|
||||
|
||||
let public2 = Keystore::sr25519_generate_new(
|
||||
&*test_state.keystore,
|
||||
ValidatorId::ID,
|
||||
Some(&test_state.validators[2].to_seed()),
|
||||
)
|
||||
.expect("Insert key into keystore");
|
||||
|
||||
let signed_2 = SignedFullStatementWithPVD::sign(
|
||||
&test_state.keystore,
|
||||
StatementWithPVD::Seconded(candidate.clone(), pvd.clone()),
|
||||
&test_state.signing_context,
|
||||
ValidatorIndex(2),
|
||||
&public2.into(),
|
||||
)
|
||||
.ok()
|
||||
.flatten()
|
||||
.expect("should be signed");
|
||||
|
||||
let statement_2 =
|
||||
CandidateBackingMessage::Statement(test_state.relay_parent, signed_2.clone());
|
||||
|
||||
virtual_overseer.send(FromOrchestra::Communication { msg: statement_2 }).await;
|
||||
|
||||
// Ensure backing subsystem is not doing any work
|
||||
assert_matches!(virtual_overseer.recv().timeout(Duration::from_secs(1)).await, None);
|
||||
|
||||
// Now send a statement from a honest validator and make sure it gets processed
|
||||
let public3 = Keystore::sr25519_generate_new(
|
||||
&*test_state.keystore,
|
||||
ValidatorId::ID,
|
||||
Some(&test_state.validators[3].to_seed()),
|
||||
)
|
||||
.expect("Insert key into keystore");
|
||||
|
||||
let signed_3 = SignedFullStatementWithPVD::sign(
|
||||
&test_state.keystore,
|
||||
StatementWithPVD::Seconded(candidate.clone(), pvd.clone()),
|
||||
&test_state.signing_context,
|
||||
ValidatorIndex(3),
|
||||
&public3.into(),
|
||||
)
|
||||
.ok()
|
||||
.flatten()
|
||||
.expect("should be signed");
|
||||
|
||||
let statement_3 =
|
||||
CandidateBackingMessage::Statement(test_state.relay_parent, signed_3.clone());
|
||||
|
||||
virtual_overseer.send(FromOrchestra::Communication { msg: statement_3 }).await;
|
||||
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::RuntimeApi(
|
||||
RuntimeApiMessage::Request(_, RuntimeApiRequest::ValidationCodeByHash(hash, tx))
|
||||
) if hash == validation_code.hash() => {
|
||||
tx.send(Ok(Some(validation_code.clone()))).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::RuntimeApi(
|
||||
RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionIndexForChild(tx))
|
||||
) => {
|
||||
tx.send(Ok(1u32.into())).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::RuntimeApi(
|
||||
RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionExecutorParams(sess_idx, tx))
|
||||
) if sess_idx == 1 => {
|
||||
tx.send(Ok(Some(ExecutorParams::default()))).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
// Sending a `Statement::Seconded` for our assignment will start
|
||||
// validation process. The first thing requested is the PoV.
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::AvailabilityDistribution(
|
||||
AvailabilityDistributionMessage::FetchPoV {
|
||||
relay_parent,
|
||||
tx,
|
||||
..
|
||||
}
|
||||
) if relay_parent == test_state.relay_parent => {
|
||||
tx.send(pov.clone()).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
// The next step is the actual request to Validation subsystem
|
||||
// to validate the `Seconded` candidate.
|
||||
let expected_pov = pov;
|
||||
let expected_validation_code = validation_code;
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::CandidateValidation(
|
||||
CandidateValidationMessage::ValidateFromExhaustive {
|
||||
validation_data,
|
||||
validation_code,
|
||||
candidate_receipt,
|
||||
pov,
|
||||
executor_params: _,
|
||||
exec_kind,
|
||||
response_sender,
|
||||
}
|
||||
) if validation_data == pvd &&
|
||||
validation_code == expected_validation_code &&
|
||||
*pov == expected_pov && &candidate_receipt.descriptor == candidate.descriptor() &&
|
||||
exec_kind == PvfExecKind::Backing &&
|
||||
candidate_commitments_hash == candidate_receipt.commitments_hash =>
|
||||
{
|
||||
response_sender.send(Ok(
|
||||
ValidationResult::Valid(CandidateCommitments {
|
||||
head_data: expected_head_data.clone(),
|
||||
upward_messages: Default::default(),
|
||||
horizontal_messages: Default::default(),
|
||||
new_validation_code: None,
|
||||
processed_downward_messages: 0,
|
||||
hrmp_watermark: 0,
|
||||
}, test_state.validation_data.clone()),
|
||||
)).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::AvailabilityStore(
|
||||
AvailabilityStoreMessage::StoreAvailableData { candidate_hash, tx, .. }
|
||||
) if candidate_hash == candidate.hash() => {
|
||||
tx.send(Ok(())).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::StatementDistribution(
|
||||
StatementDistributionMessage::Share(hash, _stmt)
|
||||
) => {
|
||||
assert_eq!(test_state.relay_parent, hash);
|
||||
}
|
||||
);
|
||||
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::Provisioner(
|
||||
ProvisionerMessage::ProvisionableData(
|
||||
_,
|
||||
ProvisionableData::BackedCandidate(candidate_receipt)
|
||||
)
|
||||
) => {
|
||||
assert_eq!(candidate_receipt, candidate.to_plain());
|
||||
}
|
||||
);
|
||||
|
||||
virtual_overseer
|
||||
.send(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(
|
||||
ActiveLeavesUpdate::stop_work(test_state.relay_parent),
|
||||
)))
|
||||
.await;
|
||||
virtual_overseer
|
||||
});
|
||||
}
|
||||
|
||||
@@ -195,6 +195,26 @@ async fn activate_leaf(
|
||||
tx.send(Ok(test_state.minimum_backing_votes)).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
// Check that subsystem job issues a request for the runtime version.
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::RuntimeApi(
|
||||
RuntimeApiMessage::Request(parent, RuntimeApiRequest::Version(tx))
|
||||
) if parent == hash => {
|
||||
tx.send(Ok(RuntimeApiRequest::DISABLED_VALIDATORS_RUNTIME_REQUIREMENT)).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
// Check that the subsystem job issues a request for the disabled validators.
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::RuntimeApi(
|
||||
RuntimeApiMessage::Request(parent, RuntimeApiRequest::DisabledValidators(tx))
|
||||
) if parent == hash => {
|
||||
tx.send(Ok(Vec::new())).unwrap();
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user