collator protocol changes for elastic scaling (validator side) (#3302)

Fixes #3128.

This introduces a new variant for the collation response from the
collator that includes the parent head data. For now, collators won't
send this new variant. We'll need to change the collator side of the
collator protocol to detect all the cores assigned to a para and send
the parent head data in the case when it's more than 1 core.

- [x] validate approach
- [x] check head data hash
This commit is contained in:
ordian
2024-03-15 20:43:28 +01:00
committed by GitHub
parent e8b51f60cc
commit 02e1a7f476
16 changed files with 409 additions and 136 deletions
@@ -27,7 +27,7 @@ use polkadot_node_network_protocol::{
PeerId,
};
use polkadot_node_primitives::PoV;
use polkadot_primitives::{CandidateHash, CandidateReceipt, Hash, Id as ParaId};
use polkadot_primitives::{CandidateHash, CandidateReceipt, Hash, HeadData, Id as ParaId};
/// The status of a collation as seen from the collator.
pub enum CollationStatus {
@@ -63,6 +63,8 @@ pub struct Collation {
pub parent_head_data_hash: Hash,
/// Proof to verify the state transition of the parachain.
pub pov: PoV,
/// Parent head-data needed for elastic scaling.
pub parent_head_data: HeadData,
/// Collation status.
pub status: CollationStatus,
}
@@ -55,7 +55,7 @@ use polkadot_node_subsystem_util::{
};
use polkadot_primitives::{
AuthorityDiscoveryId, CandidateHash, CandidateReceipt, CollatorPair, CoreIndex, CoreState,
GroupIndex, Hash, Id as ParaId, SessionIndex,
GroupIndex, Hash, HeadData, Id as ParaId, SessionIndex,
};
use super::LOG_TARGET;
@@ -347,6 +347,7 @@ async fn distribute_collation<Context>(
receipt: CandidateReceipt,
parent_head_data_hash: Hash,
pov: PoV,
parent_head_data: HeadData,
result_sender: Option<oneshot::Sender<CollationSecondedSignal>>,
) -> Result<()> {
let candidate_relay_parent = receipt.descriptor.relay_parent;
@@ -465,7 +466,13 @@ async fn distribute_collation<Context>(
per_relay_parent.collations.insert(
candidate_hash,
Collation { receipt, parent_head_data_hash, pov, status: CollationStatus::Created },
Collation {
receipt,
parent_head_data_hash,
pov,
parent_head_data,
status: CollationStatus::Created,
},
);
// If prospective parachains are disabled, a leaf should be known to peer.
@@ -763,20 +770,26 @@ async fn process_msg<Context>(
CollateOn(id) => {
state.collating_on = Some(id);
},
DistributeCollation(receipt, parent_head_data_hash, pov, result_sender) => {
DistributeCollation {
candidate_receipt,
parent_head_data_hash,
pov,
parent_head_data,
result_sender,
} => {
let _span1 = state
.span_per_relay_parent
.get(&receipt.descriptor.relay_parent)
.get(&candidate_receipt.descriptor.relay_parent)
.map(|s| s.child("distributing-collation"));
let _span2 = jaeger::Span::new(&pov, "distributing-collation");
match state.collating_on {
Some(id) if receipt.descriptor.para_id != id => {
Some(id) if candidate_receipt.descriptor.para_id != id => {
// If the ParaId of a collation requested to be distributed does not match
// the one we expect, we ignore the message.
gum::warn!(
target: LOG_TARGET,
para_id = %receipt.descriptor.para_id,
para_id = %candidate_receipt.descriptor.para_id,
collating_on = %id,
"DistributeCollation for unexpected para_id",
);
@@ -788,9 +801,10 @@ async fn process_msg<Context>(
runtime,
state,
id,
receipt,
candidate_receipt,
parent_head_data_hash,
pov,
parent_head_data,
result_sender,
)
.await?;
@@ -798,7 +812,7 @@ async fn process_msg<Context>(
None => {
gum::warn!(
target: LOG_TARGET,
para_id = %receipt.descriptor.para_id,
para_id = %candidate_receipt.descriptor.para_id,
"DistributeCollation message while not collating on any",
);
},
@@ -835,6 +849,7 @@ async fn send_collation(
request: VersionedCollationRequest,
receipt: CandidateReceipt,
pov: PoV,
_parent_head_data: HeadData,
) {
let (tx, rx) = oneshot::channel();
@@ -842,13 +857,22 @@ async fn send_collation(
let peer_id = request.peer_id();
let candidate_hash = receipt.hash();
// The response payload is the same for both versions of protocol
// The response payload is the same for v1 and v2 versions of protocol
// and doesn't have v2 alias for simplicity.
let response = OutgoingResponse {
result: Ok(request_v1::CollationFetchingResponse::Collation(receipt, pov)),
reputation_changes: Vec::new(),
sent_feedback: Some(tx),
};
// For now, we don't send parent head data to the collation requester.
let result =
// if assigned_multiple_cores {
// Ok(request_v1::CollationFetchingResponse::CollationWithParentHeadData {
// receipt,
// pov,
// parent_head_data,
// })
// } else {
Ok(request_v1::CollationFetchingResponse::Collation(receipt, pov))
// }
;
let response =
OutgoingResponse { result, reputation_changes: Vec::new(), sent_feedback: Some(tx) };
if let Err(_) = request.send_outgoing_response(response) {
gum::warn!(target: LOG_TARGET, "Sending collation response failed");
@@ -1027,9 +1051,13 @@ async fn handle_incoming_request<Context>(
return Ok(())
},
};
let (receipt, pov) = if let Some(collation) = collation {
let (receipt, pov, parent_head_data) = if let Some(collation) = collation {
collation.status.advance_to_requested();
(collation.receipt.clone(), collation.pov.clone())
(
collation.receipt.clone(),
collation.pov.clone(),
collation.parent_head_data.clone(),
)
} else {
gum::warn!(
target: LOG_TARGET,
@@ -1068,7 +1096,7 @@ async fn handle_incoming_request<Context>(
waiting.collation_fetch_active = true;
// Obtain a timer for sending collation
let _ = state.metrics.time_collation_distribution("send");
send_collation(state, req, receipt, pov).await;
send_collation(state, req, receipt, pov, parent_head_data).await;
}
},
Some(our_para_id) => {
@@ -1453,8 +1481,9 @@ async fn run_inner<Context>(
if let Some(collation) = next_collation {
let receipt = collation.receipt.clone();
let pov = collation.pov.clone();
let parent_head_data = collation.parent_head_data.clone();
send_collation(&mut state, next, receipt, pov).await;
send_collation(&mut state, next, receipt, pov, parent_head_data).await;
}
},
(candidate_hash, peer_id) = state.advertisement_timeouts.select_next_some() => {
@@ -356,12 +356,13 @@ async fn distribute_collation_with_receipt(
) -> DistributeCollation {
overseer_send(
virtual_overseer,
CollatorProtocolMessage::DistributeCollation(
candidate.clone(),
CollatorProtocolMessage::DistributeCollation {
candidate_receipt: candidate.clone(),
parent_head_data_hash,
pov.clone(),
None,
),
pov: pov.clone(),
parent_head_data: HeadData(vec![1, 2, 3]),
result_sender: None,
},
)
.await;
@@ -627,6 +628,18 @@ async fn send_peer_view_change(
.await;
}
fn decode_collation_response(bytes: &[u8]) -> (CandidateReceipt, PoV) {
let response: request_v1::CollationFetchingResponse =
request_v1::CollationFetchingResponse::decode(&mut &bytes[..])
.expect("Decoding should work");
match response {
request_v1::CollationFetchingResponse::Collation(receipt, pov) => (receipt, pov),
request_v1::CollationFetchingResponse::CollationWithParentHeadData {
receipt, pov, ..
} => (receipt, pov),
}
}
#[test]
fn advertise_and_send_collation() {
let mut test_state = TestState::default();
@@ -736,12 +749,10 @@ fn advertise_and_send_collation() {
assert_matches!(
rx.await,
Ok(full_response) => {
let request_v1::CollationFetchingResponse::Collation(receipt, pov): request_v1::CollationFetchingResponse
= request_v1::CollationFetchingResponse::decode(
&mut full_response.result
.expect("We should have a proper answer").as_ref()
)
.expect("Decoding should work");
let (receipt, pov) = decode_collation_response(
full_response.result
.expect("We should have a proper answer").as_ref()
);
assert_eq!(receipt, candidate);
assert_eq!(pov, pov_block);
}
@@ -1338,12 +1349,10 @@ where
let feedback_tx = assert_matches!(
rx.await,
Ok(full_response) => {
let request_v1::CollationFetchingResponse::Collation(receipt, pov): request_v1::CollationFetchingResponse
= request_v1::CollationFetchingResponse::decode(
&mut full_response.result
.expect("We should have a proper answer").as_ref()
)
.expect("Decoding should work");
let (receipt, pov) = decode_collation_response(
full_response.result
.expect("We should have a proper answer").as_ref()
);
assert_eq!(receipt, candidate);
assert_eq!(pov, pov_block);
@@ -1375,12 +1384,10 @@ where
assert_matches!(
rx.await,
Ok(full_response) => {
let request_v1::CollationFetchingResponse::Collation(receipt, pov): request_v1::CollationFetchingResponse
= request_v1::CollationFetchingResponse::decode(
&mut full_response.result
.expect("We should have a proper answer").as_ref()
)
.expect("Decoding should work");
let (receipt, pov) = decode_collation_response(
full_response.result
.expect("We should have a proper answer").as_ref()
);
assert_eq!(receipt, candidate);
assert_eq!(pov, pov_block);
@@ -1469,11 +1476,10 @@ fn connect_to_buffered_groups() {
assert_matches!(
rx.await,
Ok(full_response) => {
let request_v1::CollationFetchingResponse::Collation(..) =
request_v1::CollationFetchingResponse::decode(
&mut full_response.result.expect("We should have a proper answer").as_ref(),
)
.expect("Decoding should work");
let _ = decode_collation_response(
full_response.result
.expect("We should have a proper answer").as_ref()
);
}
);
@@ -271,12 +271,13 @@ fn distribute_collation_from_implicit_view() {
.build();
overseer_send(
virtual_overseer,
CollatorProtocolMessage::DistributeCollation(
candidate.clone(),
CollatorProtocolMessage::DistributeCollation {
candidate_receipt: candidate.clone(),
parent_head_data_hash,
pov.clone(),
None,
),
pov: pov.clone(),
parent_head_data: HeadData(vec![1, 2, 3]),
result_sender: None,
},
)
.await;
@@ -351,12 +352,13 @@ fn distribute_collation_up_to_limit() {
.build();
overseer_send(
virtual_overseer,
CollatorProtocolMessage::DistributeCollation(
candidate.clone(),
CollatorProtocolMessage::DistributeCollation {
candidate_receipt: candidate.clone(),
parent_head_data_hash,
pov.clone(),
None,
),
pov: pov.clone(),
parent_head_data: HeadData(vec![1, 2, 3]),
result_sender: None,
},
)
.await;
@@ -469,12 +471,10 @@ fn advertise_and_send_collation_by_hash() {
rx.await,
Ok(full_response) => {
// Response is the same for v2.
let request_v1::CollationFetchingResponse::Collation(receipt, pov): request_v1::CollationFetchingResponse
= request_v1::CollationFetchingResponse::decode(
&mut full_response.result
.expect("We should have a proper answer").as_ref()
)
.expect("Decoding should work");
let (receipt, pov) = decode_collation_response(
full_response.result
.expect("We should have a proper answer").as_ref()
);
assert_eq!(receipt, candidate);
assert_eq!(pov, pov_block);
}
@@ -89,13 +89,21 @@ pub enum SecondingError {
#[error("Received duplicate collation from the peer")]
Duplicate,
#[error("The provided parent head data does not match the hash")]
ParentHeadDataMismatch,
}
impl SecondingError {
/// Returns true if an error indicates that a peer is malicious.
pub fn is_malicious(&self) -> bool {
use SecondingError::*;
matches!(self, PersistedValidationDataMismatch | CandidateHashMismatch | Duplicate)
matches!(
self,
PersistedValidationDataMismatch |
CandidateHashMismatch |
Duplicate | ParentHeadDataMismatch
)
}
}
@@ -41,7 +41,8 @@ use polkadot_node_subsystem_util::{
metrics::prometheus::prometheus::HistogramTimer, runtime::ProspectiveParachainsMode,
};
use polkadot_primitives::{
CandidateHash, CandidateReceipt, CollatorId, Hash, Id as ParaId, PersistedValidationData,
CandidateHash, CandidateReceipt, CollatorId, Hash, HeadData, Id as ParaId,
PersistedValidationData,
};
use tokio_util::sync::CancellationToken;
@@ -120,7 +121,7 @@ impl PendingCollation {
}
}
/// v2 advertisement that was rejected by the backing
/// v2 or v3 advertisement that was rejected by the backing
/// subsystem. Validator may fetch it later if its fragment
/// membership gets recognized before relay parent goes out of view.
#[derive(Debug, Clone)]
@@ -143,6 +144,7 @@ pub fn fetched_collation_sanity_check(
advertised: &PendingCollation,
fetched: &CandidateReceipt,
persisted_validation_data: &PersistedValidationData,
maybe_parent_head_and_hash: Option<(HeadData, Hash)>,
) -> Result<(), SecondingError> {
if persisted_validation_data.hash() != fetched.descriptor().persisted_validation_data_hash {
Err(SecondingError::PersistedValidationDataMismatch)
@@ -151,6 +153,8 @@ pub fn fetched_collation_sanity_check(
.map_or(false, |pc| pc.candidate_hash() != fetched.hash())
{
Err(SecondingError::CandidateHashMismatch)
} else if maybe_parent_head_and_hash.map_or(false, |(head, hash)| head.hash() != hash) {
Err(SecondingError::ParentHeadDataMismatch)
} else {
Ok(())
}
@@ -176,6 +180,9 @@ pub struct PendingCollationFetch {
pub candidate_receipt: CandidateReceipt,
/// Proof of validity.
pub pov: PoV,
/// Optional parachain parent head data.
/// Only needed for elastic scaling.
pub maybe_parent_head_data: Option<HeadData>,
}
/// The status of the collations in [`CollationsPerRelayParent`].
@@ -359,7 +366,7 @@ impl Future for CollationFetchRequest {
});
match &res {
Poll::Ready((_, Ok(request_v1::CollationFetchingResponse::Collation(..)))) => {
Poll::Ready((_, Ok(_))) => {
self.span.as_mut().map(|s| s.add_string_tag("success", "true"));
},
Poll::Ready((_, Err(_))) => {
@@ -44,7 +44,7 @@ use polkadot_node_subsystem::{
jaeger,
messages::{
CanSecondRequest, CandidateBackingMessage, CollatorProtocolMessage, IfDisconnected,
NetworkBridgeEvent, NetworkBridgeTxMessage, ProspectiveParachainsMessage,
NetworkBridgeEvent, NetworkBridgeTxMessage, ParentHeadData, ProspectiveParachainsMessage,
ProspectiveValidationDataRequest,
},
overseer, CollatorProtocolSenderTrait, FromOrchestra, OverseerSignal, PerLeafSpan,
@@ -55,7 +55,7 @@ use polkadot_node_subsystem_util::{
runtime::{prospective_parachains_mode, ProspectiveParachainsMode},
};
use polkadot_primitives::{
CandidateHash, CollatorId, CoreState, Hash, Id as ParaId, OccupiedCoreAssumption,
CandidateHash, CollatorId, CoreState, Hash, HeadData, Id as ParaId, OccupiedCoreAssumption,
PersistedValidationData,
};
@@ -723,7 +723,7 @@ async fn request_collation(
pending_collation,
collator_id: collator_id.clone(),
collator_protocol_version: peer_protocol_version,
from_collator: response_recv.boxed(),
from_collator: response_recv,
cancellation_token: cancellation_token.clone(),
span: state
.span_per_relay_parent
@@ -889,16 +889,16 @@ async fn process_incoming_peer_message<Context>(
modify_reputation(&mut state.reputation, ctx.sender(), origin, rep).await;
}
},
Versioned::V2(V2::AdvertiseCollation {
relay_parent,
candidate_hash,
parent_head_data_hash,
}) |
Versioned::V3(V2::AdvertiseCollation {
relay_parent,
candidate_hash,
parent_head_data_hash,
}) =>
}) |
Versioned::V2(V2::AdvertiseCollation {
relay_parent,
candidate_hash,
parent_head_data_hash,
}) => {
if let Err(err) = handle_advertisement(
ctx.sender(),
state,
@@ -920,7 +920,8 @@ async fn process_incoming_peer_message<Context>(
if let Some(rep) = err.reputation_changes() {
modify_reputation(&mut state.reputation, ctx.sender(), origin, rep).await;
}
},
}
},
Versioned::V1(V1::CollationSeconded(..)) |
Versioned::V2(V2::CollationSeconded(..)) |
Versioned::V3(V2::CollationSeconded(..)) => {
@@ -1477,7 +1478,7 @@ async fn process_msg<Context>(
"CollateOn message is not expected on the validator side of the protocol",
);
},
DistributeCollation(..) => {
DistributeCollation { .. } => {
gum::warn!(
target: LOG_TARGET,
"DistributeCollation message is not expected on the validator side of the protocol",
@@ -1776,14 +1777,21 @@ async fn request_prospective_validation_data<Sender>(
candidate_relay_parent: Hash,
parent_head_data_hash: Hash,
para_id: ParaId,
maybe_parent_head_data: Option<HeadData>,
) -> std::result::Result<Option<PersistedValidationData>, SecondingError>
where
Sender: CollatorProtocolSenderTrait,
{
let (tx, rx) = oneshot::channel();
let parent_head_data = if let Some(head_data) = maybe_parent_head_data {
ParentHeadData::WithData { head_data, hash: parent_head_data_hash }
} else {
ParentHeadData::OnlyHash(parent_head_data_hash)
};
let request =
ProspectiveValidationDataRequest { para_id, candidate_relay_parent, parent_head_data_hash };
ProspectiveValidationDataRequest { para_id, candidate_relay_parent, parent_head_data };
sender
.send_message(ProspectiveParachainsMessage::GetProspectiveValidationData(request, tx))
@@ -1797,7 +1805,7 @@ where
async fn kick_off_seconding<Context>(
ctx: &mut Context,
state: &mut State,
PendingCollationFetch { mut collation_event, candidate_receipt, pov }: PendingCollationFetch,
PendingCollationFetch { mut collation_event, candidate_receipt, pov, maybe_parent_head_data }: PendingCollationFetch,
) -> std::result::Result<(), SecondingError> {
let pending_collation = collation_event.pending_collation;
let relay_parent = pending_collation.relay_parent;
@@ -1821,38 +1829,46 @@ async fn kick_off_seconding<Context>(
collation_event.pending_collation.commitments_hash =
Some(candidate_receipt.commitments_hash);
let pvd = match (
let (maybe_pvd, maybe_parent_head_and_hash) = match (
collation_event.collator_protocol_version,
collation_event.pending_collation.prospective_candidate,
) {
(CollationVersion::V2, Some(ProspectiveCandidate { parent_head_data_hash, .. }))
if per_relay_parent.prospective_parachains_mode.is_enabled() =>
request_prospective_validation_data(
{
let pvd = request_prospective_validation_data(
ctx.sender(),
relay_parent,
parent_head_data_hash,
pending_collation.para_id,
maybe_parent_head_data.clone(),
)
.await?,
.await?;
(pvd, maybe_parent_head_data.map(|head_data| (head_data, parent_head_data_hash)))
},
// Support V2 collators without async backing enabled.
(CollationVersion::V2, Some(_)) | (CollationVersion::V1, _) =>
request_persisted_validation_data(
(CollationVersion::V2, Some(_)) | (CollationVersion::V1, _) => {
let pvd = request_persisted_validation_data(
ctx.sender(),
candidate_receipt.descriptor().relay_parent,
candidate_receipt.descriptor().para_id,
)
.await?,
.await?;
(pvd, None)
},
_ => {
// `handle_advertisement` checks for protocol mismatch.
return Ok(())
},
}
.ok_or(SecondingError::PersistedValidationDataNotFound)?;
};
let pvd = maybe_pvd.ok_or(SecondingError::PersistedValidationDataNotFound)?;
fetched_collation_sanity_check(
&collation_event.pending_collation,
&candidate_receipt,
&pvd,
maybe_parent_head_and_hash,
)?;
ctx.send_message(CandidateBackingMessage::Second(
@@ -1978,9 +1994,10 @@ async fn handle_collation_fetch_response(
);
Err(None)
},
Ok(request_v1::CollationFetchingResponse::Collation(receipt, _))
if receipt.descriptor().para_id != pending_collation.para_id =>
{
Ok(
request_v1::CollationFetchingResponse::Collation(receipt, _) |
request_v1::CollationFetchingResponse::CollationWithParentHeadData { receipt, .. },
) if receipt.descriptor().para_id != pending_collation.para_id => {
gum::debug!(
target: LOG_TARGET,
expected_para_id = ?pending_collation.para_id,
@@ -2010,6 +2027,33 @@ async fn handle_collation_fetch_response(
},
candidate_receipt,
pov,
maybe_parent_head_data: None,
})
},
Ok(request_v2::CollationFetchingResponse::CollationWithParentHeadData {
receipt,
pov,
parent_head_data,
}) => {
gum::debug!(
target: LOG_TARGET,
para_id = %pending_collation.para_id,
hash = ?pending_collation.relay_parent,
candidate_hash = ?receipt.hash(),
"Received collation (v3)",
);
let _span = jaeger::Span::new(&pov, "received-collation");
metrics_result = Ok(());
Ok(PendingCollationFetch {
collation_event: CollationEvent {
collator_id,
pending_collation,
collator_protocol_version,
},
candidate_receipt: receipt,
pov,
maybe_parent_head_data: Some(parent_head_data),
})
},
};
@@ -754,6 +754,126 @@ fn fetched_collation_sanity_check() {
});
}
#[test]
fn sanity_check_invalid_parent_head_data() {
let test_state = TestState::default();
test_harness(ReputationAggregator::new(|_| true), |test_harness| async move {
let TestHarness { mut virtual_overseer, .. } = test_harness;
let pair = CollatorPair::generate().0;
let head_c = Hash::from_low_u64_be(130);
let head_c_num = 3;
update_view(&mut virtual_overseer, &test_state, vec![(head_c, head_c_num)], 1).await;
let peer_a = PeerId::random();
connect_and_declare_collator(
&mut virtual_overseer,
peer_a,
pair.clone(),
test_state.chain_ids[0],
CollationVersion::V2,
)
.await;
let mut candidate = dummy_candidate_receipt_bad_sig(head_c, Some(Default::default()));
candidate.descriptor.para_id = test_state.chain_ids[0];
let commitments = CandidateCommitments {
head_data: HeadData(vec![1, 2, 3]),
horizontal_messages: Default::default(),
upward_messages: Default::default(),
new_validation_code: None,
processed_downward_messages: 0,
hrmp_watermark: 0,
};
candidate.commitments_hash = commitments.hash();
let parent_head_data = HeadData(vec![4, 2, 0]);
let parent_head_data_hash = parent_head_data.hash();
let wrong_parent_head_data = HeadData(vec![4, 2]);
let mut pvd = dummy_pvd();
pvd.parent_head = parent_head_data;
candidate.descriptor.persisted_validation_data_hash = pvd.hash();
let candidate_hash = candidate.hash();
advertise_collation(
&mut virtual_overseer,
peer_a,
head_c,
Some((candidate_hash, parent_head_data_hash)),
)
.await;
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::CandidateBacking(
CandidateBackingMessage::CanSecond(request, tx),
) => {
assert_eq!(request.candidate_hash, candidate_hash);
assert_eq!(request.candidate_para_id, test_state.chain_ids[0]);
assert_eq!(request.parent_head_data_hash, parent_head_data_hash);
tx.send(true).expect("receiving side should be alive");
}
);
let response_channel = assert_fetch_collation_request(
&mut virtual_overseer,
head_c,
test_state.chain_ids[0],
Some(candidate_hash),
)
.await;
let pov = PoV { block_data: BlockData(vec![1]) };
response_channel
.send(Ok((
request_v2::CollationFetchingResponse::CollationWithParentHeadData {
receipt: candidate.clone(),
pov: pov.clone(),
parent_head_data: wrong_parent_head_data,
}
.encode(),
ProtocolName::from(""),
)))
.expect("Sending response should succeed");
// PVD request.
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ProspectiveParachains(
ProspectiveParachainsMessage::GetProspectiveValidationData(request, tx),
) => {
assert_eq!(head_c, request.candidate_relay_parent);
assert_eq!(test_state.chain_ids[0], request.para_id);
tx.send(Some(pvd)).unwrap();
}
);
// Reported malicious.
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(peer_id, rep)),
) => {
assert_eq!(peer_a, peer_id);
assert_eq!(rep.value, COST_REPORT_BAD.cost_or_benefit());
}
);
test_helpers::Yield::new().await;
assert_matches!(virtual_overseer.recv().now_or_never(), None);
virtual_overseer
});
}
#[test]
fn advertisement_spam_protection() {
let test_state = TestState::default();