statement-distribution: support inactive local validator in grid (#1571)

Fixes #1437

Co-authored-by: Sophia Gold <sophia@parity.io>
This commit is contained in:
Kristian Sosnin
2023-11-14 22:14:59 +04:00
committed by GitHub
parent 54f84285bf
commit 31c38cea3d
5 changed files with 514 additions and 259 deletions
@@ -142,8 +142,27 @@ struct PerRelayParentState {
session: SessionIndex,
}
impl PerRelayParentState {
fn active_validator_state(&self) -> Option<&ActiveValidatorState> {
self.local_validator.as_ref().and_then(|local| local.active.as_ref())
}
fn active_validator_state_mut(&mut self) -> Option<&mut ActiveValidatorState> {
self.local_validator.as_mut().and_then(|local| local.active.as_mut())
}
}
// per-relay-parent local validator state.
struct LocalValidatorState {
// the grid-level communication at this relay-parent.
grid_tracker: GridTracker,
// additional fields in case local node is an active validator.
active: Option<ActiveValidatorState>,
// local index actually exists in case node is inactive validator, however,
// it's not needed outside of `build_session_topology`, where it's known.
}
struct ActiveValidatorState {
// The index of the validator.
index: ValidatorIndex,
// our validator group
@@ -152,8 +171,14 @@ struct LocalValidatorState {
assignment: Option<ParaId>,
// the 'direct-in-group' communication at this relay-parent.
cluster_tracker: ClusterTracker,
// the grid-level communication at this relay-parent.
grid_tracker: GridTracker,
}
#[derive(Debug, Copy, Clone)]
enum LocalValidatorIndex {
// Local node is an active validator.
Active(ValidatorIndex),
// Local node is not in active validator set.
Inactive,
}
#[derive(Debug)]
@@ -164,7 +189,7 @@ struct PerSessionState {
// is only `None` in the time between seeing a session and
// getting the topology from the gossip-support subsystem
grid_view: Option<grid::SessionTopologyView>,
local_validator: Option<ValidatorIndex>,
local_validator: Option<LocalValidatorIndex>,
}
impl PerSessionState {
@@ -178,15 +203,10 @@ impl PerSessionState {
let local_validator = polkadot_node_subsystem_util::signing_key_and_index(
session_info.validators.iter(),
keystore,
);
)
.map(|(_, index)| LocalValidatorIndex::Active(index));
PerSessionState {
session_info,
groups,
authority_lookup,
grid_view: None,
local_validator: local_validator.map(|(_key, index)| index),
}
PerSessionState { session_info, groups, authority_lookup, grid_view: None, local_validator }
}
fn supply_topology(
@@ -204,6 +224,16 @@ impl PerSessionState {
);
self.grid_view = Some(grid_view);
if local_index.is_some() {
self.local_validator.get_or_insert(LocalValidatorIndex::Inactive);
}
}
/// Returns `true` if local is neither active or inactive validator node.
///
/// `false` is also returned if session topology is not known yet.
fn is_not_validator(&self) -> bool {
self.grid_view.is_some() && self.local_validator.is_none()
}
}
@@ -554,13 +584,17 @@ pub(crate) async fn handle_active_leaves_update<Context>(
.expect("either existed or just inserted; qed");
let local_validator = per_session.local_validator.and_then(|v| {
find_local_validator_state(
v,
&per_session.groups,
&availability_cores,
&group_rotation_info,
seconding_limit,
)
if let LocalValidatorIndex::Active(idx) = v {
find_active_validator_state(
idx,
&per_session.groups,
&availability_cores,
&group_rotation_info,
seconding_limit,
)
} else {
Some(LocalValidatorState { grid_tracker: GridTracker::default(), active: None })
}
});
state.per_relay_parent.insert(
@@ -607,7 +641,7 @@ pub(crate) async fn handle_active_leaves_update<Context>(
Ok(())
}
fn find_local_validator_state(
fn find_active_validator_state(
validator_index: ValidatorIndex,
groups: &Groups,
availability_cores: &[CoreState],
@@ -628,11 +662,13 @@ fn find_local_validator_state(
let group_validators = groups.get(our_group)?.to_owned();
Some(LocalValidatorState {
index: validator_index,
group: our_group,
assignment: para,
cluster_tracker: ClusterTracker::new(group_validators, seconding_limit)
.expect("group is non-empty because we are in it; qed"),
active: Some(ActiveValidatorState {
index: validator_index,
group: our_group,
assignment: para,
cluster_tracker: ClusterTracker::new(group_validators, seconding_limit)
.expect("group is non-empty because we are in it; qed"),
}),
grid_tracker: GridTracker::default(),
})
}
@@ -725,13 +761,17 @@ async fn send_peer_messages_for_relay_parent<Context>(
for validator_id in find_validator_ids(peer_data.iter_known_discovery_ids(), |a| {
per_session_state.authority_lookup.get(a)
}) {
if let Some(local_validator_state) = relay_parent_state.local_validator.as_mut() {
if let Some(active) = relay_parent_state
.local_validator
.as_mut()
.and_then(|local| local.active.as_mut())
{
send_pending_cluster_statements(
ctx,
relay_parent,
&(peer, peer_data.protocol_version),
validator_id,
&mut local_validator_state.cluster_tracker,
&mut active.cluster_tracker,
&state.candidates,
&relay_parent_state.statement_store,
)
@@ -1009,7 +1049,7 @@ pub(crate) async fn share_local_statement<Context>(
};
let (local_index, local_assignment, local_group) =
match per_relay_parent.local_validator.as_ref() {
match per_relay_parent.active_validator_state() {
None => return Err(JfyiError::InvalidShare),
Some(l) => (l.index, l.assignment, l.group),
};
@@ -1086,7 +1126,7 @@ pub(crate) async fn share_local_statement<Context>(
}
{
let l = per_relay_parent.local_validator.as_mut().expect("checked above; qed");
let l = per_relay_parent.active_validator_state_mut().expect("checked above; qed");
l.cluster_tracker.note_issued(local_index, compact_statement.payload().clone());
}
@@ -1173,31 +1213,41 @@ async fn circulate_statement<Context>(
// We're not meant to circulate statements in the cluster until we have the confirmed
// candidate.
let cluster_relevant = Some(local_validator.group) == statement_group;
let cluster_targets = if is_confirmed && cluster_relevant {
Some(
local_validator
.cluster_tracker
.targets()
.iter()
.filter(|&&v| {
local_validator
//
// Cluster is only relevant if local node is an active validator.
let (cluster_relevant, cluster_targets, all_cluster_targets) = local_validator
.active
.as_mut()
.map(|active| {
let cluster_relevant = Some(active.group) == statement_group;
let cluster_targets = if is_confirmed && cluster_relevant {
Some(
active
.cluster_tracker
.can_send(v, originator, compact_statement.clone())
.is_ok()
})
.filter(|&v| v != &local_validator.index)
.map(|v| (*v, DirectTargetKind::Cluster)),
)
} else {
None
};
.targets()
.iter()
.filter(|&&v| {
active
.cluster_tracker
.can_send(v, originator, compact_statement.clone())
.is_ok()
})
.filter(|&v| v != &active.index)
.map(|v| (*v, DirectTargetKind::Cluster)),
)
} else {
None
};
let all_cluster_targets = active.cluster_tracker.targets();
(cluster_relevant, cluster_targets, all_cluster_targets)
})
.unwrap_or((false, None, &[]));
let grid_targets = local_validator
.grid_tracker
.direct_statement_targets(&per_session.groups, originator, &compact_statement)
.into_iter()
.filter(|v| !cluster_relevant || !local_validator.cluster_tracker.targets().contains(v))
.filter(|v| !cluster_relevant || !all_cluster_targets.contains(v))
.map(|v| (v, DirectTargetKind::Grid));
let targets = cluster_targets
@@ -1229,18 +1279,17 @@ async fn circulate_statement<Context>(
match kind {
DirectTargetKind::Cluster => {
let active = local_validator
.active
.as_mut()
.expect("cluster target means local is active validator; qed");
// At this point, all peers in the cluster should 'know'
// the candidate, so we don't expect for this to fail.
if let Ok(()) = local_validator.cluster_tracker.can_send(
target,
originator,
compact_statement.clone(),
) {
local_validator.cluster_tracker.note_sent(
target,
originator,
compact_statement.clone(),
);
if let Ok(()) =
active.cluster_tracker.can_send(target, originator, compact_statement.clone())
{
active.cluster_tracker.note_sent(target, originator, compact_statement.clone());
statement_to_peers.push(peer_id);
}
},
@@ -1387,7 +1436,9 @@ async fn handle_incoming_statement<Context>(
None => {
// we shouldn't be receiving statements unless we're a validator
// this session.
modify_reputation(reputation, ctx.sender(), peer, COST_UNEXPECTED_STATEMENT).await;
if per_session.is_not_validator() {
modify_reputation(reputation, ctx.sender(), peer, COST_UNEXPECTED_STATEMENT).await;
}
return
},
Some(l) => l,
@@ -1402,73 +1453,81 @@ async fn handle_incoming_statement<Context>(
},
};
let cluster_sender_index = {
let (active, cluster_sender_index) = {
// This block of code only returns `Some` when both the originator and
// the sending peer are in the cluster.
let active = local_validator.active.as_mut();
let allowed_senders = local_validator
.cluster_tracker
.senders_for_originator(statement.unchecked_validator_index());
let allowed_senders = active
.as_ref()
.map(|active| {
active
.cluster_tracker
.senders_for_originator(statement.unchecked_validator_index())
})
.unwrap_or_default();
allowed_senders
let idx = allowed_senders
.iter()
.filter_map(|i| session_info.discovery_keys.get(i.0 as usize).map(|ad| (*i, ad)))
.filter(|(_, ad)| peer_state.is_authority(ad))
.map(|(i, _)| i)
.next()
.next();
(active, idx)
};
let checked_statement = if let Some(cluster_sender_index) = cluster_sender_index {
match handle_cluster_statement(
relay_parent,
&mut local_validator.cluster_tracker,
per_relay_parent.session,
&per_session.session_info,
statement,
cluster_sender_index,
) {
Ok(Some(s)) => s,
Ok(None) => return,
Err(rep) => {
modify_reputation(reputation, ctx.sender(), peer, rep).await;
return
},
}
} else {
let grid_sender_index = local_validator
.grid_tracker
.direct_statement_providers(
&per_session.groups,
statement.unchecked_validator_index(),
statement.unchecked_payload(),
)
.into_iter()
.filter_map(|i| session_info.discovery_keys.get(i.0 as usize).map(|ad| (i, ad)))
.filter(|(_, ad)| peer_state.is_authority(ad))
.map(|(i, _)| i)
.next();
if let Some(grid_sender_index) = grid_sender_index {
match handle_grid_statement(
let checked_statement =
if let Some((active, cluster_sender_index)) = active.zip(cluster_sender_index) {
match handle_cluster_statement(
relay_parent,
&mut local_validator.grid_tracker,
&mut active.cluster_tracker,
per_relay_parent.session,
&per_session,
&per_session.session_info,
statement,
grid_sender_index,
cluster_sender_index,
) {
Ok(s) => s,
Ok(Some(s)) => s,
Ok(None) => return,
Err(rep) => {
modify_reputation(reputation, ctx.sender(), peer, rep).await;
return
},
}
} else {
// Not a cluster or grid peer.
modify_reputation(reputation, ctx.sender(), peer, COST_UNEXPECTED_STATEMENT).await;
return
}
};
let grid_sender_index = local_validator
.grid_tracker
.direct_statement_providers(
&per_session.groups,
statement.unchecked_validator_index(),
statement.unchecked_payload(),
)
.into_iter()
.filter_map(|i| session_info.discovery_keys.get(i.0 as usize).map(|ad| (i, ad)))
.filter(|(_, ad)| peer_state.is_authority(ad))
.map(|(i, _)| i)
.next();
if let Some(grid_sender_index) = grid_sender_index {
match handle_grid_statement(
relay_parent,
&mut local_validator.grid_tracker,
per_relay_parent.session,
&per_session,
statement,
grid_sender_index,
) {
Ok(s) => s,
Err(rep) => {
modify_reputation(reputation, ctx.sender(), peer, rep).await;
return
},
}
} else {
// Not a cluster or grid peer.
modify_reputation(reputation, ctx.sender(), peer, COST_UNEXPECTED_STATEMENT).await;
return
}
};
let statement = checked_statement.payload().clone();
let originator_index = checked_statement.validator_index();
@@ -1536,7 +1595,7 @@ async fn handle_incoming_statement<Context>(
local_validator.grid_tracker.learned_fresh_statement(
&per_session.groups,
session_topology,
local_validator.index,
originator_index,
&statement,
);
}
@@ -1834,7 +1893,7 @@ async fn provide_candidate_to_grid<Context>(
gum::debug!(
target: LOG_TARGET,
?candidate_hash,
local_validator = ?local_validator.index,
local_validator = ?per_session.local_validator,
n_peers = manifest_peers_v2.len(),
"Sending manifest to v2 peers"
);
@@ -1853,7 +1912,7 @@ async fn provide_candidate_to_grid<Context>(
gum::debug!(
target: LOG_TARGET,
?candidate_hash,
local_validator = ?local_validator.index,
local_validator = ?per_session.local_validator,
n_peers = manifest_peers_vstaging.len(),
"Sending manifest to vstaging peers"
);
@@ -1874,7 +1933,7 @@ async fn provide_candidate_to_grid<Context>(
gum::debug!(
target: LOG_TARGET,
?candidate_hash,
local_validator = ?local_validator.index,
local_validator = ?per_session.local_validator,
n_peers = ack_peers_v2.len(),
"Sending acknowledgement to v2 peers"
);
@@ -1893,7 +1952,7 @@ async fn provide_candidate_to_grid<Context>(
gum::debug!(
target: LOG_TARGET,
?candidate_hash,
local_validator = ?local_validator.index,
local_validator = ?per_session.local_validator,
n_peers = ack_peers_vstaging.len(),
"Sending acknowledgement to vstaging peers"
);
@@ -2086,13 +2145,15 @@ async fn handle_incoming_manifest_common<'a, Context>(
let local_validator = match relay_parent_state.local_validator.as_mut() {
None => {
modify_reputation(
reputation,
ctx.sender(),
peer,
COST_UNEXPECTED_MANIFEST_MISSING_KNOWLEDGE,
)
.await;
if per_session.is_not_validator() {
modify_reputation(
reputation,
ctx.sender(),
peer,
COST_UNEXPECTED_MANIFEST_MISSING_KNOWLEDGE,
)
.await;
}
return None
},
Some(x) => x,
@@ -2188,7 +2249,7 @@ async fn handle_incoming_manifest_common<'a, Context>(
target: LOG_TARGET,
?candidate_hash,
from = ?sender_index,
local_index = ?local_validator.index,
local_index = ?per_session.local_validator,
?manifest_kind,
"immediate ack, known candidate"
);
@@ -2593,7 +2654,7 @@ async fn send_cluster_candidate_statements<Context>(
Some(s) => s,
};
let local_group = match relay_parent_state.local_validator.as_mut() {
let local_group = match relay_parent_state.active_validator_state_mut() {
None => return,
Some(v) => v.group,
};
@@ -2680,11 +2741,10 @@ pub(crate) async fn dispatch_requests<Context>(ctx: &mut Context, state: &mut St
}) {
// For cluster members, they haven't advertised any statements in particular,
// but have surely sent us some.
if local_validator
.cluster_tracker
.knows_candidate(validator_id, identifier.candidate_hash)
{
return Some(StatementFilter::blank(local_validator.cluster_tracker.targets().len()))
if let Some(active) = local_validator.active.as_ref() {
if active.cluster_tracker.knows_candidate(validator_id, identifier.candidate_hash) {
return Some(StatementFilter::blank(active.cluster_tracker.targets().len()))
}
}
let filter = local_validator
@@ -2715,7 +2775,11 @@ pub(crate) async fn dispatch_requests<Context>(ctx: &mut Context, state: &mut St
}
// don't require a backing threshold for cluster candidates.
let require_backing = relay_parent_state.local_validator.as_ref()?.group != group_index;
let local_validator = relay_parent_state.local_validator.as_ref()?;
let require_backing = local_validator
.active
.as_ref()
.map_or(true, |active| active.group != group_index);
Some(RequestProperties {
unwanted_mask,
@@ -2973,7 +3037,11 @@ pub(crate) fn answer_request(state: &mut State, message: ResponderMessage) {
for v in find_validator_ids(peer_data.iter_known_discovery_ids(), |a| {
per_session.authority_lookup.get(a)
}) {
if local_validator.cluster_tracker.can_request(v, *candidate_hash) {
if local_validator
.active
.as_ref()
.map_or(false, |active| active.cluster_tracker.can_request(v, *candidate_hash))
{
validator_id = Some(v);
is_cluster = true;
break
@@ -3015,11 +3083,16 @@ pub(crate) fn answer_request(state: &mut State, message: ResponderMessage) {
// Update bookkeeping about which statements peers have received.
for statement in &statements {
if is_cluster {
local_validator.cluster_tracker.note_sent(
validator_id,
statement.unchecked_validator_index(),
statement.unchecked_payload().clone(),
);
local_validator
.active
.as_mut()
.expect("cluster peer means local is active validator; qed")
.cluster_tracker
.note_sent(
validator_id,
statement.unchecked_validator_index(),
statement.unchecked_payload().clone(),
);
} else {
local_validator.grid_tracker.sent_or_received_direct_statement(
&per_session.groups,