Vstaging statement distribution omnibus (#1436)

in-progress PR adding new tests and solving bugs

---------

Co-authored-by: Bradley Olson <34992650+BradleyOlson64@users.noreply.github.com>
Co-authored-by: eskimor <eskimor@no-such-url.com>
Co-authored-by: eskimor <eskimor@users.noreply.github.com>
Co-authored-by: Andrei Sandu <54316454+sandreim@users.noreply.github.com>
This commit is contained in:
asynchronous rob
2023-10-21 04:01:14 -05:00
committed by GitHub
parent 76994356fc
commit a46183c706
14 changed files with 655 additions and 128 deletions
@@ -100,6 +100,8 @@ const COST_UNEXPECTED_MANIFEST_MISSING_KNOWLEDGE: Rep =
Rep::CostMinor("Unexpected Manifest, missing knowlege for relay parent");
const COST_UNEXPECTED_MANIFEST_DISALLOWED: Rep =
Rep::CostMinor("Unexpected Manifest, Peer Disallowed");
const COST_UNEXPECTED_MANIFEST_PEER_UNKNOWN: Rep =
Rep::CostMinor("Unexpected Manifest, Peer Unknown");
const COST_CONFLICTING_MANIFEST: Rep = Rep::CostMajor("Manifest conflicts with previous");
const COST_INSUFFICIENT_MANIFEST: Rep =
Rep::CostMajor("Manifest statements insufficient to back candidate");
@@ -185,11 +187,18 @@ impl PerSessionState {
}
}
fn supply_topology(&mut self, topology: &SessionGridTopology) {
fn supply_topology(
&mut self,
topology: &SessionGridTopology,
local_index: Option<ValidatorIndex>,
) {
// Note: we use the local index rather than the `self.local_validator` as the
// former may be `Some` when the latter is `None`, due to the set of nodes in
// discovery being a superset of the active validators for consensus.
let grid_view = grid::build_session_topology(
self.session_info.validator_groups.iter(),
topology,
self.local_validator,
local_index,
);
self.grid_view = Some(grid_view);
@@ -334,7 +343,7 @@ pub(crate) async fn handle_network_update<Context>(
true
},
Entry::Occupied(e) => {
gum::trace!(
gum::debug!(
target: LOG_TARGET,
authority_id = ?a,
existing_peer = ?e.get(),
@@ -366,9 +375,10 @@ pub(crate) async fn handle_network_update<Context>(
NetworkBridgeEvent::NewGossipTopology(topology) => {
let new_session_index = topology.session;
let new_topology = topology.topology;
let local_index = topology.local_index;
if let Some(per_session) = state.per_session.get_mut(&new_session_index) {
per_session.supply_topology(&new_topology);
per_session.supply_topology(&new_topology, local_index);
}
// TODO [https://github.com/paritytech/polkadot/issues/6194]
@@ -409,6 +419,12 @@ pub(crate) async fn handle_network_update<Context>(
"Updated `AuthorityDiscoveryId`s"
);
// defensive: ensure peers are actually connected
let peer_state = match state.peers.get_mut(&peer_id) {
None => return,
Some(p) => p,
};
// Remove the authority IDs which were previously mapped to the peer
// but aren't part of the new set.
state.authorities.retain(|a, p| p != &peer_id || authority_ids.contains(a));
@@ -418,9 +434,7 @@ pub(crate) async fn handle_network_update<Context>(
state.authorities.insert(a, peer_id);
}
if let Some(peer_state) = state.peers.get_mut(&peer_id) {
peer_state.discovery_ids = Some(authority_ids);
}
peer_state.discovery_ids = Some(authority_ids);
},
}
}
@@ -542,6 +556,13 @@ pub(crate) async fn handle_active_leaves_update<Context>(
);
}
gum::debug!(
target: LOG_TARGET,
"Activated leaves. Now tracking {} relay-parents across {} sessions",
state.per_relay_parent.len(),
state.per_session.len(),
);
// Reconcile all peers' views with the active leaf and any relay parents
// it implies. If they learned about the block before we did, this reconciliation will give
// non-empty results and we should send them messages concerning all activated relay-parents.
@@ -599,25 +620,18 @@ fn find_local_validator_state(
pub(crate) fn handle_deactivate_leaves(state: &mut State, leaves: &[Hash]) {
// deactivate the leaf in the implicit view.
for leaf in leaves {
state.implicit_view.deactivate_leaf(*leaf);
let pruned = state.implicit_view.deactivate_leaf(*leaf);
for pruned_rp in pruned {
// clean up per-relay-parent data based on everything removed.
state.per_relay_parent.remove(&pruned_rp);
// clean up requests related to this relay parent.
state.request_manager.remove_by_relay_parent(*leaf);
}
}
let relay_parents = state.implicit_view.all_allowed_relay_parents().collect::<HashSet<_>>();
// fast exit for no-op.
if relay_parents.len() == state.per_relay_parent.len() {
return
}
// clean up per-relay-parent data based on everything removed.
state.per_relay_parent.retain(|r, _| relay_parents.contains(r));
// Clean up all requests
for leaf in leaves {
state.request_manager.remove_by_relay_parent(*leaf);
}
state.candidates.on_deactivate_leaves(&leaves, |h| relay_parents.contains(h));
state
.candidates
.on_deactivate_leaves(&leaves, |h| state.per_relay_parent.contains_key(h));
// clean up sessions based on everything remaining.
let sessions: HashSet<_> = state.per_relay_parent.values().map(|r| r.session).collect();
@@ -1734,6 +1748,7 @@ async fn provide_candidate_to_grid<Context>(
gum::debug!(
target: LOG_TARGET,
?candidate_hash,
local_validator = ?local_validator.index,
n_peers = manifest_peers.len(),
"Sending manifest to peers"
);
@@ -1749,6 +1764,7 @@ async fn provide_candidate_to_grid<Context>(
gum::debug!(
target: LOG_TARGET,
?candidate_hash,
local_validator = ?local_validator.index,
n_peers = ack_peers.len(),
"Sending acknowledgement to peers"
);
@@ -1974,8 +1990,13 @@ async fn handle_incoming_manifest_common<'a, Context>(
let sender_index = match sender_index {
None => {
modify_reputation(reputation, ctx.sender(), peer, COST_UNEXPECTED_MANIFEST_DISALLOWED)
.await;
modify_reputation(
reputation,
ctx.sender(),
peer,
COST_UNEXPECTED_MANIFEST_PEER_UNKNOWN,
)
.await;
return None
},
Some(s) => s,
@@ -2029,6 +2050,17 @@ async fn handle_incoming_manifest_common<'a, Context>(
return None
}
if acknowledge {
gum::trace!(
target: LOG_TARGET,
?candidate_hash,
from = ?sender_index,
local_index = ?local_validator.index,
?manifest_kind,
"immediate ack, known candidate"
);
}
Some(ManifestImportSuccess { relay_parent_state, per_session, acknowledge, sender_index })
}
@@ -2558,6 +2590,13 @@ pub(crate) async fn handle_response<Context>(
let &requests::CandidateIdentifier { relay_parent, candidate_hash, group_index } =
response.candidate_identifier();
gum::trace!(
target: LOG_TARGET,
?candidate_hash,
peer = ?response.requested_peer(),
"Received response",
);
let post_confirmation = {
let relay_parent_state = match state.per_relay_parent.get_mut(&relay_parent) {
None => return,
@@ -2596,12 +2635,29 @@ pub(crate) async fn handle_response<Context>(
let (candidate, pvd, statements) = match res.request_status {
requests::CandidateRequestStatus::Outdated => return,
requests::CandidateRequestStatus::Incomplete => return,
requests::CandidateRequestStatus::Incomplete => {
gum::trace!(
target: LOG_TARGET,
?candidate_hash,
"Response incomplete. Retrying"
);
return
},
requests::CandidateRequestStatus::Complete {
candidate,
persisted_validation_data,
statements,
} => (candidate, persisted_validation_data, statements),
} => {
gum::trace!(
target: LOG_TARGET,
?candidate_hash,
n_statements = statements.len(),
"Successfully received candidate"
);
(candidate, persisted_validation_data, statements)
},
};
for statement in statements {
@@ -2673,6 +2729,13 @@ pub(crate) fn answer_request(state: &mut State, message: ResponderMessage) {
let ResponderMessage { request, sent_feedback } = message;
let AttestedCandidateRequest { candidate_hash, ref mask } = &request.payload;
gum::trace!(
target: LOG_TARGET,
?candidate_hash,
peer = ?request.peer,
"Received request"
);
// Signal to the responder that we started processing this request.
let _ = sent_feedback.send(());
@@ -2681,12 +2744,12 @@ pub(crate) fn answer_request(state: &mut State, message: ResponderMessage) {
Some(c) => c,
};
let relay_parent_state = match state.per_relay_parent.get(&confirmed.relay_parent()) {
let relay_parent_state = match state.per_relay_parent.get_mut(&confirmed.relay_parent()) {
None => return,
Some(s) => s,
};
let local_validator = match relay_parent_state.local_validator.as_ref() {
let local_validator = match relay_parent_state.local_validator.as_mut() {
None => return,
Some(s) => s,
};
@@ -2718,28 +2781,39 @@ pub(crate) fn answer_request(state: &mut State, message: ResponderMessage) {
return
}
// check peer is allowed to request the candidate (i.e. we've sent them a manifest)
{
let mut can_request = false;
for validator_id in find_validator_ids(peer_data.iter_known_discovery_ids(), |a| {
// check peer is allowed to request the candidate (i.e. they're in the cluster or we've sent
// them a manifest)
let (validator_id, is_cluster) = {
let mut validator_id = None;
let mut is_cluster = false;
for v in find_validator_ids(peer_data.iter_known_discovery_ids(), |a| {
per_session.authority_lookup.get(a)
}) {
if local_validator.grid_tracker.can_request(validator_id, *candidate_hash) {
can_request = true;
if local_validator.cluster_tracker.can_request(v, *candidate_hash) {
validator_id = Some(v);
is_cluster = true;
break
}
if local_validator.grid_tracker.can_request(v, *candidate_hash) {
validator_id = Some(v);
break
}
}
if !can_request {
let _ = request.send_outgoing_response(OutgoingResponse {
result: Err(()),
reputation_changes: vec![COST_UNEXPECTED_REQUEST],
sent_feedback: None,
});
match validator_id {
Some(v) => (v, is_cluster),
None => {
let _ = request.send_outgoing_response(OutgoingResponse {
result: Err(()),
reputation_changes: vec![COST_UNEXPECTED_REQUEST],
sent_feedback: None,
});
return
return
},
}
}
};
// Transform mask with 'OR' semantics into one with 'AND' semantics for the API used
// below.
@@ -2748,19 +2822,34 @@ pub(crate) fn answer_request(state: &mut State, message: ResponderMessage) {
validated_in_group: !mask.validated_in_group.clone(),
};
let statements: Vec<_> = relay_parent_state
.statement_store
.group_statements(&per_session.groups, confirmed.group_index(), *candidate_hash, &and_mask)
.map(|s| s.as_unchecked().clone())
.collect();
// Update bookkeeping about which statements peers have received.
for statement in &statements {
if is_cluster {
local_validator.cluster_tracker.note_sent(
validator_id,
statement.unchecked_validator_index(),
statement.unchecked_payload().clone(),
);
} else {
local_validator.grid_tracker.sent_or_received_direct_statement(
&per_session.groups,
statement.unchecked_validator_index(),
validator_id,
statement.unchecked_payload(),
);
}
}
let response = AttestedCandidateResponse {
candidate_receipt: (&**confirmed.candidate_receipt()).clone(),
persisted_validation_data: confirmed.persisted_validation_data().clone(),
statements: relay_parent_state
.statement_store
.group_statements(
&per_session.groups,
confirmed.group_index(),
*candidate_hash,
&and_mask,
)
.map(|s| s.as_unchecked().clone())
.collect(),
statements,
};
let _ = request.send_response(response);