statement-distribution: fix filtering of statements for elastic parachains (#3879)

fixes https://github.com/paritytech/polkadot-sdk/issues/3775

Additionally moves the claim queue fetch utilities into
`subsystem-util`.

TODO:
- [x] fix tests
- [x] add elastic scaling tests

---------

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>
This commit is contained in:
Andrei Sandu
2024-04-03 11:34:50 +03:00
committed by GitHub
parent 665e3654ce
commit e8e201f0ff
9 changed files with 265 additions and 98 deletions
@@ -81,6 +81,9 @@ pub enum Error {
#[error("Fetching validator groups failed {0:?}")]
FetchValidatorGroups(RuntimeApiError),
#[error("Fetching claim queue failed {0:?}")]
FetchClaimQueue(runtime::Error),
#[error("Attempted to share statement when not a validator or not assigned")]
InvalidShare,
@@ -46,6 +46,7 @@ use polkadot_node_subsystem_util::{
backing_implicit_view::View as ImplicitView,
reputation::ReputationAggregator,
runtime::{request_min_backing_votes, ProspectiveParachainsMode},
vstaging::fetch_claim_queue,
};
use polkadot_primitives::{
AuthorityDiscoveryId, CandidateHash, CompactStatement, CoreIndex, CoreState, GroupIndex,
@@ -149,10 +150,9 @@ pub(crate) const REQUEST_RETRY_DELAY: Duration = Duration::from_secs(1);
struct PerRelayParentState {
local_validator: Option<LocalValidatorState>,
statement_store: StatementStore,
availability_cores: Vec<CoreState>,
group_rotation_info: GroupRotationInfo,
seconding_limit: usize,
session: SessionIndex,
groups_per_para: HashMap<ParaId, Vec<GroupIndex>>,
}
impl PerRelayParentState {
@@ -563,11 +563,13 @@ pub(crate) async fn handle_active_leaves_update<Context>(
activated: &ActivatedLeaf,
leaf_mode: ProspectiveParachainsMode,
) -> JfyiErrorResult<()> {
let seconding_limit = match leaf_mode {
let max_candidate_depth = match leaf_mode {
ProspectiveParachainsMode::Disabled => return Ok(()),
ProspectiveParachainsMode::Enabled { max_candidate_depth, .. } => max_candidate_depth + 1,
ProspectiveParachainsMode::Enabled { max_candidate_depth, .. } => max_candidate_depth,
};
let seconding_limit = max_candidate_depth + 1;
state
.implicit_view
.activate_leaf(ctx.sender(), activated.hash)
@@ -693,15 +695,23 @@ pub(crate) async fn handle_active_leaves_update<Context>(
}
});
let groups_per_para = determine_groups_per_para(
ctx.sender(),
new_relay_parent,
availability_cores,
group_rotation_info,
max_candidate_depth,
)
.await;
state.per_relay_parent.insert(
new_relay_parent,
PerRelayParentState {
local_validator,
statement_store: StatementStore::new(&per_session.groups),
availability_cores,
group_rotation_info,
seconding_limit,
session: session_index,
groups_per_para,
},
);
}
@@ -2126,17 +2136,64 @@ async fn provide_candidate_to_grid<Context>(
}
}
fn group_for_para(
availability_cores: &[CoreState],
group_rotation_info: &GroupRotationInfo,
para_id: ParaId,
) -> Option<GroupIndex> {
// Note: this won't work well for on-demand parachains as it assumes that core assignments are
// fixed across blocks.
let core_index = availability_cores.iter().position(|c| c.para_id() == Some(para_id));
// Utility function to populate per relay parent `ParaId` to `GroupIndex` mappings.
async fn determine_groups_per_para(
sender: &mut impl overseer::StatementDistributionSenderTrait,
relay_parent: Hash,
availability_cores: Vec<CoreState>,
group_rotation_info: GroupRotationInfo,
max_candidate_depth: usize,
) -> HashMap<ParaId, Vec<GroupIndex>> {
let maybe_claim_queue = fetch_claim_queue(sender, relay_parent)
.await
.unwrap_or_else(|err| {
gum::debug!(
target: LOG_TARGET,
?relay_parent,
?err,
"determine_groups_per_para: `claim_queue` API not available, falling back to iterating availability cores"
);
None
});
core_index
.map(|c| group_rotation_info.group_for_core(CoreIndex(c as _), availability_cores.len()))
let n_cores = availability_cores.len();
// Determine the core indices occupied by each para at the current relay parent. To support
// on-demand parachains we also consider the core indices at next block if core has a candidate
// pending availability.
let para_core_indices: Vec<_> = if let Some(claim_queue) = maybe_claim_queue {
claim_queue
.into_iter()
.filter_map(|(core_index, paras)| Some((*paras.front()?, core_index)))
.collect()
} else {
availability_cores
.into_iter()
.enumerate()
.filter_map(|(index, core)| match core {
CoreState::Scheduled(scheduled_core) =>
Some((scheduled_core.para_id, CoreIndex(index as u32))),
CoreState::Occupied(occupied_core) =>
if max_candidate_depth >= 1 {
occupied_core
.next_up_on_available
.map(|scheduled_core| (scheduled_core.para_id, CoreIndex(index as u32)))
} else {
None
},
CoreState::Free => None,
})
.collect()
};
let mut groups_per_para = HashMap::new();
// Map from `CoreIndex` to `GroupIndex` and collect as `HashMap`.
for (para, core_index) in para_core_indices {
let group_index = group_rotation_info.group_for_core(core_index, n_cores);
groups_per_para.entry(para).or_insert_with(Vec::new).push(group_index)
}
groups_per_para
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
@@ -2192,18 +2249,23 @@ async fn fragment_tree_update_inner<Context>(
let confirmed_candidate = state.candidates.get_confirmed(&candidate_hash);
let prs = state.per_relay_parent.get_mut(&receipt.descriptor().relay_parent);
if let (Some(confirmed), Some(prs)) = (confirmed_candidate, prs) {
let group_index = group_for_para(
&prs.availability_cores,
&prs.group_rotation_info,
receipt.descriptor().para_id,
);
let per_session = state.per_session.get(&prs.session);
if let (Some(per_session), Some(group_index)) = (per_session, group_index) {
let group_index = confirmed.group_index();
// Sanity check if group_index is valid for this para at relay parent.
let Some(expected_groups) = prs.groups_per_para.get(&receipt.descriptor().para_id)
else {
continue
};
if !expected_groups.iter().any(|g| *g == group_index) {
continue
}
if let Some(per_session) = per_session {
send_backing_fresh_statements(
ctx,
candidate_hash,
group_index,
confirmed.group_index(),
&receipt.descriptor().relay_parent,
prs,
confirmed,
@@ -2311,13 +2373,12 @@ async fn handle_incoming_manifest_common<'a, Context>(
Some(x) => x,
};
let expected_group = group_for_para(
&relay_parent_state.availability_cores,
&relay_parent_state.group_rotation_info,
para_id,
);
let Some(expected_groups) = relay_parent_state.groups_per_para.get(&para_id) else {
modify_reputation(reputation, ctx.sender(), peer, COST_MALFORMED_MANIFEST).await;
return None
};
if expected_group != Some(manifest_summary.claimed_group_index) {
if !expected_groups.iter().any(|g| g == &manifest_summary.claimed_group_index) {
modify_reputation(reputation, ctx.sender(), peer, COST_MALFORMED_MANIFEST).await;
return None
}
@@ -3037,13 +3098,11 @@ pub(crate) async fn handle_response<Context>(
relay_parent_state.session,
|v| per_session.session_info.validators.get(v).map(|x| x.clone()),
|para, g_index| {
let expected_group = group_for_para(
&relay_parent_state.availability_cores,
&relay_parent_state.group_rotation_info,
para,
);
let Some(expected_groups) = relay_parent_state.groups_per_para.get(&para) else {
return false
};
Some(g_index) == expected_group
expected_groups.iter().any(|g| g == &g_index)
},
disabled_mask,
);
@@ -312,6 +312,66 @@ fn useful_cluster_statement_from_non_cluster_peer_rejected() {
});
}
// Both validators in the test are part of backing groups assigned to same parachain
#[test]
fn elastic_scaling_useful_cluster_statement_from_non_cluster_peer_rejected() {
let config = TestConfig {
validator_count: 20,
group_size: 3,
local_validator: LocalRole::Validator,
async_backing_params: None,
};
let relay_parent = Hash::repeat_byte(1);
let peer_a = PeerId::random();
test_harness(config, |state, mut overseer| async move {
let candidate_hash = CandidateHash(Hash::repeat_byte(42));
let test_leaf = state.make_dummy_leaf_with_multiple_cores_per_para(relay_parent, 3);
// Peer A is not in our group, but its group is assigned to same para as we are.
let not_our_group = GroupIndex(1);
let that_group_validators = state.group_validators(not_our_group, false);
let v_non = that_group_validators[0];
connect_peer(
&mut overseer,
peer_a.clone(),
Some(vec![state.discovery_id(v_non)].into_iter().collect()),
)
.await;
send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await;
activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
let statement = state
.sign_statement(
v_non,
CompactStatement::Seconded(candidate_hash),
&SigningContext { parent_hash: relay_parent, session_index: 1 },
)
.as_unchecked()
.clone();
send_peer_message(
&mut overseer,
peer_a.clone(),
protocol_v2::StatementDistributionMessage::Statement(relay_parent, statement),
)
.await;
assert_matches!(
overseer.recv().await,
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r)))
if p == peer_a && r == COST_UNEXPECTED_STATEMENT_INVALID_SENDER.into() => { }
);
overseer
});
}
#[test]
fn statement_from_non_cluster_originator_unexpected() {
let config = TestConfig {
@@ -1829,9 +1829,7 @@ fn advertisement_not_re_sent_when_peer_re_enters_view() {
});
}
// Grid statements imported to backing once candidate enters hypothetical frontier.
#[test]
fn grid_statements_imported_to_backing() {
fn inner_grid_statements_imported_to_backing(groups_for_first_para: usize) {
let validator_count = 6;
let group_size = 3;
let config = TestConfig {
@@ -1851,9 +1849,12 @@ fn grid_statements_imported_to_backing() {
let local_group_index = local_validator.group_index.unwrap();
let other_group = next_group_index(local_group_index, validator_count, group_size);
let other_para = ParaId::from(other_group.0);
let test_leaf = state.make_dummy_leaf(relay_parent);
// Other para is same para for elastic scaling test (groups_for_first_para > 1)
let other_para = ParaId::from((groups_for_first_para == 1) as u32);
let test_leaf =
state.make_dummy_leaf_with_multiple_cores_per_para(relay_parent, groups_for_first_para);
let (candidate, pvd) = make_candidate(
relay_parent,
@@ -2018,6 +2019,18 @@ fn grid_statements_imported_to_backing() {
overseer
});
}
// Grid statements imported to backing once candidate enters hypothetical frontier.
#[test]
fn grid_statements_imported_to_backing() {
inner_grid_statements_imported_to_backing(1);
}
// Grid statements imported to backing once candidate enters hypothetical frontier.
// All statements are for candidates of the same parachain but from different backing groups.
#[test]
fn elastic_scaling_grid_statements_imported_to_backing() {
inner_grid_statements_imported_to_backing(2);
}
#[test]
fn advertisements_rejected_from_incorrect_peers() {
@@ -177,20 +177,39 @@ impl TestState {
}
fn make_dummy_leaf(&self, relay_parent: Hash) -> TestLeaf {
self.make_dummy_leaf_with_multiple_cores_per_para(relay_parent, 1)
}
fn make_dummy_leaf_with_multiple_cores_per_para(
&self,
relay_parent: Hash,
groups_for_first_para: usize,
) -> TestLeaf {
TestLeaf {
number: 1,
hash: relay_parent,
parent_hash: Hash::repeat_byte(0),
session: 1,
availability_cores: self.make_availability_cores(|i| {
CoreState::Scheduled(ScheduledCore {
para_id: ParaId::from(i as u32),
collator: None,
})
let para_id = if i < groups_for_first_para {
ParaId::from(0u32)
} else {
ParaId::from(i as u32)
};
CoreState::Scheduled(ScheduledCore { para_id, collator: None })
}),
disabled_validators: Default::default(),
para_data: (0..self.session_info.validator_groups.len())
.map(|i| (ParaId::from(i as u32), PerParaData::new(1, vec![1, 2, 3].into())))
.map(|i| {
let para_id = if i < groups_for_first_para {
ParaId::from(0u32)
} else {
ParaId::from(i as u32)
};
(para_id, PerParaData::new(1, vec![1, 2, 3].into()))
})
.collect(),
minimum_backing_votes: 2,
}