chore: move tests into separate files (#3206)

Moves tests into separate files
in order to limit the loc per file.
This commit is contained in:
Bernhard Schuster
2021-06-11 13:50:19 +01:00
committed by GitHub
parent 1d3a9d81d6
commit f698bf8548
16 changed files with 7747 additions and 7627 deletions
+1 -358
View File
@@ -508,361 +508,4 @@ impl metrics::Metrics for Metrics {
}
#[cfg(test)]
mod tests {
mod handle_new_activations {
use super::super::*;
use futures::{
lock::Mutex,
task::{Context as FuturesContext, Poll},
Future,
};
use polkadot_node_primitives::{Collation, CollationResult, BlockData, PoV, POV_BOMB_LIMIT};
use polkadot_node_subsystem::messages::{
AllMessages, RuntimeApiMessage, RuntimeApiRequest,
};
use polkadot_node_subsystem_test_helpers::{
subsystem_test_harness, TestSubsystemContextHandle,
};
use polkadot_primitives::v1::{
CollatorPair, Id as ParaId, PersistedValidationData, ScheduledCore, ValidationCode,
};
use std::pin::Pin;
fn test_collation() -> Collation {
Collation {
upward_messages: Default::default(),
horizontal_messages: Default::default(),
new_validation_code: Default::default(),
head_data: Default::default(),
proof_of_validity: PoV {
block_data: BlockData(Vec::new()),
},
processed_downward_messages: Default::default(),
hrmp_watermark: Default::default(),
}
}
fn test_collation_compressed() -> Collation {
let mut collation = test_collation();
let compressed = PoV {
block_data: BlockData(sp_maybe_compressed_blob::compress(
&collation.proof_of_validity.block_data.0,
POV_BOMB_LIMIT,
).unwrap())
};
collation.proof_of_validity = compressed;
collation
}
fn test_validation_data() -> PersistedValidationData {
let mut persisted_validation_data: PersistedValidationData = Default::default();
persisted_validation_data.max_pov_size = 1024;
persisted_validation_data
}
// Box<dyn Future<Output = Collation> + Unpin + Send
struct TestCollator;
impl Future for TestCollator {
type Output = Option<CollationResult>;
fn poll(self: Pin<&mut Self>, _cx: &mut FuturesContext) -> Poll<Self::Output> {
Poll::Ready(Some(CollationResult { collation: test_collation(), result_sender: None }))
}
}
impl Unpin for TestCollator {}
fn test_config<Id: Into<ParaId>>(para_id: Id) -> Arc<CollationGenerationConfig> {
Arc::new(CollationGenerationConfig {
key: CollatorPair::generate().0,
collator: Box::new(|_: Hash, _vd: &PersistedValidationData| {
TestCollator.boxed()
}),
para_id: para_id.into(),
})
}
fn scheduled_core_for<Id: Into<ParaId>>(para_id: Id) -> ScheduledCore {
ScheduledCore {
para_id: para_id.into(),
collator: None,
}
}
#[test]
fn requests_availability_per_relay_parent() {
let activated_hashes: Vec<Hash> = vec![
[1; 32].into(),
[4; 32].into(),
[9; 32].into(),
[16; 32].into(),
];
let requested_availability_cores = Arc::new(Mutex::new(Vec::new()));
let overseer_requested_availability_cores = requested_availability_cores.clone();
let overseer = |mut handle: TestSubsystemContextHandle<CollationGenerationMessage>| async move {
loop {
match handle.try_recv().await {
None => break,
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(hash, RuntimeApiRequest::AvailabilityCores(tx)))) => {
overseer_requested_availability_cores.lock().await.push(hash);
tx.send(Ok(vec![])).unwrap();
}
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(_hash, RuntimeApiRequest::Validators(tx)))) => {
tx.send(Ok(vec![Default::default(); 3])).unwrap();
}
Some(msg) => panic!("didn't expect any other overseer requests given no availability cores; got {:?}", msg),
}
}
};
let (tx, _rx) = mpsc::channel(0);
let subsystem_activated_hashes = activated_hashes.clone();
subsystem_test_harness(overseer, |mut ctx| async move {
handle_new_activations(
test_config(123u32),
subsystem_activated_hashes,
&mut ctx,
Metrics(None),
&tx,
)
.await
.unwrap();
});
let mut requested_availability_cores = Arc::try_unwrap(requested_availability_cores)
.expect("overseer should have shut down by now")
.into_inner();
requested_availability_cores.sort();
assert_eq!(requested_availability_cores, activated_hashes);
}
#[test]
fn requests_validation_data_for_scheduled_matches() {
let activated_hashes: Vec<Hash> = vec![
Hash::repeat_byte(1),
Hash::repeat_byte(4),
Hash::repeat_byte(9),
Hash::repeat_byte(16),
];
let requested_validation_data = Arc::new(Mutex::new(Vec::new()));
let overseer_requested_validation_data = requested_validation_data.clone();
let overseer = |mut handle: TestSubsystemContextHandle<CollationGenerationMessage>| async move {
loop {
match handle.try_recv().await {
None => break,
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
hash,
RuntimeApiRequest::AvailabilityCores(tx),
))) => {
tx.send(Ok(vec![
CoreState::Free,
// this is weird, see explanation below
CoreState::Scheduled(scheduled_core_for(
(hash.as_fixed_bytes()[0] * 4) as u32,
)),
CoreState::Scheduled(scheduled_core_for(
(hash.as_fixed_bytes()[0] * 5) as u32,
)),
]))
.unwrap();
}
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
hash,
RuntimeApiRequest::PersistedValidationData(
_para_id,
_occupied_core_assumption,
tx,
),
))) => {
overseer_requested_validation_data
.lock()
.await
.push(hash);
tx.send(Ok(Default::default())).unwrap();
}
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_hash,
RuntimeApiRequest::Validators(tx),
))) => {
tx.send(Ok(vec![Default::default(); 3])).unwrap();
}
Some(msg) => {
panic!("didn't expect any other overseer requests; got {:?}", msg)
}
}
}
};
let (tx, _rx) = mpsc::channel(0);
subsystem_test_harness(overseer, |mut ctx| async move {
handle_new_activations(test_config(16), activated_hashes, &mut ctx, Metrics(None), &tx)
.await
.unwrap();
});
let requested_validation_data = Arc::try_unwrap(requested_validation_data)
.expect("overseer should have shut down by now")
.into_inner();
// the only activated hash should be from the 4 hash:
// each activated hash generates two scheduled cores: one with its value * 4, one with its value * 5
// given that the test configuration has a para_id of 16, there's only one way to get that value: with the 4
// hash.
assert_eq!(requested_validation_data, vec![[4; 32].into()]);
}
#[test]
fn sends_distribute_collation_message() {
let activated_hashes: Vec<Hash> = vec![
Hash::repeat_byte(1),
Hash::repeat_byte(4),
Hash::repeat_byte(9),
Hash::repeat_byte(16),
];
let overseer = |mut handle: TestSubsystemContextHandle<CollationGenerationMessage>| async move {
loop {
match handle.try_recv().await {
None => break,
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
hash,
RuntimeApiRequest::AvailabilityCores(tx),
))) => {
tx.send(Ok(vec![
CoreState::Free,
// this is weird, see explanation below
CoreState::Scheduled(scheduled_core_for(
(hash.as_fixed_bytes()[0] * 4) as u32,
)),
CoreState::Scheduled(scheduled_core_for(
(hash.as_fixed_bytes()[0] * 5) as u32,
)),
]))
.unwrap();
}
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_hash,
RuntimeApiRequest::PersistedValidationData(
_para_id,
_occupied_core_assumption,
tx,
),
))) => {
tx.send(Ok(Some(test_validation_data()))).unwrap();
}
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_hash,
RuntimeApiRequest::Validators(tx),
))) => {
tx.send(Ok(vec![Default::default(); 3])).unwrap();
}
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_hash,
RuntimeApiRequest::ValidationCode(
_para_id,
OccupiedCoreAssumption::Free,
tx,
),
))) => {
tx.send(Ok(Some(ValidationCode(vec![1, 2, 3])))).unwrap();
}
Some(msg) => {
panic!("didn't expect any other overseer requests; got {:?}", msg)
}
}
}
};
let config = test_config(16);
let subsystem_config = config.clone();
let (tx, rx) = mpsc::channel(0);
// empty vec doesn't allocate on the heap, so it's ok we throw it away
let sent_messages = Arc::new(Mutex::new(Vec::new()));
let subsystem_sent_messages = sent_messages.clone();
subsystem_test_harness(overseer, |mut ctx| async move {
handle_new_activations(subsystem_config, activated_hashes, &mut ctx, Metrics(None), &tx)
.await
.unwrap();
std::mem::drop(tx);
// collect all sent messages
*subsystem_sent_messages.lock().await = rx.collect().await;
});
let sent_messages = Arc::try_unwrap(sent_messages)
.expect("subsystem should have shut down by now")
.into_inner();
// we expect a single message to be sent, containing a candidate receipt.
// we don't care too much about the commitments_hash right now, but let's ensure that we've calculated the
// correct descriptor
let expect_pov_hash = test_collation_compressed().proof_of_validity.hash();
let expect_validation_data_hash = test_validation_data().hash();
let expect_relay_parent = Hash::repeat_byte(4);
let expect_validation_code_hash = ValidationCode(vec![1, 2, 3]).hash();
let expect_payload = collator_signature_payload(
&expect_relay_parent,
&config.para_id,
&expect_validation_data_hash,
&expect_pov_hash,
&expect_validation_code_hash,
);
let expect_descriptor = CandidateDescriptor {
signature: config.key.sign(&expect_payload),
para_id: config.para_id,
relay_parent: expect_relay_parent,
collator: config.key.public(),
persisted_validation_data_hash: expect_validation_data_hash,
pov_hash: expect_pov_hash,
erasure_root: Default::default(), // this isn't something we're checking right now
para_head: test_collation().head_data.hash(),
validation_code_hash: expect_validation_code_hash,
};
assert_eq!(sent_messages.len(), 1);
match &sent_messages[0] {
AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation(
CandidateReceipt { descriptor, .. },
_pov,
..
)) => {
// signature generation is non-deterministic, so we can't just assert that the
// expected descriptor is correct. What we can do is validate that the produced
// descriptor has a valid signature, then just copy in the generated signature
// and check the rest of the fields for equality.
assert!(CollatorPair::verify(
&descriptor.signature,
&collator_signature_payload(
&descriptor.relay_parent,
&descriptor.para_id,
&descriptor.persisted_validation_data_hash,
&descriptor.pov_hash,
&descriptor.validation_code_hash,
)
.as_ref(),
&descriptor.collator,
));
let expect_descriptor = {
let mut expect_descriptor = expect_descriptor;
expect_descriptor.signature = descriptor.signature.clone();
expect_descriptor.erasure_root = descriptor.erasure_root.clone();
expect_descriptor
};
assert_eq!(descriptor, &expect_descriptor);
}
_ => panic!("received wrong message type"),
}
}
}
}
mod tests;
@@ -0,0 +1,372 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
mod handle_new_activations {
use super::super::*;
use futures::{
lock::Mutex,
task::{Context as FuturesContext, Poll},
Future,
};
use polkadot_node_primitives::{Collation, CollationResult, BlockData, PoV, POV_BOMB_LIMIT};
use polkadot_node_subsystem::messages::{
AllMessages, RuntimeApiMessage, RuntimeApiRequest,
};
use polkadot_node_subsystem_test_helpers::{
subsystem_test_harness, TestSubsystemContextHandle,
};
use polkadot_primitives::v1::{
CollatorPair, Id as ParaId, PersistedValidationData, ScheduledCore, ValidationCode,
};
use std::pin::Pin;
fn test_collation() -> Collation {
Collation {
upward_messages: Default::default(),
horizontal_messages: Default::default(),
new_validation_code: Default::default(),
head_data: Default::default(),
proof_of_validity: PoV {
block_data: BlockData(Vec::new()),
},
processed_downward_messages: Default::default(),
hrmp_watermark: Default::default(),
}
}
fn test_collation_compressed() -> Collation {
let mut collation = test_collation();
let compressed = PoV {
block_data: BlockData(sp_maybe_compressed_blob::compress(
&collation.proof_of_validity.block_data.0,
POV_BOMB_LIMIT,
).unwrap())
};
collation.proof_of_validity = compressed;
collation
}
fn test_validation_data() -> PersistedValidationData {
let mut persisted_validation_data: PersistedValidationData = Default::default();
persisted_validation_data.max_pov_size = 1024;
persisted_validation_data
}
// Box<dyn Future<Output = Collation> + Unpin + Send
struct TestCollator;
impl Future for TestCollator {
type Output = Option<CollationResult>;
fn poll(self: Pin<&mut Self>, _cx: &mut FuturesContext) -> Poll<Self::Output> {
Poll::Ready(Some(CollationResult { collation: test_collation(), result_sender: None }))
}
}
impl Unpin for TestCollator {}
fn test_config<Id: Into<ParaId>>(para_id: Id) -> Arc<CollationGenerationConfig> {
Arc::new(CollationGenerationConfig {
key: CollatorPair::generate().0,
collator: Box::new(|_: Hash, _vd: &PersistedValidationData| {
TestCollator.boxed()
}),
para_id: para_id.into(),
})
}
fn scheduled_core_for<Id: Into<ParaId>>(para_id: Id) -> ScheduledCore {
ScheduledCore {
para_id: para_id.into(),
collator: None,
}
}
#[test]
fn requests_availability_per_relay_parent() {
let activated_hashes: Vec<Hash> = vec![
[1; 32].into(),
[4; 32].into(),
[9; 32].into(),
[16; 32].into(),
];
let requested_availability_cores = Arc::new(Mutex::new(Vec::new()));
let overseer_requested_availability_cores = requested_availability_cores.clone();
let overseer = |mut handle: TestSubsystemContextHandle<CollationGenerationMessage>| async move {
loop {
match handle.try_recv().await {
None => break,
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(hash, RuntimeApiRequest::AvailabilityCores(tx)))) => {
overseer_requested_availability_cores.lock().await.push(hash);
tx.send(Ok(vec![])).unwrap();
}
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(_hash, RuntimeApiRequest::Validators(tx)))) => {
tx.send(Ok(vec![Default::default(); 3])).unwrap();
}
Some(msg) => panic!("didn't expect any other overseer requests given no availability cores; got {:?}", msg),
}
}
};
let (tx, _rx) = mpsc::channel(0);
let subsystem_activated_hashes = activated_hashes.clone();
subsystem_test_harness(overseer, |mut ctx| async move {
handle_new_activations(
test_config(123u32),
subsystem_activated_hashes,
&mut ctx,
Metrics(None),
&tx,
)
.await
.unwrap();
});
let mut requested_availability_cores = Arc::try_unwrap(requested_availability_cores)
.expect("overseer should have shut down by now")
.into_inner();
requested_availability_cores.sort();
assert_eq!(requested_availability_cores, activated_hashes);
}
#[test]
fn requests_validation_data_for_scheduled_matches() {
let activated_hashes: Vec<Hash> = vec![
Hash::repeat_byte(1),
Hash::repeat_byte(4),
Hash::repeat_byte(9),
Hash::repeat_byte(16),
];
let requested_validation_data = Arc::new(Mutex::new(Vec::new()));
let overseer_requested_validation_data = requested_validation_data.clone();
let overseer = |mut handle: TestSubsystemContextHandle<CollationGenerationMessage>| async move {
loop {
match handle.try_recv().await {
None => break,
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
hash,
RuntimeApiRequest::AvailabilityCores(tx),
))) => {
tx.send(Ok(vec![
CoreState::Free,
// this is weird, see explanation below
CoreState::Scheduled(scheduled_core_for(
(hash.as_fixed_bytes()[0] * 4) as u32,
)),
CoreState::Scheduled(scheduled_core_for(
(hash.as_fixed_bytes()[0] * 5) as u32,
)),
]))
.unwrap();
}
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
hash,
RuntimeApiRequest::PersistedValidationData(
_para_id,
_occupied_core_assumption,
tx,
),
))) => {
overseer_requested_validation_data
.lock()
.await
.push(hash);
tx.send(Ok(Default::default())).unwrap();
}
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_hash,
RuntimeApiRequest::Validators(tx),
))) => {
tx.send(Ok(vec![Default::default(); 3])).unwrap();
}
Some(msg) => {
panic!("didn't expect any other overseer requests; got {:?}", msg)
}
}
}
};
let (tx, _rx) = mpsc::channel(0);
subsystem_test_harness(overseer, |mut ctx| async move {
handle_new_activations(test_config(16), activated_hashes, &mut ctx, Metrics(None), &tx)
.await
.unwrap();
});
let requested_validation_data = Arc::try_unwrap(requested_validation_data)
.expect("overseer should have shut down by now")
.into_inner();
// the only activated hash should be from the 4 hash:
// each activated hash generates two scheduled cores: one with its value * 4, one with its value * 5
// given that the test configuration has a para_id of 16, there's only one way to get that value: with the 4
// hash.
assert_eq!(requested_validation_data, vec![[4; 32].into()]);
}
#[test]
fn sends_distribute_collation_message() {
let activated_hashes: Vec<Hash> = vec![
Hash::repeat_byte(1),
Hash::repeat_byte(4),
Hash::repeat_byte(9),
Hash::repeat_byte(16),
];
let overseer = |mut handle: TestSubsystemContextHandle<CollationGenerationMessage>| async move {
loop {
match handle.try_recv().await {
None => break,
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
hash,
RuntimeApiRequest::AvailabilityCores(tx),
))) => {
tx.send(Ok(vec![
CoreState::Free,
// this is weird, see explanation below
CoreState::Scheduled(scheduled_core_for(
(hash.as_fixed_bytes()[0] * 4) as u32,
)),
CoreState::Scheduled(scheduled_core_for(
(hash.as_fixed_bytes()[0] * 5) as u32,
)),
]))
.unwrap();
}
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_hash,
RuntimeApiRequest::PersistedValidationData(
_para_id,
_occupied_core_assumption,
tx,
),
))) => {
tx.send(Ok(Some(test_validation_data()))).unwrap();
}
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_hash,
RuntimeApiRequest::Validators(tx),
))) => {
tx.send(Ok(vec![Default::default(); 3])).unwrap();
}
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_hash,
RuntimeApiRequest::ValidationCode(
_para_id,
OccupiedCoreAssumption::Free,
tx,
),
))) => {
tx.send(Ok(Some(ValidationCode(vec![1, 2, 3])))).unwrap();
}
Some(msg) => {
panic!("didn't expect any other overseer requests; got {:?}", msg)
}
}
}
};
let config = test_config(16);
let subsystem_config = config.clone();
let (tx, rx) = mpsc::channel(0);
// empty vec doesn't allocate on the heap, so it's ok we throw it away
let sent_messages = Arc::new(Mutex::new(Vec::new()));
let subsystem_sent_messages = sent_messages.clone();
subsystem_test_harness(overseer, |mut ctx| async move {
handle_new_activations(subsystem_config, activated_hashes, &mut ctx, Metrics(None), &tx)
.await
.unwrap();
std::mem::drop(tx);
// collect all sent messages
*subsystem_sent_messages.lock().await = rx.collect().await;
});
let sent_messages = Arc::try_unwrap(sent_messages)
.expect("subsystem should have shut down by now")
.into_inner();
// we expect a single message to be sent, containing a candidate receipt.
// we don't care too much about the commitments_hash right now, but let's ensure that we've calculated the
// correct descriptor
let expect_pov_hash = test_collation_compressed().proof_of_validity.hash();
let expect_validation_data_hash = test_validation_data().hash();
let expect_relay_parent = Hash::repeat_byte(4);
let expect_validation_code_hash = ValidationCode(vec![1, 2, 3]).hash();
let expect_payload = collator_signature_payload(
&expect_relay_parent,
&config.para_id,
&expect_validation_data_hash,
&expect_pov_hash,
&expect_validation_code_hash,
);
let expect_descriptor = CandidateDescriptor {
signature: config.key.sign(&expect_payload),
para_id: config.para_id,
relay_parent: expect_relay_parent,
collator: config.key.public(),
persisted_validation_data_hash: expect_validation_data_hash,
pov_hash: expect_pov_hash,
erasure_root: Default::default(), // this isn't something we're checking right now
para_head: test_collation().head_data.hash(),
validation_code_hash: expect_validation_code_hash,
};
assert_eq!(sent_messages.len(), 1);
match &sent_messages[0] {
AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation(
CandidateReceipt { descriptor, .. },
_pov,
..
)) => {
// signature generation is non-deterministic, so we can't just assert that the
// expected descriptor is correct. What we can do is validate that the produced
// descriptor has a valid signature, then just copy in the generated signature
// and check the rest of the fields for equality.
assert!(CollatorPair::verify(
&descriptor.signature,
&collator_signature_payload(
&descriptor.relay_parent,
&descriptor.para_id,
&descriptor.persisted_validation_data_hash,
&descriptor.pov_hash,
&descriptor.validation_code_hash,
)
.as_ref(),
&descriptor.collator,
));
let expect_descriptor = {
let mut expect_descriptor = expect_descriptor;
expect_descriptor.signature = descriptor.signature.clone();
expect_descriptor.erasure_root = descriptor.erasure_root.clone();
expect_descriptor
};
assert_eq!(descriptor, &expect_descriptor);
}
_ => panic!("received wrong message type"),
}
}
}
+1 -815
View File
@@ -413,818 +413,4 @@ impl metrics::Metrics for Metrics {
}
#[cfg(test)]
mod tests {
use super::*;
use polkadot_primitives::v1::{
ValidatorId, ValidatorIndex, GroupRotationInfo, CoreState, PersistedValidationData,
Id as ParaId, OccupiedCoreAssumption, SessionIndex, ValidationCode,
CommittedCandidateReceipt, CandidateEvent, InboundDownwardMessage,
BlockNumber, InboundHrmpMessage, SessionInfo, AuthorityDiscoveryId,
};
use polkadot_node_subsystem_test_helpers as test_helpers;
use sp_core::testing::TaskExecutor;
use std::{collections::{HashMap, BTreeMap}, sync::{Arc, Mutex}};
use futures::channel::oneshot;
use polkadot_node_primitives::{
BabeEpoch, BabeEpochConfiguration, BabeAllowedSlots,
};
#[derive(Default, Clone)]
struct MockRuntimeApi {
authorities: Vec<AuthorityDiscoveryId>,
validators: Vec<ValidatorId>,
validator_groups: Vec<Vec<ValidatorIndex>>,
availability_cores: Vec<CoreState>,
availability_cores_wait: Arc<Mutex<()>>,
validation_data: HashMap<ParaId, PersistedValidationData>,
session_index_for_child: SessionIndex,
session_info: HashMap<SessionIndex, SessionInfo>,
validation_code: HashMap<ParaId, ValidationCode>,
historical_validation_code: HashMap<ParaId, Vec<(BlockNumber, ValidationCode)>>,
validation_outputs_results: HashMap<ParaId, bool>,
candidate_pending_availability: HashMap<ParaId, CommittedCandidateReceipt>,
candidate_events: Vec<CandidateEvent>,
dmq_contents: HashMap<ParaId, Vec<InboundDownwardMessage>>,
hrmp_channels: HashMap<ParaId, BTreeMap<ParaId, Vec<InboundHrmpMessage>>>,
babe_epoch: Option<BabeEpoch>,
}
impl ProvideRuntimeApi<Block> for MockRuntimeApi {
type Api = Self;
fn runtime_api<'a>(&'a self) -> sp_api::ApiRef<'a, Self::Api> {
self.clone().into()
}
}
sp_api::mock_impl_runtime_apis! {
impl ParachainHost<Block> for MockRuntimeApi {
fn validators(&self) -> Vec<ValidatorId> {
self.validators.clone()
}
fn validator_groups(&self) -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo) {
(
self.validator_groups.clone(),
GroupRotationInfo {
session_start_block: 1,
group_rotation_frequency: 100,
now: 10,
},
)
}
fn availability_cores(&self) -> Vec<CoreState> {
let _ = self.availability_cores_wait.lock().unwrap();
self.availability_cores.clone()
}
fn persisted_validation_data(
&self,
para: ParaId,
_assumption: OccupiedCoreAssumption,
) -> Option<PersistedValidationData> {
self.validation_data.get(&para).cloned()
}
fn check_validation_outputs(
&self,
para_id: ParaId,
_commitments: polkadot_primitives::v1::CandidateCommitments,
) -> bool {
self.validation_outputs_results
.get(&para_id)
.cloned()
.expect(
"`check_validation_outputs` called but the expected result hasn't been supplied"
)
}
fn session_index_for_child(&self) -> SessionIndex {
self.session_index_for_child.clone()
}
fn session_info(&self, index: SessionIndex) -> Option<SessionInfo> {
self.session_info.get(&index).cloned()
}
fn validation_code(
&self,
para: ParaId,
_assumption: OccupiedCoreAssumption,
) -> Option<ValidationCode> {
self.validation_code.get(&para).map(|c| c.clone())
}
fn historical_validation_code(
&self,
para: ParaId,
at: BlockNumber,
) -> Option<ValidationCode> {
self.historical_validation_code.get(&para).and_then(|h_code| {
h_code.iter()
.take_while(|(changed_at, _)| changed_at <= &at)
.last()
.map(|(_, code)| code.clone())
})
}
fn candidate_pending_availability(
&self,
para: ParaId,
) -> Option<CommittedCandidateReceipt> {
self.candidate_pending_availability.get(&para).map(|c| c.clone())
}
fn candidate_events(&self) -> Vec<CandidateEvent> {
self.candidate_events.clone()
}
fn dmq_contents(
&self,
recipient: ParaId,
) -> Vec<InboundDownwardMessage> {
self.dmq_contents.get(&recipient).map(|q| q.clone()).unwrap_or_default()
}
fn inbound_hrmp_channels_contents(
&self,
recipient: ParaId
) -> BTreeMap<ParaId, Vec<InboundHrmpMessage>> {
self.hrmp_channels.get(&recipient).map(|q| q.clone()).unwrap_or_default()
}
fn validation_code_by_hash(
&self,
_hash: Hash,
) -> Option<ValidationCode> {
unreachable!("not used in tests");
}
}
impl BabeApi<Block> for MockRuntimeApi {
fn configuration(&self) -> sp_consensus_babe::BabeGenesisConfiguration {
unimplemented!()
}
fn current_epoch_start(&self) -> sp_consensus_babe::Slot {
self.babe_epoch.as_ref().unwrap().start_slot
}
fn current_epoch(&self) -> BabeEpoch {
self.babe_epoch.as_ref().unwrap().clone()
}
fn next_epoch(&self) -> BabeEpoch {
unimplemented!()
}
fn generate_key_ownership_proof(
_slot: sp_consensus_babe::Slot,
_authority_id: sp_consensus_babe::AuthorityId,
) -> Option<sp_consensus_babe::OpaqueKeyOwnershipProof> {
None
}
fn submit_report_equivocation_unsigned_extrinsic(
_equivocation_proof: sp_consensus_babe::EquivocationProof<polkadot_primitives::v1::Header>,
_key_owner_proof: sp_consensus_babe::OpaqueKeyOwnershipProof,
) -> Option<()> {
None
}
}
impl AuthorityDiscoveryApi<Block> for MockRuntimeApi {
fn authorities(&self) -> Vec<AuthorityDiscoveryId> {
self.authorities.clone()
}
}
}
#[test]
fn requests_authorities() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let spawner = sp_core::testing::TaskExecutor::new();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(relay_parent, Request::Authorities(tx))
}).await;
assert_eq!(rx.await.unwrap().unwrap(), runtime_api.authorities);
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn requests_validators() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let spawner = sp_core::testing::TaskExecutor::new();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(relay_parent, Request::Validators(tx))
}).await;
assert_eq!(rx.await.unwrap().unwrap(), runtime_api.validators);
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn requests_validator_groups() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let spawner = sp_core::testing::TaskExecutor::new();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(relay_parent, Request::ValidatorGroups(tx))
}).await;
assert_eq!(rx.await.unwrap().unwrap().0, runtime_api.validator_groups);
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn requests_availability_cores() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let spawner = sp_core::testing::TaskExecutor::new();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(relay_parent, Request::AvailabilityCores(tx))
}).await;
assert_eq!(rx.await.unwrap().unwrap(), runtime_api.availability_cores);
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn requests_persisted_validation_data() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let relay_parent = [1; 32].into();
let para_a = 5.into();
let para_b = 6.into();
let spawner = sp_core::testing::TaskExecutor::new();
let mut runtime_api = MockRuntimeApi::default();
runtime_api.validation_data.insert(para_a, Default::default());
let runtime_api = Arc::new(runtime_api);
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(
relay_parent,
Request::PersistedValidationData(para_a, OccupiedCoreAssumption::Included, tx)
),
}).await;
assert_eq!(rx.await.unwrap().unwrap(), Some(Default::default()));
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(
relay_parent,
Request::PersistedValidationData(para_b, OccupiedCoreAssumption::Included, tx)
),
}).await;
assert_eq!(rx.await.unwrap().unwrap(), None);
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn requests_check_validation_outputs() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let mut runtime_api = MockRuntimeApi::default();
let relay_parent = [1; 32].into();
let para_a = 5.into();
let para_b = 6.into();
let commitments = polkadot_primitives::v1::CandidateCommitments::default();
let spawner = sp_core::testing::TaskExecutor::new();
runtime_api.validation_outputs_results.insert(para_a, false);
runtime_api.validation_outputs_results.insert(para_b, true);
let runtime_api = Arc::new(runtime_api);
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(
relay_parent,
Request::CheckValidationOutputs(
para_a,
commitments.clone(),
tx,
),
)
}).await;
assert_eq!(
rx.await.unwrap().unwrap(),
runtime_api.validation_outputs_results[&para_a],
);
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(
relay_parent,
Request::CheckValidationOutputs(
para_b,
commitments,
tx,
),
)
}).await;
assert_eq!(
rx.await.unwrap().unwrap(),
runtime_api.validation_outputs_results[&para_b],
);
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn requests_session_index_for_child() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let spawner = sp_core::testing::TaskExecutor::new();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(relay_parent, Request::SessionIndexForChild(tx))
}).await;
assert_eq!(rx.await.unwrap().unwrap(), runtime_api.session_index_for_child);
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn requests_session_info() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let mut runtime_api = MockRuntimeApi::default();
let session_index = 1;
runtime_api.session_info.insert(session_index, Default::default());
let runtime_api = Arc::new(runtime_api);
let spawner = sp_core::testing::TaskExecutor::new();
let relay_parent = [1; 32].into();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(relay_parent, Request::SessionInfo(session_index, tx))
}).await;
assert_eq!(rx.await.unwrap().unwrap(), Some(Default::default()));
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn requests_validation_code() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let relay_parent = [1; 32].into();
let para_a = 5.into();
let para_b = 6.into();
let spawner = sp_core::testing::TaskExecutor::new();
let mut runtime_api = MockRuntimeApi::default();
runtime_api.validation_code.insert(para_a, Default::default());
let runtime_api = Arc::new(runtime_api);
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(
relay_parent,
Request::ValidationCode(para_a, OccupiedCoreAssumption::Included, tx)
),
}).await;
assert_eq!(rx.await.unwrap().unwrap(), Some(Default::default()));
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(
relay_parent,
Request::ValidationCode(para_b, OccupiedCoreAssumption::Included, tx)
),
}).await;
assert_eq!(rx.await.unwrap().unwrap(), None);
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn requests_candidate_pending_availability() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let relay_parent = [1; 32].into();
let para_a = 5.into();
let para_b = 6.into();
let spawner = sp_core::testing::TaskExecutor::new();
let mut runtime_api = MockRuntimeApi::default();
runtime_api.candidate_pending_availability.insert(para_a, Default::default());
let runtime_api = Arc::new(runtime_api);
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(
relay_parent,
Request::CandidatePendingAvailability(para_a, tx),
)
}).await;
assert_eq!(rx.await.unwrap().unwrap(), Some(Default::default()));
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(
relay_parent,
Request::CandidatePendingAvailability(para_b, tx),
)
}).await;
assert_eq!(rx.await.unwrap().unwrap(), None);
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn requests_candidate_events() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let spawner = sp_core::testing::TaskExecutor::new();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(relay_parent, Request::CandidateEvents(tx))
}).await;
assert_eq!(rx.await.unwrap().unwrap(), runtime_api.candidate_events);
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn requests_dmq_contents() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let relay_parent = [1; 32].into();
let para_a = 5.into();
let para_b = 6.into();
let spawner = sp_core::testing::TaskExecutor::new();
let runtime_api = Arc::new({
let mut runtime_api = MockRuntimeApi::default();
runtime_api.dmq_contents.insert(para_a, vec![]);
runtime_api.dmq_contents.insert(
para_b,
vec![InboundDownwardMessage {
sent_at: 228,
msg: b"Novus Ordo Seclorum".to_vec(),
}],
);
runtime_api
});
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
ctx_handle
.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(relay_parent, Request::DmqContents(para_a, tx)),
})
.await;
assert_eq!(rx.await.unwrap().unwrap(), vec![]);
let (tx, rx) = oneshot::channel();
ctx_handle
.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(relay_parent, Request::DmqContents(para_b, tx)),
})
.await;
assert_eq!(
rx.await.unwrap().unwrap(),
vec![InboundDownwardMessage {
sent_at: 228,
msg: b"Novus Ordo Seclorum".to_vec(),
}]
);
ctx_handle
.send(FromOverseer::Signal(OverseerSignal::Conclude))
.await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn requests_inbound_hrmp_channels_contents() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let relay_parent = [1; 32].into();
let para_a = 99.into();
let para_b = 66.into();
let para_c = 33.into();
let spawner = sp_core::testing::TaskExecutor::new();
let para_b_inbound_channels = [
(para_a, vec![]),
(
para_c,
vec![InboundHrmpMessage {
sent_at: 1,
data: "𝙀=𝙈𝘾²".as_bytes().to_owned(),
}],
),
]
.iter()
.cloned()
.collect::<BTreeMap<_, _>>();
let runtime_api = Arc::new({
let mut runtime_api = MockRuntimeApi::default();
runtime_api.hrmp_channels.insert(para_a, BTreeMap::new());
runtime_api
.hrmp_channels
.insert(para_b, para_b_inbound_channels.clone());
runtime_api
});
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
ctx_handle
.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(
relay_parent,
Request::InboundHrmpChannelsContents(para_a, tx),
),
})
.await;
assert_eq!(rx.await.unwrap().unwrap(), BTreeMap::new());
let (tx, rx) = oneshot::channel();
ctx_handle
.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(
relay_parent,
Request::InboundHrmpChannelsContents(para_b, tx),
),
})
.await;
assert_eq!(rx.await.unwrap().unwrap(), para_b_inbound_channels,);
ctx_handle
.send(FromOverseer::Signal(OverseerSignal::Conclude))
.await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn requests_historical_code() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let para_a = 5.into();
let para_b = 6.into();
let spawner = sp_core::testing::TaskExecutor::new();
let runtime_api = Arc::new({
let mut runtime_api = MockRuntimeApi::default();
runtime_api.historical_validation_code.insert(
para_a,
vec![(1, vec![1, 2, 3].into()), (10, vec![4, 5, 6].into())],
);
runtime_api.historical_validation_code.insert(
para_b,
vec![(5, vec![7, 8, 9].into())],
);
runtime_api
});
let relay_parent = [1; 32].into();
let subsystem = RuntimeApiSubsystem::new(runtime_api, Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
{
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(
relay_parent,
Request::HistoricalValidationCode(para_a, 5, tx),
)
}).await;
assert_eq!(rx.await.unwrap().unwrap(), Some(ValidationCode::from(vec![1, 2, 3])));
}
{
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(
relay_parent,
Request::HistoricalValidationCode(para_a, 10, tx),
)
}).await;
assert_eq!(rx.await.unwrap().unwrap(), Some(ValidationCode::from(vec![4, 5, 6])));
}
{
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(
relay_parent,
Request::HistoricalValidationCode(para_b, 1, tx),
)
}).await;
assert!(rx.await.unwrap().unwrap().is_none());
}
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn multiple_requests_in_parallel_are_working() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let spawner = sp_core::testing::TaskExecutor::new();
let mutex = runtime_api.availability_cores_wait.clone();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
// Make all requests block until we release this mutex.
let lock = mutex.lock().unwrap();
let mut receivers = Vec::new();
for _ in 0..MAX_PARALLEL_REQUESTS * 10 {
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(relay_parent, Request::AvailabilityCores(tx))
}).await;
receivers.push(rx);
}
let join = future::join_all(receivers);
drop(lock);
join.await
.into_iter()
.for_each(|r| assert_eq!(r.unwrap().unwrap(), runtime_api.availability_cores));
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn request_babe_epoch() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let mut runtime_api = MockRuntimeApi::default();
let epoch = BabeEpoch {
epoch_index: 100,
start_slot: sp_consensus_babe::Slot::from(1000),
duration: 10,
authorities: Vec::new(),
randomness: [1u8; 32],
config: BabeEpochConfiguration {
c: (1, 4),
allowed_slots: BabeAllowedSlots::PrimarySlots,
},
};
runtime_api.babe_epoch = Some(epoch.clone());
let runtime_api = Arc::new(runtime_api);
let relay_parent = [1; 32].into();
let spawner = sp_core::testing::TaskExecutor::new();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(relay_parent, Request::CurrentBabeEpoch(tx))
}).await;
assert_eq!(rx.await.unwrap().unwrap(), epoch);
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
}
mod tests;
+829
View File
@@ -0,0 +1,829 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use super::*;
use polkadot_primitives::v1::{
ValidatorId, ValidatorIndex, GroupRotationInfo, CoreState, PersistedValidationData,
Id as ParaId, OccupiedCoreAssumption, SessionIndex, ValidationCode,
CommittedCandidateReceipt, CandidateEvent, InboundDownwardMessage,
BlockNumber, InboundHrmpMessage, SessionInfo, AuthorityDiscoveryId,
};
use polkadot_node_subsystem_test_helpers as test_helpers;
use sp_core::testing::TaskExecutor;
use std::{collections::{HashMap, BTreeMap}, sync::{Arc, Mutex}};
use futures::channel::oneshot;
use polkadot_node_primitives::{
BabeEpoch, BabeEpochConfiguration, BabeAllowedSlots,
};
#[derive(Default, Clone)]
struct MockRuntimeApi {
authorities: Vec<AuthorityDiscoveryId>,
validators: Vec<ValidatorId>,
validator_groups: Vec<Vec<ValidatorIndex>>,
availability_cores: Vec<CoreState>,
availability_cores_wait: Arc<Mutex<()>>,
validation_data: HashMap<ParaId, PersistedValidationData>,
session_index_for_child: SessionIndex,
session_info: HashMap<SessionIndex, SessionInfo>,
validation_code: HashMap<ParaId, ValidationCode>,
historical_validation_code: HashMap<ParaId, Vec<(BlockNumber, ValidationCode)>>,
validation_outputs_results: HashMap<ParaId, bool>,
candidate_pending_availability: HashMap<ParaId, CommittedCandidateReceipt>,
candidate_events: Vec<CandidateEvent>,
dmq_contents: HashMap<ParaId, Vec<InboundDownwardMessage>>,
hrmp_channels: HashMap<ParaId, BTreeMap<ParaId, Vec<InboundHrmpMessage>>>,
babe_epoch: Option<BabeEpoch>,
}
impl ProvideRuntimeApi<Block> for MockRuntimeApi {
type Api = Self;
fn runtime_api<'a>(&'a self) -> sp_api::ApiRef<'a, Self::Api> {
self.clone().into()
}
}
sp_api::mock_impl_runtime_apis! {
impl ParachainHost<Block> for MockRuntimeApi {
fn validators(&self) -> Vec<ValidatorId> {
self.validators.clone()
}
fn validator_groups(&self) -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo) {
(
self.validator_groups.clone(),
GroupRotationInfo {
session_start_block: 1,
group_rotation_frequency: 100,
now: 10,
},
)
}
fn availability_cores(&self) -> Vec<CoreState> {
let _ = self.availability_cores_wait.lock().unwrap();
self.availability_cores.clone()
}
fn persisted_validation_data(
&self,
para: ParaId,
_assumption: OccupiedCoreAssumption,
) -> Option<PersistedValidationData> {
self.validation_data.get(&para).cloned()
}
fn check_validation_outputs(
&self,
para_id: ParaId,
_commitments: polkadot_primitives::v1::CandidateCommitments,
) -> bool {
self.validation_outputs_results
.get(&para_id)
.cloned()
.expect(
"`check_validation_outputs` called but the expected result hasn't been supplied"
)
}
fn session_index_for_child(&self) -> SessionIndex {
self.session_index_for_child.clone()
}
fn session_info(&self, index: SessionIndex) -> Option<SessionInfo> {
self.session_info.get(&index).cloned()
}
fn validation_code(
&self,
para: ParaId,
_assumption: OccupiedCoreAssumption,
) -> Option<ValidationCode> {
self.validation_code.get(&para).map(|c| c.clone())
}
fn historical_validation_code(
&self,
para: ParaId,
at: BlockNumber,
) -> Option<ValidationCode> {
self.historical_validation_code.get(&para).and_then(|h_code| {
h_code.iter()
.take_while(|(changed_at, _)| changed_at <= &at)
.last()
.map(|(_, code)| code.clone())
})
}
fn candidate_pending_availability(
&self,
para: ParaId,
) -> Option<CommittedCandidateReceipt> {
self.candidate_pending_availability.get(&para).map(|c| c.clone())
}
fn candidate_events(&self) -> Vec<CandidateEvent> {
self.candidate_events.clone()
}
fn dmq_contents(
&self,
recipient: ParaId,
) -> Vec<InboundDownwardMessage> {
self.dmq_contents.get(&recipient).map(|q| q.clone()).unwrap_or_default()
}
fn inbound_hrmp_channels_contents(
&self,
recipient: ParaId
) -> BTreeMap<ParaId, Vec<InboundHrmpMessage>> {
self.hrmp_channels.get(&recipient).map(|q| q.clone()).unwrap_or_default()
}
fn validation_code_by_hash(
&self,
_hash: Hash,
) -> Option<ValidationCode> {
unreachable!("not used in tests");
}
}
impl BabeApi<Block> for MockRuntimeApi {
fn configuration(&self) -> sp_consensus_babe::BabeGenesisConfiguration {
unimplemented!()
}
fn current_epoch_start(&self) -> sp_consensus_babe::Slot {
self.babe_epoch.as_ref().unwrap().start_slot
}
fn current_epoch(&self) -> BabeEpoch {
self.babe_epoch.as_ref().unwrap().clone()
}
fn next_epoch(&self) -> BabeEpoch {
unimplemented!()
}
fn generate_key_ownership_proof(
_slot: sp_consensus_babe::Slot,
_authority_id: sp_consensus_babe::AuthorityId,
) -> Option<sp_consensus_babe::OpaqueKeyOwnershipProof> {
None
}
fn submit_report_equivocation_unsigned_extrinsic(
_equivocation_proof: sp_consensus_babe::EquivocationProof<polkadot_primitives::v1::Header>,
_key_owner_proof: sp_consensus_babe::OpaqueKeyOwnershipProof,
) -> Option<()> {
None
}
}
impl AuthorityDiscoveryApi<Block> for MockRuntimeApi {
fn authorities(&self) -> Vec<AuthorityDiscoveryId> {
self.authorities.clone()
}
}
}
#[test]
fn requests_authorities() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let spawner = sp_core::testing::TaskExecutor::new();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(relay_parent, Request::Authorities(tx))
}).await;
assert_eq!(rx.await.unwrap().unwrap(), runtime_api.authorities);
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn requests_validators() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let spawner = sp_core::testing::TaskExecutor::new();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(relay_parent, Request::Validators(tx))
}).await;
assert_eq!(rx.await.unwrap().unwrap(), runtime_api.validators);
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn requests_validator_groups() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let spawner = sp_core::testing::TaskExecutor::new();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(relay_parent, Request::ValidatorGroups(tx))
}).await;
assert_eq!(rx.await.unwrap().unwrap().0, runtime_api.validator_groups);
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn requests_availability_cores() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let spawner = sp_core::testing::TaskExecutor::new();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(relay_parent, Request::AvailabilityCores(tx))
}).await;
assert_eq!(rx.await.unwrap().unwrap(), runtime_api.availability_cores);
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn requests_persisted_validation_data() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let relay_parent = [1; 32].into();
let para_a = 5.into();
let para_b = 6.into();
let spawner = sp_core::testing::TaskExecutor::new();
let mut runtime_api = MockRuntimeApi::default();
runtime_api.validation_data.insert(para_a, Default::default());
let runtime_api = Arc::new(runtime_api);
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(
relay_parent,
Request::PersistedValidationData(para_a, OccupiedCoreAssumption::Included, tx)
),
}).await;
assert_eq!(rx.await.unwrap().unwrap(), Some(Default::default()));
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(
relay_parent,
Request::PersistedValidationData(para_b, OccupiedCoreAssumption::Included, tx)
),
}).await;
assert_eq!(rx.await.unwrap().unwrap(), None);
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn requests_check_validation_outputs() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let mut runtime_api = MockRuntimeApi::default();
let relay_parent = [1; 32].into();
let para_a = 5.into();
let para_b = 6.into();
let commitments = polkadot_primitives::v1::CandidateCommitments::default();
let spawner = sp_core::testing::TaskExecutor::new();
runtime_api.validation_outputs_results.insert(para_a, false);
runtime_api.validation_outputs_results.insert(para_b, true);
let runtime_api = Arc::new(runtime_api);
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(
relay_parent,
Request::CheckValidationOutputs(
para_a,
commitments.clone(),
tx,
),
)
}).await;
assert_eq!(
rx.await.unwrap().unwrap(),
runtime_api.validation_outputs_results[&para_a],
);
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(
relay_parent,
Request::CheckValidationOutputs(
para_b,
commitments,
tx,
),
)
}).await;
assert_eq!(
rx.await.unwrap().unwrap(),
runtime_api.validation_outputs_results[&para_b],
);
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn requests_session_index_for_child() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let spawner = sp_core::testing::TaskExecutor::new();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(relay_parent, Request::SessionIndexForChild(tx))
}).await;
assert_eq!(rx.await.unwrap().unwrap(), runtime_api.session_index_for_child);
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn requests_session_info() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let mut runtime_api = MockRuntimeApi::default();
let session_index = 1;
runtime_api.session_info.insert(session_index, Default::default());
let runtime_api = Arc::new(runtime_api);
let spawner = sp_core::testing::TaskExecutor::new();
let relay_parent = [1; 32].into();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(relay_parent, Request::SessionInfo(session_index, tx))
}).await;
assert_eq!(rx.await.unwrap().unwrap(), Some(Default::default()));
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn requests_validation_code() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let relay_parent = [1; 32].into();
let para_a = 5.into();
let para_b = 6.into();
let spawner = sp_core::testing::TaskExecutor::new();
let mut runtime_api = MockRuntimeApi::default();
runtime_api.validation_code.insert(para_a, Default::default());
let runtime_api = Arc::new(runtime_api);
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(
relay_parent,
Request::ValidationCode(para_a, OccupiedCoreAssumption::Included, tx)
),
}).await;
assert_eq!(rx.await.unwrap().unwrap(), Some(Default::default()));
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(
relay_parent,
Request::ValidationCode(para_b, OccupiedCoreAssumption::Included, tx)
),
}).await;
assert_eq!(rx.await.unwrap().unwrap(), None);
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn requests_candidate_pending_availability() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let relay_parent = [1; 32].into();
let para_a = 5.into();
let para_b = 6.into();
let spawner = sp_core::testing::TaskExecutor::new();
let mut runtime_api = MockRuntimeApi::default();
runtime_api.candidate_pending_availability.insert(para_a, Default::default());
let runtime_api = Arc::new(runtime_api);
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(
relay_parent,
Request::CandidatePendingAvailability(para_a, tx),
)
}).await;
assert_eq!(rx.await.unwrap().unwrap(), Some(Default::default()));
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(
relay_parent,
Request::CandidatePendingAvailability(para_b, tx),
)
}).await;
assert_eq!(rx.await.unwrap().unwrap(), None);
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn requests_candidate_events() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let spawner = sp_core::testing::TaskExecutor::new();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(relay_parent, Request::CandidateEvents(tx))
}).await;
assert_eq!(rx.await.unwrap().unwrap(), runtime_api.candidate_events);
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn requests_dmq_contents() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let relay_parent = [1; 32].into();
let para_a = 5.into();
let para_b = 6.into();
let spawner = sp_core::testing::TaskExecutor::new();
let runtime_api = Arc::new({
let mut runtime_api = MockRuntimeApi::default();
runtime_api.dmq_contents.insert(para_a, vec![]);
runtime_api.dmq_contents.insert(
para_b,
vec![InboundDownwardMessage {
sent_at: 228,
msg: b"Novus Ordo Seclorum".to_vec(),
}],
);
runtime_api
});
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
ctx_handle
.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(relay_parent, Request::DmqContents(para_a, tx)),
})
.await;
assert_eq!(rx.await.unwrap().unwrap(), vec![]);
let (tx, rx) = oneshot::channel();
ctx_handle
.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(relay_parent, Request::DmqContents(para_b, tx)),
})
.await;
assert_eq!(
rx.await.unwrap().unwrap(),
vec![InboundDownwardMessage {
sent_at: 228,
msg: b"Novus Ordo Seclorum".to_vec(),
}]
);
ctx_handle
.send(FromOverseer::Signal(OverseerSignal::Conclude))
.await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn requests_inbound_hrmp_channels_contents() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let relay_parent = [1; 32].into();
let para_a = 99.into();
let para_b = 66.into();
let para_c = 33.into();
let spawner = sp_core::testing::TaskExecutor::new();
let para_b_inbound_channels = [
(para_a, vec![]),
(
para_c,
vec![InboundHrmpMessage {
sent_at: 1,
data: "𝙀=𝙈𝘾²".as_bytes().to_owned(),
}],
),
]
.iter()
.cloned()
.collect::<BTreeMap<_, _>>();
let runtime_api = Arc::new({
let mut runtime_api = MockRuntimeApi::default();
runtime_api.hrmp_channels.insert(para_a, BTreeMap::new());
runtime_api
.hrmp_channels
.insert(para_b, para_b_inbound_channels.clone());
runtime_api
});
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
ctx_handle
.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(
relay_parent,
Request::InboundHrmpChannelsContents(para_a, tx),
),
})
.await;
assert_eq!(rx.await.unwrap().unwrap(), BTreeMap::new());
let (tx, rx) = oneshot::channel();
ctx_handle
.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(
relay_parent,
Request::InboundHrmpChannelsContents(para_b, tx),
),
})
.await;
assert_eq!(rx.await.unwrap().unwrap(), para_b_inbound_channels,);
ctx_handle
.send(FromOverseer::Signal(OverseerSignal::Conclude))
.await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn requests_historical_code() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let para_a = 5.into();
let para_b = 6.into();
let spawner = sp_core::testing::TaskExecutor::new();
let runtime_api = Arc::new({
let mut runtime_api = MockRuntimeApi::default();
runtime_api.historical_validation_code.insert(
para_a,
vec![(1, vec![1, 2, 3].into()), (10, vec![4, 5, 6].into())],
);
runtime_api.historical_validation_code.insert(
para_b,
vec![(5, vec![7, 8, 9].into())],
);
runtime_api
});
let relay_parent = [1; 32].into();
let subsystem = RuntimeApiSubsystem::new(runtime_api, Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
{
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(
relay_parent,
Request::HistoricalValidationCode(para_a, 5, tx),
)
}).await;
assert_eq!(rx.await.unwrap().unwrap(), Some(ValidationCode::from(vec![1, 2, 3])));
}
{
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(
relay_parent,
Request::HistoricalValidationCode(para_a, 10, tx),
)
}).await;
assert_eq!(rx.await.unwrap().unwrap(), Some(ValidationCode::from(vec![4, 5, 6])));
}
{
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(
relay_parent,
Request::HistoricalValidationCode(para_b, 1, tx),
)
}).await;
assert!(rx.await.unwrap().unwrap().is_none());
}
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn multiple_requests_in_parallel_are_working() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let spawner = sp_core::testing::TaskExecutor::new();
let mutex = runtime_api.availability_cores_wait.clone();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
// Make all requests block until we release this mutex.
let lock = mutex.lock().unwrap();
let mut receivers = Vec::new();
for _ in 0..MAX_PARALLEL_REQUESTS * 10 {
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(relay_parent, Request::AvailabilityCores(tx))
}).await;
receivers.push(rx);
}
let join = future::join_all(receivers);
drop(lock);
join.await
.into_iter()
.for_each(|r| assert_eq!(r.unwrap().unwrap(), runtime_api.availability_cores));
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn request_babe_epoch() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let mut runtime_api = MockRuntimeApi::default();
let epoch = BabeEpoch {
epoch_index: 100,
start_slot: sp_consensus_babe::Slot::from(1000),
duration: 10,
authorities: Vec::new(),
randomness: [1u8; 32],
config: BabeEpochConfiguration {
c: (1, 4),
allowed_slots: BabeAllowedSlots::PrimarySlots,
},
};
runtime_api.babe_epoch = Some(epoch.clone());
let runtime_api = Arc::new(runtime_api);
let relay_parent = [1; 32].into();
let spawner = sp_core::testing::TaskExecutor::new();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(relay_parent, Request::CurrentBabeEpoch(tx))
}).await;
assert_eq!(rx.await.unwrap().unwrap(), epoch);
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
@@ -868,782 +868,4 @@ pub(crate) async fn run(
}
#[cfg(test)]
mod tests {
use super::*;
use std::{sync::Arc, time::Duration};
use assert_matches::assert_matches;
use futures::{executor, future, Future};
use sp_core::{crypto::Pair, Decode};
use sp_keyring::Sr25519Keyring;
use sp_runtime::traits::AppVerify;
use polkadot_node_network_protocol::{
our_view,
view,
request_response::request::IncomingRequest,
};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_primitives::v1::{AuthorityDiscoveryId, CandidateDescriptor, CollatorPair, GroupRotationInfo, ScheduledCore, SessionIndex, SessionInfo, ValidatorId, ValidatorIndex};
use polkadot_node_primitives::BlockData;
use polkadot_subsystem::{
jaeger,
messages::{RuntimeApiMessage, RuntimeApiRequest},
ActiveLeavesUpdate, ActivatedLeaf, LeafStatus,
};
use polkadot_subsystem_testhelpers as test_helpers;
#[derive(Default)]
struct TestCandidateBuilder {
para_id: ParaId,
pov_hash: Hash,
relay_parent: Hash,
commitments_hash: Hash,
}
impl TestCandidateBuilder {
fn build(self) -> CandidateReceipt {
CandidateReceipt {
descriptor: CandidateDescriptor {
para_id: self.para_id,
pov_hash: self.pov_hash,
relay_parent: self.relay_parent,
..Default::default()
},
commitments_hash: self.commitments_hash,
}
}
}
#[derive(Clone)]
struct TestState {
para_id: ParaId,
validators: Vec<Sr25519Keyring>,
session_info: SessionInfo,
group_rotation_info: GroupRotationInfo,
validator_peer_id: Vec<PeerId>,
relay_parent: Hash,
availability_core: CoreState,
local_peer_id: PeerId,
collator_pair: CollatorPair,
session_index: SessionIndex,
}
fn validator_pubkeys(val_ids: &[Sr25519Keyring]) -> Vec<ValidatorId> {
val_ids.iter().map(|v| v.public().into()).collect()
}
fn validator_authority_id(val_ids: &[Sr25519Keyring]) -> Vec<AuthorityDiscoveryId> {
val_ids.iter().map(|v| v.public().into()).collect()
}
impl Default for TestState {
fn default() -> Self {
let para_id = ParaId::from(1);
let validators = vec![
Sr25519Keyring::Alice,
Sr25519Keyring::Bob,
Sr25519Keyring::Charlie,
Sr25519Keyring::Dave,
Sr25519Keyring::Ferdie,
];
let validator_public = validator_pubkeys(&validators);
let discovery_keys = validator_authority_id(&validators);
let validator_peer_id = std::iter::repeat_with(|| PeerId::random())
.take(discovery_keys.len())
.collect();
let validator_groups = vec![vec![2, 0, 4], vec![3, 2, 4]]
.into_iter().map(|g| g.into_iter().map(ValidatorIndex).collect()).collect();
let group_rotation_info = GroupRotationInfo {
session_start_block: 0,
group_rotation_frequency: 100,
now: 1,
};
let availability_core = CoreState::Scheduled(ScheduledCore {
para_id,
collator: None,
});
let relay_parent = Hash::random();
let local_peer_id = PeerId::random();
let collator_pair = CollatorPair::generate().0;
Self {
para_id,
validators,
session_info: SessionInfo {
validators: validator_public,
discovery_keys,
validator_groups,
..Default::default()
},
group_rotation_info,
validator_peer_id,
relay_parent,
availability_core,
local_peer_id,
collator_pair,
session_index: 1,
}
}
}
impl TestState {
fn current_group_validator_indices(&self) -> &[ValidatorIndex] {
&self.session_info.validator_groups[0]
}
fn current_session_index(&self) -> SessionIndex {
self.session_index
}
fn current_group_validator_peer_ids(&self) -> Vec<PeerId> {
self.current_group_validator_indices().iter().map(|i| self.validator_peer_id[i.0 as usize].clone()).collect()
}
fn current_group_validator_authority_ids(&self) -> Vec<AuthorityDiscoveryId> {
self.current_group_validator_indices()
.iter()
.map(|i| self.session_info.discovery_keys[i.0 as usize].clone())
.collect()
}
/// Generate a new relay parent and inform the subsystem about the new view.
///
/// If `merge_views == true` it means the subsystem will be informed that we are working on the old `relay_parent`
/// and the new one.
async fn advance_to_new_round(&mut self, virtual_overseer: &mut VirtualOverseer, merge_views: bool) {
let old_relay_parent = self.relay_parent;
while self.relay_parent == old_relay_parent {
self.relay_parent.randomize();
}
let our_view = if merge_views {
our_view![old_relay_parent, self.relay_parent]
} else {
our_view![self.relay_parent]
};
overseer_send(
virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange(our_view)),
).await;
}
}
type VirtualOverseer = test_helpers::TestSubsystemContextHandle<CollatorProtocolMessage>;
struct TestHarness {
virtual_overseer: VirtualOverseer,
}
fn test_harness<T: Future<Output = VirtualOverseer>>(
local_peer_id: PeerId,
collator_pair: CollatorPair,
test: impl FnOnce(TestHarness) -> T,
) {
let pool = sp_core::testing::TaskExecutor::new();
let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
let subsystem = run(context, local_peer_id, collator_pair, Metrics::default());
let test_fut = test(TestHarness { virtual_overseer });
futures::pin_mut!(test_fut);
futures::pin_mut!(subsystem);
executor::block_on(future::join(async move {
let mut overseer = test_fut.await;
overseer_signal(&mut overseer, OverseerSignal::Conclude).await;
}, subsystem)).1.unwrap();
}
const TIMEOUT: Duration = Duration::from_millis(100);
async fn overseer_send(
overseer: &mut VirtualOverseer,
msg: CollatorProtocolMessage,
) {
tracing::trace!(?msg, "sending message");
overseer
.send(FromOverseer::Communication { msg })
.timeout(TIMEOUT)
.await
.expect(&format!("{:?} is more than enough for sending messages.", TIMEOUT));
}
async fn overseer_recv(
overseer: &mut VirtualOverseer,
) -> AllMessages {
let msg = overseer_recv_with_timeout(overseer, TIMEOUT)
.await
.expect(&format!("{:?} is more than enough to receive messages", TIMEOUT));
tracing::trace!(?msg, "received message");
msg
}
async fn overseer_recv_with_timeout(
overseer: &mut VirtualOverseer,
timeout: Duration,
) -> Option<AllMessages> {
tracing::trace!("waiting for message...");
overseer
.recv()
.timeout(timeout)
.await
}
async fn overseer_signal(
overseer: &mut VirtualOverseer,
signal: OverseerSignal,
) {
overseer
.send(FromOverseer::Signal(signal))
.timeout(TIMEOUT)
.await
.expect(&format!("{:?} is more than enough for sending signals.", TIMEOUT));
}
// Setup the system by sending the `CollateOn`, `ActiveLeaves` and `OurViewChange` messages.
async fn setup_system(virtual_overseer: &mut VirtualOverseer, test_state: &TestState) {
overseer_send(
virtual_overseer,
CollatorProtocolMessage::CollateOn(test_state.para_id),
).await;
overseer_signal(
virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: vec![ActivatedLeaf {
hash: test_state.relay_parent,
number: 1,
status: LeafStatus::Fresh,
span: Arc::new(jaeger::Span::Disabled),
}].into(),
deactivated: [][..].into(),
}),
).await;
overseer_send(
virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::OurViewChange(our_view![test_state.relay_parent]),
),
).await;
}
/// Result of [`distribute_collation`]
struct DistributeCollation {
candidate: CandidateReceipt,
pov_block: PoV,
}
/// Create some PoV and distribute it.
async fn distribute_collation(
virtual_overseer: &mut VirtualOverseer,
test_state: &TestState,
// whether or not we expect a connection request or not.
should_connect: bool,
) -> DistributeCollation {
// Now we want to distribute a PoVBlock
let pov_block = PoV {
block_data: BlockData(vec![42, 43, 44]),
};
let pov_hash = pov_block.hash();
let candidate = TestCandidateBuilder {
para_id: test_state.para_id,
relay_parent: test_state.relay_parent,
pov_hash,
..Default::default()
}.build();
overseer_send(
virtual_overseer,
CollatorProtocolMessage::DistributeCollation(candidate.clone(), pov_block.clone(), None),
).await;
// obtain the availability cores.
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::AvailabilityCores(tx)
)) => {
assert_eq!(relay_parent, test_state.relay_parent);
tx.send(Ok(vec![test_state.availability_core.clone()])).unwrap();
}
);
// We don't know precisely what is going to come as session info might be cached:
loop {
match overseer_recv(virtual_overseer).await {
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::SessionIndexForChild(tx),
)) => {
assert_eq!(relay_parent, test_state.relay_parent);
tx.send(Ok(test_state.current_session_index())).unwrap();
}
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::SessionInfo(index, tx),
)) => {
assert_eq!(relay_parent, test_state.relay_parent);
assert_eq!(index, test_state.current_session_index());
tx.send(Ok(Some(test_state.session_info.clone()))).unwrap();
}
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::ValidatorGroups(tx)
)) => {
assert_eq!(relay_parent, test_state.relay_parent);
tx.send(Ok((
test_state.session_info.validator_groups.clone(),
test_state.group_rotation_info.clone(),
))).unwrap();
// This call is mandatory - we are done:
break;
}
other =>
panic!("Unexpected message received: {:?}", other),
}
}
if should_connect {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ConnectToValidators {
..
}
) => {}
);
}
DistributeCollation {
candidate,
pov_block,
}
}
/// Connect a peer
async fn connect_peer(
virtual_overseer: &mut VirtualOverseer,
peer: PeerId,
authority_id: Option<AuthorityDiscoveryId>
) {
overseer_send(
virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(
peer.clone(),
polkadot_node_network_protocol::ObservedRole::Authority,
authority_id,
),
),
).await;
overseer_send(
virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(peer, view![]),
),
).await;
}
/// Disconnect a peer
async fn disconnect_peer(virtual_overseer: &mut VirtualOverseer, peer: PeerId) {
overseer_send(
virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerDisconnected(peer)),
).await;
}
/// Check that the next received message is a `Declare` message.
async fn expect_declare_msg(
virtual_overseer: &mut VirtualOverseer,
test_state: &TestState,
peer: &PeerId,
) {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendCollationMessage(
to,
protocol_v1::CollationProtocol::CollatorProtocol(wire_message),
)
) => {
assert_eq!(to[0], *peer);
assert_matches!(
wire_message,
protocol_v1::CollatorProtocolMessage::Declare(
collator_id,
para_id,
signature,
) => {
assert!(signature.verify(
&*protocol_v1::declare_signature_payload(&test_state.local_peer_id),
&collator_id),
);
assert_eq!(collator_id, test_state.collator_pair.public());
assert_eq!(para_id, test_state.para_id);
}
);
}
);
}
/// Check that the next received message is a collation advertisement message.
async fn expect_advertise_collation_msg(
virtual_overseer: &mut VirtualOverseer,
peer: &PeerId,
expected_relay_parent: Hash,
) {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendCollationMessage(
to,
protocol_v1::CollationProtocol::CollatorProtocol(wire_message),
)
) => {
assert_eq!(to[0], *peer);
assert_matches!(
wire_message,
protocol_v1::CollatorProtocolMessage::AdvertiseCollation(
relay_parent,
) => {
assert_eq!(relay_parent, expected_relay_parent);
}
);
}
);
}
/// Send a message that the given peer's view changed.
async fn send_peer_view_change(virtual_overseer: &mut VirtualOverseer, peer: &PeerId, hashes: Vec<Hash>) {
overseer_send(
virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(peer.clone(), View::new(hashes, 0)),
),
).await;
}
#[test]
fn advertise_and_send_collation() {
let mut test_state = TestState::default();
let local_peer_id = test_state.local_peer_id.clone();
let collator_pair = test_state.collator_pair.clone();
test_harness(local_peer_id, collator_pair, |test_harness| async move {
let mut virtual_overseer = test_harness.virtual_overseer;
setup_system(&mut virtual_overseer, &test_state).await;
let DistributeCollation { candidate, pov_block } =
distribute_collation(&mut virtual_overseer, &test_state, true).await;
for (val, peer) in test_state.current_group_validator_authority_ids()
.into_iter()
.zip(test_state.current_group_validator_peer_ids())
{
connect_peer(&mut virtual_overseer, peer.clone(), Some(val.clone())).await;
}
// We declare to the connected validators that we are a collator.
// We need to catch all `Declare` messages to the validators we've
// previosly connected to.
for peer_id in test_state.current_group_validator_peer_ids() {
expect_declare_msg(&mut virtual_overseer, &test_state, &peer_id).await;
}
let peer = test_state.current_group_validator_peer_ids()[0].clone();
// Send info about peer's view.
send_peer_view_change(&mut virtual_overseer, &peer, vec![test_state.relay_parent]).await;
// The peer is interested in a leaf that we have a collation for;
// advertise it.
expect_advertise_collation_msg(&mut virtual_overseer, &peer, test_state.relay_parent).await;
// Request a collation.
let (tx, rx) = oneshot::channel();
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::CollationFetchingRequest(
IncomingRequest::new(
peer,
CollationFetchingRequest {
relay_parent: test_state.relay_parent,
para_id: test_state.para_id,
},
tx,
)
)
).await;
assert_matches!(
rx.await,
Ok(full_response) => {
let CollationFetchingResponse::Collation(receipt, pov): CollationFetchingResponse
= CollationFetchingResponse::decode(
&mut full_response.result
.expect("We should have a proper answer").as_ref()
)
.expect("Decoding should work");
assert_eq!(receipt, candidate);
assert_eq!(pov, pov_block);
}
);
let old_relay_parent = test_state.relay_parent;
test_state.advance_to_new_round(&mut virtual_overseer, false).await;
let peer = test_state.validator_peer_id[2].clone();
// Re-request a collation.
let (tx, rx) = oneshot::channel();
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::CollationFetchingRequest(
IncomingRequest::new(
peer,
CollationFetchingRequest {
relay_parent: old_relay_parent,
para_id: test_state.para_id,
},
tx,
)
)
).await;
// Re-requesting collation should fail:
assert_matches!(
rx.await,
Err(_) => {}
);
assert!(overseer_recv_with_timeout(&mut virtual_overseer, TIMEOUT).await.is_none());
distribute_collation(&mut virtual_overseer, &test_state, true).await;
// Send info about peer's view.
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(
peer.clone(),
view![test_state.relay_parent],
)
)
).await;
expect_advertise_collation_msg(&mut virtual_overseer, &peer, test_state.relay_parent).await;
virtual_overseer
});
}
#[test]
fn collators_declare_to_connected_peers() {
let test_state = TestState::default();
let local_peer_id = test_state.local_peer_id.clone();
let collator_pair = test_state.collator_pair.clone();
test_harness(local_peer_id, collator_pair, |test_harness| async move {
let mut virtual_overseer = test_harness.virtual_overseer;
let peer = test_state.validator_peer_id[0].clone();
let validator_id = test_state.current_group_validator_authority_ids()[0].clone();
setup_system(&mut virtual_overseer, &test_state).await;
// A validator connected to us
connect_peer(&mut virtual_overseer, peer.clone(), Some(validator_id)).await;
expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
virtual_overseer
})
}
#[test]
fn collations_are_only_advertised_to_validators_with_correct_view() {
let test_state = TestState::default();
let local_peer_id = test_state.local_peer_id.clone();
let collator_pair = test_state.collator_pair.clone();
test_harness(local_peer_id, collator_pair, |test_harness| async move {
let mut virtual_overseer = test_harness.virtual_overseer;
let peer = test_state.current_group_validator_peer_ids()[0].clone();
let validator_id = test_state.current_group_validator_authority_ids()[0].clone();
let peer2 = test_state.current_group_validator_peer_ids()[1].clone();
let validator_id2 = test_state.current_group_validator_authority_ids()[1].clone();
setup_system(&mut virtual_overseer, &test_state).await;
// A validator connected to us
connect_peer(&mut virtual_overseer, peer.clone(), Some(validator_id)).await;
// Connect the second validator
connect_peer(&mut virtual_overseer, peer2.clone(), Some(validator_id2)).await;
expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
expect_declare_msg(&mut virtual_overseer, &test_state, &peer2).await;
// And let it tell us that it is has the same view.
send_peer_view_change(&mut virtual_overseer, &peer2, vec![test_state.relay_parent]).await;
distribute_collation(&mut virtual_overseer, &test_state, true).await;
expect_advertise_collation_msg(&mut virtual_overseer, &peer2, test_state.relay_parent).await;
// The other validator announces that it changed its view.
send_peer_view_change(&mut virtual_overseer, &peer, vec![test_state.relay_parent]).await;
// After changing the view we should receive the advertisement
expect_advertise_collation_msg(&mut virtual_overseer, &peer, test_state.relay_parent).await;
virtual_overseer
})
}
#[test]
fn collate_on_two_different_relay_chain_blocks() {
let mut test_state = TestState::default();
let local_peer_id = test_state.local_peer_id.clone();
let collator_pair = test_state.collator_pair.clone();
test_harness(local_peer_id, collator_pair, |test_harness| async move {
let mut virtual_overseer = test_harness.virtual_overseer;
let peer = test_state.current_group_validator_peer_ids()[0].clone();
let validator_id = test_state.current_group_validator_authority_ids()[0].clone();
let peer2 = test_state.current_group_validator_peer_ids()[1].clone();
let validator_id2 = test_state.current_group_validator_authority_ids()[1].clone();
setup_system(&mut virtual_overseer, &test_state).await;
// A validator connected to us
connect_peer(&mut virtual_overseer, peer.clone(), Some(validator_id)).await;
// Connect the second validator
connect_peer(&mut virtual_overseer, peer2.clone(), Some(validator_id2)).await;
expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
expect_declare_msg(&mut virtual_overseer, &test_state, &peer2).await;
distribute_collation(&mut virtual_overseer, &test_state, true).await;
let old_relay_parent = test_state.relay_parent;
// Advance to a new round, while informing the subsystem that the old and the new relay parent are active.
test_state.advance_to_new_round(&mut virtual_overseer, true).await;
distribute_collation(&mut virtual_overseer, &test_state, true).await;
send_peer_view_change(&mut virtual_overseer, &peer, vec![old_relay_parent]).await;
expect_advertise_collation_msg(&mut virtual_overseer, &peer, old_relay_parent).await;
send_peer_view_change(&mut virtual_overseer, &peer2, vec![test_state.relay_parent]).await;
expect_advertise_collation_msg(&mut virtual_overseer, &peer2, test_state.relay_parent).await;
virtual_overseer
})
}
#[test]
fn validator_reconnect_does_not_advertise_a_second_time() {
let test_state = TestState::default();
let local_peer_id = test_state.local_peer_id.clone();
let collator_pair = test_state.collator_pair.clone();
test_harness(local_peer_id, collator_pair, |test_harness| async move {
let mut virtual_overseer = test_harness.virtual_overseer;
let peer = test_state.current_group_validator_peer_ids()[0].clone();
let validator_id = test_state.current_group_validator_authority_ids()[0].clone();
setup_system(&mut virtual_overseer, &test_state).await;
// A validator connected to us
connect_peer(&mut virtual_overseer, peer.clone(), Some(validator_id.clone())).await;
expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
distribute_collation(&mut virtual_overseer, &test_state, true).await;
send_peer_view_change(&mut virtual_overseer, &peer, vec![test_state.relay_parent]).await;
expect_advertise_collation_msg(&mut virtual_overseer, &peer, test_state.relay_parent).await;
// Disconnect and reconnect directly
disconnect_peer(&mut virtual_overseer, peer.clone()).await;
connect_peer(&mut virtual_overseer, peer.clone(), Some(validator_id)).await;
expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
send_peer_view_change(&mut virtual_overseer, &peer, vec![test_state.relay_parent]).await;
assert!(overseer_recv_with_timeout(&mut virtual_overseer, TIMEOUT).await.is_none());
virtual_overseer
})
}
#[test]
fn collators_reject_declare_messages() {
let test_state = TestState::default();
let local_peer_id = test_state.local_peer_id.clone();
let collator_pair = test_state.collator_pair.clone();
let collator_pair2 = CollatorPair::generate().0;
test_harness(local_peer_id, collator_pair, |test_harness| async move {
let mut virtual_overseer = test_harness.virtual_overseer;
let peer = test_state.current_group_validator_peer_ids()[0].clone();
let validator_id = test_state.current_group_validator_authority_ids()[0].clone();
setup_system(&mut virtual_overseer, &test_state).await;
// A validator connected to us
connect_peer(&mut virtual_overseer, peer.clone(), Some(validator_id)).await;
expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer.clone(),
protocol_v1::CollatorProtocolMessage::Declare(
collator_pair2.public(),
ParaId::from(5),
collator_pair2.sign(b"garbage"),
),
)
)
).await;
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridge(NetworkBridgeMessage::DisconnectPeer(
p,
PeerSet::Collation,
)) if p == peer
);
virtual_overseer
})
}
}
mod tests;
@@ -0,0 +1,793 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use super::*;
use std::{sync::Arc, time::Duration};
use assert_matches::assert_matches;
use futures::{executor, future, Future};
use sp_core::{crypto::Pair, Decode};
use sp_keyring::Sr25519Keyring;
use sp_runtime::traits::AppVerify;
use polkadot_node_network_protocol::{
our_view,
view,
request_response::request::IncomingRequest,
};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_primitives::v1::{AuthorityDiscoveryId, CandidateDescriptor, CollatorPair, GroupRotationInfo, ScheduledCore, SessionIndex, SessionInfo, ValidatorId, ValidatorIndex};
use polkadot_node_primitives::BlockData;
use polkadot_subsystem::{
jaeger,
messages::{RuntimeApiMessage, RuntimeApiRequest},
ActiveLeavesUpdate, ActivatedLeaf, LeafStatus,
};
use polkadot_subsystem_testhelpers as test_helpers;
#[derive(Default)]
struct TestCandidateBuilder {
para_id: ParaId,
pov_hash: Hash,
relay_parent: Hash,
commitments_hash: Hash,
}
impl TestCandidateBuilder {
fn build(self) -> CandidateReceipt {
CandidateReceipt {
descriptor: CandidateDescriptor {
para_id: self.para_id,
pov_hash: self.pov_hash,
relay_parent: self.relay_parent,
..Default::default()
},
commitments_hash: self.commitments_hash,
}
}
}
#[derive(Clone)]
struct TestState {
para_id: ParaId,
validators: Vec<Sr25519Keyring>,
session_info: SessionInfo,
group_rotation_info: GroupRotationInfo,
validator_peer_id: Vec<PeerId>,
relay_parent: Hash,
availability_core: CoreState,
local_peer_id: PeerId,
collator_pair: CollatorPair,
session_index: SessionIndex,
}
fn validator_pubkeys(val_ids: &[Sr25519Keyring]) -> Vec<ValidatorId> {
val_ids.iter().map(|v| v.public().into()).collect()
}
fn validator_authority_id(val_ids: &[Sr25519Keyring]) -> Vec<AuthorityDiscoveryId> {
val_ids.iter().map(|v| v.public().into()).collect()
}
impl Default for TestState {
fn default() -> Self {
let para_id = ParaId::from(1);
let validators = vec![
Sr25519Keyring::Alice,
Sr25519Keyring::Bob,
Sr25519Keyring::Charlie,
Sr25519Keyring::Dave,
Sr25519Keyring::Ferdie,
];
let validator_public = validator_pubkeys(&validators);
let discovery_keys = validator_authority_id(&validators);
let validator_peer_id = std::iter::repeat_with(|| PeerId::random())
.take(discovery_keys.len())
.collect();
let validator_groups = vec![vec![2, 0, 4], vec![3, 2, 4]]
.into_iter().map(|g| g.into_iter().map(ValidatorIndex).collect()).collect();
let group_rotation_info = GroupRotationInfo {
session_start_block: 0,
group_rotation_frequency: 100,
now: 1,
};
let availability_core = CoreState::Scheduled(ScheduledCore {
para_id,
collator: None,
});
let relay_parent = Hash::random();
let local_peer_id = PeerId::random();
let collator_pair = CollatorPair::generate().0;
Self {
para_id,
validators,
session_info: SessionInfo {
validators: validator_public,
discovery_keys,
validator_groups,
..Default::default()
},
group_rotation_info,
validator_peer_id,
relay_parent,
availability_core,
local_peer_id,
collator_pair,
session_index: 1,
}
}
}
impl TestState {
fn current_group_validator_indices(&self) -> &[ValidatorIndex] {
&self.session_info.validator_groups[0]
}
fn current_session_index(&self) -> SessionIndex {
self.session_index
}
fn current_group_validator_peer_ids(&self) -> Vec<PeerId> {
self.current_group_validator_indices().iter().map(|i| self.validator_peer_id[i.0 as usize].clone()).collect()
}
fn current_group_validator_authority_ids(&self) -> Vec<AuthorityDiscoveryId> {
self.current_group_validator_indices()
.iter()
.map(|i| self.session_info.discovery_keys[i.0 as usize].clone())
.collect()
}
/// Generate a new relay parent and inform the subsystem about the new view.
///
/// If `merge_views == true` it means the subsystem will be informed that we are working on the old `relay_parent`
/// and the new one.
async fn advance_to_new_round(&mut self, virtual_overseer: &mut VirtualOverseer, merge_views: bool) {
let old_relay_parent = self.relay_parent;
while self.relay_parent == old_relay_parent {
self.relay_parent.randomize();
}
let our_view = if merge_views {
our_view![old_relay_parent, self.relay_parent]
} else {
our_view![self.relay_parent]
};
overseer_send(
virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange(our_view)),
).await;
}
}
type VirtualOverseer = test_helpers::TestSubsystemContextHandle<CollatorProtocolMessage>;
struct TestHarness {
virtual_overseer: VirtualOverseer,
}
fn test_harness<T: Future<Output = VirtualOverseer>>(
local_peer_id: PeerId,
collator_pair: CollatorPair,
test: impl FnOnce(TestHarness) -> T,
) {
let pool = sp_core::testing::TaskExecutor::new();
let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
let subsystem = run(context, local_peer_id, collator_pair, Metrics::default());
let test_fut = test(TestHarness { virtual_overseer });
futures::pin_mut!(test_fut);
futures::pin_mut!(subsystem);
executor::block_on(future::join(async move {
let mut overseer = test_fut.await;
overseer_signal(&mut overseer, OverseerSignal::Conclude).await;
}, subsystem)).1.unwrap();
}
const TIMEOUT: Duration = Duration::from_millis(100);
async fn overseer_send(
overseer: &mut VirtualOverseer,
msg: CollatorProtocolMessage,
) {
tracing::trace!(?msg, "sending message");
overseer
.send(FromOverseer::Communication { msg })
.timeout(TIMEOUT)
.await
.expect(&format!("{:?} is more than enough for sending messages.", TIMEOUT));
}
async fn overseer_recv(
overseer: &mut VirtualOverseer,
) -> AllMessages {
let msg = overseer_recv_with_timeout(overseer, TIMEOUT)
.await
.expect(&format!("{:?} is more than enough to receive messages", TIMEOUT));
tracing::trace!(?msg, "received message");
msg
}
async fn overseer_recv_with_timeout(
overseer: &mut VirtualOverseer,
timeout: Duration,
) -> Option<AllMessages> {
tracing::trace!("waiting for message...");
overseer
.recv()
.timeout(timeout)
.await
}
async fn overseer_signal(
overseer: &mut VirtualOverseer,
signal: OverseerSignal,
) {
overseer
.send(FromOverseer::Signal(signal))
.timeout(TIMEOUT)
.await
.expect(&format!("{:?} is more than enough for sending signals.", TIMEOUT));
}
// Setup the system by sending the `CollateOn`, `ActiveLeaves` and `OurViewChange` messages.
async fn setup_system(virtual_overseer: &mut VirtualOverseer, test_state: &TestState) {
overseer_send(
virtual_overseer,
CollatorProtocolMessage::CollateOn(test_state.para_id),
).await;
overseer_signal(
virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: vec![ActivatedLeaf {
hash: test_state.relay_parent,
number: 1,
status: LeafStatus::Fresh,
span: Arc::new(jaeger::Span::Disabled),
}].into(),
deactivated: [][..].into(),
}),
).await;
overseer_send(
virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::OurViewChange(our_view![test_state.relay_parent]),
),
).await;
}
/// Result of [`distribute_collation`]
struct DistributeCollation {
candidate: CandidateReceipt,
pov_block: PoV,
}
/// Create some PoV and distribute it.
async fn distribute_collation(
virtual_overseer: &mut VirtualOverseer,
test_state: &TestState,
// whether or not we expect a connection request or not.
should_connect: bool,
) -> DistributeCollation {
// Now we want to distribute a PoVBlock
let pov_block = PoV {
block_data: BlockData(vec![42, 43, 44]),
};
let pov_hash = pov_block.hash();
let candidate = TestCandidateBuilder {
para_id: test_state.para_id,
relay_parent: test_state.relay_parent,
pov_hash,
..Default::default()
}.build();
overseer_send(
virtual_overseer,
CollatorProtocolMessage::DistributeCollation(candidate.clone(), pov_block.clone(), None),
).await;
// obtain the availability cores.
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::AvailabilityCores(tx)
)) => {
assert_eq!(relay_parent, test_state.relay_parent);
tx.send(Ok(vec![test_state.availability_core.clone()])).unwrap();
}
);
// We don't know precisely what is going to come as session info might be cached:
loop {
match overseer_recv(virtual_overseer).await {
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::SessionIndexForChild(tx),
)) => {
assert_eq!(relay_parent, test_state.relay_parent);
tx.send(Ok(test_state.current_session_index())).unwrap();
}
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::SessionInfo(index, tx),
)) => {
assert_eq!(relay_parent, test_state.relay_parent);
assert_eq!(index, test_state.current_session_index());
tx.send(Ok(Some(test_state.session_info.clone()))).unwrap();
}
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::ValidatorGroups(tx)
)) => {
assert_eq!(relay_parent, test_state.relay_parent);
tx.send(Ok((
test_state.session_info.validator_groups.clone(),
test_state.group_rotation_info.clone(),
))).unwrap();
// This call is mandatory - we are done:
break;
}
other =>
panic!("Unexpected message received: {:?}", other),
}
}
if should_connect {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ConnectToValidators {
..
}
) => {}
);
}
DistributeCollation {
candidate,
pov_block,
}
}
/// Connect a peer
async fn connect_peer(
virtual_overseer: &mut VirtualOverseer,
peer: PeerId,
authority_id: Option<AuthorityDiscoveryId>
) {
overseer_send(
virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(
peer.clone(),
polkadot_node_network_protocol::ObservedRole::Authority,
authority_id,
),
),
).await;
overseer_send(
virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(peer, view![]),
),
).await;
}
/// Disconnect a peer
async fn disconnect_peer(virtual_overseer: &mut VirtualOverseer, peer: PeerId) {
overseer_send(
virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerDisconnected(peer)),
).await;
}
/// Check that the next received message is a `Declare` message.
async fn expect_declare_msg(
virtual_overseer: &mut VirtualOverseer,
test_state: &TestState,
peer: &PeerId,
) {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendCollationMessage(
to,
protocol_v1::CollationProtocol::CollatorProtocol(wire_message),
)
) => {
assert_eq!(to[0], *peer);
assert_matches!(
wire_message,
protocol_v1::CollatorProtocolMessage::Declare(
collator_id,
para_id,
signature,
) => {
assert!(signature.verify(
&*protocol_v1::declare_signature_payload(&test_state.local_peer_id),
&collator_id),
);
assert_eq!(collator_id, test_state.collator_pair.public());
assert_eq!(para_id, test_state.para_id);
}
);
}
);
}
/// Check that the next received message is a collation advertisement message.
async fn expect_advertise_collation_msg(
virtual_overseer: &mut VirtualOverseer,
peer: &PeerId,
expected_relay_parent: Hash,
) {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendCollationMessage(
to,
protocol_v1::CollationProtocol::CollatorProtocol(wire_message),
)
) => {
assert_eq!(to[0], *peer);
assert_matches!(
wire_message,
protocol_v1::CollatorProtocolMessage::AdvertiseCollation(
relay_parent,
) => {
assert_eq!(relay_parent, expected_relay_parent);
}
);
}
);
}
/// Send a message that the given peer's view changed.
async fn send_peer_view_change(virtual_overseer: &mut VirtualOverseer, peer: &PeerId, hashes: Vec<Hash>) {
overseer_send(
virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(peer.clone(), View::new(hashes, 0)),
),
).await;
}
#[test]
fn advertise_and_send_collation() {
let mut test_state = TestState::default();
let local_peer_id = test_state.local_peer_id.clone();
let collator_pair = test_state.collator_pair.clone();
test_harness(local_peer_id, collator_pair, |test_harness| async move {
let mut virtual_overseer = test_harness.virtual_overseer;
setup_system(&mut virtual_overseer, &test_state).await;
let DistributeCollation { candidate, pov_block } =
distribute_collation(&mut virtual_overseer, &test_state, true).await;
for (val, peer) in test_state.current_group_validator_authority_ids()
.into_iter()
.zip(test_state.current_group_validator_peer_ids())
{
connect_peer(&mut virtual_overseer, peer.clone(), Some(val.clone())).await;
}
// We declare to the connected validators that we are a collator.
// We need to catch all `Declare` messages to the validators we've
// previosly connected to.
for peer_id in test_state.current_group_validator_peer_ids() {
expect_declare_msg(&mut virtual_overseer, &test_state, &peer_id).await;
}
let peer = test_state.current_group_validator_peer_ids()[0].clone();
// Send info about peer's view.
send_peer_view_change(&mut virtual_overseer, &peer, vec![test_state.relay_parent]).await;
// The peer is interested in a leaf that we have a collation for;
// advertise it.
expect_advertise_collation_msg(&mut virtual_overseer, &peer, test_state.relay_parent).await;
// Request a collation.
let (tx, rx) = oneshot::channel();
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::CollationFetchingRequest(
IncomingRequest::new(
peer,
CollationFetchingRequest {
relay_parent: test_state.relay_parent,
para_id: test_state.para_id,
},
tx,
)
)
).await;
assert_matches!(
rx.await,
Ok(full_response) => {
let CollationFetchingResponse::Collation(receipt, pov): CollationFetchingResponse
= CollationFetchingResponse::decode(
&mut full_response.result
.expect("We should have a proper answer").as_ref()
)
.expect("Decoding should work");
assert_eq!(receipt, candidate);
assert_eq!(pov, pov_block);
}
);
let old_relay_parent = test_state.relay_parent;
test_state.advance_to_new_round(&mut virtual_overseer, false).await;
let peer = test_state.validator_peer_id[2].clone();
// Re-request a collation.
let (tx, rx) = oneshot::channel();
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::CollationFetchingRequest(
IncomingRequest::new(
peer,
CollationFetchingRequest {
relay_parent: old_relay_parent,
para_id: test_state.para_id,
},
tx,
)
)
).await;
// Re-requesting collation should fail:
assert_matches!(
rx.await,
Err(_) => {}
);
assert!(overseer_recv_with_timeout(&mut virtual_overseer, TIMEOUT).await.is_none());
distribute_collation(&mut virtual_overseer, &test_state, true).await;
// Send info about peer's view.
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(
peer.clone(),
view![test_state.relay_parent],
)
)
).await;
expect_advertise_collation_msg(&mut virtual_overseer, &peer, test_state.relay_parent).await;
virtual_overseer
});
}
#[test]
fn collators_declare_to_connected_peers() {
let test_state = TestState::default();
let local_peer_id = test_state.local_peer_id.clone();
let collator_pair = test_state.collator_pair.clone();
test_harness(local_peer_id, collator_pair, |test_harness| async move {
let mut virtual_overseer = test_harness.virtual_overseer;
let peer = test_state.validator_peer_id[0].clone();
let validator_id = test_state.current_group_validator_authority_ids()[0].clone();
setup_system(&mut virtual_overseer, &test_state).await;
// A validator connected to us
connect_peer(&mut virtual_overseer, peer.clone(), Some(validator_id)).await;
expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
virtual_overseer
})
}
#[test]
fn collations_are_only_advertised_to_validators_with_correct_view() {
let test_state = TestState::default();
let local_peer_id = test_state.local_peer_id.clone();
let collator_pair = test_state.collator_pair.clone();
test_harness(local_peer_id, collator_pair, |test_harness| async move {
let mut virtual_overseer = test_harness.virtual_overseer;
let peer = test_state.current_group_validator_peer_ids()[0].clone();
let validator_id = test_state.current_group_validator_authority_ids()[0].clone();
let peer2 = test_state.current_group_validator_peer_ids()[1].clone();
let validator_id2 = test_state.current_group_validator_authority_ids()[1].clone();
setup_system(&mut virtual_overseer, &test_state).await;
// A validator connected to us
connect_peer(&mut virtual_overseer, peer.clone(), Some(validator_id)).await;
// Connect the second validator
connect_peer(&mut virtual_overseer, peer2.clone(), Some(validator_id2)).await;
expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
expect_declare_msg(&mut virtual_overseer, &test_state, &peer2).await;
// And let it tell us that it is has the same view.
send_peer_view_change(&mut virtual_overseer, &peer2, vec![test_state.relay_parent]).await;
distribute_collation(&mut virtual_overseer, &test_state, true).await;
expect_advertise_collation_msg(&mut virtual_overseer, &peer2, test_state.relay_parent).await;
// The other validator announces that it changed its view.
send_peer_view_change(&mut virtual_overseer, &peer, vec![test_state.relay_parent]).await;
// After changing the view we should receive the advertisement
expect_advertise_collation_msg(&mut virtual_overseer, &peer, test_state.relay_parent).await;
virtual_overseer
})
}
#[test]
fn collate_on_two_different_relay_chain_blocks() {
let mut test_state = TestState::default();
let local_peer_id = test_state.local_peer_id.clone();
let collator_pair = test_state.collator_pair.clone();
test_harness(local_peer_id, collator_pair, |test_harness| async move {
let mut virtual_overseer = test_harness.virtual_overseer;
let peer = test_state.current_group_validator_peer_ids()[0].clone();
let validator_id = test_state.current_group_validator_authority_ids()[0].clone();
let peer2 = test_state.current_group_validator_peer_ids()[1].clone();
let validator_id2 = test_state.current_group_validator_authority_ids()[1].clone();
setup_system(&mut virtual_overseer, &test_state).await;
// A validator connected to us
connect_peer(&mut virtual_overseer, peer.clone(), Some(validator_id)).await;
// Connect the second validator
connect_peer(&mut virtual_overseer, peer2.clone(), Some(validator_id2)).await;
expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
expect_declare_msg(&mut virtual_overseer, &test_state, &peer2).await;
distribute_collation(&mut virtual_overseer, &test_state, true).await;
let old_relay_parent = test_state.relay_parent;
// Advance to a new round, while informing the subsystem that the old and the new relay parent are active.
test_state.advance_to_new_round(&mut virtual_overseer, true).await;
distribute_collation(&mut virtual_overseer, &test_state, true).await;
send_peer_view_change(&mut virtual_overseer, &peer, vec![old_relay_parent]).await;
expect_advertise_collation_msg(&mut virtual_overseer, &peer, old_relay_parent).await;
send_peer_view_change(&mut virtual_overseer, &peer2, vec![test_state.relay_parent]).await;
expect_advertise_collation_msg(&mut virtual_overseer, &peer2, test_state.relay_parent).await;
virtual_overseer
})
}
#[test]
fn validator_reconnect_does_not_advertise_a_second_time() {
let test_state = TestState::default();
let local_peer_id = test_state.local_peer_id.clone();
let collator_pair = test_state.collator_pair.clone();
test_harness(local_peer_id, collator_pair, |test_harness| async move {
let mut virtual_overseer = test_harness.virtual_overseer;
let peer = test_state.current_group_validator_peer_ids()[0].clone();
let validator_id = test_state.current_group_validator_authority_ids()[0].clone();
setup_system(&mut virtual_overseer, &test_state).await;
// A validator connected to us
connect_peer(&mut virtual_overseer, peer.clone(), Some(validator_id.clone())).await;
expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
distribute_collation(&mut virtual_overseer, &test_state, true).await;
send_peer_view_change(&mut virtual_overseer, &peer, vec![test_state.relay_parent]).await;
expect_advertise_collation_msg(&mut virtual_overseer, &peer, test_state.relay_parent).await;
// Disconnect and reconnect directly
disconnect_peer(&mut virtual_overseer, peer.clone()).await;
connect_peer(&mut virtual_overseer, peer.clone(), Some(validator_id)).await;
expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
send_peer_view_change(&mut virtual_overseer, &peer, vec![test_state.relay_parent]).await;
assert!(overseer_recv_with_timeout(&mut virtual_overseer, TIMEOUT).await.is_none());
virtual_overseer
})
}
#[test]
fn collators_reject_declare_messages() {
let test_state = TestState::default();
let local_peer_id = test_state.local_peer_id.clone();
let collator_pair = test_state.collator_pair.clone();
let collator_pair2 = CollatorPair::generate().0;
test_harness(local_peer_id, collator_pair, |test_harness| async move {
let mut virtual_overseer = test_harness.virtual_overseer;
let peer = test_state.current_group_validator_peer_ids()[0].clone();
let validator_id = test_state.current_group_validator_authority_ids()[0].clone();
setup_system(&mut virtual_overseer, &test_state).await;
// A validator connected to us
connect_peer(&mut virtual_overseer, peer.clone(), Some(validator_id)).await;
expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer.clone(),
protocol_v1::CollatorProtocolMessage::Declare(
collator_pair2.public(),
ParaId::from(5),
collator_pair2.sign(b"garbage"),
),
)
)
).await;
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridge(NetworkBridgeMessage::DisconnectPeer(
p,
PeerSet::Collation,
)) if p == peer
);
virtual_overseer
})
}
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
+1 -245
View File
@@ -861,248 +861,4 @@ impl futures::Stream for Metronome
}
#[cfg(test)]
mod tests {
use super::*;
use executor::block_on;
use thiserror::Error;
use polkadot_node_jaeger as jaeger;
use polkadot_node_subsystem::{
messages::{AllMessages, CollatorProtocolMessage}, ActiveLeavesUpdate, FromOverseer, OverseerSignal,
SpawnedSubsystem, ActivatedLeaf, LeafStatus,
};
use assert_matches::assert_matches;
use futures::{channel::mpsc, executor, StreamExt, future, Future, FutureExt, SinkExt};
use polkadot_primitives::v1::Hash;
use polkadot_node_subsystem_test_helpers::{self as test_helpers, make_subsystem_context};
use std::{pin::Pin, sync::{Arc, atomic::{AtomicUsize, Ordering}}, time::Duration};
// basic usage: in a nutshell, when you want to define a subsystem, just focus on what its jobs do;
// you can leave the subsystem itself to the job manager.
// for purposes of demonstration, we're going to whip up a fake subsystem.
// this will 'select' candidates which are pre-loaded in the job
// job structs are constructed within JobTrait::run
// most will want to retain the sender and receiver, as well as whatever other data they like
struct FakeCollatorProtocolJob {
receiver: mpsc::Receiver<CollatorProtocolMessage>,
}
// Error will mostly be a wrapper to make the try operator more convenient;
// deriving From implementations for most variants is recommended.
// It must implement Debug for logging.
#[derive(Debug, Error)]
enum Error {
#[error(transparent)]
Sending(#[from]mpsc::SendError),
}
impl JobTrait for FakeCollatorProtocolJob {
type ToJob = CollatorProtocolMessage;
type Error = Error;
type RunArgs = bool;
type Metrics = ();
const NAME: &'static str = "FakeCollatorProtocolJob";
/// Run a job for the parent block indicated
//
// this function is in charge of creating and executing the job's main loop
fn run<S: SubsystemSender>(
_: Hash,
_: Arc<jaeger::Span>,
run_args: Self::RunArgs,
_metrics: Self::Metrics,
receiver: mpsc::Receiver<CollatorProtocolMessage>,
mut sender: JobSender<S>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
async move {
let job = FakeCollatorProtocolJob { receiver };
if run_args {
sender.send_message(CollatorProtocolMessage::Invalid(
Default::default(),
Default::default(),
).into()).await;
}
// it isn't necessary to break run_loop into its own function,
// but it's convenient to separate the concerns in this way
job.run_loop().await
}
.boxed()
}
}
impl FakeCollatorProtocolJob {
async fn run_loop(mut self) -> Result<(), Error> {
loop {
match self.receiver.next().await {
Some(_csm) => {
unimplemented!("we'd report the collator to the peer set manager here, but that's not implemented yet");
}
None => break,
}
}
Ok(())
}
}
// with the job defined, it's straightforward to get a subsystem implementation.
type FakeCollatorProtocolSubsystem<Spawner> =
JobSubsystem<FakeCollatorProtocolJob, Spawner>;
// this type lets us pretend to be the overseer
type OverseerHandle = test_helpers::TestSubsystemContextHandle<CollatorProtocolMessage>;
fn test_harness<T: Future<Output = ()>>(
run_args: bool,
test: impl FnOnce(OverseerHandle) -> T,
) {
let _ = env_logger::builder()
.is_test(true)
.filter(
None,
log::LevelFilter::Trace,
)
.try_init();
let pool = sp_core::testing::TaskExecutor::new();
let (context, overseer_handle) = make_subsystem_context(pool.clone());
let subsystem = FakeCollatorProtocolSubsystem::new(
pool,
run_args,
(),
).run(context);
let test_future = test(overseer_handle);
futures::pin_mut!(subsystem, test_future);
executor::block_on(async move {
future::join(subsystem, test_future)
.timeout(Duration::from_secs(2))
.await
.expect("test timed out instead of completing")
});
}
#[test]
fn starting_and_stopping_job_works() {
let relay_parent: Hash = [0; 32].into();
test_harness(true, |mut overseer_handle| async move {
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: relay_parent,
number: 1,
status: LeafStatus::Fresh,
span: Arc::new(jaeger::Span::Disabled),
}),
)))
.await;
assert_matches!(
overseer_handle.recv().await,
AllMessages::CollatorProtocol(_)
);
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::stop_work(relay_parent),
)))
.await;
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::Conclude))
.await;
});
}
#[test]
fn sending_to_a_non_running_job_do_not_stop_the_subsystem() {
let relay_parent = Hash::repeat_byte(0x01);
test_harness(true, |mut overseer_handle| async move {
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: relay_parent,
number: 1,
status: LeafStatus::Fresh,
span: Arc::new(jaeger::Span::Disabled),
}),
)))
.await;
// send to a non running job
overseer_handle
.send(FromOverseer::Communication {
msg: Default::default(),
})
.await;
// the subsystem is still alive
assert_matches!(
overseer_handle.recv().await,
AllMessages::CollatorProtocol(_)
);
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::Conclude))
.await;
});
}
#[test]
fn test_subsystem_impl_and_name_derivation() {
let pool = sp_core::testing::TaskExecutor::new();
let (context, _) = make_subsystem_context::<CollatorProtocolMessage, _>(pool.clone());
let SpawnedSubsystem { name, .. } =
FakeCollatorProtocolSubsystem::new(pool, false, ()).start(context);
assert_eq!(name, "FakeCollatorProtocol");
}
#[test]
fn tick_tack_metronome() {
let n = Arc::new(AtomicUsize::default());
let (tick, mut block) = mpsc::unbounded();
let metronome = {
let n = n.clone();
let stream = Metronome::new(Duration::from_millis(137_u64));
stream.for_each(move |_res| {
let _ = n.fetch_add(1, Ordering::Relaxed);
let mut tick = tick.clone();
async move {
tick.send(()).await.expect("Test helper channel works. qed");
}
}).fuse()
};
let f2 = async move {
block.next().await;
assert_eq!(n.load(Ordering::Relaxed), 1_usize);
block.next().await;
assert_eq!(n.load(Ordering::Relaxed), 2_usize);
block.next().await;
assert_eq!(n.load(Ordering::Relaxed), 3_usize);
block.next().await;
assert_eq!(n.load(Ordering::Relaxed), 4_usize);
}.fuse();
futures::pin_mut!(f2);
futures::pin_mut!(metronome);
block_on(async move {
// futures::join!(metronome, f2)
futures::select!(
_ = metronome => unreachable!("Metronome never stops. qed"),
_ = f2 => (),
)
});
}
}
mod tests;
+259
View File
@@ -0,0 +1,259 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use super::*;
use executor::block_on;
use thiserror::Error;
use polkadot_node_jaeger as jaeger;
use polkadot_node_subsystem::{
messages::{AllMessages, CollatorProtocolMessage}, ActiveLeavesUpdate, FromOverseer, OverseerSignal,
SpawnedSubsystem, ActivatedLeaf, LeafStatus,
};
use assert_matches::assert_matches;
use futures::{channel::mpsc, executor, StreamExt, future, Future, FutureExt, SinkExt};
use polkadot_primitives::v1::Hash;
use polkadot_node_subsystem_test_helpers::{self as test_helpers, make_subsystem_context};
use std::{pin::Pin, sync::{Arc, atomic::{AtomicUsize, Ordering}}, time::Duration};
// basic usage: in a nutshell, when you want to define a subsystem, just focus on what its jobs do;
// you can leave the subsystem itself to the job manager.
// for purposes of demonstration, we're going to whip up a fake subsystem.
// this will 'select' candidates which are pre-loaded in the job
// job structs are constructed within JobTrait::run
// most will want to retain the sender and receiver, as well as whatever other data they like
struct FakeCollatorProtocolJob {
receiver: mpsc::Receiver<CollatorProtocolMessage>,
}
// Error will mostly be a wrapper to make the try operator more convenient;
// deriving From implementations for most variants is recommended.
// It must implement Debug for logging.
#[derive(Debug, Error)]
enum Error {
#[error(transparent)]
Sending(#[from]mpsc::SendError),
}
impl JobTrait for FakeCollatorProtocolJob {
type ToJob = CollatorProtocolMessage;
type Error = Error;
type RunArgs = bool;
type Metrics = ();
const NAME: &'static str = "FakeCollatorProtocolJob";
/// Run a job for the parent block indicated
//
// this function is in charge of creating and executing the job's main loop
fn run<S: SubsystemSender>(
_: Hash,
_: Arc<jaeger::Span>,
run_args: Self::RunArgs,
_metrics: Self::Metrics,
receiver: mpsc::Receiver<CollatorProtocolMessage>,
mut sender: JobSender<S>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
async move {
let job = FakeCollatorProtocolJob { receiver };
if run_args {
sender.send_message(CollatorProtocolMessage::Invalid(
Default::default(),
Default::default(),
).into()).await;
}
// it isn't necessary to break run_loop into its own function,
// but it's convenient to separate the concerns in this way
job.run_loop().await
}
.boxed()
}
}
impl FakeCollatorProtocolJob {
async fn run_loop(mut self) -> Result<(), Error> {
loop {
match self.receiver.next().await {
Some(_csm) => {
unimplemented!("we'd report the collator to the peer set manager here, but that's not implemented yet");
}
None => break,
}
}
Ok(())
}
}
// with the job defined, it's straightforward to get a subsystem implementation.
type FakeCollatorProtocolSubsystem<Spawner> =
JobSubsystem<FakeCollatorProtocolJob, Spawner>;
// this type lets us pretend to be the overseer
type OverseerHandle = test_helpers::TestSubsystemContextHandle<CollatorProtocolMessage>;
fn test_harness<T: Future<Output = ()>>(
run_args: bool,
test: impl FnOnce(OverseerHandle) -> T,
) {
let _ = env_logger::builder()
.is_test(true)
.filter(
None,
log::LevelFilter::Trace,
)
.try_init();
let pool = sp_core::testing::TaskExecutor::new();
let (context, overseer_handle) = make_subsystem_context(pool.clone());
let subsystem = FakeCollatorProtocolSubsystem::new(
pool,
run_args,
(),
).run(context);
let test_future = test(overseer_handle);
futures::pin_mut!(subsystem, test_future);
executor::block_on(async move {
future::join(subsystem, test_future)
.timeout(Duration::from_secs(2))
.await
.expect("test timed out instead of completing")
});
}
#[test]
fn starting_and_stopping_job_works() {
let relay_parent: Hash = [0; 32].into();
test_harness(true, |mut overseer_handle| async move {
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: relay_parent,
number: 1,
status: LeafStatus::Fresh,
span: Arc::new(jaeger::Span::Disabled),
}),
)))
.await;
assert_matches!(
overseer_handle.recv().await,
AllMessages::CollatorProtocol(_)
);
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::stop_work(relay_parent),
)))
.await;
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::Conclude))
.await;
});
}
#[test]
fn sending_to_a_non_running_job_do_not_stop_the_subsystem() {
let relay_parent = Hash::repeat_byte(0x01);
test_harness(true, |mut overseer_handle| async move {
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: relay_parent,
number: 1,
status: LeafStatus::Fresh,
span: Arc::new(jaeger::Span::Disabled),
}),
)))
.await;
// send to a non running job
overseer_handle
.send(FromOverseer::Communication {
msg: Default::default(),
})
.await;
// the subsystem is still alive
assert_matches!(
overseer_handle.recv().await,
AllMessages::CollatorProtocol(_)
);
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::Conclude))
.await;
});
}
#[test]
fn test_subsystem_impl_and_name_derivation() {
let pool = sp_core::testing::TaskExecutor::new();
let (context, _) = make_subsystem_context::<CollatorProtocolMessage, _>(pool.clone());
let SpawnedSubsystem { name, .. } =
FakeCollatorProtocolSubsystem::new(pool, false, ()).start(context);
assert_eq!(name, "FakeCollatorProtocol");
}
#[test]
fn tick_tack_metronome() {
let n = Arc::new(AtomicUsize::default());
let (tick, mut block) = mpsc::unbounded();
let metronome = {
let n = n.clone();
let stream = Metronome::new(Duration::from_millis(137_u64));
stream.for_each(move |_res| {
let _ = n.fetch_add(1, Ordering::Relaxed);
let mut tick = tick.clone();
async move {
tick.send(()).await.expect("Test helper channel works. qed");
}
}).fuse()
};
let f2 = async move {
block.next().await;
assert_eq!(n.load(Ordering::Relaxed), 1_usize);
block.next().await;
assert_eq!(n.load(Ordering::Relaxed), 2_usize);
block.next().await;
assert_eq!(n.load(Ordering::Relaxed), 3_usize);
block.next().await;
assert_eq!(n.load(Ordering::Relaxed), 4_usize);
}.fuse();
futures::pin_mut!(f2);
futures::pin_mut!(metronome);
block_on(async move {
// futures::join!(metronome, f2)
futures::select!(
_ = metronome => unreachable!("Metronome never stops. qed"),
_ = f2 => (),
)
});
}