mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-27 09:17:58 +00:00
statement-distribution: Fix CostMinor("Unexpected Statement") (#3223)
On grid distribution messages have two paths of reaching a node, so there is the possiblity of a race when two peers send each other the same statement around the same time. Statement local_knowledge will tell us that the peer should have not send the statement because we sent it to it. Fix it by also keeping track only of the statement we received from a given peer and penalize it only if it sends it to us more than once. Fixes: https://github.com/paritytech/polkadot-sdk/issues/2346 Additionally, also use different Cost labels for different paths to make it easier to debug things. --------- Signed-off-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io>
This commit is contained in:
committed by
GitHub
parent
c3489aaf2f
commit
8362a6810d
@@ -527,12 +527,16 @@ impl GridTracker {
|
||||
}
|
||||
|
||||
/// Determine the validators which can send a statement to us by direct broadcast.
|
||||
///
|
||||
/// Returns a list of tuples representing each potential sender(ValidatorIndex)
|
||||
/// and if the sender should already know about the statement, because we just
|
||||
/// sent it to it.
|
||||
pub fn direct_statement_providers(
|
||||
&self,
|
||||
groups: &Groups,
|
||||
originator: ValidatorIndex,
|
||||
statement: &CompactStatement,
|
||||
) -> Vec<ValidatorIndex> {
|
||||
) -> Vec<(ValidatorIndex, bool)> {
|
||||
let (g, c_h, kind, in_group) =
|
||||
match extract_statement_and_group_info(groups, originator, statement) {
|
||||
None => return Vec::new(),
|
||||
@@ -616,12 +620,13 @@ impl GridTracker {
|
||||
originator: ValidatorIndex,
|
||||
counterparty: ValidatorIndex,
|
||||
statement: &CompactStatement,
|
||||
received: bool,
|
||||
) {
|
||||
if let Some((_, c_h, kind, in_group)) =
|
||||
extract_statement_and_group_info(groups, originator, statement)
|
||||
{
|
||||
if let Some(known) = self.confirmed_backed.get_mut(&c_h) {
|
||||
known.sent_or_received_direct_statement(counterparty, in_group, kind);
|
||||
known.sent_or_received_direct_statement(counterparty, in_group, kind, received);
|
||||
|
||||
if let Some(pending) = self.pending_statements.get_mut(&counterparty) {
|
||||
pending.remove(&(originator, statement.clone()));
|
||||
@@ -908,6 +913,12 @@ struct MutualKnowledge {
|
||||
/// `Some` only if we have advertised, acknowledged, or requested the candidate
|
||||
/// from them.
|
||||
local_knowledge: Option<StatementFilter>,
|
||||
/// Knowledge peer circulated to us, this is different from `local_knowledge` and
|
||||
/// `remote_knowledge`, through the fact that includes only statements that we received from
|
||||
/// peer while the other two, after manifest exchange part will include both what we sent to
|
||||
/// the peer and what we received from peer, see `sent_or_received_direct_statement` for more
|
||||
/// details.
|
||||
received_knowledge: Option<StatementFilter>,
|
||||
}
|
||||
|
||||
// A utility struct for keeping track of metadata about candidates
|
||||
@@ -933,10 +944,13 @@ impl KnownBackedCandidate {
|
||||
}
|
||||
|
||||
fn manifest_sent_to(&mut self, validator: ValidatorIndex, local_knowledge: StatementFilter) {
|
||||
let k = self
|
||||
.mutual_knowledge
|
||||
.entry(validator)
|
||||
.or_insert_with(|| MutualKnowledge { remote_knowledge: None, local_knowledge: None });
|
||||
let k = self.mutual_knowledge.entry(validator).or_insert_with(|| MutualKnowledge {
|
||||
remote_knowledge: None,
|
||||
local_knowledge: None,
|
||||
received_knowledge: None,
|
||||
});
|
||||
k.received_knowledge =
|
||||
Some(StatementFilter::blank(local_knowledge.seconded_in_group.len()));
|
||||
|
||||
k.local_knowledge = Some(local_knowledge);
|
||||
}
|
||||
@@ -946,20 +960,24 @@ impl KnownBackedCandidate {
|
||||
validator: ValidatorIndex,
|
||||
remote_knowledge: StatementFilter,
|
||||
) {
|
||||
let k = self
|
||||
.mutual_knowledge
|
||||
.entry(validator)
|
||||
.or_insert_with(|| MutualKnowledge { remote_knowledge: None, local_knowledge: None });
|
||||
let k = self.mutual_knowledge.entry(validator).or_insert_with(|| MutualKnowledge {
|
||||
remote_knowledge: None,
|
||||
local_knowledge: None,
|
||||
received_knowledge: None,
|
||||
});
|
||||
|
||||
k.remote_knowledge = Some(remote_knowledge);
|
||||
}
|
||||
|
||||
/// Returns a list of tuples representing each potential sender(ValidatorIndex)
|
||||
/// and if the sender should already know about the statement, because we just
|
||||
/// sent it to it.
|
||||
fn direct_statement_senders(
|
||||
&self,
|
||||
group_index: GroupIndex,
|
||||
originator_index_in_group: usize,
|
||||
statement_kind: StatementKind,
|
||||
) -> Vec<ValidatorIndex> {
|
||||
) -> Vec<(ValidatorIndex, bool)> {
|
||||
if group_index != self.group_index {
|
||||
return Vec::new()
|
||||
}
|
||||
@@ -968,11 +986,18 @@ impl KnownBackedCandidate {
|
||||
.iter()
|
||||
.filter(|(_, k)| k.remote_knowledge.is_some())
|
||||
.filter(|(_, k)| {
|
||||
k.local_knowledge
|
||||
k.received_knowledge
|
||||
.as_ref()
|
||||
.map_or(false, |r| !r.contains(originator_index_in_group, statement_kind))
|
||||
})
|
||||
.map(|(v, _)| *v)
|
||||
.map(|(v, k)| {
|
||||
(
|
||||
*v,
|
||||
k.local_knowledge
|
||||
.as_ref()
|
||||
.map_or(false, |r| r.contains(originator_index_in_group, statement_kind)),
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
@@ -1014,12 +1039,19 @@ impl KnownBackedCandidate {
|
||||
validator: ValidatorIndex,
|
||||
statement_index_in_group: usize,
|
||||
statement_kind: StatementKind,
|
||||
received: bool,
|
||||
) {
|
||||
if let Some(k) = self.mutual_knowledge.get_mut(&validator) {
|
||||
if let (Some(r), Some(l)) = (k.remote_knowledge.as_mut(), k.local_knowledge.as_mut()) {
|
||||
r.set(statement_index_in_group, statement_kind);
|
||||
l.set(statement_index_in_group, statement_kind);
|
||||
}
|
||||
|
||||
if received {
|
||||
k.received_knowledge
|
||||
.as_mut()
|
||||
.map(|knowledge| knowledge.set(statement_index_in_group, statement_kind));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2238,6 +2270,7 @@ mod tests {
|
||||
validator_index,
|
||||
counterparty,
|
||||
&statement,
|
||||
false,
|
||||
);
|
||||
|
||||
// There should be no pending statements now (for the counterparty).
|
||||
|
||||
@@ -93,7 +93,19 @@ mod statement_store;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
const COST_UNEXPECTED_STATEMENT: Rep = Rep::CostMinor("Unexpected Statement");
|
||||
const COST_UNEXPECTED_STATEMENT_NOT_VALIDATOR: Rep =
|
||||
Rep::CostMinor("Unexpected Statement, not a validator");
|
||||
const COST_UNEXPECTED_STATEMENT_VALIDATOR_NOT_FOUND: Rep =
|
||||
Rep::CostMinor("Unexpected Statement, validator not found");
|
||||
const COST_UNEXPECTED_STATEMENT_INVALID_SENDER: Rep =
|
||||
Rep::CostMinor("Unexpected Statement, invalid sender");
|
||||
const COST_UNEXPECTED_STATEMENT_BAD_ADVERTISE: Rep =
|
||||
Rep::CostMinor("Unexpected Statement, bad advertise");
|
||||
const COST_UNEXPECTED_STATEMENT_CLUSTER_REJECTED: Rep =
|
||||
Rep::CostMinor("Unexpected Statement, cluster rejected");
|
||||
const COST_UNEXPECTED_STATEMENT_NOT_IN_GROUP: Rep =
|
||||
Rep::CostMinor("Unexpected Statement, not in group");
|
||||
|
||||
const COST_UNEXPECTED_STATEMENT_MISSING_KNOWLEDGE: Rep =
|
||||
Rep::CostMinor("Unexpected Statement, missing knowledge for relay parent");
|
||||
const COST_EXCESSIVE_SECONDED: Rep = Rep::CostMinor("Sent Excessive `Seconded` Statements");
|
||||
@@ -1086,6 +1098,7 @@ async fn send_pending_grid_messages<Context>(
|
||||
originator,
|
||||
peer_validator_id,
|
||||
&compact,
|
||||
false,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1378,6 +1391,7 @@ async fn circulate_statement<Context>(
|
||||
originator,
|
||||
target,
|
||||
&compact_statement,
|
||||
false,
|
||||
);
|
||||
},
|
||||
}
|
||||
@@ -1515,7 +1529,13 @@ async fn handle_incoming_statement<Context>(
|
||||
// we shouldn't be receiving statements unless we're a validator
|
||||
// this session.
|
||||
if per_session.is_not_validator() {
|
||||
modify_reputation(reputation, ctx.sender(), peer, COST_UNEXPECTED_STATEMENT).await;
|
||||
modify_reputation(
|
||||
reputation,
|
||||
ctx.sender(),
|
||||
peer,
|
||||
COST_UNEXPECTED_STATEMENT_NOT_VALIDATOR,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
return
|
||||
},
|
||||
@@ -1526,7 +1546,13 @@ async fn handle_incoming_statement<Context>(
|
||||
match per_session.groups.by_validator_index(statement.unchecked_validator_index()) {
|
||||
Some(g) => g,
|
||||
None => {
|
||||
modify_reputation(reputation, ctx.sender(), peer, COST_UNEXPECTED_STATEMENT).await;
|
||||
modify_reputation(
|
||||
reputation,
|
||||
ctx.sender(),
|
||||
peer,
|
||||
COST_UNEXPECTED_STATEMENT_VALIDATOR_NOT_FOUND,
|
||||
)
|
||||
.await;
|
||||
return
|
||||
},
|
||||
};
|
||||
@@ -1565,38 +1591,45 @@ async fn handle_incoming_statement<Context>(
|
||||
(active, idx)
|
||||
};
|
||||
|
||||
let checked_statement =
|
||||
if let Some((active, cluster_sender_index)) = active.zip(cluster_sender_index) {
|
||||
match handle_cluster_statement(
|
||||
relay_parent,
|
||||
&mut active.cluster_tracker,
|
||||
per_relay_parent.session,
|
||||
&per_session.session_info,
|
||||
statement,
|
||||
cluster_sender_index,
|
||||
) {
|
||||
Ok(Some(s)) => s,
|
||||
Ok(None) => return,
|
||||
Err(rep) => {
|
||||
modify_reputation(reputation, ctx.sender(), peer, rep).await;
|
||||
return
|
||||
},
|
||||
}
|
||||
} else {
|
||||
let grid_sender_index = local_validator
|
||||
.grid_tracker
|
||||
.direct_statement_providers(
|
||||
&per_session.groups,
|
||||
statement.unchecked_validator_index(),
|
||||
statement.unchecked_payload(),
|
||||
)
|
||||
.into_iter()
|
||||
.filter_map(|i| session_info.discovery_keys.get(i.0 as usize).map(|ad| (i, ad)))
|
||||
.filter(|(_, ad)| peer_state.is_authority(ad))
|
||||
.map(|(i, _)| i)
|
||||
.next();
|
||||
let checked_statement = if let Some((active, cluster_sender_index)) =
|
||||
active.zip(cluster_sender_index)
|
||||
{
|
||||
match handle_cluster_statement(
|
||||
relay_parent,
|
||||
&mut active.cluster_tracker,
|
||||
per_relay_parent.session,
|
||||
&per_session.session_info,
|
||||
statement,
|
||||
cluster_sender_index,
|
||||
) {
|
||||
Ok(Some(s)) => s,
|
||||
Ok(None) => return,
|
||||
Err(rep) => {
|
||||
modify_reputation(reputation, ctx.sender(), peer, rep).await;
|
||||
return
|
||||
},
|
||||
}
|
||||
} else {
|
||||
let grid_sender_index = local_validator
|
||||
.grid_tracker
|
||||
.direct_statement_providers(
|
||||
&per_session.groups,
|
||||
statement.unchecked_validator_index(),
|
||||
statement.unchecked_payload(),
|
||||
)
|
||||
.into_iter()
|
||||
.filter_map(|(i, validator_knows_statement)| {
|
||||
session_info
|
||||
.discovery_keys
|
||||
.get(i.0 as usize)
|
||||
.map(|ad| (i, ad, validator_knows_statement))
|
||||
})
|
||||
.filter(|(_, ad, _)| peer_state.is_authority(ad))
|
||||
.map(|(i, _, validator_knows_statement)| (i, validator_knows_statement))
|
||||
.next();
|
||||
|
||||
if let Some(grid_sender_index) = grid_sender_index {
|
||||
if let Some((grid_sender_index, validator_knows_statement)) = grid_sender_index {
|
||||
if !validator_knows_statement {
|
||||
match handle_grid_statement(
|
||||
relay_parent,
|
||||
&mut local_validator.grid_tracker,
|
||||
@@ -1612,11 +1645,22 @@ async fn handle_incoming_statement<Context>(
|
||||
},
|
||||
}
|
||||
} else {
|
||||
// Not a cluster or grid peer.
|
||||
modify_reputation(reputation, ctx.sender(), peer, COST_UNEXPECTED_STATEMENT).await;
|
||||
return
|
||||
// Reward the peer for sending us the statement
|
||||
modify_reputation(reputation, ctx.sender(), peer, BENEFIT_VALID_STATEMENT).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
} else {
|
||||
// Not a cluster or grid peer.
|
||||
modify_reputation(
|
||||
reputation,
|
||||
ctx.sender(),
|
||||
peer,
|
||||
COST_UNEXPECTED_STATEMENT_INVALID_SENDER,
|
||||
)
|
||||
.await;
|
||||
return
|
||||
}
|
||||
};
|
||||
|
||||
let statement = checked_statement.payload().clone();
|
||||
let originator_index = checked_statement.validator_index();
|
||||
@@ -1635,7 +1679,13 @@ async fn handle_incoming_statement<Context>(
|
||||
);
|
||||
|
||||
if let Err(BadAdvertisement) = res {
|
||||
modify_reputation(reputation, ctx.sender(), peer, COST_UNEXPECTED_STATEMENT).await;
|
||||
modify_reputation(
|
||||
reputation,
|
||||
ctx.sender(),
|
||||
peer,
|
||||
COST_UNEXPECTED_STATEMENT_BAD_ADVERTISE,
|
||||
)
|
||||
.await;
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -1743,11 +1793,11 @@ fn handle_cluster_statement(
|
||||
Ok(ClusterAccept::WithPrejudice) => false,
|
||||
Err(ClusterRejectIncoming::ExcessiveSeconded) => return Err(COST_EXCESSIVE_SECONDED),
|
||||
Err(ClusterRejectIncoming::CandidateUnknown | ClusterRejectIncoming::Duplicate) =>
|
||||
return Err(COST_UNEXPECTED_STATEMENT),
|
||||
return Err(COST_UNEXPECTED_STATEMENT_CLUSTER_REJECTED),
|
||||
Err(ClusterRejectIncoming::NotInGroup) => {
|
||||
// sanity: shouldn't be possible; we already filtered this
|
||||
// out above.
|
||||
return Err(COST_UNEXPECTED_STATEMENT)
|
||||
return Err(COST_UNEXPECTED_STATEMENT_NOT_IN_GROUP)
|
||||
},
|
||||
}
|
||||
};
|
||||
@@ -1798,6 +1848,7 @@ fn handle_grid_statement(
|
||||
checked_statement.validator_index(),
|
||||
grid_sender_index,
|
||||
&checked_statement.payload(),
|
||||
true,
|
||||
);
|
||||
|
||||
Ok(checked_statement)
|
||||
@@ -2376,6 +2427,7 @@ fn post_acknowledgement_statement_messages(
|
||||
statement.validator_index(),
|
||||
recipient,
|
||||
statement.payload(),
|
||||
false,
|
||||
);
|
||||
match peer.1.into() {
|
||||
ValidationVersion::V2 => messages.push(Versioned::V2(
|
||||
@@ -3255,6 +3307,7 @@ pub(crate) fn answer_request(state: &mut State, message: ResponderMessage) {
|
||||
statement.unchecked_validator_index(),
|
||||
validator_id,
|
||||
statement.unchecked_payload(),
|
||||
false,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -170,7 +170,7 @@ fn cluster_valid_statement_before_seconded_ignored() {
|
||||
overseer.recv().await,
|
||||
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r))) => {
|
||||
assert_eq!(p, peer_a);
|
||||
assert_eq!(r, COST_UNEXPECTED_STATEMENT.into());
|
||||
assert_eq!(r, COST_UNEXPECTED_STATEMENT_CLUSTER_REJECTED.into());
|
||||
}
|
||||
);
|
||||
|
||||
@@ -305,7 +305,7 @@ fn useful_cluster_statement_from_non_cluster_peer_rejected() {
|
||||
assert_matches!(
|
||||
overseer.recv().await,
|
||||
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r)))
|
||||
if p == peer_a && r == COST_UNEXPECTED_STATEMENT.into() => { }
|
||||
if p == peer_a && r == COST_UNEXPECTED_STATEMENT_INVALID_SENDER.into() => { }
|
||||
);
|
||||
|
||||
overseer
|
||||
@@ -359,7 +359,7 @@ fn statement_from_non_cluster_originator_unexpected() {
|
||||
assert_matches!(
|
||||
overseer.recv().await,
|
||||
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r)))
|
||||
if p == peer_a && r == COST_UNEXPECTED_STATEMENT.into() => { }
|
||||
if p == peer_a && r == COST_UNEXPECTED_STATEMENT_INVALID_SENDER.into() => { }
|
||||
);
|
||||
|
||||
overseer
|
||||
|
||||
Reference in New Issue
Block a user