diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs index 97efb3ba80..27b5972d98 100644 --- a/polkadot/node/core/backing/src/lib.rs +++ b/polkadot/node/core/backing/src/lib.rs @@ -1574,7 +1574,7 @@ async fn post_import_statement_actions( ctx: &mut Context, rp_state: &mut PerRelayParentState, summary: Option<&TableSummary>, -) -> Result<(), Error> { +) { if let Some(attested) = summary.as_ref().and_then(|s| { rp_state.table.attested_candidate( &s.candidate, @@ -1630,8 +1630,6 @@ async fn post_import_statement_actions( } issue_new_misbehaviors(ctx, rp_state.parent, &mut rp_state.table); - - Ok(()) } /// Check if there have happened any new misbehaviors and issue necessary messages. @@ -1674,7 +1672,7 @@ async fn sign_import_and_distribute_statement( let smsg = StatementDistributionMessage::Share(rp_state.parent, signed_statement.clone()); ctx.send_unbounded_message(smsg); - post_import_statement_actions(ctx, rp_state, summary.as_ref()).await?; + post_import_statement_actions(ctx, rp_state, summary.as_ref()).await; Ok(Some(signed_statement)) } else { @@ -1800,7 +1798,7 @@ async fn maybe_validate_and_import( } let summary = res?; - post_import_statement_actions(ctx, rp_state, summary.as_ref()).await?; + post_import_statement_actions(ctx, rp_state, summary.as_ref()).await; if let Some(summary) = summary { // import_statement already takes care of communicating with the diff --git a/polkadot/node/core/prospective-parachains/src/lib.rs b/polkadot/node/core/prospective-parachains/src/lib.rs index fcca0dd0b5..dabcfb80e0 100644 --- a/polkadot/node/core/prospective-parachains/src/lib.rs +++ b/polkadot/node/core/prospective-parachains/src/lib.rs @@ -290,6 +290,14 @@ async fn handle_active_leaves_update( ) .expect("ancestors are provided in reverse order and correctly; qed"); + gum::debug!( + target: LOG_TARGET, + relay_parent = ?hash, + min_relay_parent = scope.earliest_relay_parent().number, + para_id = ?para, + "Creating fragment tree" + ); + let tree = FragmentTree::populate(scope, &*candidate_storage); fragment_trees.insert(para, tree); diff --git a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs index 304cabbaac..e4b95f24d5 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs @@ -93,13 +93,18 @@ const COST_APPARENT_FLOOD: Rep = /// For considerations on this value, see: https://github.com/paritytech/polkadot/issues/4386 const MAX_UNSHARED_UPLOAD_TIME: Duration = Duration::from_millis(150); -/// Ensure that collator issues a connection request at least once every this many seconds. -/// Usually it's done when advertising new collation. However, if the core stays occupied or -/// it's not our turn to produce a candidate, it's important to disconnect from previous -/// peers. +/// Ensure that collator updates its connection requests to validators +/// this long after the most recent leaf. +/// +/// The timeout is designed for substreams to be properly closed if they need to be +/// reopened shortly after the next leaf. +/// +/// Collators also update their connection requests on every new collation. +/// This timeout is mostly about removing stale connections while avoiding races +/// with new collations which may want to reactivate them. /// /// Validators are obtained from [`ValidatorGroupsBuffer::validators_to_connect`]. -const RECONNECT_TIMEOUT: Duration = Duration::from_secs(12); +const RECONNECT_AFTER_LEAF_TIMEOUT: Duration = Duration::from_secs(4); /// Future that when resolved indicates that we should update reserved peer-set /// of validators we want to be connected to. @@ -108,6 +113,13 @@ const RECONNECT_TIMEOUT: Duration = Duration::from_secs(12); /// connected. type ReconnectTimeout = Fuse; +#[derive(Debug)] +enum ShouldAdvertiseTo { + Yes, + NotAuthority, + AlreadyAdvertised, +} + /// Info about validators we are currently connected to. /// /// It keeps track to which validators we advertised our collation. @@ -129,10 +141,10 @@ impl ValidatorGroup { candidate_hash: &CandidateHash, peer_ids: &HashMap>, peer: &PeerId, - ) -> bool { + ) -> ShouldAdvertiseTo { let authority_ids = match peer_ids.get(peer) { Some(authority_ids) => authority_ids, - None => return false, + None => return ShouldAdvertiseTo::NotAuthority, }; for id in authority_ids { @@ -151,11 +163,13 @@ impl ValidatorGroup { .get(candidate_hash) .map_or(true, |advertised| !advertised[validator_index]) { - return true + return ShouldAdvertiseTo::Yes + } else { + return ShouldAdvertiseTo::AlreadyAdvertised } } - false + ShouldAdvertiseTo::NotAuthority } /// Should be called after we advertised our collation to the given `peer` to keep track of it. @@ -255,8 +269,8 @@ struct State { /// Tracks which validators we want to stay connected to. validator_groups_buf: ValidatorGroupsBuffer, - /// Timeout-future that enforces collator to update the peer-set at least once - /// every [`RECONNECT_TIMEOUT`] seconds. + /// Timeout-future which is reset after every leaf to [`RECONNECT_AFTER_LEAF_TIMEOUT`] seconds. + /// When it fires, we update our reserved peers. reconnect_timeout: ReconnectTimeout, /// Metrics. @@ -443,7 +457,7 @@ async fn distribute_collation( } // Update a set of connected validators if necessary. - state.reconnect_timeout = connect_to_validators(ctx, &state.validator_groups_buf).await; + connect_to_validators(ctx, &state.validator_groups_buf).await; if let Some(result_sender) = result_sender { state.collation_result_senders.insert(candidate_hash, result_sender); @@ -619,15 +633,12 @@ async fn declare( /// Updates a set of connected validators based on their advertisement-bits /// in a validators buffer. -/// -/// Should be called again once a returned future resolves. #[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)] async fn connect_to_validators( ctx: &mut Context, validator_groups_buf: &ValidatorGroupsBuffer, -) -> ReconnectTimeout { +) { let validator_ids = validator_groups_buf.validators_to_connect(); - let is_disconnect = validator_ids.is_empty(); // ignore address resolution failure // will reissue a new request on new collation @@ -638,14 +649,6 @@ async fn connect_to_validators( failed, }) .await; - - if is_disconnect { - gum::trace!(target: LOG_TARGET, "Disconnecting from all peers"); - // Never resolves. - Fuse::terminated() - } else { - futures_timer::Delay::new(RECONNECT_TIMEOUT).fuse() - } } /// Advertise collation to the given `peer`. @@ -685,22 +688,29 @@ async fn advertise_collation( .validator_group .should_advertise_to(candidate_hash, peer_ids, &peer); - if !should_advertise { - gum::debug!( - target: LOG_TARGET, - ?relay_parent, - peer_id = %peer, - "Not advertising collation since validator is not interested", - ); - continue + match should_advertise { + ShouldAdvertiseTo::Yes => {}, + ShouldAdvertiseTo::NotAuthority | ShouldAdvertiseTo::AlreadyAdvertised => { + gum::trace!( + target: LOG_TARGET, + ?relay_parent, + ?candidate_hash, + peer_id = %peer, + reason = ?should_advertise, + "Not advertising collation" + ); + continue + }, } gum::debug!( target: LOG_TARGET, ?relay_parent, + ?candidate_hash, peer_id = %peer, "Advertising collation.", ); + collation.status.advance_to_advertised(); let collation_message = match protocol_version { @@ -1149,7 +1159,7 @@ async fn handle_network_msg( PeerConnected(peer_id, observed_role, protocol_version, maybe_authority) => { // If it is possible that a disconnected validator would attempt a reconnect // it should be handled here. - gum::trace!(target: LOG_TARGET, ?peer_id, ?observed_role, "Peer connected"); + gum::trace!(target: LOG_TARGET, ?peer_id, ?observed_role, ?maybe_authority, "Peer connected"); let version = match protocol_version.try_into() { Ok(version) => version, @@ -1200,7 +1210,11 @@ async fn handle_network_msg( }, UpdatedAuthorityIds(peer_id, authority_ids) => { gum::trace!(target: LOG_TARGET, ?peer_id, ?authority_ids, "Updated authority ids"); - state.peer_ids.insert(peer_id, authority_ids); + if let Some(version) = state.peer_data.get(&peer_id).map(|d| d.version) { + if state.peer_ids.insert(peer_id, authority_ids).is_none() { + declare(ctx, state, &peer_id, version).await; + } + } }, NewGossipTopology { .. } => { // impossible! @@ -1369,7 +1383,11 @@ async fn run_inner( "Failed to process message" )?; }, - FromOrchestra::Signal(ActiveLeaves(_update)) => {} + FromOrchestra::Signal(ActiveLeaves(update)) => { + if update.activated.is_some() { + *reconnect_timeout = futures_timer::Delay::new(RECONNECT_AFTER_LEAF_TIMEOUT).fuse(); + } + } FromOrchestra::Signal(BlockFinalized(..)) => {} FromOrchestra::Signal(Conclude) => return Ok(()), }, @@ -1390,7 +1408,7 @@ async fn run_inner( // The request it still alive, it should be kept in a waiting queue. } else { for authority_id in state.peer_ids.get(&peer_id).into_iter().flatten() { - // Timeout not hit, this peer is no longer interested in this relay parent. + // This peer has received the candidate. Not interested anymore. state.validator_groups_buf.reset_validator_interest(candidate_hash, authority_id); } waiting.waiting_peers.remove(&(peer_id, candidate_hash)); @@ -1446,12 +1464,11 @@ async fn run_inner( } } _ = reconnect_timeout => { - state.reconnect_timeout = - connect_to_validators(&mut ctx, &state.validator_groups_buf).await; + connect_to_validators(&mut ctx, &state.validator_groups_buf).await; gum::trace!( target: LOG_TARGET, - timeout = ?RECONNECT_TIMEOUT, + timeout = ?RECONNECT_AFTER_LEAF_TIMEOUT, "Peer-set updated due to a timeout" ); }, diff --git a/polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs b/polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs index cfa7627038..5b88efc99d 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs @@ -133,7 +133,7 @@ impl ValidatorGroupsBuffer { } } - /// Note that a validator is no longer interested in a given relay parent. + /// Note that a validator is no longer interested in a given candidate. pub fn reset_validator_interest( &mut self, candidate_hash: CandidateHash, diff --git a/polkadot/node/network/gossip-support/src/lib.rs b/polkadot/node/network/gossip-support/src/lib.rs index 4fa23507e8..2b2b2d0933 100644 --- a/polkadot/node/network/gossip-support/src/lib.rs +++ b/polkadot/node/network/gossip-support/src/lib.rs @@ -104,7 +104,7 @@ pub struct GossipSupport { /// By `PeerId`. /// /// Needed for efficient handling of disconnect events. - connected_authorities_by_peer_id: HashMap>, + connected_peers: HashMap>, /// Authority discovery service. authority_discovery: AD, @@ -130,7 +130,7 @@ where failure_start: None, resolved_authorities: HashMap::new(), connected_authorities: HashMap::new(), - connected_authorities_by_peer_id: HashMap::new(), + connected_peers: HashMap::new(), authority_discovery, metrics, } @@ -407,19 +407,42 @@ where } } - for (peer_id, auths) in authority_ids { - if self.connected_authorities_by_peer_id.get(&peer_id) != Some(&auths) { + // peer was authority and now isn't + for (peer_id, current) in self.connected_peers.iter_mut() { + // empty -> nonempty is handled in the next loop + if !current.is_empty() && !authority_ids.contains_key(peer_id) { sender .send_message(NetworkBridgeRxMessage::UpdatedAuthorityIds { - peer_id, - authority_ids: auths.clone(), + peer_id: *peer_id, + authority_ids: HashSet::new(), }) .await; - auths.iter().for_each(|a| { + for a in current.drain() { + self.connected_authorities.remove(&a); + } + } + } + + // peer has new authority set. + for (peer_id, new) in authority_ids { + // If the peer is connected _and_ the authority IDs have changed. + if let Some(prev) = self.connected_peers.get(&peer_id).filter(|x| x != &&new) { + sender + .send_message(NetworkBridgeRxMessage::UpdatedAuthorityIds { + peer_id, + authority_ids: new.clone(), + }) + .await; + + prev.iter().for_each(|a| { + self.connected_authorities.remove(a); + }); + new.iter().for_each(|a| { self.connected_authorities.insert(a.clone(), peer_id); }); - self.connected_authorities_by_peer_id.insert(peer_id, auths); + + self.connected_peers.insert(peer_id, new); } } } @@ -431,12 +454,13 @@ where authority_ids.iter().for_each(|a| { self.connected_authorities.insert(a.clone(), peer_id); }); - self.connected_authorities_by_peer_id.insert(peer_id, authority_ids); + self.connected_peers.insert(peer_id, authority_ids); + } else { + self.connected_peers.insert(peer_id, HashSet::new()); } }, NetworkBridgeEvent::PeerDisconnected(peer_id) => { - if let Some(authority_ids) = self.connected_authorities_by_peer_id.remove(&peer_id) - { + if let Some(authority_ids) = self.connected_peers.remove(&peer_id) { authority_ids.into_iter().for_each(|a| { self.connected_authorities.remove(&a); }); diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index eead7df522..259c8f6a36 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -320,6 +320,7 @@ impl StatementDistributionSubsystem { let mode = prospective_parachains_mode(ctx.sender(), activated.hash).await?; if let ProspectiveParachainsMode::Enabled { .. } = mode { v2::handle_active_leaves_update(ctx, state, activated, mode).await?; + v2::handle_deactivate_leaves(state, &deactivated); } else if let ProspectiveParachainsMode::Disabled = mode { for deactivated in &deactivated { crate::legacy_v1::handle_deactivate_leaf(legacy_v1_state, *deactivated); diff --git a/polkadot/node/network/statement-distribution/src/v2/candidates.rs b/polkadot/node/network/statement-distribution/src/v2/candidates.rs index e660df5da1..ad56ad4a23 100644 --- a/polkadot/node/network/statement-distribution/src/v2/candidates.rs +++ b/polkadot/node/network/statement-distribution/src/v2/candidates.rs @@ -353,7 +353,13 @@ impl Candidates { ); c.has_claims() }, - }) + }); + + gum::trace!( + target: crate::LOG_TARGET, + "Candidates remaining after cleanup: {}", + self.candidates.len(), + ); } } diff --git a/polkadot/node/network/statement-distribution/src/v2/cluster.rs b/polkadot/node/network/statement-distribution/src/v2/cluster.rs index 8adb8353ca..619114de96 100644 --- a/polkadot/node/network/statement-distribution/src/v2/cluster.rs +++ b/polkadot/node/network/statement-distribution/src/v2/cluster.rs @@ -331,6 +331,13 @@ impl ClusterTracker { self.validator_seconded(validator, candidate_hash) } + /// Whether a validator can request a candidate from us. + pub fn can_request(&self, target: ValidatorIndex, candidate_hash: CandidateHash) -> bool { + self.validators.contains(&target) && + self.we_sent_seconded(target, candidate_hash) && + !self.they_sent_seconded(target, candidate_hash) + } + /// Returns a Vec of pending statements to be sent to a particular validator /// index. `Seconded` statements are sorted to the front of the vector. /// diff --git a/polkadot/node/network/statement-distribution/src/v2/grid.rs b/polkadot/node/network/statement-distribution/src/v2/grid.rs index 3d53ff6d32..19bad34c44 100644 --- a/polkadot/node/network/statement-distribution/src/v2/grid.rs +++ b/polkadot/node/network/statement-distribution/src/v2/grid.rs @@ -2245,4 +2245,73 @@ mod tests { ); assert_eq!(tracker.all_pending_statements_for(counterparty), vec![]); } + + #[test] + fn session_grid_topology_consistent() { + let n_validators = 300; + let group_size = 5; + + let validator_indices = + (0..n_validators).map(|i| ValidatorIndex(i as u32)).collect::>(); + let groups = validator_indices.chunks(group_size).map(|x| x.to_vec()).collect::>(); + + let topology = SessionGridTopology::new( + (0..n_validators).collect::>(), + (0..n_validators) + .map(|i| TopologyPeerInfo { + peer_ids: Vec::new(), + validator_index: ValidatorIndex(i as u32), + discovery_id: AuthorityDiscoveryPair::generate().0.public(), + }) + .collect(), + ); + + let computed_topologies = validator_indices + .iter() + .cloned() + .map(|v| build_session_topology(groups.iter(), &topology, Some(v))) + .collect::>(); + + let pairwise_check_topologies = |i, j| { + let v_i = ValidatorIndex(i); + let v_j = ValidatorIndex(j); + + for group in (0..groups.len()).map(|i| GroupIndex(i as u32)) { + let g_i = computed_topologies[i as usize].group_views.get(&group).unwrap(); + let g_j = computed_topologies[j as usize].group_views.get(&group).unwrap(); + + if g_i.sending.contains(&v_j) { + assert!( + g_j.receiving.contains(&v_i), + "{:?}: {:?}, sending but not receiving", + group, + &(i, j) + ); + } + + if g_j.sending.contains(&v_i) { + assert!( + g_i.receiving.contains(&v_j), + "{:?}: {:?}, sending but not receiving", + group, + &(j, i) + ); + } + + if g_i.receiving.contains(&v_j) { + assert!(g_j.sending.contains(&v_i), "{:?}, receiving but not sending", &(i, j)); + } + + if g_j.receiving.contains(&v_i) { + assert!(g_i.sending.contains(&v_j), "{:?}, receiving but not sending", &(j, i)); + } + } + }; + + for i in 0..n_validators { + for j in (i + 1)..n_validators { + pairwise_check_topologies(i as u32, j as u32); + } + } + } } diff --git a/polkadot/node/network/statement-distribution/src/v2/mod.rs b/polkadot/node/network/statement-distribution/src/v2/mod.rs index e11d66c41a..95104732df 100644 --- a/polkadot/node/network/statement-distribution/src/v2/mod.rs +++ b/polkadot/node/network/statement-distribution/src/v2/mod.rs @@ -100,6 +100,8 @@ const COST_UNEXPECTED_MANIFEST_MISSING_KNOWLEDGE: Rep = Rep::CostMinor("Unexpected Manifest, missing knowlege for relay parent"); const COST_UNEXPECTED_MANIFEST_DISALLOWED: Rep = Rep::CostMinor("Unexpected Manifest, Peer Disallowed"); +const COST_UNEXPECTED_MANIFEST_PEER_UNKNOWN: Rep = + Rep::CostMinor("Unexpected Manifest, Peer Unknown"); const COST_CONFLICTING_MANIFEST: Rep = Rep::CostMajor("Manifest conflicts with previous"); const COST_INSUFFICIENT_MANIFEST: Rep = Rep::CostMajor("Manifest statements insufficient to back candidate"); @@ -185,11 +187,18 @@ impl PerSessionState { } } - fn supply_topology(&mut self, topology: &SessionGridTopology) { + fn supply_topology( + &mut self, + topology: &SessionGridTopology, + local_index: Option, + ) { + // Note: we use the local index rather than the `self.local_validator` as the + // former may be `Some` when the latter is `None`, due to the set of nodes in + // discovery being a superset of the active validators for consensus. let grid_view = grid::build_session_topology( self.session_info.validator_groups.iter(), topology, - self.local_validator, + local_index, ); self.grid_view = Some(grid_view); @@ -334,7 +343,7 @@ pub(crate) async fn handle_network_update( true }, Entry::Occupied(e) => { - gum::trace!( + gum::debug!( target: LOG_TARGET, authority_id = ?a, existing_peer = ?e.get(), @@ -366,9 +375,10 @@ pub(crate) async fn handle_network_update( NetworkBridgeEvent::NewGossipTopology(topology) => { let new_session_index = topology.session; let new_topology = topology.topology; + let local_index = topology.local_index; if let Some(per_session) = state.per_session.get_mut(&new_session_index) { - per_session.supply_topology(&new_topology); + per_session.supply_topology(&new_topology, local_index); } // TODO [https://github.com/paritytech/polkadot/issues/6194] @@ -409,6 +419,12 @@ pub(crate) async fn handle_network_update( "Updated `AuthorityDiscoveryId`s" ); + // defensive: ensure peers are actually connected + let peer_state = match state.peers.get_mut(&peer_id) { + None => return, + Some(p) => p, + }; + // Remove the authority IDs which were previously mapped to the peer // but aren't part of the new set. state.authorities.retain(|a, p| p != &peer_id || authority_ids.contains(a)); @@ -418,9 +434,7 @@ pub(crate) async fn handle_network_update( state.authorities.insert(a, peer_id); } - if let Some(peer_state) = state.peers.get_mut(&peer_id) { - peer_state.discovery_ids = Some(authority_ids); - } + peer_state.discovery_ids = Some(authority_ids); }, } } @@ -542,6 +556,13 @@ pub(crate) async fn handle_active_leaves_update( ); } + gum::debug!( + target: LOG_TARGET, + "Activated leaves. Now tracking {} relay-parents across {} sessions", + state.per_relay_parent.len(), + state.per_session.len(), + ); + // Reconcile all peers' views with the active leaf and any relay parents // it implies. If they learned about the block before we did, this reconciliation will give // non-empty results and we should send them messages concerning all activated relay-parents. @@ -599,25 +620,18 @@ fn find_local_validator_state( pub(crate) fn handle_deactivate_leaves(state: &mut State, leaves: &[Hash]) { // deactivate the leaf in the implicit view. for leaf in leaves { - state.implicit_view.deactivate_leaf(*leaf); + let pruned = state.implicit_view.deactivate_leaf(*leaf); + for pruned_rp in pruned { + // clean up per-relay-parent data based on everything removed. + state.per_relay_parent.remove(&pruned_rp); + // clean up requests related to this relay parent. + state.request_manager.remove_by_relay_parent(*leaf); + } } - let relay_parents = state.implicit_view.all_allowed_relay_parents().collect::>(); - - // fast exit for no-op. - if relay_parents.len() == state.per_relay_parent.len() { - return - } - - // clean up per-relay-parent data based on everything removed. - state.per_relay_parent.retain(|r, _| relay_parents.contains(r)); - - // Clean up all requests - for leaf in leaves { - state.request_manager.remove_by_relay_parent(*leaf); - } - - state.candidates.on_deactivate_leaves(&leaves, |h| relay_parents.contains(h)); + state + .candidates + .on_deactivate_leaves(&leaves, |h| state.per_relay_parent.contains_key(h)); // clean up sessions based on everything remaining. let sessions: HashSet<_> = state.per_relay_parent.values().map(|r| r.session).collect(); @@ -1734,6 +1748,7 @@ async fn provide_candidate_to_grid( gum::debug!( target: LOG_TARGET, ?candidate_hash, + local_validator = ?local_validator.index, n_peers = manifest_peers.len(), "Sending manifest to peers" ); @@ -1749,6 +1764,7 @@ async fn provide_candidate_to_grid( gum::debug!( target: LOG_TARGET, ?candidate_hash, + local_validator = ?local_validator.index, n_peers = ack_peers.len(), "Sending acknowledgement to peers" ); @@ -1974,8 +1990,13 @@ async fn handle_incoming_manifest_common<'a, Context>( let sender_index = match sender_index { None => { - modify_reputation(reputation, ctx.sender(), peer, COST_UNEXPECTED_MANIFEST_DISALLOWED) - .await; + modify_reputation( + reputation, + ctx.sender(), + peer, + COST_UNEXPECTED_MANIFEST_PEER_UNKNOWN, + ) + .await; return None }, Some(s) => s, @@ -2029,6 +2050,17 @@ async fn handle_incoming_manifest_common<'a, Context>( return None } + if acknowledge { + gum::trace!( + target: LOG_TARGET, + ?candidate_hash, + from = ?sender_index, + local_index = ?local_validator.index, + ?manifest_kind, + "immediate ack, known candidate" + ); + } + Some(ManifestImportSuccess { relay_parent_state, per_session, acknowledge, sender_index }) } @@ -2558,6 +2590,13 @@ pub(crate) async fn handle_response( let &requests::CandidateIdentifier { relay_parent, candidate_hash, group_index } = response.candidate_identifier(); + gum::trace!( + target: LOG_TARGET, + ?candidate_hash, + peer = ?response.requested_peer(), + "Received response", + ); + let post_confirmation = { let relay_parent_state = match state.per_relay_parent.get_mut(&relay_parent) { None => return, @@ -2596,12 +2635,29 @@ pub(crate) async fn handle_response( let (candidate, pvd, statements) = match res.request_status { requests::CandidateRequestStatus::Outdated => return, - requests::CandidateRequestStatus::Incomplete => return, + requests::CandidateRequestStatus::Incomplete => { + gum::trace!( + target: LOG_TARGET, + ?candidate_hash, + "Response incomplete. Retrying" + ); + + return + }, requests::CandidateRequestStatus::Complete { candidate, persisted_validation_data, statements, - } => (candidate, persisted_validation_data, statements), + } => { + gum::trace!( + target: LOG_TARGET, + ?candidate_hash, + n_statements = statements.len(), + "Successfully received candidate" + ); + + (candidate, persisted_validation_data, statements) + }, }; for statement in statements { @@ -2673,6 +2729,13 @@ pub(crate) fn answer_request(state: &mut State, message: ResponderMessage) { let ResponderMessage { request, sent_feedback } = message; let AttestedCandidateRequest { candidate_hash, ref mask } = &request.payload; + gum::trace!( + target: LOG_TARGET, + ?candidate_hash, + peer = ?request.peer, + "Received request" + ); + // Signal to the responder that we started processing this request. let _ = sent_feedback.send(()); @@ -2681,12 +2744,12 @@ pub(crate) fn answer_request(state: &mut State, message: ResponderMessage) { Some(c) => c, }; - let relay_parent_state = match state.per_relay_parent.get(&confirmed.relay_parent()) { + let relay_parent_state = match state.per_relay_parent.get_mut(&confirmed.relay_parent()) { None => return, Some(s) => s, }; - let local_validator = match relay_parent_state.local_validator.as_ref() { + let local_validator = match relay_parent_state.local_validator.as_mut() { None => return, Some(s) => s, }; @@ -2718,28 +2781,39 @@ pub(crate) fn answer_request(state: &mut State, message: ResponderMessage) { return } - // check peer is allowed to request the candidate (i.e. we've sent them a manifest) - { - let mut can_request = false; - for validator_id in find_validator_ids(peer_data.iter_known_discovery_ids(), |a| { + // check peer is allowed to request the candidate (i.e. they're in the cluster or we've sent + // them a manifest) + let (validator_id, is_cluster) = { + let mut validator_id = None; + let mut is_cluster = false; + for v in find_validator_ids(peer_data.iter_known_discovery_ids(), |a| { per_session.authority_lookup.get(a) }) { - if local_validator.grid_tracker.can_request(validator_id, *candidate_hash) { - can_request = true; + if local_validator.cluster_tracker.can_request(v, *candidate_hash) { + validator_id = Some(v); + is_cluster = true; + break + } + + if local_validator.grid_tracker.can_request(v, *candidate_hash) { + validator_id = Some(v); break } } - if !can_request { - let _ = request.send_outgoing_response(OutgoingResponse { - result: Err(()), - reputation_changes: vec![COST_UNEXPECTED_REQUEST], - sent_feedback: None, - }); + match validator_id { + Some(v) => (v, is_cluster), + None => { + let _ = request.send_outgoing_response(OutgoingResponse { + result: Err(()), + reputation_changes: vec![COST_UNEXPECTED_REQUEST], + sent_feedback: None, + }); - return + return + }, } - } + }; // Transform mask with 'OR' semantics into one with 'AND' semantics for the API used // below. @@ -2748,19 +2822,34 @@ pub(crate) fn answer_request(state: &mut State, message: ResponderMessage) { validated_in_group: !mask.validated_in_group.clone(), }; + let statements: Vec<_> = relay_parent_state + .statement_store + .group_statements(&per_session.groups, confirmed.group_index(), *candidate_hash, &and_mask) + .map(|s| s.as_unchecked().clone()) + .collect(); + + // Update bookkeeping about which statements peers have received. + for statement in &statements { + if is_cluster { + local_validator.cluster_tracker.note_sent( + validator_id, + statement.unchecked_validator_index(), + statement.unchecked_payload().clone(), + ); + } else { + local_validator.grid_tracker.sent_or_received_direct_statement( + &per_session.groups, + statement.unchecked_validator_index(), + validator_id, + statement.unchecked_payload(), + ); + } + } + let response = AttestedCandidateResponse { candidate_receipt: (&**confirmed.candidate_receipt()).clone(), persisted_validation_data: confirmed.persisted_validation_data().clone(), - statements: relay_parent_state - .statement_store - .group_statements( - &per_session.groups, - confirmed.group_index(), - *candidate_hash, - &and_mask, - ) - .map(|s| s.as_unchecked().clone()) - .collect(), + statements, }; let _ = request.send_response(response); diff --git a/polkadot/node/network/statement-distribution/src/v2/requests.rs b/polkadot/node/network/statement-distribution/src/v2/requests.rs index f13496024f..8507a4b827 100644 --- a/polkadot/node/network/statement-distribution/src/v2/requests.rs +++ b/polkadot/node/network/statement-distribution/src/v2/requests.rs @@ -265,6 +265,12 @@ impl RequestManager { HEntry::Vacant(_) => (), } } + + gum::debug!( + target: LOG_TARGET, + "Requests remaining after cleanup: {}", + self.by_priority.len(), + ); } /// Returns true if there are pending requests that are dispatchable. @@ -355,6 +361,13 @@ impl RequestManager { Some(t) => t, }; + gum::debug!( + target: crate::LOG_TARGET, + candidate_hash = ?id.candidate_hash, + peer = ?target, + "Issuing candidate request" + ); + let (request, response_fut) = OutgoingRequest::new( RequestRecipient::Peer(target), AttestedCandidateRequest { @@ -498,6 +511,11 @@ impl UnhandledResponse { &self.response.identifier } + /// Get the peer we made the request to. + pub fn requested_peer(&self) -> &PeerId { + &self.response.requested_peer + } + /// Validate the response. If the response is valid, this will yield the /// candidate, the [`PersistedValidationData`] of the candidate, and requested /// checked statements. @@ -582,12 +600,19 @@ impl UnhandledResponse { request_status: CandidateRequestStatus::Incomplete, } }, - Err(RequestError::NetworkError(_) | RequestError::Canceled(_)) => + Err(e @ RequestError::NetworkError(_) | e @ RequestError::Canceled(_)) => { + gum::trace!( + target: LOG_TARGET, + err = ?e, + peer = ?requested_peer, + "Request error" + ); return ResponseValidationOutput { requested_peer, reputation_changes: vec![], request_status: CandidateRequestStatus::Incomplete, - }, + } + }, Ok(response) => response, }; diff --git a/polkadot/node/network/statement-distribution/src/v2/statement_store.rs b/polkadot/node/network/statement-distribution/src/v2/statement_store.rs index 74db431eda..96d976e22c 100644 --- a/polkadot/node/network/statement-distribution/src/v2/statement_store.rs +++ b/polkadot/node/network/statement-distribution/src/v2/statement_store.rs @@ -212,6 +212,7 @@ impl StatementStore { } /// Get an iterator over all statements marked as being unknown by the backing subsystem. + /// This provides `Seconded` statements prior to `Valid` statements. pub fn fresh_statements_for_backing<'a>( &'a self, validators: &'a [ValidatorIndex], @@ -220,14 +221,15 @@ impl StatementStore { let s_st = CompactStatement::Seconded(candidate_hash); let v_st = CompactStatement::Valid(candidate_hash); - validators - .iter() - .flat_map(move |v| { - let a = self.known_statements.get(&(*v, s_st.clone())); - let b = self.known_statements.get(&(*v, v_st.clone())); + let fresh_seconded = + validators.iter().map(move |v| self.known_statements.get(&(*v, s_st.clone()))); - a.into_iter().chain(b) - }) + let fresh_valid = + validators.iter().map(move |v| self.known_statements.get(&(*v, v_st.clone()))); + + fresh_seconded + .chain(fresh_valid) + .flatten() .filter(|stored| !stored.known_by_backing) .map(|stored| &stored.statement) } @@ -250,6 +252,7 @@ impl StatementStore { } /// Error indicating that the validator was unknown. +#[derive(Debug)] pub struct ValidatorUnknown; type Fingerprint = (ValidatorIndex, CompactStatement); @@ -281,3 +284,78 @@ impl GroupStatements { self.valid.set(within_group_index, true); } } + +#[cfg(test)] +mod tests { + use super::*; + + use polkadot_primitives::v6::{Hash, SigningContext, ValidatorPair}; + use sp_application_crypto::Pair as PairT; + + #[test] + fn always_provides_fresh_statements_in_order() { + let validator_a = ValidatorIndex(1); + let validator_b = ValidatorIndex(2); + let candidate_hash = CandidateHash(Hash::repeat_byte(42)); + + let valid_statement = CompactStatement::Valid(candidate_hash); + let seconded_statement = CompactStatement::Seconded(candidate_hash); + let signing_context = + SigningContext { parent_hash: Hash::repeat_byte(0), session_index: 1 }; + + let groups = Groups::new(vec![vec![validator_a, validator_b]].into(), 2); + + let mut store = StatementStore::new(&groups); + + // import a Valid statement from A and a Seconded statement from B. + let signed_valid_by_a = { + let payload = valid_statement.signing_payload(&signing_context); + let pair = ValidatorPair::generate().0; + let signature = pair.sign(&payload[..]); + + SignedStatement::new( + valid_statement.clone(), + validator_a, + signature, + &signing_context, + &pair.public(), + ) + .unwrap() + }; + store.insert(&groups, signed_valid_by_a, StatementOrigin::Remote).unwrap(); + + let signed_seconded_by_b = { + let payload = seconded_statement.signing_payload(&signing_context); + let pair = ValidatorPair::generate().0; + let signature = pair.sign(&payload[..]); + + SignedStatement::new( + seconded_statement.clone(), + validator_b, + signature, + &signing_context, + &pair.public(), + ) + .unwrap() + }; + store.insert(&groups, signed_seconded_by_b, StatementOrigin::Remote).unwrap(); + + // Regardless of the order statements are requested, + // we will get them in the order [B, A] because seconded statements must be first. + let vals = &[validator_a, validator_b]; + let statements = + store.fresh_statements_for_backing(vals, candidate_hash).collect::>(); + + assert_eq!(statements.len(), 2); + assert_eq!(statements[0].payload(), &seconded_statement); + assert_eq!(statements[1].payload(), &valid_statement); + + let vals = &[validator_b, validator_a]; + let statements = + store.fresh_statements_for_backing(vals, candidate_hash).collect::>(); + + assert_eq!(statements.len(), 2); + assert_eq!(statements[0].payload(), &seconded_statement); + assert_eq!(statements[1].payload(), &valid_statement); + } +} diff --git a/polkadot/node/network/statement-distribution/src/v2/tests/grid.rs b/polkadot/node/network/statement-distribution/src/v2/tests/grid.rs index a0af957982..5b1dabfc8a 100644 --- a/polkadot/node/network/statement-distribution/src/v2/tests/grid.rs +++ b/polkadot/node/network/statement-distribution/src/v2/tests/grid.rs @@ -1797,6 +1797,7 @@ fn grid_statements_imported_to_backing() { #[test] fn advertisements_rejected_from_incorrect_peers() { + sp_tracing::try_init_simple(); let validator_count = 6; let group_size = 3; let config = TestConfig { @@ -1831,12 +1832,12 @@ fn advertisements_rejected_from_incorrect_peers() { ); let candidate_hash = candidate.hash(); - let other_group_validators = state.group_validators(local_validator.group_index, true); - let target_group_validators = state.group_validators(other_group, true); - let v_a = other_group_validators[0]; - let v_b = other_group_validators[1]; - let v_c = target_group_validators[0]; - let v_d = target_group_validators[1]; + let target_group_validators = state.group_validators(local_validator.group_index, true); + let other_group_validators = state.group_validators(other_group, true); + let v_a = target_group_validators[0]; + let v_b = target_group_validators[1]; + let v_c = other_group_validators[0]; + let v_d = other_group_validators[1]; // peer A is in group, has relay parent in view. // peer B is in group, has no relay parent in view. @@ -1911,10 +1912,11 @@ fn advertisements_rejected_from_incorrect_peers() { ) .await; + // Message not expected from peers of our own group. assert_matches!( overseer.recv().await, AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r))) - if p == peer_a && r == COST_UNEXPECTED_MANIFEST_DISALLOWED.into() => { } + if p == peer_a && r == COST_UNEXPECTED_MANIFEST_PEER_UNKNOWN.into() => { } ); } @@ -1927,10 +1929,11 @@ fn advertisements_rejected_from_incorrect_peers() { ) .await; + // Message not expected from peers of our own group. assert_matches!( overseer.recv().await, AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r))) - if p == peer_b && r == COST_UNEXPECTED_MANIFEST_DISALLOWED.into() => { } + if p == peer_b && r == COST_UNEXPECTED_MANIFEST_PEER_UNKNOWN.into() => { } ); } diff --git a/polkadot/node/network/statement-distribution/src/v2/tests/requests.rs b/polkadot/node/network/statement-distribution/src/v2/tests/requests.rs index 0734b75c97..4734d7a0f9 100644 --- a/polkadot/node/network/statement-distribution/src/v2/tests/requests.rs +++ b/polkadot/node/network/statement-distribution/src/v2/tests/requests.rs @@ -1306,6 +1306,208 @@ fn local_node_sanity_checks_incoming_requests() { }); } +#[test] +fn local_node_checks_that_peer_can_request_before_responding() { + let config = TestConfig { + validator_count: 20, + group_size: 3, + local_validator: true, + async_backing_params: None, + }; + + let relay_parent = Hash::repeat_byte(1); + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + + test_harness(config, |mut state, mut overseer| async move { + let local_validator = state.local.clone().unwrap(); + let local_para = ParaId::from(local_validator.group_index.0); + + let test_leaf = state.make_dummy_leaf(relay_parent); + + let (candidate, pvd) = make_candidate( + relay_parent, + 1, + local_para, + test_leaf.para_data(local_para).head_data.clone(), + vec![4, 5, 6].into(), + Hash::repeat_byte(42).into(), + ); + let candidate_hash = candidate.hash(); + + // Peers A and B are in group and have relay parent in view. + let other_group_validators = state.group_validators(local_validator.group_index, true); + + connect_peer( + &mut overseer, + peer_a.clone(), + Some(vec![state.discovery_id(other_group_validators[0])].into_iter().collect()), + ) + .await; + + connect_peer( + &mut overseer, + peer_b.clone(), + Some(vec![state.discovery_id(other_group_validators[1])].into_iter().collect()), + ) + .await; + let peer_b_index = other_group_validators[1]; + + send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await; + send_peer_view_change(&mut overseer, peer_b.clone(), view![relay_parent]).await; + + // Finish setup + activate_leaf(&mut overseer, &test_leaf, &state, true).await; + + answer_expected_hypothetical_depth_request( + &mut overseer, + vec![], + Some(relay_parent), + false, + ) + .await; + + let mask = StatementFilter::blank(state.config.group_size); + + // Confirm candidate. + let signed = state.sign_statement( + local_validator.validator_index, + CompactStatement::Seconded(candidate_hash), + &SigningContext { session_index: 1, parent_hash: relay_parent }, + ); + let full_signed = signed + .clone() + .convert_to_superpayload(StatementWithPVD::Seconded(candidate.clone(), pvd.clone())) + .unwrap(); + + overseer + .send(FromOrchestra::Communication { + msg: StatementDistributionMessage::Share(relay_parent, full_signed), + }) + .await; + + assert_matches!( + overseer.recv().await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage( + peers, + Versioned::V2(protocol_v2::ValidationProtocol::StatementDistribution( + protocol_v2::StatementDistributionMessage::Statement( + r, + s, + ) + )) + )) => { + assert_eq!(peers, vec![peer_a.clone(), peer_b.clone()]); + assert_eq!(r, relay_parent); + assert_eq!(s.unchecked_payload(), &CompactStatement::Seconded(candidate_hash)); + assert_eq!(s.unchecked_validator_index(), local_validator.validator_index); + } + ); + + answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + + // Local node should respond to requests from peers in the same group + // which appear to not have already seen the candidate + { + // Peer requests candidate and local responds + let response = state + .send_request( + peer_a, + request_v2::AttestedCandidateRequest { + candidate_hash: candidate.hash(), + mask: mask.clone(), + }, + ) + .await + .await; + + let expected_statements = vec![signed.into_unchecked()]; + assert_matches!(response, full_response => { + // Response is the same for vstaging. + let request_v2::AttestedCandidateResponse { candidate_receipt, persisted_validation_data, statements } = + request_v2::AttestedCandidateResponse::decode( + &mut full_response.result.expect("We should have a proper answer").as_ref(), + ).expect("Decoding should work"); + assert_eq!(candidate_receipt, candidate); + assert_eq!(persisted_validation_data, pvd); + assert_eq!(statements, expected_statements); + }); + } + + // Local node should reject requests if the requester appears to know + // the candidate (has sent them a Seconded statement) + { + let statement = state + .sign_statement( + peer_b_index, + CompactStatement::Seconded(candidate_hash), + &SigningContext { parent_hash: relay_parent, session_index: 1 }, + ) + .as_unchecked() + .clone(); + + send_peer_message( + &mut overseer, + peer_b.clone(), + protocol_v2::StatementDistributionMessage::Statement(relay_parent, statement), + ) + .await; + + assert_matches!( + overseer.recv().await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r))) + if p == peer_b && r == BENEFIT_VALID_STATEMENT_FIRST.into() => { } + ); + + let response = state + .send_request( + peer_b, + request_v2::AttestedCandidateRequest { + candidate_hash: candidate.hash(), + mask: mask.clone(), + }, + ) + .await + .await; + + // Peer already knows about this candidate. Should reject. + assert_matches!( + response, + RawOutgoingResponse { + result, + reputation_changes, + sent_feedback + } => { + assert_matches!(result, Err(())); + assert_eq!(reputation_changes, vec![COST_UNEXPECTED_REQUEST.into()]); + assert_matches!(sent_feedback, None); + } + ); + + // Handling leftover statement distribution message + assert_matches!( + overseer.recv().await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage( + peers, + Versioned::V2(protocol_v2::ValidationProtocol::StatementDistribution( + protocol_v2::StatementDistributionMessage::Statement( + r, + s, + ) + )) + )) => { + assert_eq!(peers, vec![peer_a.clone()]); + assert_eq!(r, relay_parent); + assert_eq!(s.unchecked_payload(), &CompactStatement::Seconded(candidate_hash)); + assert_eq!(s.unchecked_validator_index(), peer_b_index); + } + ); + } + + overseer + }); +} + #[test] fn local_node_respects_statement_mask() { let validator_count = 6;