mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-18 17:41:01 +00:00
Only accept requests from peers that got notified by us. (#2889)
This commit is contained in:
@@ -167,9 +167,11 @@ fn note_hash(
|
|||||||
/// knowledge that a peer has about goings-on in a relay parent.
|
/// knowledge that a peer has about goings-on in a relay parent.
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
struct PeerRelayParentKnowledge {
|
struct PeerRelayParentKnowledge {
|
||||||
/// candidates that the peer is aware of. This indicates that we can
|
/// candidates that the peer is aware of because we sent statements to it. This indicates that we can
|
||||||
/// send other statements pertaining to that candidate.
|
/// send other statements pertaining to that candidate.
|
||||||
known_candidates: HashSet<CandidateHash>,
|
sent_candidates: HashSet<CandidateHash>,
|
||||||
|
/// candidates that peer is aware of, because we received statements from it.
|
||||||
|
received_candidates: HashSet<CandidateHash>,
|
||||||
/// fingerprints of all statements a peer should be aware of: those that
|
/// fingerprints of all statements a peer should be aware of: those that
|
||||||
/// were sent to the peer by us.
|
/// were sent to the peer by us.
|
||||||
sent_statements: HashSet<(CompactStatement, ValidatorIndex)>,
|
sent_statements: HashSet<(CompactStatement, ValidatorIndex)>,
|
||||||
@@ -205,7 +207,7 @@ impl PeerRelayParentKnowledge {
|
|||||||
.or_default()
|
.or_default()
|
||||||
.note_local(h.clone());
|
.note_local(h.clone());
|
||||||
|
|
||||||
self.known_candidates.insert(h.clone())
|
self.sent_candidates.insert(h.clone())
|
||||||
},
|
},
|
||||||
CompactStatement::Valid(_) => {
|
CompactStatement::Valid(_) => {
|
||||||
false
|
false
|
||||||
@@ -231,7 +233,7 @@ impl PeerRelayParentKnowledge {
|
|||||||
CompactStatement::Valid(ref h) => {
|
CompactStatement::Valid(ref h) => {
|
||||||
// The peer can only accept Valid and Invalid statements for which it is aware
|
// The peer can only accept Valid and Invalid statements for which it is aware
|
||||||
// of the corresponding candidate.
|
// of the corresponding candidate.
|
||||||
self.known_candidates.contains(h)
|
self.is_known_candidate(h)
|
||||||
}
|
}
|
||||||
CompactStatement::Seconded(_) => {
|
CompactStatement::Seconded(_) => {
|
||||||
true
|
true
|
||||||
@@ -280,7 +282,7 @@ impl PeerRelayParentKnowledge {
|
|||||||
h
|
h
|
||||||
}
|
}
|
||||||
CompactStatement::Valid(ref h) => {
|
CompactStatement::Valid(ref h) => {
|
||||||
if !self.known_candidates.contains(&h) {
|
if !self.is_known_candidate(&h) {
|
||||||
return Err(COST_UNEXPECTED_STATEMENT);
|
return Err(COST_UNEXPECTED_STATEMENT);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -301,7 +303,7 @@ impl PeerRelayParentKnowledge {
|
|||||||
}
|
}
|
||||||
|
|
||||||
self.received_statements.insert(fingerprint.clone());
|
self.received_statements.insert(fingerprint.clone());
|
||||||
Ok(self.known_candidates.insert(candidate_hash.clone()))
|
Ok(self.received_candidates.insert(candidate_hash.clone()))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This method does the same checks as `receive` without modifying the internal state.
|
/// This method does the same checks as `receive` without modifying the internal state.
|
||||||
@@ -330,7 +332,7 @@ impl PeerRelayParentKnowledge {
|
|||||||
h
|
h
|
||||||
}
|
}
|
||||||
CompactStatement::Valid(ref h) => {
|
CompactStatement::Valid(ref h) => {
|
||||||
if !self.known_candidates.contains(&h) {
|
if !self.is_known_candidate(&h) {
|
||||||
return Err(COST_UNEXPECTED_STATEMENT);
|
return Err(COST_UNEXPECTED_STATEMENT);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -348,6 +350,12 @@ impl PeerRelayParentKnowledge {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Check for candidates that the peer is aware of. This indicates that we can
|
||||||
|
/// send other statements pertaining to that candidate.
|
||||||
|
fn is_known_candidate(&self, candidate: &CandidateHash) -> bool {
|
||||||
|
self.sent_candidates.contains(candidate) || self.received_candidates.contains(candidate)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct PeerData {
|
struct PeerData {
|
||||||
@@ -1527,6 +1535,7 @@ impl StatementDistribution {
|
|||||||
.await?,
|
.await?,
|
||||||
Message::Responder(result) =>
|
Message::Responder(result) =>
|
||||||
self.handle_responder_message(
|
self.handle_responder_message(
|
||||||
|
&peers,
|
||||||
&mut active_heads,
|
&mut active_heads,
|
||||||
result.ok_or(SubsystemError::Context(
|
result.ok_or(SubsystemError::Context(
|
||||||
"Failed to read from responder receiver (stream finished)"
|
"Failed to read from responder receiver (stream finished)"
|
||||||
@@ -1545,15 +1554,30 @@ impl StatementDistribution {
|
|||||||
/// Handle messages from responder background task.
|
/// Handle messages from responder background task.
|
||||||
async fn handle_responder_message(
|
async fn handle_responder_message(
|
||||||
&self,
|
&self,
|
||||||
|
peers: &HashMap<PeerId, PeerData>,
|
||||||
active_heads: &mut HashMap<Hash, ActiveHeadData>,
|
active_heads: &mut HashMap<Hash, ActiveHeadData>,
|
||||||
message: ResponderMessage,
|
message: ResponderMessage,
|
||||||
) -> SubsystemResult<bool> {
|
) -> SubsystemResult<bool> {
|
||||||
match message {
|
match message {
|
||||||
ResponderMessage::GetData {
|
ResponderMessage::GetData {
|
||||||
|
requesting_peer,
|
||||||
relay_parent,
|
relay_parent,
|
||||||
candidate_hash,
|
candidate_hash,
|
||||||
tx,
|
tx,
|
||||||
} => {
|
} => {
|
||||||
|
if !requesting_peer_knows_about_candidate(
|
||||||
|
peers,
|
||||||
|
&requesting_peer,
|
||||||
|
&relay_parent,
|
||||||
|
&candidate_hash
|
||||||
|
) {
|
||||||
|
tracing::warn!(
|
||||||
|
target: LOG_TARGET,
|
||||||
|
"Peer requested candidate, although we never announced it to that peer."
|
||||||
|
);
|
||||||
|
return Ok(false)
|
||||||
|
}
|
||||||
|
|
||||||
let active_head = match active_heads.get(&relay_parent) {
|
let active_head = match active_heads.get(&relay_parent) {
|
||||||
Some(head) => head,
|
Some(head) => head,
|
||||||
None => return Ok(false),
|
None => return Ok(false),
|
||||||
@@ -1844,6 +1868,36 @@ impl StatementDistribution {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Check whether a peer knows about a candidate from us.
|
||||||
|
///
|
||||||
|
/// If not, it is deemed illegal for it to request corresponding data from us.
|
||||||
|
fn requesting_peer_knows_about_candidate(
|
||||||
|
peers: &HashMap<PeerId, PeerData>,
|
||||||
|
requesting_peer: &PeerId,
|
||||||
|
relay_parent: &Hash,
|
||||||
|
candidate_hash: &CandidateHash,
|
||||||
|
) -> bool {
|
||||||
|
requesting_peer_knows_about_candidate_inner(
|
||||||
|
peers,
|
||||||
|
requesting_peer,
|
||||||
|
relay_parent,
|
||||||
|
candidate_hash,
|
||||||
|
).is_some()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Helper function for `requesting_peer_knows_about_statement`.
|
||||||
|
fn requesting_peer_knows_about_candidate_inner(
|
||||||
|
peers: &HashMap<PeerId, PeerData>,
|
||||||
|
requesting_peer: &PeerId,
|
||||||
|
relay_parent: &Hash,
|
||||||
|
candidate_hash: &CandidateHash,
|
||||||
|
) -> Option<()> {
|
||||||
|
let peer_data = peers.get(requesting_peer)?;
|
||||||
|
let knowledge = peer_data.view_knowledge.get(relay_parent)?;
|
||||||
|
knowledge.sent_candidates.get(&candidate_hash)?;
|
||||||
|
Some(())
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct MetricsInner {
|
struct MetricsInner {
|
||||||
statements_distributed: prometheus::Counter<prometheus::U64>,
|
statements_distributed: prometheus::Counter<prometheus::U64>,
|
||||||
@@ -2146,7 +2200,7 @@ mod tests {
|
|||||||
|
|
||||||
// Sending an un-pinned statement should not work and should have no effect.
|
// Sending an un-pinned statement should not work and should have no effect.
|
||||||
assert!(!knowledge.can_send(&(CompactStatement::Valid(hash_a), ValidatorIndex(0))));
|
assert!(!knowledge.can_send(&(CompactStatement::Valid(hash_a), ValidatorIndex(0))));
|
||||||
assert!(!knowledge.known_candidates.contains(&hash_a));
|
assert!(!knowledge.is_known_candidate(&hash_a));
|
||||||
assert!(knowledge.sent_statements.is_empty());
|
assert!(knowledge.sent_statements.is_empty());
|
||||||
assert!(knowledge.received_statements.is_empty());
|
assert!(knowledge.received_statements.is_empty());
|
||||||
assert!(knowledge.seconded_counts.is_empty());
|
assert!(knowledge.seconded_counts.is_empty());
|
||||||
@@ -2155,7 +2209,7 @@ mod tests {
|
|||||||
// Make the peer aware of the candidate.
|
// Make the peer aware of the candidate.
|
||||||
assert_eq!(knowledge.send(&(CompactStatement::Seconded(hash_a), ValidatorIndex(0))), true);
|
assert_eq!(knowledge.send(&(CompactStatement::Seconded(hash_a), ValidatorIndex(0))), true);
|
||||||
assert_eq!(knowledge.send(&(CompactStatement::Seconded(hash_a), ValidatorIndex(1))), false);
|
assert_eq!(knowledge.send(&(CompactStatement::Seconded(hash_a), ValidatorIndex(1))), false);
|
||||||
assert!(knowledge.known_candidates.contains(&hash_a));
|
assert!(knowledge.is_known_candidate(&hash_a));
|
||||||
assert_eq!(knowledge.sent_statements.len(), 2);
|
assert_eq!(knowledge.sent_statements.len(), 2);
|
||||||
assert!(knowledge.received_statements.is_empty());
|
assert!(knowledge.received_statements.is_empty());
|
||||||
assert_eq!(knowledge.seconded_counts.len(), 2);
|
assert_eq!(knowledge.seconded_counts.len(), 2);
|
||||||
@@ -2163,7 +2217,7 @@ mod tests {
|
|||||||
|
|
||||||
// And now it should accept the dependent message.
|
// And now it should accept the dependent message.
|
||||||
assert_eq!(knowledge.send(&(CompactStatement::Valid(hash_a), ValidatorIndex(0))), false);
|
assert_eq!(knowledge.send(&(CompactStatement::Valid(hash_a), ValidatorIndex(0))), false);
|
||||||
assert!(knowledge.known_candidates.contains(&hash_a));
|
assert!(knowledge.is_known_candidate(&hash_a));
|
||||||
assert_eq!(knowledge.sent_statements.len(), 3);
|
assert_eq!(knowledge.sent_statements.len(), 3);
|
||||||
assert!(knowledge.received_statements.is_empty());
|
assert!(knowledge.received_statements.is_empty());
|
||||||
assert_eq!(knowledge.seconded_counts.len(), 2);
|
assert_eq!(knowledge.seconded_counts.len(), 2);
|
||||||
@@ -2208,7 +2262,7 @@ mod tests {
|
|||||||
Ok(false),
|
Ok(false),
|
||||||
);
|
);
|
||||||
|
|
||||||
assert!(knowledge.known_candidates.contains(&hash_a));
|
assert!(knowledge.is_known_candidate(&hash_a));
|
||||||
assert_eq!(*knowledge.received_message_count.get(&hash_a).unwrap(), 2);
|
assert_eq!(*knowledge.received_message_count.get(&hash_a).unwrap(), 2);
|
||||||
|
|
||||||
assert!(knowledge.check_can_receive(&(CompactStatement::Valid(hash_a), ValidatorIndex(2)), 3).is_ok());
|
assert!(knowledge.check_can_receive(&(CompactStatement::Valid(hash_a), ValidatorIndex(2)), 3).is_ok());
|
||||||
@@ -2394,7 +2448,7 @@ mod tests {
|
|||||||
|
|
||||||
let c_knowledge = peer_data.view_knowledge.get(&hash_c).unwrap();
|
let c_knowledge = peer_data.view_knowledge.get(&hash_c).unwrap();
|
||||||
|
|
||||||
assert!(c_knowledge.known_candidates.contains(&candidate_hash));
|
assert!(c_knowledge.is_known_candidate(&candidate_hash));
|
||||||
assert!(c_knowledge.sent_statements.contains(
|
assert!(c_knowledge.sent_statements.contains(
|
||||||
&(CompactStatement::Seconded(candidate_hash), ValidatorIndex(0))
|
&(CompactStatement::Seconded(candidate_hash), ValidatorIndex(0))
|
||||||
));
|
));
|
||||||
@@ -3073,12 +3127,30 @@ mod tests {
|
|||||||
// Now that it has the candidate it should answer requests accordingly (even after a
|
// Now that it has the candidate it should answer requests accordingly (even after a
|
||||||
// failed request):
|
// failed request):
|
||||||
|
|
||||||
// Failing request first:
|
// Failing request first (wrong relay parent hash):
|
||||||
let (pending_response, response_rx) = oneshot::channel();
|
let (pending_response, response_rx) = oneshot::channel();
|
||||||
let inner_req = StatementFetchingRequest {
|
let inner_req = StatementFetchingRequest {
|
||||||
relay_parent: hash_b,
|
relay_parent: hash_b,
|
||||||
candidate_hash: metadata.candidate_hash,
|
candidate_hash: metadata.candidate_hash,
|
||||||
};
|
};
|
||||||
|
let req = sc_network::config::IncomingRequest {
|
||||||
|
peer: peer_b,
|
||||||
|
payload: inner_req.encode(),
|
||||||
|
pending_response,
|
||||||
|
};
|
||||||
|
tx_reqs.send(req).await.unwrap();
|
||||||
|
assert_matches!(
|
||||||
|
response_rx.await.unwrap().result,
|
||||||
|
Err(()) => {}
|
||||||
|
);
|
||||||
|
|
||||||
|
// Another failing request (peer_a never received a statement from us, so it is not
|
||||||
|
// allowed to request the data):
|
||||||
|
let (pending_response, response_rx) = oneshot::channel();
|
||||||
|
let inner_req = StatementFetchingRequest {
|
||||||
|
relay_parent: metadata.relay_parent,
|
||||||
|
candidate_hash: metadata.candidate_hash,
|
||||||
|
};
|
||||||
let req = sc_network::config::IncomingRequest {
|
let req = sc_network::config::IncomingRequest {
|
||||||
peer: peer_a,
|
peer: peer_a,
|
||||||
payload: inner_req.encode(),
|
payload: inner_req.encode(),
|
||||||
@@ -3090,19 +3162,18 @@ mod tests {
|
|||||||
Err(()) => {}
|
Err(()) => {}
|
||||||
);
|
);
|
||||||
|
|
||||||
// Now the working one:
|
// And now the succeding request from peer_b:
|
||||||
let (pending_response, response_rx) = oneshot::channel();
|
let (pending_response, response_rx) = oneshot::channel();
|
||||||
let inner_req = StatementFetchingRequest {
|
let inner_req = StatementFetchingRequest {
|
||||||
relay_parent: metadata.relay_parent,
|
relay_parent: metadata.relay_parent,
|
||||||
candidate_hash: metadata.candidate_hash,
|
candidate_hash: metadata.candidate_hash,
|
||||||
};
|
};
|
||||||
let req = sc_network::config::IncomingRequest {
|
let req = sc_network::config::IncomingRequest {
|
||||||
peer: peer_a,
|
peer: peer_b,
|
||||||
payload: inner_req.encode(),
|
payload: inner_req.encode(),
|
||||||
pending_response,
|
pending_response,
|
||||||
};
|
};
|
||||||
tx_reqs.send(req).await.unwrap();
|
tx_reqs.send(req).await.unwrap();
|
||||||
|
|
||||||
let StatementFetchingResponse::Statement(committed) =
|
let StatementFetchingResponse::Statement(committed) =
|
||||||
Decode::decode(&mut response_rx.await.unwrap().result.unwrap().as_ref()).unwrap();
|
Decode::decode(&mut response_rx.await.unwrap().result.unwrap().as_ref()).unwrap();
|
||||||
assert_eq!(committed, candidate);
|
assert_eq!(committed, candidate);
|
||||||
|
|||||||
@@ -19,13 +19,13 @@ use futures::{SinkExt, StreamExt, channel::{mpsc, oneshot}, stream::FuturesUnord
|
|||||||
use parity_scale_codec::Decode;
|
use parity_scale_codec::Decode;
|
||||||
|
|
||||||
use polkadot_node_network_protocol::{
|
use polkadot_node_network_protocol::{
|
||||||
UnifiedReputationChange as Rep,
|
PeerId, UnifiedReputationChange as Rep,
|
||||||
request_response::{
|
request_response::{
|
||||||
IncomingRequest, MAX_PARALLEL_STATEMENT_REQUESTS, request::OutgoingResponse,
|
IncomingRequest, MAX_PARALLEL_STATEMENT_REQUESTS, request::OutgoingResponse,
|
||||||
v1::{
|
v1::{
|
||||||
StatementFetchingRequest, StatementFetchingResponse
|
StatementFetchingRequest, StatementFetchingResponse
|
||||||
}
|
},
|
||||||
}
|
},
|
||||||
};
|
};
|
||||||
use polkadot_primitives::v1::{CandidateHash, CommittedCandidateReceipt, Hash};
|
use polkadot_primitives::v1::{CandidateHash, CommittedCandidateReceipt, Hash};
|
||||||
|
|
||||||
@@ -37,6 +37,7 @@ const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Peer sent unparsable request")
|
|||||||
pub enum ResponderMessage {
|
pub enum ResponderMessage {
|
||||||
/// Get an update of availble peers to try for fetching a given statement.
|
/// Get an update of availble peers to try for fetching a given statement.
|
||||||
GetData {
|
GetData {
|
||||||
|
requesting_peer: PeerId,
|
||||||
relay_parent: Hash,
|
relay_parent: Hash,
|
||||||
candidate_hash: CandidateHash,
|
candidate_hash: CandidateHash,
|
||||||
tx: oneshot::Sender<CommittedCandidateReceipt>
|
tx: oneshot::Sender<CommittedCandidateReceipt>
|
||||||
@@ -112,6 +113,7 @@ pub async fn respond(
|
|||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
if let Err(err) = sender.feed(
|
if let Err(err) = sender.feed(
|
||||||
ResponderMessage::GetData {
|
ResponderMessage::GetData {
|
||||||
|
requesting_peer: peer,
|
||||||
relay_parent: req.payload.relay_parent,
|
relay_parent: req.payload.relay_parent,
|
||||||
candidate_hash: req.payload.candidate_hash,
|
candidate_hash: req.payload.candidate_hash,
|
||||||
tx,
|
tx,
|
||||||
@@ -131,7 +133,7 @@ pub async fn respond(
|
|||||||
?err,
|
?err,
|
||||||
"Requested data not found."
|
"Requested data not found."
|
||||||
);
|
);
|
||||||
Err(())
|
Err(())
|
||||||
}
|
}
|
||||||
Ok(v) => Ok(StatementFetchingResponse::Statement(v)),
|
Ok(v) => Ok(StatementFetchingResponse::Statement(v)),
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user