Files
pezkuwi-subxt/polkadot/node/network/statement-distribution/src/tests.rs
T
Bernhard Schuster 0f1a9fb1eb remove Default from CandidateDescriptor (#4484)
* remove Default from CandidateHash

* Apply suggestions from code review

Co-authored-by: Andronik Ordian <write@reusable.software>

* chore: fmt

* remove backed candidate default

* Partial migration away from CandidateReceipt::default

* Remove more CandidateReceipt defaults

* fmt

* Mostly remove CommittedCandidateReceipt default usage

* Remove CommittedCandidateReceipt

* Remove more Defaults from polakdot primitives v1 + fmt

* Remove more Default from polkadot primites v1

* WIP trying to get overseer example + tests to compile

* feat: add primitives test helpers

* reduce deps of helper

* update primitive helpers

* make candidate validation compile

* fixup cargo lock

* make av-store compile

* fixup disputes coordinator tests

* test: fixup backing

* test: fixup approval voting

* fixup bitfield signing

* test: fixup runtime-api

* test: fixup availability dist

* foxi[ pverseer test]

* remove some Defaults, remove bounds from `dummy`

All `fn dummy` in primitives need to be removed anyways.
This aids in the transition.

* it's a test helper, so always use std

* test: fixup parachains runtime tests

Excluding benches.

* fix keyring

* fix paras runtime properly, no more default

* Remove fn dummy() usage from approval voting

* Move TestCandidateBuilder out of av store to test helpers

* Make candidate validation tests pass

* Make most dispute coirdinator tests pass

* Make provisioner tests work

* Make availability recovery tests work with test helpers

* Update polkadot-collator-protocol tests

* Update statement distribution tests

* Update polkadot overseer examples and tests

* Derive default for validation code so we don't break unrelated things

* Make para runtime test pass (no bench)

* Some more work

* chore: cargo fmt

* cargo fix

* avoid some Default::default

* fixup dispute coordinator test

* remove unused crate deps

* remove Default::default wherever possible, replace by dummy_* for the most part

* chore: cargo fmt

* Remove some warnings

* Remove CommittedCandidateReceipt dummy

* Remove CandidateReceipt dummy

* Remove CandidateDescriptor dummy

* Remove commented out code

* Fix para runtime tests

* chore: nightly

* Some updates to the builder

* Dynamically adjust mock head data size

* Make dispute cooridinator tests work

* Fix test candidate_backing_reorders_votes work

* +nightly-2021-10-29 fmt

* Spelling and remove a default use in builder

* Various clean up

* More small updates

* fmt

* More small updates

* Doc comments for test helpers

* cargo run --quiet --release --features=runtime-benchmarks -- benchmark --chain=kusama-dev --steps=50 --repeat=20 --pallet=runtime_parachains::paras_inherent --extrinsic=* --execution=wasm --wasm-execution=compiled --heap-pages=4096 --header=./file_header.txt --output=./runtime/kusama/src/weights/runtime_parachains_paras_inherent.rs

* cargo run --quiet --release --features=runtime-benchmarks -- benchmark --chain=polkadot-dev --steps=50 --repeat=20 --pallet=runtime_parachains::paras_inherent --extrinsic=* --execution=wasm --wasm-execution=compiled --heap-pages=4096 --header=./file_header.txt --output=./runtime/polkadot/src/weights/runtime_parachains_paras_inherent.rs

* Update lib.rs

* review comments

* fix warnings

* fix test by using correct candidate receipt relay parent

Co-authored-by: Andronik Ordian <write@reusable.software>
Co-authored-by: emostov <32168567+emostov@users.noreply.github.com>
Co-authored-by: Parity Bot <admin@parity.io>
Co-authored-by: Gavin Wood <gavin@parity.io>
2021-12-10 12:12:07 +00:00

1856 lines
52 KiB
Rust

// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use super::*;
use assert_matches::assert_matches;
use futures::executor::{self, block_on};
use futures_timer::Delay;
use parity_scale_codec::{Decode, Encode};
use polkadot_node_network_protocol::{
request_response::{
v1::{StatementFetchingRequest, StatementFetchingResponse},
IncomingRequest, Recipient, Requests,
},
view, ObservedRole,
};
use polkadot_node_primitives::Statement;
use polkadot_node_subsystem_test_helpers::mock::make_ferdie_keystore;
use polkadot_primitives::v1::{SessionInfo, ValidationCode};
use polkadot_primitives_test_helpers::{dummy_committed_candidate_receipt, dummy_hash};
use polkadot_subsystem::{
jaeger,
messages::{RuntimeApiMessage, RuntimeApiRequest},
ActivatedLeaf, LeafStatus,
};
use sc_keystore::LocalKeystore;
use sp_application_crypto::{sr25519::Pair, AppKey, Pair as TraitPair};
use sp_keyring::Sr25519Keyring;
use sp_keystore::{CryptoStore, SyncCryptoStore, SyncCryptoStorePtr};
use std::{iter::FromIterator as _, sync::Arc, time::Duration};
#[test]
fn active_head_accepts_only_2_seconded_per_validator() {
let validators = vec![
Sr25519Keyring::Alice.public().into(),
Sr25519Keyring::Bob.public().into(),
Sr25519Keyring::Charlie.public().into(),
];
let parent_hash: Hash = [1; 32].into();
let session_index = 1;
let signing_context = SigningContext { parent_hash, session_index };
let candidate_a = {
let mut c = dummy_committed_candidate_receipt(dummy_hash());
c.descriptor.relay_parent = parent_hash;
c.descriptor.para_id = 1.into();
c
};
let candidate_b = {
let mut c = dummy_committed_candidate_receipt(dummy_hash());
c.descriptor.relay_parent = parent_hash;
c.descriptor.para_id = 2.into();
c
};
let candidate_c = {
let mut c = dummy_committed_candidate_receipt(dummy_hash());
c.descriptor.relay_parent = parent_hash;
c.descriptor.para_id = 3.into();
c
};
let mut head_data = ActiveHeadData::new(
validators,
session_index,
PerLeafSpan::new(Arc::new(jaeger::Span::Disabled), "test"),
);
let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
let alice_public = SyncCryptoStore::sr25519_generate_new(
&*keystore,
ValidatorId::ID,
Some(&Sr25519Keyring::Alice.to_seed()),
)
.unwrap();
let bob_public = SyncCryptoStore::sr25519_generate_new(
&*keystore,
ValidatorId::ID,
Some(&Sr25519Keyring::Bob.to_seed()),
)
.unwrap();
// note A
let a_seconded_val_0 = block_on(SignedFullStatement::sign(
&keystore,
Statement::Seconded(candidate_a.clone()),
&signing_context,
ValidatorIndex(0),
&alice_public.into(),
))
.ok()
.flatten()
.expect("should be signed");
assert!(head_data.check_useful_or_unknown(&a_seconded_val_0.clone().into()).is_ok());
let noted = head_data.note_statement(a_seconded_val_0.clone());
assert_matches!(noted, NotedStatement::Fresh(_));
// note A (duplicate)
assert_eq!(
head_data.check_useful_or_unknown(&a_seconded_val_0.clone().into()),
Err(DeniedStatement::UsefulButKnown),
);
let noted = head_data.note_statement(a_seconded_val_0);
assert_matches!(noted, NotedStatement::UsefulButKnown);
// note B
let statement = block_on(SignedFullStatement::sign(
&keystore,
Statement::Seconded(candidate_b.clone()),
&signing_context,
ValidatorIndex(0),
&alice_public.into(),
))
.ok()
.flatten()
.expect("should be signed");
assert!(head_data.check_useful_or_unknown(&statement.clone().into()).is_ok());
let noted = head_data.note_statement(statement);
assert_matches!(noted, NotedStatement::Fresh(_));
// note C (beyond 2 - ignored)
let statement = block_on(SignedFullStatement::sign(
&keystore,
Statement::Seconded(candidate_c.clone()),
&signing_context,
ValidatorIndex(0),
&alice_public.into(),
))
.ok()
.flatten()
.expect("should be signed");
assert_eq!(
head_data.check_useful_or_unknown(&statement.clone().into()),
Err(DeniedStatement::NotUseful),
);
let noted = head_data.note_statement(statement);
assert_matches!(noted, NotedStatement::NotUseful);
// note B (new validator)
let statement = block_on(SignedFullStatement::sign(
&keystore,
Statement::Seconded(candidate_b.clone()),
&signing_context,
ValidatorIndex(1),
&bob_public.into(),
))
.ok()
.flatten()
.expect("should be signed");
assert!(head_data.check_useful_or_unknown(&statement.clone().into()).is_ok());
let noted = head_data.note_statement(statement);
assert_matches!(noted, NotedStatement::Fresh(_));
// note C (new validator)
let statement = block_on(SignedFullStatement::sign(
&keystore,
Statement::Seconded(candidate_c.clone()),
&signing_context,
ValidatorIndex(1),
&bob_public.into(),
))
.ok()
.flatten()
.expect("should be signed");
assert!(head_data.check_useful_or_unknown(&statement.clone().into()).is_ok());
let noted = head_data.note_statement(statement);
assert_matches!(noted, NotedStatement::Fresh(_));
}
#[test]
fn note_local_works() {
let hash_a = CandidateHash([1; 32].into());
let hash_b = CandidateHash([2; 32].into());
let mut per_peer_tracker = VcPerPeerTracker::default();
per_peer_tracker.note_local(hash_a.clone());
per_peer_tracker.note_local(hash_b.clone());
assert!(per_peer_tracker.local_observed.contains(&hash_a));
assert!(per_peer_tracker.local_observed.contains(&hash_b));
assert!(!per_peer_tracker.remote_observed.contains(&hash_a));
assert!(!per_peer_tracker.remote_observed.contains(&hash_b));
}
#[test]
fn note_remote_works() {
let hash_a = CandidateHash([1; 32].into());
let hash_b = CandidateHash([2; 32].into());
let hash_c = CandidateHash([3; 32].into());
let mut per_peer_tracker = VcPerPeerTracker::default();
assert!(per_peer_tracker.note_remote(hash_a.clone()));
assert!(per_peer_tracker.note_remote(hash_b.clone()));
assert!(!per_peer_tracker.note_remote(hash_c.clone()));
assert!(per_peer_tracker.remote_observed.contains(&hash_a));
assert!(per_peer_tracker.remote_observed.contains(&hash_b));
assert!(!per_peer_tracker.remote_observed.contains(&hash_c));
assert!(!per_peer_tracker.local_observed.contains(&hash_a));
assert!(!per_peer_tracker.local_observed.contains(&hash_b));
assert!(!per_peer_tracker.local_observed.contains(&hash_c));
}
#[test]
fn per_peer_relay_parent_knowledge_send() {
let mut knowledge = PeerRelayParentKnowledge::default();
let hash_a = CandidateHash([1; 32].into());
// Sending an un-pinned statement should not work and should have no effect.
assert!(!knowledge.can_send(&(CompactStatement::Valid(hash_a), ValidatorIndex(0))));
assert!(!knowledge.is_known_candidate(&hash_a));
assert!(knowledge.sent_statements.is_empty());
assert!(knowledge.received_statements.is_empty());
assert!(knowledge.seconded_counts.is_empty());
assert!(knowledge.received_message_count.is_empty());
// Make the peer aware of the candidate.
assert_eq!(knowledge.send(&(CompactStatement::Seconded(hash_a), ValidatorIndex(0))), true);
assert_eq!(knowledge.send(&(CompactStatement::Seconded(hash_a), ValidatorIndex(1))), false);
assert!(knowledge.is_known_candidate(&hash_a));
assert_eq!(knowledge.sent_statements.len(), 2);
assert!(knowledge.received_statements.is_empty());
assert_eq!(knowledge.seconded_counts.len(), 2);
assert!(knowledge.received_message_count.get(&hash_a).is_none());
// And now it should accept the dependent message.
assert_eq!(knowledge.send(&(CompactStatement::Valid(hash_a), ValidatorIndex(0))), false);
assert!(knowledge.is_known_candidate(&hash_a));
assert_eq!(knowledge.sent_statements.len(), 3);
assert!(knowledge.received_statements.is_empty());
assert_eq!(knowledge.seconded_counts.len(), 2);
assert!(knowledge.received_message_count.get(&hash_a).is_none());
}
#[test]
fn cant_send_after_receiving() {
let mut knowledge = PeerRelayParentKnowledge::default();
let hash_a = CandidateHash([1; 32].into());
assert!(knowledge
.check_can_receive(&(CompactStatement::Seconded(hash_a), ValidatorIndex(0)), 3)
.is_ok());
assert!(knowledge
.receive(&(CompactStatement::Seconded(hash_a), ValidatorIndex(0)), 3)
.unwrap());
assert!(!knowledge.can_send(&(CompactStatement::Seconded(hash_a), ValidatorIndex(0))));
}
#[test]
fn per_peer_relay_parent_knowledge_receive() {
let mut knowledge = PeerRelayParentKnowledge::default();
let hash_a = CandidateHash([1; 32].into());
assert_eq!(
knowledge.check_can_receive(&(CompactStatement::Valid(hash_a), ValidatorIndex(0)), 3),
Err(COST_UNEXPECTED_STATEMENT),
);
assert_eq!(
knowledge.receive(&(CompactStatement::Valid(hash_a), ValidatorIndex(0)), 3),
Err(COST_UNEXPECTED_STATEMENT),
);
assert!(knowledge
.check_can_receive(&(CompactStatement::Seconded(hash_a), ValidatorIndex(0)), 3)
.is_ok());
assert_eq!(
knowledge.receive(&(CompactStatement::Seconded(hash_a), ValidatorIndex(0)), 3),
Ok(true),
);
// Push statements up to the flood limit.
assert!(knowledge
.check_can_receive(&(CompactStatement::Valid(hash_a), ValidatorIndex(1)), 3)
.is_ok());
assert_eq!(
knowledge.receive(&(CompactStatement::Valid(hash_a), ValidatorIndex(1)), 3),
Ok(false),
);
assert!(knowledge.is_known_candidate(&hash_a));
assert_eq!(*knowledge.received_message_count.get(&hash_a).unwrap(), 2);
assert!(knowledge
.check_can_receive(&(CompactStatement::Valid(hash_a), ValidatorIndex(2)), 3)
.is_ok());
assert_eq!(
knowledge.receive(&(CompactStatement::Valid(hash_a), ValidatorIndex(2)), 3),
Ok(false),
);
assert_eq!(*knowledge.received_message_count.get(&hash_a).unwrap(), 3);
assert_eq!(
knowledge.check_can_receive(&(CompactStatement::Valid(hash_a), ValidatorIndex(7)), 3),
Err(COST_APPARENT_FLOOD),
);
assert_eq!(
knowledge.receive(&(CompactStatement::Valid(hash_a), ValidatorIndex(7)), 3),
Err(COST_APPARENT_FLOOD),
);
assert_eq!(*knowledge.received_message_count.get(&hash_a).unwrap(), 3);
assert_eq!(knowledge.received_statements.len(), 3); // number of prior `Ok`s.
// Now make sure that the seconding limit is respected.
let hash_b = CandidateHash([2; 32].into());
let hash_c = CandidateHash([3; 32].into());
assert!(knowledge
.check_can_receive(&(CompactStatement::Seconded(hash_b), ValidatorIndex(0)), 3)
.is_ok());
assert_eq!(
knowledge.receive(&(CompactStatement::Seconded(hash_b), ValidatorIndex(0)), 3),
Ok(true),
);
assert_eq!(
knowledge.check_can_receive(&(CompactStatement::Seconded(hash_c), ValidatorIndex(0)), 3),
Err(COST_UNEXPECTED_STATEMENT),
);
assert_eq!(
knowledge.receive(&(CompactStatement::Seconded(hash_c), ValidatorIndex(0)), 3),
Err(COST_UNEXPECTED_STATEMENT),
);
// Last, make sure that already-known statements are disregarded.
assert_eq!(
knowledge.check_can_receive(&(CompactStatement::Valid(hash_a), ValidatorIndex(2)), 3),
Err(COST_DUPLICATE_STATEMENT),
);
assert_eq!(
knowledge.receive(&(CompactStatement::Valid(hash_a), ValidatorIndex(2)), 3),
Err(COST_DUPLICATE_STATEMENT),
);
assert_eq!(
knowledge.check_can_receive(&(CompactStatement::Seconded(hash_b), ValidatorIndex(0)), 3),
Err(COST_DUPLICATE_STATEMENT),
);
assert_eq!(
knowledge.receive(&(CompactStatement::Seconded(hash_b), ValidatorIndex(0)), 3),
Err(COST_DUPLICATE_STATEMENT),
);
}
#[test]
fn peer_view_update_sends_messages() {
let hash_a = Hash::repeat_byte(1);
let hash_b = Hash::repeat_byte(2);
let hash_c = Hash::repeat_byte(3);
let candidate = {
let mut c = dummy_committed_candidate_receipt(dummy_hash());
c.descriptor.relay_parent = hash_c;
c.descriptor.para_id = 1.into();
c
};
let candidate_hash = candidate.hash();
let old_view = view![hash_a, hash_b];
let new_view = view![hash_b, hash_c];
let mut active_heads = HashMap::new();
let validators = vec![
Sr25519Keyring::Alice.public().into(),
Sr25519Keyring::Bob.public().into(),
Sr25519Keyring::Charlie.public().into(),
];
let session_index = 1;
let signing_context = SigningContext { parent_hash: hash_c, session_index };
let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
let alice_public = SyncCryptoStore::sr25519_generate_new(
&*keystore,
ValidatorId::ID,
Some(&Sr25519Keyring::Alice.to_seed()),
)
.unwrap();
let bob_public = SyncCryptoStore::sr25519_generate_new(
&*keystore,
ValidatorId::ID,
Some(&Sr25519Keyring::Bob.to_seed()),
)
.unwrap();
let charlie_public = SyncCryptoStore::sr25519_generate_new(
&*keystore,
ValidatorId::ID,
Some(&Sr25519Keyring::Charlie.to_seed()),
)
.unwrap();
let new_head_data = {
let mut data = ActiveHeadData::new(
validators,
session_index,
PerLeafSpan::new(Arc::new(jaeger::Span::Disabled), "test"),
);
let statement = block_on(SignedFullStatement::sign(
&keystore,
Statement::Seconded(candidate.clone()),
&signing_context,
ValidatorIndex(0),
&alice_public.into(),
))
.ok()
.flatten()
.expect("should be signed");
assert!(data.check_useful_or_unknown(&statement.clone().into()).is_ok());
let noted = data.note_statement(statement);
assert_matches!(noted, NotedStatement::Fresh(_));
let statement = block_on(SignedFullStatement::sign(
&keystore,
Statement::Valid(candidate_hash),
&signing_context,
ValidatorIndex(1),
&bob_public.into(),
))
.ok()
.flatten()
.expect("should be signed");
assert!(data.check_useful_or_unknown(&statement.clone().into()).is_ok());
let noted = data.note_statement(statement);
assert_matches!(noted, NotedStatement::Fresh(_));
let statement = block_on(SignedFullStatement::sign(
&keystore,
Statement::Valid(candidate_hash),
&signing_context,
ValidatorIndex(2),
&charlie_public.into(),
))
.ok()
.flatten()
.expect("should be signed");
assert!(data.check_useful_or_unknown(&statement.clone().into()).is_ok());
let noted = data.note_statement(statement);
assert_matches!(noted, NotedStatement::Fresh(_));
data
};
active_heads.insert(hash_c, new_head_data);
let mut peer_data = PeerData {
view: old_view,
view_knowledge: {
let mut k = HashMap::new();
k.insert(hash_a, Default::default());
k.insert(hash_b, Default::default());
k
},
maybe_authority: None,
};
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context::<
StatementDistributionMessage,
_,
>(pool);
let peer = PeerId::random();
executor::block_on(async move {
let gossip_peers = HashSet::from_iter(vec![peer.clone()].into_iter());
update_peer_view_and_maybe_send_unlocked(
peer.clone(),
&gossip_peers,
&mut peer_data,
&mut ctx,
&active_heads,
new_view.clone(),
&Default::default(),
)
.await;
assert_eq!(peer_data.view, new_view);
assert!(!peer_data.view_knowledge.contains_key(&hash_a));
assert!(peer_data.view_knowledge.contains_key(&hash_b));
let c_knowledge = peer_data.view_knowledge.get(&hash_c).unwrap();
assert!(c_knowledge.is_known_candidate(&candidate_hash));
assert!(c_knowledge
.sent_statements
.contains(&(CompactStatement::Seconded(candidate_hash), ValidatorIndex(0))));
assert!(c_knowledge
.sent_statements
.contains(&(CompactStatement::Valid(candidate_hash), ValidatorIndex(1))));
assert!(c_knowledge
.sent_statements
.contains(&(CompactStatement::Valid(candidate_hash), ValidatorIndex(2))));
// now see if we got the 3 messages from the active head data.
let active_head = active_heads.get(&hash_c).unwrap();
// semi-fragile because hashmap iterator ordering is undefined, but in practice
// it will not change between runs of the program.
for statement in active_head.statements_about(candidate_hash) {
let message = handle.recv().await;
let expected_to = vec![peer.clone()];
let expected_payload = statement_message(hash_c, statement.statement.clone());
assert_matches!(
message,
AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
to,
payload,
)) => {
assert_eq!(to, expected_to);
assert_eq!(payload, expected_payload)
}
)
}
});
}
#[test]
fn circulated_statement_goes_to_all_peers_with_view() {
let hash_a = Hash::repeat_byte(1);
let hash_b = Hash::repeat_byte(2);
let hash_c = Hash::repeat_byte(3);
let candidate = {
let mut c = dummy_committed_candidate_receipt(dummy_hash());
c.descriptor.relay_parent = hash_b;
c.descriptor.para_id = 1.into();
c
};
let peer_a = PeerId::random();
let peer_b = PeerId::random();
let peer_c = PeerId::random();
let peer_a_view = view![hash_a];
let peer_b_view = view![hash_a, hash_b];
let peer_c_view = view![hash_b, hash_c];
let session_index = 1;
let peer_data_from_view = |view: View| PeerData {
view: view.clone(),
view_knowledge: view.iter().map(|v| (v.clone(), Default::default())).collect(),
maybe_authority: None,
};
let mut peer_data: HashMap<_, _> = vec![
(peer_a.clone(), peer_data_from_view(peer_a_view)),
(peer_b.clone(), peer_data_from_view(peer_b_view)),
(peer_c.clone(), peer_data_from_view(peer_c_view)),
]
.into_iter()
.collect();
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context::<
StatementDistributionMessage,
_,
>(pool);
executor::block_on(async move {
let signing_context = SigningContext { parent_hash: hash_b, session_index };
let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
let alice_public = CryptoStore::sr25519_generate_new(
&*keystore,
ValidatorId::ID,
Some(&Sr25519Keyring::Alice.to_seed()),
)
.await
.unwrap();
let statement = SignedFullStatement::sign(
&keystore,
Statement::Seconded(candidate),
&signing_context,
ValidatorIndex(0),
&alice_public.into(),
)
.await
.ok()
.flatten()
.expect("should be signed");
let comparator = StoredStatementComparator {
compact: statement.payload().to_compact(),
validator_index: ValidatorIndex(0),
signature: statement.signature().clone(),
};
let statement = StoredStatement { comparator: &comparator, statement: &statement };
let gossip_peers =
HashSet::from_iter(vec![peer_a.clone(), peer_b.clone(), peer_c.clone()].into_iter());
let needs_dependents = circulate_statement(
&gossip_peers,
&mut peer_data,
&mut ctx,
hash_b,
statement,
Vec::new(),
)
.await;
{
assert_eq!(needs_dependents.len(), 2);
assert!(needs_dependents.contains(&peer_b));
assert!(needs_dependents.contains(&peer_c));
}
let fingerprint = (statement.compact().clone(), ValidatorIndex(0));
assert!(peer_data
.get(&peer_b)
.unwrap()
.view_knowledge
.get(&hash_b)
.unwrap()
.sent_statements
.contains(&fingerprint));
assert!(peer_data
.get(&peer_c)
.unwrap()
.view_knowledge
.get(&hash_b)
.unwrap()
.sent_statements
.contains(&fingerprint));
let message = handle.recv().await;
assert_matches!(
message,
AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
to,
payload,
)) => {
assert_eq!(to.len(), 2);
assert!(to.contains(&peer_b));
assert!(to.contains(&peer_c));
assert_eq!(
payload,
statement_message(hash_b, statement.statement.clone()),
);
}
)
});
}
#[test]
fn receiving_from_one_sends_to_another_and_to_candidate_backing() {
let hash_a = Hash::repeat_byte(1);
let candidate = {
let mut c = dummy_committed_candidate_receipt(dummy_hash());
c.descriptor.relay_parent = hash_a;
c.descriptor.para_id = 1.into();
c
};
let peer_a = PeerId::random();
let peer_b = PeerId::random();
let validators = vec![
Sr25519Keyring::Alice.pair(),
Sr25519Keyring::Bob.pair(),
Sr25519Keyring::Charlie.pair(),
];
let session_info = make_session_info(validators, vec![]);
let session_index = 1;
let pool = sp_core::testing::TaskExecutor::new();
let (ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
let (statement_req_receiver, _) = IncomingRequest::get_config_receiver();
let bg = async move {
let s = StatementDistribution::new(
Arc::new(LocalKeystore::in_memory()),
statement_req_receiver,
Default::default(),
);
s.run(ctx).await.unwrap();
};
let test_fut = async move {
// register our active heads.
handle
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: hash_a,
number: 1,
status: LeafStatus::Fresh,
span: Arc::new(jaeger::Span::Disabled),
}),
)))
.await;
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionIndexForChild(tx))
)
if r == hash_a
=> {
let _ = tx.send(Ok(session_index));
}
);
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionInfo(sess_index, tx))
)
if r == hash_a && sess_index == session_index
=> {
let _ = tx.send(Ok(Some(session_info)));
}
);
// notify of peers and view
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full, None),
),
})
.await;
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full, None),
),
})
.await;
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![hash_a]),
),
})
.await;
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![hash_a]),
),
})
.await;
// receive a seconded statement from peer A. it should be propagated onwards to peer B and to
// candidate backing.
let statement = {
let signing_context = SigningContext { parent_hash: hash_a, session_index };
let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
let alice_public = CryptoStore::sr25519_generate_new(
&*keystore,
ValidatorId::ID,
Some(&Sr25519Keyring::Alice.to_seed()),
)
.await
.unwrap();
SignedFullStatement::sign(
&keystore,
Statement::Seconded(candidate),
&signing_context,
ValidatorIndex(0),
&alice_public.into(),
)
.await
.ok()
.flatten()
.expect("should be signed")
};
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
protocol_v1::StatementDistributionMessage::Statement(
hash_a,
statement.clone().into(),
),
),
),
})
.await;
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(p, r)
) if p == peer_a && r == BENEFIT_VALID_STATEMENT_FIRST => {}
);
assert_matches!(
handle.recv().await,
AllMessages::CandidateBacking(
CandidateBackingMessage::Statement(r, s)
) if r == hash_a && s == statement => {}
);
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(
recipients,
protocol_v1::ValidationProtocol::StatementDistribution(
protocol_v1::StatementDistributionMessage::Statement(r, s)
),
)
) => {
assert_eq!(recipients, vec![peer_b.clone()]);
assert_eq!(r, hash_a);
assert_eq!(s, statement.into());
}
);
handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::pin_mut!(test_fut);
futures::pin_mut!(bg);
executor::block_on(future::join(test_fut, bg));
}
#[test]
fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing() {
sp_tracing::try_init_simple();
let hash_a = Hash::repeat_byte(1);
let hash_b = Hash::repeat_byte(2);
let candidate = {
let mut c = dummy_committed_candidate_receipt(dummy_hash());
c.descriptor.relay_parent = hash_a;
c.descriptor.para_id = 1.into();
c.commitments.new_validation_code = Some(ValidationCode(vec![1, 2, 3]));
c
};
let peer_a = PeerId::random(); // Alice
let peer_b = PeerId::random(); // Bob
let peer_c = PeerId::random(); // Charlie
let peer_bad = PeerId::random(); // No validator
let validators = vec![
Sr25519Keyring::Alice.pair(),
Sr25519Keyring::Bob.pair(),
Sr25519Keyring::Charlie.pair(),
// We:
Sr25519Keyring::Ferdie.pair(),
];
let session_info = make_session_info(validators, vec![vec![0, 1, 2, 4], vec![3]]);
let session_index = 1;
let pool = sp_core::testing::TaskExecutor::new();
let (ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
let (statement_req_receiver, mut req_cfg) = IncomingRequest::get_config_receiver();
let bg = async move {
let s = StatementDistribution::new(
make_ferdie_keystore(),
statement_req_receiver,
Default::default(),
);
s.run(ctx).await.unwrap();
};
let test_fut = async move {
// register our active heads.
handle
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: hash_a,
number: 1,
status: LeafStatus::Fresh,
span: Arc::new(jaeger::Span::Disabled),
}),
)))
.await;
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionIndexForChild(tx))
)
if r == hash_a
=> {
let _ = tx.send(Ok(session_index));
}
);
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionInfo(sess_index, tx))
)
if r == hash_a && sess_index == session_index
=> {
let _ = tx.send(Ok(Some(session_info)));
}
);
// notify of peers and view
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(
peer_a.clone(),
ObservedRole::Full,
Some(HashSet::from([Sr25519Keyring::Alice.public().into()])),
),
),
})
.await;
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(
peer_b.clone(),
ObservedRole::Full,
Some(HashSet::from([Sr25519Keyring::Bob.public().into()])),
),
),
})
.await;
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(
peer_c.clone(),
ObservedRole::Full,
Some(HashSet::from([Sr25519Keyring::Charlie.public().into()])),
),
),
})
.await;
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(peer_bad.clone(), ObservedRole::Full, None),
),
})
.await;
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![hash_a]),
),
})
.await;
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![hash_a]),
),
})
.await;
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(peer_c.clone(), view![hash_a]),
),
})
.await;
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(peer_bad.clone(), view![hash_a]),
),
})
.await;
// receive a seconded statement from peer A, which does not provide the request data,
// then get that data from peer C. It should be propagated onwards to peer B and to
// candidate backing.
let statement = {
let signing_context = SigningContext { parent_hash: hash_a, session_index };
let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
let alice_public = CryptoStore::sr25519_generate_new(
&*keystore,
ValidatorId::ID,
Some(&Sr25519Keyring::Alice.to_seed()),
)
.await
.unwrap();
SignedFullStatement::sign(
&keystore,
Statement::Seconded(candidate.clone()),
&signing_context,
ValidatorIndex(0),
&alice_public.into(),
)
.await
.ok()
.flatten()
.expect("should be signed")
};
let metadata =
protocol_v1::StatementDistributionMessage::Statement(hash_a, statement.clone().into())
.get_metadata();
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
protocol_v1::StatementDistributionMessage::LargeStatement(metadata.clone()),
),
),
})
.await;
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendRequests(
mut reqs, IfDisconnected::ImmediateError
)
) => {
let reqs = reqs.pop().unwrap();
let outgoing = match reqs {
Requests::StatementFetching(outgoing) => outgoing,
_ => panic!("Unexpected request"),
};
let req = outgoing.payload;
assert_eq!(req.relay_parent, metadata.relay_parent);
assert_eq!(req.candidate_hash, metadata.candidate_hash);
assert_eq!(outgoing.peer, Recipient::Peer(peer_a));
// Just drop request - should trigger error.
}
);
// There is a race between request handler asking for more peers and processing of the
// coming `PeerMessage`s, we want the request handler to ask first here for better test
// coverage:
Delay::new(Duration::from_millis(20)).await;
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_c.clone(),
protocol_v1::StatementDistributionMessage::LargeStatement(metadata.clone()),
),
),
})
.await;
// Malicious peer:
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_bad.clone(),
protocol_v1::StatementDistributionMessage::LargeStatement(metadata.clone()),
),
),
})
.await;
// Let c fail once too:
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendRequests(
mut reqs, IfDisconnected::ImmediateError
)
) => {
let reqs = reqs.pop().unwrap();
let outgoing = match reqs {
Requests::StatementFetching(outgoing) => outgoing,
_ => panic!("Unexpected request"),
};
let req = outgoing.payload;
assert_eq!(req.relay_parent, metadata.relay_parent);
assert_eq!(req.candidate_hash, metadata.candidate_hash);
assert_eq!(outgoing.peer, Recipient::Peer(peer_c));
}
);
// a fails again:
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendRequests(
mut reqs, IfDisconnected::ImmediateError
)
) => {
let reqs = reqs.pop().unwrap();
let outgoing = match reqs {
Requests::StatementFetching(outgoing) => outgoing,
_ => panic!("Unexpected request"),
};
let req = outgoing.payload;
assert_eq!(req.relay_parent, metadata.relay_parent);
assert_eq!(req.candidate_hash, metadata.candidate_hash);
// On retry, we should have reverse order:
assert_eq!(outgoing.peer, Recipient::Peer(peer_a));
}
);
// Send invalid response (all other peers have been tried now):
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendRequests(
mut reqs, IfDisconnected::ImmediateError
)
) => {
let reqs = reqs.pop().unwrap();
let outgoing = match reqs {
Requests::StatementFetching(outgoing) => outgoing,
_ => panic!("Unexpected request"),
};
let req = outgoing.payload;
assert_eq!(req.relay_parent, metadata.relay_parent);
assert_eq!(req.candidate_hash, metadata.candidate_hash);
assert_eq!(outgoing.peer, Recipient::Peer(peer_bad));
let bad_candidate = {
let mut bad = candidate.clone();
bad.descriptor.para_id = 0xeadbeaf.into();
bad
};
let response = StatementFetchingResponse::Statement(bad_candidate);
outgoing.pending_response.send(Ok(response.encode())).unwrap();
}
);
// Should get punished and never tried again:
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(p, r)
) if p == peer_bad && r == COST_WRONG_HASH => {}
);
// a is tried again (retried in reverse order):
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendRequests(
mut reqs, IfDisconnected::ImmediateError
)
) => {
let reqs = reqs.pop().unwrap();
let outgoing = match reqs {
Requests::StatementFetching(outgoing) => outgoing,
_ => panic!("Unexpected request"),
};
let req = outgoing.payload;
assert_eq!(req.relay_parent, metadata.relay_parent);
assert_eq!(req.candidate_hash, metadata.candidate_hash);
// On retry, we should have reverse order:
assert_eq!(outgoing.peer, Recipient::Peer(peer_a));
}
);
// c succeeds now:
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendRequests(
mut reqs, IfDisconnected::ImmediateError
)
) => {
let reqs = reqs.pop().unwrap();
let outgoing = match reqs {
Requests::StatementFetching(outgoing) => outgoing,
_ => panic!("Unexpected request"),
};
let req = outgoing.payload;
assert_eq!(req.relay_parent, metadata.relay_parent);
assert_eq!(req.candidate_hash, metadata.candidate_hash);
// On retry, we should have reverse order:
assert_eq!(outgoing.peer, Recipient::Peer(peer_c));
let response = StatementFetchingResponse::Statement(candidate.clone());
outgoing.pending_response.send(Ok(response.encode())).unwrap();
}
);
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(p, r)
) if p == peer_a && r == COST_FETCH_FAIL => {}
);
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(p, r)
) if p == peer_c && r == BENEFIT_VALID_RESPONSE => {}
);
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(p, r)
) if p == peer_a && r == BENEFIT_VALID_STATEMENT_FIRST => {}
);
assert_matches!(
handle.recv().await,
AllMessages::CandidateBacking(
CandidateBackingMessage::Statement(r, s)
) if r == hash_a && s == statement => {}
);
// Now messages should go out:
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(
mut recipients,
protocol_v1::ValidationProtocol::StatementDistribution(
protocol_v1::StatementDistributionMessage::LargeStatement(meta)
),
)
) => {
tracing::debug!(
target: LOG_TARGET,
?recipients,
"Recipients received"
);
recipients.sort();
let mut expected = vec![peer_b, peer_c, peer_bad];
expected.sort();
assert_eq!(recipients, expected);
assert_eq!(meta.relay_parent, hash_a);
assert_eq!(meta.candidate_hash, statement.payload().candidate_hash());
assert_eq!(meta.signed_by, statement.validator_index());
assert_eq!(&meta.signature, statement.signature());
}
);
// Now that it has the candidate it should answer requests accordingly (even after a
// failed request):
// Failing request first (wrong relay parent hash):
let (pending_response, response_rx) = oneshot::channel();
let inner_req = StatementFetchingRequest {
relay_parent: hash_b,
candidate_hash: metadata.candidate_hash,
};
let req = sc_network::config::IncomingRequest {
peer: peer_b,
payload: inner_req.encode(),
pending_response,
};
req_cfg.inbound_queue.as_mut().unwrap().send(req).await.unwrap();
assert_matches!(
response_rx.await.unwrap().result,
Err(()) => {}
);
// Another failing request (peer_a never received a statement from us, so it is not
// allowed to request the data):
let (pending_response, response_rx) = oneshot::channel();
let inner_req = StatementFetchingRequest {
relay_parent: metadata.relay_parent,
candidate_hash: metadata.candidate_hash,
};
let req = sc_network::config::IncomingRequest {
peer: peer_a,
payload: inner_req.encode(),
pending_response,
};
req_cfg.inbound_queue.as_mut().unwrap().send(req).await.unwrap();
assert_matches!(
response_rx.await.unwrap().result,
Err(()) => {}
);
// And now the succeding request from peer_b:
let (pending_response, response_rx) = oneshot::channel();
let inner_req = StatementFetchingRequest {
relay_parent: metadata.relay_parent,
candidate_hash: metadata.candidate_hash,
};
let req = sc_network::config::IncomingRequest {
peer: peer_b,
payload: inner_req.encode(),
pending_response,
};
req_cfg.inbound_queue.as_mut().unwrap().send(req).await.unwrap();
let StatementFetchingResponse::Statement(committed) =
Decode::decode(&mut response_rx.await.unwrap().result.unwrap().as_ref()).unwrap();
assert_eq!(committed, candidate);
handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::pin_mut!(test_fut);
futures::pin_mut!(bg);
executor::block_on(future::join(test_fut, bg));
}
#[test]
fn share_prioritizes_backing_group() {
sp_tracing::try_init_simple();
let hash_a = Hash::repeat_byte(1);
let candidate = {
let mut c = dummy_committed_candidate_receipt(dummy_hash());
c.descriptor.relay_parent = hash_a;
c.descriptor.para_id = 1.into();
c.commitments.new_validation_code = Some(ValidationCode(vec![1, 2, 3]));
c
};
let peer_a = PeerId::random(); // Alice
let peer_b = PeerId::random(); // Bob
let peer_c = PeerId::random(); // Charlie
let peer_bad = PeerId::random(); // No validator
let peer_other_group = PeerId::random(); //Ferdie
let mut validators = vec![
Sr25519Keyring::Alice.pair(),
Sr25519Keyring::Bob.pair(),
Sr25519Keyring::Charlie.pair(),
// other group
Sr25519Keyring::Dave.pair(),
// We:
Sr25519Keyring::Ferdie.pair(),
];
// Strictly speaking we only need MIN_GOSSIP_PEERS - 3 to make sure only priority peers
// will be served, but by using a larger value we test for overflow errors:
let dummy_count = MIN_GOSSIP_PEERS;
// We artificially inflate our group, so there won't be any free slots for other peers. (We
// want to test that our group is prioritized):
let dummy_pairs: Vec<_> =
std::iter::repeat_with(|| Pair::generate().0).take(dummy_count).collect();
let dummy_peers: Vec<_> =
std::iter::repeat_with(|| PeerId::random()).take(dummy_count).collect();
validators = validators.into_iter().chain(dummy_pairs.clone()).collect();
let mut first_group = vec![0, 1, 2, 4];
first_group.append(&mut (0..dummy_count as u32).map(|v| v + 5).collect());
let session_info = make_session_info(validators, vec![first_group, vec![3]]);
let session_index = 1;
let pool = sp_core::testing::TaskExecutor::new();
let (ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
let (statement_req_receiver, mut req_cfg) = IncomingRequest::get_config_receiver();
let bg = async move {
let s = StatementDistribution::new(
make_ferdie_keystore(),
statement_req_receiver,
Default::default(),
);
s.run(ctx).await.unwrap();
};
let test_fut = async move {
// register our active heads.
handle
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: hash_a,
number: 1,
status: LeafStatus::Fresh,
span: Arc::new(jaeger::Span::Disabled),
}),
)))
.await;
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionIndexForChild(tx))
)
if r == hash_a
=> {
let _ = tx.send(Ok(session_index));
}
);
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionInfo(sess_index, tx))
)
if r == hash_a && sess_index == session_index
=> {
let _ = tx.send(Ok(Some(session_info)));
}
);
// notify of dummy peers and view
for (peer, pair) in dummy_peers.clone().into_iter().zip(dummy_pairs) {
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(
peer,
ObservedRole::Full,
Some(HashSet::from([pair.public().into()])),
),
),
})
.await;
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(peer, view![hash_a]),
),
})
.await;
}
// notify of peers and view
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(
peer_a.clone(),
ObservedRole::Full,
Some(HashSet::from([Sr25519Keyring::Alice.public().into()])),
),
),
})
.await;
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(
peer_b.clone(),
ObservedRole::Full,
Some(HashSet::from([Sr25519Keyring::Bob.public().into()])),
),
),
})
.await;
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(
peer_c.clone(),
ObservedRole::Full,
Some(HashSet::from([Sr25519Keyring::Charlie.public().into()])),
),
),
})
.await;
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(peer_bad.clone(), ObservedRole::Full, None),
),
})
.await;
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(
peer_other_group.clone(),
ObservedRole::Full,
Some(HashSet::from([Sr25519Keyring::Dave.public().into()])),
),
),
})
.await;
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![hash_a]),
),
})
.await;
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![hash_a]),
),
})
.await;
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(peer_c.clone(), view![hash_a]),
),
})
.await;
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(peer_bad.clone(), view![hash_a]),
),
})
.await;
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(peer_other_group.clone(), view![hash_a]),
),
})
.await;
// receive a seconded statement from peer A, which does not provide the request data,
// then get that data from peer C. It should be propagated onwards to peer B and to
// candidate backing.
let statement = {
let signing_context = SigningContext { parent_hash: hash_a, session_index };
let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
let ferdie_public = CryptoStore::sr25519_generate_new(
&*keystore,
ValidatorId::ID,
Some(&Sr25519Keyring::Ferdie.to_seed()),
)
.await
.unwrap();
SignedFullStatement::sign(
&keystore,
Statement::Seconded(candidate.clone()),
&signing_context,
ValidatorIndex(4),
&ferdie_public.into(),
)
.await
.ok()
.flatten()
.expect("should be signed")
};
let metadata =
protocol_v1::StatementDistributionMessage::Statement(hash_a, statement.clone().into())
.get_metadata();
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::Share(hash_a, statement.clone()),
})
.await;
// Messages should go out:
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(
mut recipients,
protocol_v1::ValidationProtocol::StatementDistribution(
protocol_v1::StatementDistributionMessage::LargeStatement(meta)
),
)
) => {
tracing::debug!(
target: LOG_TARGET,
?recipients,
"Recipients received"
);
recipients.sort();
// We expect only our backing group to be the recipients, du to the inflated
// test group above:
let mut expected: Vec<_> = vec![peer_a, peer_b, peer_c].into_iter().chain(dummy_peers).collect();
expected.sort();
assert_eq!(recipients.len(), expected.len());
assert_eq!(recipients, expected);
assert_eq!(meta.relay_parent, hash_a);
assert_eq!(meta.candidate_hash, statement.payload().candidate_hash());
assert_eq!(meta.signed_by, statement.validator_index());
assert_eq!(&meta.signature, statement.signature());
}
);
// Now that it has the candidate it should answer requests accordingly:
let (pending_response, response_rx) = oneshot::channel();
let inner_req = StatementFetchingRequest {
relay_parent: metadata.relay_parent,
candidate_hash: metadata.candidate_hash,
};
let req = sc_network::config::IncomingRequest {
peer: peer_b,
payload: inner_req.encode(),
pending_response,
};
req_cfg.inbound_queue.as_mut().unwrap().send(req).await.unwrap();
let StatementFetchingResponse::Statement(committed) =
Decode::decode(&mut response_rx.await.unwrap().result.unwrap().as_ref()).unwrap();
assert_eq!(committed, candidate);
handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::pin_mut!(test_fut);
futures::pin_mut!(bg);
executor::block_on(future::join(test_fut, bg));
}
#[test]
fn peer_cant_flood_with_large_statements() {
sp_tracing::try_init_simple();
let hash_a = Hash::repeat_byte(1);
let candidate = {
let mut c = dummy_committed_candidate_receipt(dummy_hash());
c.descriptor.relay_parent = hash_a;
c.descriptor.para_id = 1.into();
c.commitments.new_validation_code = Some(ValidationCode(vec![1, 2, 3]));
c
};
let peer_a = PeerId::random(); // Alice
let validators = vec![
Sr25519Keyring::Alice.pair(),
Sr25519Keyring::Bob.pair(),
Sr25519Keyring::Charlie.pair(),
// other group
Sr25519Keyring::Dave.pair(),
// We:
Sr25519Keyring::Ferdie.pair(),
];
let first_group = vec![0, 1, 2, 4];
let session_info = make_session_info(validators, vec![first_group, vec![3]]);
let session_index = 1;
let pool = sp_core::testing::TaskExecutor::new();
let (ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
let (statement_req_receiver, _) = IncomingRequest::get_config_receiver();
let bg = async move {
let s = StatementDistribution::new(
make_ferdie_keystore(),
statement_req_receiver,
Default::default(),
);
s.run(ctx).await.unwrap();
};
let test_fut = async move {
// register our active heads.
handle
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: hash_a,
number: 1,
status: LeafStatus::Fresh,
span: Arc::new(jaeger::Span::Disabled),
}),
)))
.await;
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionIndexForChild(tx))
)
if r == hash_a
=> {
let _ = tx.send(Ok(session_index));
}
);
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionInfo(sess_index, tx))
)
if r == hash_a && sess_index == session_index
=> {
let _ = tx.send(Ok(Some(session_info)));
}
);
// notify of peers and view
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(
peer_a.clone(),
ObservedRole::Full,
Some(HashSet::from([Sr25519Keyring::Alice.public().into()])),
),
),
})
.await;
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![hash_a]),
),
})
.await;
// receive a seconded statement from peer A.
let statement = {
let signing_context = SigningContext { parent_hash: hash_a, session_index };
let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
let alice_public = CryptoStore::sr25519_generate_new(
&*keystore,
ValidatorId::ID,
Some(&Sr25519Keyring::Alice.to_seed()),
)
.await
.unwrap();
SignedFullStatement::sign(
&keystore,
Statement::Seconded(candidate.clone()),
&signing_context,
ValidatorIndex(0),
&alice_public.into(),
)
.await
.ok()
.flatten()
.expect("should be signed")
};
let metadata =
protocol_v1::StatementDistributionMessage::Statement(hash_a, statement.clone().into())
.get_metadata();
for _ in 0..MAX_LARGE_STATEMENTS_PER_SENDER + 1 {
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
protocol_v1::StatementDistributionMessage::LargeStatement(
metadata.clone(),
),
),
),
})
.await;
}
// We should try to fetch the data and punish the peer (but we don't know what comes
// first):
let mut requested = false;
let mut punished = false;
for _ in 0..2 {
match handle.recv().await {
AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(
mut reqs,
IfDisconnected::ImmediateError,
)) => {
let reqs = reqs.pop().unwrap();
let outgoing = match reqs {
Requests::StatementFetching(outgoing) => outgoing,
_ => panic!("Unexpected request"),
};
let req = outgoing.payload;
assert_eq!(req.relay_parent, metadata.relay_parent);
assert_eq!(req.candidate_hash, metadata.candidate_hash);
assert_eq!(outgoing.peer, Recipient::Peer(peer_a));
// Just drop request - should trigger error.
requested = true;
},
AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer(p, r))
if p == peer_a && r == COST_APPARENT_FLOOD =>
{
punished = true;
},
m => panic!("Unexpected message: {:?}", m),
}
}
assert!(requested, "large data has not been requested.");
assert!(punished, "Peer should have been punished for flooding.");
handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::pin_mut!(test_fut);
futures::pin_mut!(bg);
executor::block_on(future::join(test_fut, bg));
}
fn make_session_info(validators: Vec<Pair>, groups: Vec<Vec<u32>>) -> SessionInfo {
let validator_groups: Vec<Vec<ValidatorIndex>> = groups
.iter()
.map(|g| g.into_iter().map(|v| ValidatorIndex(*v)).collect())
.collect();
SessionInfo {
discovery_keys: validators.iter().map(|k| k.public().into()).collect(),
// Not used:
n_cores: validator_groups.len() as u32,
validator_groups,
validators: validators.iter().map(|k| k.public().into()).collect(),
// Not used values:
assignment_keys: Vec::new(),
zeroth_delay_tranche_width: 0,
relay_vrf_modulo_samples: 0,
n_delay_tranches: 0,
no_show_slots: 0,
needed_approvals: 0,
}
}