From 8362a6810dd66be5ec02d7de32c521a095cc8bd6 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe <49718502+alexggh@users.noreply.github.com> Date: Mon, 12 Feb 2024 12:44:16 +0200 Subject: [PATCH] statement-distribution: Fix CostMinor("Unexpected Statement") (#3223) On grid distribution messages have two paths of reaching a node, so there is the possiblity of a race when two peers send each other the same statement around the same time. Statement local_knowledge will tell us that the peer should have not send the statement because we sent it to it. Fix it by also keeping track only of the statement we received from a given peer and penalize it only if it sends it to us more than once. Fixes: https://github.com/paritytech/polkadot-sdk/issues/2346 Additionally, also use different Cost labels for different paths to make it easier to debug things. --------- Signed-off-by: Alexandru Gheorghe --- .../statement-distribution/src/v2/grid.rs | 59 ++++++-- .../statement-distribution/src/v2/mod.rs | 135 ++++++++++++------ .../src/v2/tests/cluster.rs | 6 +- 3 files changed, 143 insertions(+), 57 deletions(-) diff --git a/polkadot/node/network/statement-distribution/src/v2/grid.rs b/polkadot/node/network/statement-distribution/src/v2/grid.rs index 19f2305319..24d846c840 100644 --- a/polkadot/node/network/statement-distribution/src/v2/grid.rs +++ b/polkadot/node/network/statement-distribution/src/v2/grid.rs @@ -527,12 +527,16 @@ impl GridTracker { } /// Determine the validators which can send a statement to us by direct broadcast. + /// + /// Returns a list of tuples representing each potential sender(ValidatorIndex) + /// and if the sender should already know about the statement, because we just + /// sent it to it. pub fn direct_statement_providers( &self, groups: &Groups, originator: ValidatorIndex, statement: &CompactStatement, - ) -> Vec { + ) -> Vec<(ValidatorIndex, bool)> { let (g, c_h, kind, in_group) = match extract_statement_and_group_info(groups, originator, statement) { None => return Vec::new(), @@ -616,12 +620,13 @@ impl GridTracker { originator: ValidatorIndex, counterparty: ValidatorIndex, statement: &CompactStatement, + received: bool, ) { if let Some((_, c_h, kind, in_group)) = extract_statement_and_group_info(groups, originator, statement) { if let Some(known) = self.confirmed_backed.get_mut(&c_h) { - known.sent_or_received_direct_statement(counterparty, in_group, kind); + known.sent_or_received_direct_statement(counterparty, in_group, kind, received); if let Some(pending) = self.pending_statements.get_mut(&counterparty) { pending.remove(&(originator, statement.clone())); @@ -908,6 +913,12 @@ struct MutualKnowledge { /// `Some` only if we have advertised, acknowledged, or requested the candidate /// from them. local_knowledge: Option, + /// Knowledge peer circulated to us, this is different from `local_knowledge` and + /// `remote_knowledge`, through the fact that includes only statements that we received from + /// peer while the other two, after manifest exchange part will include both what we sent to + /// the peer and what we received from peer, see `sent_or_received_direct_statement` for more + /// details. + received_knowledge: Option, } // A utility struct for keeping track of metadata about candidates @@ -933,10 +944,13 @@ impl KnownBackedCandidate { } fn manifest_sent_to(&mut self, validator: ValidatorIndex, local_knowledge: StatementFilter) { - let k = self - .mutual_knowledge - .entry(validator) - .or_insert_with(|| MutualKnowledge { remote_knowledge: None, local_knowledge: None }); + let k = self.mutual_knowledge.entry(validator).or_insert_with(|| MutualKnowledge { + remote_knowledge: None, + local_knowledge: None, + received_knowledge: None, + }); + k.received_knowledge = + Some(StatementFilter::blank(local_knowledge.seconded_in_group.len())); k.local_knowledge = Some(local_knowledge); } @@ -946,20 +960,24 @@ impl KnownBackedCandidate { validator: ValidatorIndex, remote_knowledge: StatementFilter, ) { - let k = self - .mutual_knowledge - .entry(validator) - .or_insert_with(|| MutualKnowledge { remote_knowledge: None, local_knowledge: None }); + let k = self.mutual_knowledge.entry(validator).or_insert_with(|| MutualKnowledge { + remote_knowledge: None, + local_knowledge: None, + received_knowledge: None, + }); k.remote_knowledge = Some(remote_knowledge); } + /// Returns a list of tuples representing each potential sender(ValidatorIndex) + /// and if the sender should already know about the statement, because we just + /// sent it to it. fn direct_statement_senders( &self, group_index: GroupIndex, originator_index_in_group: usize, statement_kind: StatementKind, - ) -> Vec { + ) -> Vec<(ValidatorIndex, bool)> { if group_index != self.group_index { return Vec::new() } @@ -968,11 +986,18 @@ impl KnownBackedCandidate { .iter() .filter(|(_, k)| k.remote_knowledge.is_some()) .filter(|(_, k)| { - k.local_knowledge + k.received_knowledge .as_ref() .map_or(false, |r| !r.contains(originator_index_in_group, statement_kind)) }) - .map(|(v, _)| *v) + .map(|(v, k)| { + ( + *v, + k.local_knowledge + .as_ref() + .map_or(false, |r| r.contains(originator_index_in_group, statement_kind)), + ) + }) .collect() } @@ -1014,12 +1039,19 @@ impl KnownBackedCandidate { validator: ValidatorIndex, statement_index_in_group: usize, statement_kind: StatementKind, + received: bool, ) { if let Some(k) = self.mutual_knowledge.get_mut(&validator) { if let (Some(r), Some(l)) = (k.remote_knowledge.as_mut(), k.local_knowledge.as_mut()) { r.set(statement_index_in_group, statement_kind); l.set(statement_index_in_group, statement_kind); } + + if received { + k.received_knowledge + .as_mut() + .map(|knowledge| knowledge.set(statement_index_in_group, statement_kind)); + } } } @@ -2238,6 +2270,7 @@ mod tests { validator_index, counterparty, &statement, + false, ); // There should be no pending statements now (for the counterparty). diff --git a/polkadot/node/network/statement-distribution/src/v2/mod.rs b/polkadot/node/network/statement-distribution/src/v2/mod.rs index 5285265787..dc29c19a48 100644 --- a/polkadot/node/network/statement-distribution/src/v2/mod.rs +++ b/polkadot/node/network/statement-distribution/src/v2/mod.rs @@ -93,7 +93,19 @@ mod statement_store; #[cfg(test)] mod tests; -const COST_UNEXPECTED_STATEMENT: Rep = Rep::CostMinor("Unexpected Statement"); +const COST_UNEXPECTED_STATEMENT_NOT_VALIDATOR: Rep = + Rep::CostMinor("Unexpected Statement, not a validator"); +const COST_UNEXPECTED_STATEMENT_VALIDATOR_NOT_FOUND: Rep = + Rep::CostMinor("Unexpected Statement, validator not found"); +const COST_UNEXPECTED_STATEMENT_INVALID_SENDER: Rep = + Rep::CostMinor("Unexpected Statement, invalid sender"); +const COST_UNEXPECTED_STATEMENT_BAD_ADVERTISE: Rep = + Rep::CostMinor("Unexpected Statement, bad advertise"); +const COST_UNEXPECTED_STATEMENT_CLUSTER_REJECTED: Rep = + Rep::CostMinor("Unexpected Statement, cluster rejected"); +const COST_UNEXPECTED_STATEMENT_NOT_IN_GROUP: Rep = + Rep::CostMinor("Unexpected Statement, not in group"); + const COST_UNEXPECTED_STATEMENT_MISSING_KNOWLEDGE: Rep = Rep::CostMinor("Unexpected Statement, missing knowledge for relay parent"); const COST_EXCESSIVE_SECONDED: Rep = Rep::CostMinor("Sent Excessive `Seconded` Statements"); @@ -1086,6 +1098,7 @@ async fn send_pending_grid_messages( originator, peer_validator_id, &compact, + false, ); } @@ -1378,6 +1391,7 @@ async fn circulate_statement( originator, target, &compact_statement, + false, ); }, } @@ -1515,7 +1529,13 @@ async fn handle_incoming_statement( // we shouldn't be receiving statements unless we're a validator // this session. if per_session.is_not_validator() { - modify_reputation(reputation, ctx.sender(), peer, COST_UNEXPECTED_STATEMENT).await; + modify_reputation( + reputation, + ctx.sender(), + peer, + COST_UNEXPECTED_STATEMENT_NOT_VALIDATOR, + ) + .await; } return }, @@ -1526,7 +1546,13 @@ async fn handle_incoming_statement( match per_session.groups.by_validator_index(statement.unchecked_validator_index()) { Some(g) => g, None => { - modify_reputation(reputation, ctx.sender(), peer, COST_UNEXPECTED_STATEMENT).await; + modify_reputation( + reputation, + ctx.sender(), + peer, + COST_UNEXPECTED_STATEMENT_VALIDATOR_NOT_FOUND, + ) + .await; return }, }; @@ -1565,38 +1591,45 @@ async fn handle_incoming_statement( (active, idx) }; - let checked_statement = - if let Some((active, cluster_sender_index)) = active.zip(cluster_sender_index) { - match handle_cluster_statement( - relay_parent, - &mut active.cluster_tracker, - per_relay_parent.session, - &per_session.session_info, - statement, - cluster_sender_index, - ) { - Ok(Some(s)) => s, - Ok(None) => return, - Err(rep) => { - modify_reputation(reputation, ctx.sender(), peer, rep).await; - return - }, - } - } else { - let grid_sender_index = local_validator - .grid_tracker - .direct_statement_providers( - &per_session.groups, - statement.unchecked_validator_index(), - statement.unchecked_payload(), - ) - .into_iter() - .filter_map(|i| session_info.discovery_keys.get(i.0 as usize).map(|ad| (i, ad))) - .filter(|(_, ad)| peer_state.is_authority(ad)) - .map(|(i, _)| i) - .next(); + let checked_statement = if let Some((active, cluster_sender_index)) = + active.zip(cluster_sender_index) + { + match handle_cluster_statement( + relay_parent, + &mut active.cluster_tracker, + per_relay_parent.session, + &per_session.session_info, + statement, + cluster_sender_index, + ) { + Ok(Some(s)) => s, + Ok(None) => return, + Err(rep) => { + modify_reputation(reputation, ctx.sender(), peer, rep).await; + return + }, + } + } else { + let grid_sender_index = local_validator + .grid_tracker + .direct_statement_providers( + &per_session.groups, + statement.unchecked_validator_index(), + statement.unchecked_payload(), + ) + .into_iter() + .filter_map(|(i, validator_knows_statement)| { + session_info + .discovery_keys + .get(i.0 as usize) + .map(|ad| (i, ad, validator_knows_statement)) + }) + .filter(|(_, ad, _)| peer_state.is_authority(ad)) + .map(|(i, _, validator_knows_statement)| (i, validator_knows_statement)) + .next(); - if let Some(grid_sender_index) = grid_sender_index { + if let Some((grid_sender_index, validator_knows_statement)) = grid_sender_index { + if !validator_knows_statement { match handle_grid_statement( relay_parent, &mut local_validator.grid_tracker, @@ -1612,11 +1645,22 @@ async fn handle_incoming_statement( }, } } else { - // Not a cluster or grid peer. - modify_reputation(reputation, ctx.sender(), peer, COST_UNEXPECTED_STATEMENT).await; - return + // Reward the peer for sending us the statement + modify_reputation(reputation, ctx.sender(), peer, BENEFIT_VALID_STATEMENT).await; + return; } - }; + } else { + // Not a cluster or grid peer. + modify_reputation( + reputation, + ctx.sender(), + peer, + COST_UNEXPECTED_STATEMENT_INVALID_SENDER, + ) + .await; + return + } + }; let statement = checked_statement.payload().clone(); let originator_index = checked_statement.validator_index(); @@ -1635,7 +1679,13 @@ async fn handle_incoming_statement( ); if let Err(BadAdvertisement) = res { - modify_reputation(reputation, ctx.sender(), peer, COST_UNEXPECTED_STATEMENT).await; + modify_reputation( + reputation, + ctx.sender(), + peer, + COST_UNEXPECTED_STATEMENT_BAD_ADVERTISE, + ) + .await; return } } @@ -1743,11 +1793,11 @@ fn handle_cluster_statement( Ok(ClusterAccept::WithPrejudice) => false, Err(ClusterRejectIncoming::ExcessiveSeconded) => return Err(COST_EXCESSIVE_SECONDED), Err(ClusterRejectIncoming::CandidateUnknown | ClusterRejectIncoming::Duplicate) => - return Err(COST_UNEXPECTED_STATEMENT), + return Err(COST_UNEXPECTED_STATEMENT_CLUSTER_REJECTED), Err(ClusterRejectIncoming::NotInGroup) => { // sanity: shouldn't be possible; we already filtered this // out above. - return Err(COST_UNEXPECTED_STATEMENT) + return Err(COST_UNEXPECTED_STATEMENT_NOT_IN_GROUP) }, } }; @@ -1798,6 +1848,7 @@ fn handle_grid_statement( checked_statement.validator_index(), grid_sender_index, &checked_statement.payload(), + true, ); Ok(checked_statement) @@ -2376,6 +2427,7 @@ fn post_acknowledgement_statement_messages( statement.validator_index(), recipient, statement.payload(), + false, ); match peer.1.into() { ValidationVersion::V2 => messages.push(Versioned::V2( @@ -3255,6 +3307,7 @@ pub(crate) fn answer_request(state: &mut State, message: ResponderMessage) { statement.unchecked_validator_index(), validator_id, statement.unchecked_payload(), + false, ); } } diff --git a/polkadot/node/network/statement-distribution/src/v2/tests/cluster.rs b/polkadot/node/network/statement-distribution/src/v2/tests/cluster.rs index 7ffed9d47d..a944a9cd6d 100644 --- a/polkadot/node/network/statement-distribution/src/v2/tests/cluster.rs +++ b/polkadot/node/network/statement-distribution/src/v2/tests/cluster.rs @@ -170,7 +170,7 @@ fn cluster_valid_statement_before_seconded_ignored() { overseer.recv().await, AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r))) => { assert_eq!(p, peer_a); - assert_eq!(r, COST_UNEXPECTED_STATEMENT.into()); + assert_eq!(r, COST_UNEXPECTED_STATEMENT_CLUSTER_REJECTED.into()); } ); @@ -305,7 +305,7 @@ fn useful_cluster_statement_from_non_cluster_peer_rejected() { assert_matches!( overseer.recv().await, AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r))) - if p == peer_a && r == COST_UNEXPECTED_STATEMENT.into() => { } + if p == peer_a && r == COST_UNEXPECTED_STATEMENT_INVALID_SENDER.into() => { } ); overseer @@ -359,7 +359,7 @@ fn statement_from_non_cluster_originator_unexpected() { assert_matches!( overseer.recv().await, AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r))) - if p == peer_a && r == COST_UNEXPECTED_STATEMENT.into() => { } + if p == peer_a && r == COST_UNEXPECTED_STATEMENT_INVALID_SENDER.into() => { } ); overseer