mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-27 02:17:58 +00:00
More tests for new request based statement distribution (#2875)
* More test coverage. * Preserve peer order. * Better test coverage. * Even more test coverage. * Add doc comment to `IndexMap`. * Fix flaky test. * Review remarks. * Review remarks.
This commit is contained in:
@@ -53,7 +53,7 @@ use polkadot_node_network_protocol::{
|
||||
|
||||
use futures::{channel::mpsc, future::RemoteHandle, prelude::*};
|
||||
use futures::channel::oneshot;
|
||||
use indexmap::IndexSet;
|
||||
use indexmap::{IndexSet, IndexMap, map::Entry as IEntry};
|
||||
|
||||
use std::collections::{HashMap, HashSet, hash_map::Entry};
|
||||
|
||||
@@ -508,7 +508,10 @@ enum LargeStatementStatus {
|
||||
struct FetchingInfo {
|
||||
/// All peers that send us a `LargeStatement` or a `Valid` statement for the given
|
||||
/// `CandidateHash`, together with their originally sent messages.
|
||||
available_peers: HashMap<PeerId, Vec<protocol_v1::StatementDistributionMessage>>,
|
||||
///
|
||||
/// We use an `IndexMap` here to preserve the ordering of peers sending us messages. This is
|
||||
/// desirable because we reward first sending peers with reputation.
|
||||
available_peers: IndexMap<PeerId, Vec<protocol_v1::StatementDistributionMessage>>,
|
||||
/// Peers left to try in case the background task needs it.
|
||||
peers_to_try: Vec<PeerId>,
|
||||
/// Sender for sending fresh peers to the fetching task in case of failure.
|
||||
@@ -1057,22 +1060,24 @@ async fn retrieve_statement_from_message<'a>(
|
||||
match occupied.get_mut() {
|
||||
LargeStatementStatus::Fetching(info) => {
|
||||
|
||||
let is_new_peer = !info.available_peers.contains_key(&peer);
|
||||
let is_large_statement = message.is_large_statement();
|
||||
|
||||
match info.available_peers.entry(peer) {
|
||||
Entry::Occupied(mut occupied) => {
|
||||
occupied.get_mut().push(message);
|
||||
}
|
||||
Entry::Vacant(vacant) => {
|
||||
vacant.insert(vec![message]);
|
||||
}
|
||||
}
|
||||
let is_new_peer =
|
||||
match info.available_peers.entry(peer) {
|
||||
IEntry::Occupied(mut occupied) => {
|
||||
occupied.get_mut().push(message);
|
||||
false
|
||||
}
|
||||
IEntry::Vacant(vacant) => {
|
||||
vacant.insert(vec![message]);
|
||||
true
|
||||
}
|
||||
};
|
||||
|
||||
if is_new_peer & is_large_statement {
|
||||
info.peers_to_try.push(peer);
|
||||
// Answer any pending request for more peers:
|
||||
if let Some(sender) = std::mem::take(&mut info.peer_sender) {
|
||||
if let Some(sender) = info.peer_sender.take() {
|
||||
let to_send = std::mem::take(&mut info.peers_to_try);
|
||||
if let Err(peers) = sender.send(to_send) {
|
||||
// Requester no longer interested for now, might want them
|
||||
@@ -1181,7 +1186,7 @@ async fn launch_request(
|
||||
return None
|
||||
}
|
||||
let available_peers = {
|
||||
let mut m = HashMap::new();
|
||||
let mut m = IndexMap::new();
|
||||
m.insert(peer, vec![protocol_v1::StatementDistributionMessage::LargeStatement(meta)]);
|
||||
m
|
||||
};
|
||||
@@ -1950,6 +1955,7 @@ impl metrics::Metrics for Metrics {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
use parity_scale_codec::{Decode, Encode};
|
||||
use super::*;
|
||||
use std::sync::Arc;
|
||||
@@ -1959,9 +1965,10 @@ mod tests {
|
||||
use polkadot_primitives::v1::{CommittedCandidateReceipt, ValidationCode};
|
||||
use assert_matches::assert_matches;
|
||||
use futures::executor::{self, block_on};
|
||||
use futures_timer::Delay;
|
||||
use sp_keystore::{CryptoStore, SyncCryptoStorePtr, SyncCryptoStore};
|
||||
use sc_keystore::LocalKeystore;
|
||||
use polkadot_node_network_protocol::{view, ObservedRole};
|
||||
use polkadot_node_network_protocol::{view, ObservedRole, request_response::Recipient};
|
||||
use polkadot_subsystem::{jaeger, ActivatedLeaf};
|
||||
use polkadot_node_network_protocol::request_response::{
|
||||
Requests,
|
||||
@@ -2699,6 +2706,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing() {
|
||||
sp_tracing::try_init_simple();
|
||||
let hash_a = Hash::repeat_byte(1);
|
||||
let hash_b = Hash::repeat_byte(2);
|
||||
|
||||
@@ -2712,6 +2720,8 @@ mod tests {
|
||||
|
||||
let peer_a = PeerId::random();
|
||||
let peer_b = PeerId::random();
|
||||
let peer_c = PeerId::random();
|
||||
let peer_bad = PeerId::random();
|
||||
|
||||
let validators = vec![
|
||||
Sr25519Keyring::Alice.public().into(),
|
||||
@@ -2780,6 +2790,16 @@ mod tests {
|
||||
NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full)
|
||||
)
|
||||
}).await;
|
||||
handle.send(FromOverseer::Communication {
|
||||
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
|
||||
NetworkBridgeEvent::PeerConnected(peer_c.clone(), ObservedRole::Full)
|
||||
)
|
||||
}).await;
|
||||
handle.send(FromOverseer::Communication {
|
||||
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
|
||||
NetworkBridgeEvent::PeerConnected(peer_bad.clone(), ObservedRole::Full)
|
||||
)
|
||||
}).await;
|
||||
|
||||
handle.send(FromOverseer::Communication {
|
||||
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
|
||||
@@ -2792,8 +2812,19 @@ mod tests {
|
||||
NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![hash_a])
|
||||
)
|
||||
}).await;
|
||||
handle.send(FromOverseer::Communication {
|
||||
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
|
||||
NetworkBridgeEvent::PeerViewChange(peer_c.clone(), view![hash_a])
|
||||
)
|
||||
}).await;
|
||||
handle.send(FromOverseer::Communication {
|
||||
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
|
||||
NetworkBridgeEvent::PeerViewChange(peer_bad.clone(), view![hash_a])
|
||||
)
|
||||
}).await;
|
||||
|
||||
// receive a seconded statement from peer A. it should be propagated onwards to peer B and to
|
||||
// receive a seconded statement from peer A, which does not provide the request data,
|
||||
// then get that data from peer C. It should be propagated onwards to peer B and to
|
||||
// candidate backing.
|
||||
let statement = {
|
||||
let signing_context = SigningContext {
|
||||
@@ -2815,7 +2846,7 @@ mod tests {
|
||||
).await.ok().flatten().expect("should be signed")
|
||||
};
|
||||
|
||||
let metadata =
|
||||
let metadata =
|
||||
protocol_v1::StatementDistributionMessage::Statement(hash_a, statement.clone()).get_metadata();
|
||||
|
||||
handle.send(FromOverseer::Communication {
|
||||
@@ -2842,6 +2873,150 @@ mod tests {
|
||||
let req = outgoing.payload;
|
||||
assert_eq!(req.relay_parent, metadata.relay_parent);
|
||||
assert_eq!(req.candidate_hash, metadata.candidate_hash);
|
||||
assert_eq!(outgoing.peer, Recipient::Peer(peer_a));
|
||||
// Just drop request - should trigger error.
|
||||
}
|
||||
);
|
||||
|
||||
// There is a race between request handler asking for more peers and processing of the
|
||||
// coming `PeerMessage`s, we want the request handler to ask first here for better test
|
||||
// coverage:
|
||||
Delay::new(Duration::from_millis(20)).await;
|
||||
|
||||
handle.send(FromOverseer::Communication {
|
||||
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
|
||||
NetworkBridgeEvent::PeerMessage(
|
||||
peer_c.clone(),
|
||||
protocol_v1::StatementDistributionMessage::LargeStatement(metadata.clone()),
|
||||
)
|
||||
)
|
||||
}).await;
|
||||
|
||||
// Malicious peer:
|
||||
handle.send(FromOverseer::Communication {
|
||||
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
|
||||
NetworkBridgeEvent::PeerMessage(
|
||||
peer_bad.clone(),
|
||||
protocol_v1::StatementDistributionMessage::LargeStatement(metadata.clone()),
|
||||
)
|
||||
)
|
||||
}).await;
|
||||
|
||||
// Let c fail once too:
|
||||
assert_matches!(
|
||||
handle.recv().await,
|
||||
AllMessages::NetworkBridge(
|
||||
NetworkBridgeMessage::SendRequests(
|
||||
mut reqs, IfDisconnected::ImmediateError
|
||||
)
|
||||
) => {
|
||||
let reqs = reqs.pop().unwrap();
|
||||
let outgoing = match reqs {
|
||||
Requests::StatementFetching(outgoing) => outgoing,
|
||||
_ => panic!("Unexpected request"),
|
||||
};
|
||||
let req = outgoing.payload;
|
||||
assert_eq!(req.relay_parent, metadata.relay_parent);
|
||||
assert_eq!(req.candidate_hash, metadata.candidate_hash);
|
||||
assert_eq!(outgoing.peer, Recipient::Peer(peer_c));
|
||||
}
|
||||
);
|
||||
|
||||
// a fails again:
|
||||
assert_matches!(
|
||||
handle.recv().await,
|
||||
AllMessages::NetworkBridge(
|
||||
NetworkBridgeMessage::SendRequests(
|
||||
mut reqs, IfDisconnected::ImmediateError
|
||||
)
|
||||
) => {
|
||||
let reqs = reqs.pop().unwrap();
|
||||
let outgoing = match reqs {
|
||||
Requests::StatementFetching(outgoing) => outgoing,
|
||||
_ => panic!("Unexpected request"),
|
||||
};
|
||||
let req = outgoing.payload;
|
||||
assert_eq!(req.relay_parent, metadata.relay_parent);
|
||||
assert_eq!(req.candidate_hash, metadata.candidate_hash);
|
||||
// On retry, we should have reverse order:
|
||||
assert_eq!(outgoing.peer, Recipient::Peer(peer_a));
|
||||
}
|
||||
);
|
||||
|
||||
// Send invalid response (all other peers have been tried now):
|
||||
assert_matches!(
|
||||
handle.recv().await,
|
||||
AllMessages::NetworkBridge(
|
||||
NetworkBridgeMessage::SendRequests(
|
||||
mut reqs, IfDisconnected::ImmediateError
|
||||
)
|
||||
) => {
|
||||
let reqs = reqs.pop().unwrap();
|
||||
let outgoing = match reqs {
|
||||
Requests::StatementFetching(outgoing) => outgoing,
|
||||
_ => panic!("Unexpected request"),
|
||||
};
|
||||
let req = outgoing.payload;
|
||||
assert_eq!(req.relay_parent, metadata.relay_parent);
|
||||
assert_eq!(req.candidate_hash, metadata.candidate_hash);
|
||||
assert_eq!(outgoing.peer, Recipient::Peer(peer_bad));
|
||||
let bad_candidate = {
|
||||
let mut bad = candidate.clone();
|
||||
bad.descriptor.para_id = 0xeadbeaf.into();
|
||||
bad
|
||||
};
|
||||
let response = StatementFetchingResponse::Statement(bad_candidate);
|
||||
outgoing.pending_response.send(Ok(response.encode())).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
// Should get punished and never tried again:
|
||||
assert_matches!(
|
||||
handle.recv().await,
|
||||
AllMessages::NetworkBridge(
|
||||
NetworkBridgeMessage::ReportPeer(p, r)
|
||||
) if p == peer_bad && r == COST_WRONG_HASH => {}
|
||||
);
|
||||
|
||||
// a is tried again (retried in reverse order):
|
||||
assert_matches!(
|
||||
handle.recv().await,
|
||||
AllMessages::NetworkBridge(
|
||||
NetworkBridgeMessage::SendRequests(
|
||||
mut reqs, IfDisconnected::ImmediateError
|
||||
)
|
||||
) => {
|
||||
let reqs = reqs.pop().unwrap();
|
||||
let outgoing = match reqs {
|
||||
Requests::StatementFetching(outgoing) => outgoing,
|
||||
_ => panic!("Unexpected request"),
|
||||
};
|
||||
let req = outgoing.payload;
|
||||
assert_eq!(req.relay_parent, metadata.relay_parent);
|
||||
assert_eq!(req.candidate_hash, metadata.candidate_hash);
|
||||
// On retry, we should have reverse order:
|
||||
assert_eq!(outgoing.peer, Recipient::Peer(peer_a));
|
||||
}
|
||||
);
|
||||
|
||||
// c succeeds now:
|
||||
assert_matches!(
|
||||
handle.recv().await,
|
||||
AllMessages::NetworkBridge(
|
||||
NetworkBridgeMessage::SendRequests(
|
||||
mut reqs, IfDisconnected::ImmediateError
|
||||
)
|
||||
) => {
|
||||
let reqs = reqs.pop().unwrap();
|
||||
let outgoing = match reqs {
|
||||
Requests::StatementFetching(outgoing) => outgoing,
|
||||
_ => panic!("Unexpected request"),
|
||||
};
|
||||
let req = outgoing.payload;
|
||||
assert_eq!(req.relay_parent, metadata.relay_parent);
|
||||
assert_eq!(req.candidate_hash, metadata.candidate_hash);
|
||||
// On retry, we should have reverse order:
|
||||
assert_eq!(outgoing.peer, Recipient::Peer(peer_c));
|
||||
let response = StatementFetchingResponse::Statement(candidate.clone());
|
||||
outgoing.pending_response.send(Ok(response.encode())).unwrap();
|
||||
}
|
||||
@@ -2851,7 +3026,14 @@ mod tests {
|
||||
handle.recv().await,
|
||||
AllMessages::NetworkBridge(
|
||||
NetworkBridgeMessage::ReportPeer(p, r)
|
||||
) if p == peer_a && r == BENEFIT_VALID_RESPONSE => {}
|
||||
) if p == peer_a && r == COST_FETCH_FAIL => {}
|
||||
);
|
||||
|
||||
assert_matches!(
|
||||
handle.recv().await,
|
||||
AllMessages::NetworkBridge(
|
||||
NetworkBridgeMessage::ReportPeer(p, r)
|
||||
) if p == peer_c && r == BENEFIT_VALID_RESPONSE => {}
|
||||
);
|
||||
|
||||
assert_matches!(
|
||||
@@ -2869,17 +3051,18 @@ mod tests {
|
||||
);
|
||||
|
||||
|
||||
// Now messages should go out:
|
||||
assert_matches!(
|
||||
handle.recv().await,
|
||||
AllMessages::NetworkBridge(
|
||||
NetworkBridgeMessage::SendValidationMessage(
|
||||
recipients,
|
||||
mut recipients,
|
||||
protocol_v1::ValidationProtocol::StatementDistribution(
|
||||
protocol_v1::StatementDistributionMessage::LargeStatement(meta)
|
||||
),
|
||||
)
|
||||
) => {
|
||||
assert_eq!(recipients, vec![peer_b.clone()]);
|
||||
assert_eq!(recipients.sort(), vec![peer_b.clone(), peer_c.clone()].sort());
|
||||
assert_eq!(meta.relay_parent, hash_a);
|
||||
assert_eq!(meta.candidate_hash, statement.payload().candidate_hash());
|
||||
assert_eq!(meta.signed_by, statement.validator_index());
|
||||
@@ -2889,7 +3072,7 @@ mod tests {
|
||||
|
||||
// Now that it has the candidate it should answer requests accordingly (even after a
|
||||
// failed request):
|
||||
|
||||
|
||||
// Failing request first:
|
||||
let (pending_response, response_rx) = oneshot::channel();
|
||||
let inner_req = StatementFetchingRequest {
|
||||
@@ -2906,7 +3089,7 @@ mod tests {
|
||||
response_rx.await.unwrap().result,
|
||||
Err(()) => {}
|
||||
);
|
||||
|
||||
|
||||
// Now the working one:
|
||||
let (pending_response, response_rx) = oneshot::channel();
|
||||
let inner_req = StatementFetchingRequest {
|
||||
|
||||
Reference in New Issue
Block a user