mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 07:41:08 +00:00
@@ -49,6 +49,9 @@ use std::sync::Arc;
|
||||
|
||||
mod error;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
const LOG_TARGET: &'static str = "parachain::collation-generation";
|
||||
|
||||
/// Collation Generation Subsystem
|
||||
@@ -506,6 +509,3 @@ impl metrics::Metrics for Metrics {
|
||||
Ok(Metrics(Some(metrics)))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
@@ -15,358 +15,358 @@
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
mod handle_new_activations {
|
||||
use super::super::*;
|
||||
use futures::{
|
||||
lock::Mutex,
|
||||
task::{Context as FuturesContext, Poll},
|
||||
Future,
|
||||
};
|
||||
use polkadot_node_primitives::{Collation, CollationResult, BlockData, PoV, POV_BOMB_LIMIT};
|
||||
use polkadot_node_subsystem::messages::{
|
||||
AllMessages, RuntimeApiMessage, RuntimeApiRequest,
|
||||
};
|
||||
use polkadot_node_subsystem_test_helpers::{
|
||||
subsystem_test_harness, TestSubsystemContextHandle,
|
||||
};
|
||||
use polkadot_primitives::v1::{
|
||||
CollatorPair, Id as ParaId, PersistedValidationData, ScheduledCore, ValidationCode,
|
||||
};
|
||||
use std::pin::Pin;
|
||||
use super::super::*;
|
||||
use futures::{
|
||||
lock::Mutex,
|
||||
task::{Context as FuturesContext, Poll},
|
||||
Future,
|
||||
};
|
||||
use polkadot_node_primitives::{Collation, CollationResult, BlockData, PoV, POV_BOMB_LIMIT};
|
||||
use polkadot_node_subsystem::messages::{
|
||||
AllMessages, RuntimeApiMessage, RuntimeApiRequest,
|
||||
};
|
||||
use polkadot_node_subsystem_test_helpers::{
|
||||
subsystem_test_harness, TestSubsystemContextHandle,
|
||||
};
|
||||
use polkadot_primitives::v1::{
|
||||
CollatorPair, Id as ParaId, PersistedValidationData, ScheduledCore, ValidationCode,
|
||||
};
|
||||
use std::pin::Pin;
|
||||
|
||||
fn test_collation() -> Collation {
|
||||
Collation {
|
||||
upward_messages: Default::default(),
|
||||
horizontal_messages: Default::default(),
|
||||
new_validation_code: Default::default(),
|
||||
head_data: Default::default(),
|
||||
proof_of_validity: PoV {
|
||||
block_data: BlockData(Vec::new()),
|
||||
},
|
||||
processed_downward_messages: Default::default(),
|
||||
hrmp_watermark: Default::default(),
|
||||
}
|
||||
}
|
||||
fn test_collation() -> Collation {
|
||||
Collation {
|
||||
upward_messages: Default::default(),
|
||||
horizontal_messages: Default::default(),
|
||||
new_validation_code: Default::default(),
|
||||
head_data: Default::default(),
|
||||
proof_of_validity: PoV {
|
||||
block_data: BlockData(Vec::new()),
|
||||
},
|
||||
processed_downward_messages: Default::default(),
|
||||
hrmp_watermark: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn test_collation_compressed() -> Collation {
|
||||
let mut collation = test_collation();
|
||||
let compressed = PoV {
|
||||
block_data: BlockData(sp_maybe_compressed_blob::compress(
|
||||
&collation.proof_of_validity.block_data.0,
|
||||
POV_BOMB_LIMIT,
|
||||
).unwrap())
|
||||
};
|
||||
collation.proof_of_validity = compressed;
|
||||
collation
|
||||
}
|
||||
fn test_collation_compressed() -> Collation {
|
||||
let mut collation = test_collation();
|
||||
let compressed = PoV {
|
||||
block_data: BlockData(sp_maybe_compressed_blob::compress(
|
||||
&collation.proof_of_validity.block_data.0,
|
||||
POV_BOMB_LIMIT,
|
||||
).unwrap())
|
||||
};
|
||||
collation.proof_of_validity = compressed;
|
||||
collation
|
||||
}
|
||||
|
||||
fn test_validation_data() -> PersistedValidationData {
|
||||
let mut persisted_validation_data: PersistedValidationData = Default::default();
|
||||
persisted_validation_data.max_pov_size = 1024;
|
||||
persisted_validation_data
|
||||
}
|
||||
fn test_validation_data() -> PersistedValidationData {
|
||||
let mut persisted_validation_data: PersistedValidationData = Default::default();
|
||||
persisted_validation_data.max_pov_size = 1024;
|
||||
persisted_validation_data
|
||||
}
|
||||
|
||||
// Box<dyn Future<Output = Collation> + Unpin + Send
|
||||
struct TestCollator;
|
||||
// Box<dyn Future<Output = Collation> + Unpin + Send
|
||||
struct TestCollator;
|
||||
|
||||
impl Future for TestCollator {
|
||||
type Output = Option<CollationResult>;
|
||||
impl Future for TestCollator {
|
||||
type Output = Option<CollationResult>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, _cx: &mut FuturesContext) -> Poll<Self::Output> {
|
||||
Poll::Ready(Some(CollationResult { collation: test_collation(), result_sender: None }))
|
||||
}
|
||||
}
|
||||
fn poll(self: Pin<&mut Self>, _cx: &mut FuturesContext) -> Poll<Self::Output> {
|
||||
Poll::Ready(Some(CollationResult { collation: test_collation(), result_sender: None }))
|
||||
}
|
||||
}
|
||||
|
||||
impl Unpin for TestCollator {}
|
||||
impl Unpin for TestCollator {}
|
||||
|
||||
fn test_config<Id: Into<ParaId>>(para_id: Id) -> Arc<CollationGenerationConfig> {
|
||||
Arc::new(CollationGenerationConfig {
|
||||
key: CollatorPair::generate().0,
|
||||
collator: Box::new(|_: Hash, _vd: &PersistedValidationData| {
|
||||
TestCollator.boxed()
|
||||
}),
|
||||
para_id: para_id.into(),
|
||||
})
|
||||
}
|
||||
fn test_config<Id: Into<ParaId>>(para_id: Id) -> Arc<CollationGenerationConfig> {
|
||||
Arc::new(CollationGenerationConfig {
|
||||
key: CollatorPair::generate().0,
|
||||
collator: Box::new(|_: Hash, _vd: &PersistedValidationData| {
|
||||
TestCollator.boxed()
|
||||
}),
|
||||
para_id: para_id.into(),
|
||||
})
|
||||
}
|
||||
|
||||
fn scheduled_core_for<Id: Into<ParaId>>(para_id: Id) -> ScheduledCore {
|
||||
ScheduledCore {
|
||||
para_id: para_id.into(),
|
||||
collator: None,
|
||||
}
|
||||
}
|
||||
fn scheduled_core_for<Id: Into<ParaId>>(para_id: Id) -> ScheduledCore {
|
||||
ScheduledCore {
|
||||
para_id: para_id.into(),
|
||||
collator: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn requests_availability_per_relay_parent() {
|
||||
let activated_hashes: Vec<Hash> = vec![
|
||||
[1; 32].into(),
|
||||
[4; 32].into(),
|
||||
[9; 32].into(),
|
||||
[16; 32].into(),
|
||||
];
|
||||
#[test]
|
||||
fn requests_availability_per_relay_parent() {
|
||||
let activated_hashes: Vec<Hash> = vec![
|
||||
[1; 32].into(),
|
||||
[4; 32].into(),
|
||||
[9; 32].into(),
|
||||
[16; 32].into(),
|
||||
];
|
||||
|
||||
let requested_availability_cores = Arc::new(Mutex::new(Vec::new()));
|
||||
let requested_availability_cores = Arc::new(Mutex::new(Vec::new()));
|
||||
|
||||
let overseer_requested_availability_cores = requested_availability_cores.clone();
|
||||
let overseer = |mut handle: TestSubsystemContextHandle<CollationGenerationMessage>| async move {
|
||||
loop {
|
||||
match handle.try_recv().await {
|
||||
None => break,
|
||||
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(hash, RuntimeApiRequest::AvailabilityCores(tx)))) => {
|
||||
overseer_requested_availability_cores.lock().await.push(hash);
|
||||
tx.send(Ok(vec![])).unwrap();
|
||||
}
|
||||
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(_hash, RuntimeApiRequest::Validators(tx)))) => {
|
||||
tx.send(Ok(vec![Default::default(); 3])).unwrap();
|
||||
}
|
||||
Some(msg) => panic!("didn't expect any other overseer requests given no availability cores; got {:?}", msg),
|
||||
}
|
||||
}
|
||||
};
|
||||
let overseer_requested_availability_cores = requested_availability_cores.clone();
|
||||
let overseer = |mut handle: TestSubsystemContextHandle<CollationGenerationMessage>| async move {
|
||||
loop {
|
||||
match handle.try_recv().await {
|
||||
None => break,
|
||||
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(hash, RuntimeApiRequest::AvailabilityCores(tx)))) => {
|
||||
overseer_requested_availability_cores.lock().await.push(hash);
|
||||
tx.send(Ok(vec![])).unwrap();
|
||||
}
|
||||
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(_hash, RuntimeApiRequest::Validators(tx)))) => {
|
||||
tx.send(Ok(vec![Default::default(); 3])).unwrap();
|
||||
}
|
||||
Some(msg) => panic!("didn't expect any other overseer requests given no availability cores; got {:?}", msg),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let (tx, _rx) = mpsc::channel(0);
|
||||
let (tx, _rx) = mpsc::channel(0);
|
||||
|
||||
let subsystem_activated_hashes = activated_hashes.clone();
|
||||
subsystem_test_harness(overseer, |mut ctx| async move {
|
||||
handle_new_activations(
|
||||
test_config(123u32),
|
||||
subsystem_activated_hashes,
|
||||
&mut ctx,
|
||||
Metrics(None),
|
||||
&tx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
});
|
||||
let subsystem_activated_hashes = activated_hashes.clone();
|
||||
subsystem_test_harness(overseer, |mut ctx| async move {
|
||||
handle_new_activations(
|
||||
test_config(123u32),
|
||||
subsystem_activated_hashes,
|
||||
&mut ctx,
|
||||
Metrics(None),
|
||||
&tx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
let mut requested_availability_cores = Arc::try_unwrap(requested_availability_cores)
|
||||
.expect("overseer should have shut down by now")
|
||||
.into_inner();
|
||||
requested_availability_cores.sort();
|
||||
let mut requested_availability_cores = Arc::try_unwrap(requested_availability_cores)
|
||||
.expect("overseer should have shut down by now")
|
||||
.into_inner();
|
||||
requested_availability_cores.sort();
|
||||
|
||||
assert_eq!(requested_availability_cores, activated_hashes);
|
||||
}
|
||||
assert_eq!(requested_availability_cores, activated_hashes);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn requests_validation_data_for_scheduled_matches() {
|
||||
let activated_hashes: Vec<Hash> = vec![
|
||||
Hash::repeat_byte(1),
|
||||
Hash::repeat_byte(4),
|
||||
Hash::repeat_byte(9),
|
||||
Hash::repeat_byte(16),
|
||||
];
|
||||
#[test]
|
||||
fn requests_validation_data_for_scheduled_matches() {
|
||||
let activated_hashes: Vec<Hash> = vec![
|
||||
Hash::repeat_byte(1),
|
||||
Hash::repeat_byte(4),
|
||||
Hash::repeat_byte(9),
|
||||
Hash::repeat_byte(16),
|
||||
];
|
||||
|
||||
let requested_validation_data = Arc::new(Mutex::new(Vec::new()));
|
||||
let requested_validation_data = Arc::new(Mutex::new(Vec::new()));
|
||||
|
||||
let overseer_requested_validation_data = requested_validation_data.clone();
|
||||
let overseer = |mut handle: TestSubsystemContextHandle<CollationGenerationMessage>| async move {
|
||||
loop {
|
||||
match handle.try_recv().await {
|
||||
None => break,
|
||||
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
hash,
|
||||
RuntimeApiRequest::AvailabilityCores(tx),
|
||||
))) => {
|
||||
tx.send(Ok(vec![
|
||||
CoreState::Free,
|
||||
// this is weird, see explanation below
|
||||
CoreState::Scheduled(scheduled_core_for(
|
||||
(hash.as_fixed_bytes()[0] * 4) as u32,
|
||||
)),
|
||||
CoreState::Scheduled(scheduled_core_for(
|
||||
(hash.as_fixed_bytes()[0] * 5) as u32,
|
||||
)),
|
||||
]))
|
||||
.unwrap();
|
||||
}
|
||||
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
hash,
|
||||
RuntimeApiRequest::PersistedValidationData(
|
||||
_para_id,
|
||||
_occupied_core_assumption,
|
||||
tx,
|
||||
),
|
||||
))) => {
|
||||
overseer_requested_validation_data
|
||||
.lock()
|
||||
.await
|
||||
.push(hash);
|
||||
tx.send(Ok(Default::default())).unwrap();
|
||||
}
|
||||
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
_hash,
|
||||
RuntimeApiRequest::Validators(tx),
|
||||
))) => {
|
||||
tx.send(Ok(vec![Default::default(); 3])).unwrap();
|
||||
}
|
||||
Some(msg) => {
|
||||
panic!("didn't expect any other overseer requests; got {:?}", msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
let overseer_requested_validation_data = requested_validation_data.clone();
|
||||
let overseer = |mut handle: TestSubsystemContextHandle<CollationGenerationMessage>| async move {
|
||||
loop {
|
||||
match handle.try_recv().await {
|
||||
None => break,
|
||||
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
hash,
|
||||
RuntimeApiRequest::AvailabilityCores(tx),
|
||||
))) => {
|
||||
tx.send(Ok(vec![
|
||||
CoreState::Free,
|
||||
// this is weird, see explanation below
|
||||
CoreState::Scheduled(scheduled_core_for(
|
||||
(hash.as_fixed_bytes()[0] * 4) as u32,
|
||||
)),
|
||||
CoreState::Scheduled(scheduled_core_for(
|
||||
(hash.as_fixed_bytes()[0] * 5) as u32,
|
||||
)),
|
||||
]))
|
||||
.unwrap();
|
||||
}
|
||||
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
hash,
|
||||
RuntimeApiRequest::PersistedValidationData(
|
||||
_para_id,
|
||||
_occupied_core_assumption,
|
||||
tx,
|
||||
),
|
||||
))) => {
|
||||
overseer_requested_validation_data
|
||||
.lock()
|
||||
.await
|
||||
.push(hash);
|
||||
tx.send(Ok(Default::default())).unwrap();
|
||||
}
|
||||
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
_hash,
|
||||
RuntimeApiRequest::Validators(tx),
|
||||
))) => {
|
||||
tx.send(Ok(vec![Default::default(); 3])).unwrap();
|
||||
}
|
||||
Some(msg) => {
|
||||
panic!("didn't expect any other overseer requests; got {:?}", msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let (tx, _rx) = mpsc::channel(0);
|
||||
let (tx, _rx) = mpsc::channel(0);
|
||||
|
||||
subsystem_test_harness(overseer, |mut ctx| async move {
|
||||
handle_new_activations(test_config(16), activated_hashes, &mut ctx, Metrics(None), &tx)
|
||||
.await
|
||||
.unwrap();
|
||||
});
|
||||
subsystem_test_harness(overseer, |mut ctx| async move {
|
||||
handle_new_activations(test_config(16), activated_hashes, &mut ctx, Metrics(None), &tx)
|
||||
.await
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
let requested_validation_data = Arc::try_unwrap(requested_validation_data)
|
||||
.expect("overseer should have shut down by now")
|
||||
.into_inner();
|
||||
let requested_validation_data = Arc::try_unwrap(requested_validation_data)
|
||||
.expect("overseer should have shut down by now")
|
||||
.into_inner();
|
||||
|
||||
// the only activated hash should be from the 4 hash:
|
||||
// each activated hash generates two scheduled cores: one with its value * 4, one with its value * 5
|
||||
// given that the test configuration has a para_id of 16, there's only one way to get that value: with the 4
|
||||
// hash.
|
||||
assert_eq!(requested_validation_data, vec![[4; 32].into()]);
|
||||
}
|
||||
// the only activated hash should be from the 4 hash:
|
||||
// each activated hash generates two scheduled cores: one with its value * 4, one with its value * 5
|
||||
// given that the test configuration has a para_id of 16, there's only one way to get that value: with the 4
|
||||
// hash.
|
||||
assert_eq!(requested_validation_data, vec![[4; 32].into()]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sends_distribute_collation_message() {
|
||||
let activated_hashes: Vec<Hash> = vec![
|
||||
Hash::repeat_byte(1),
|
||||
Hash::repeat_byte(4),
|
||||
Hash::repeat_byte(9),
|
||||
Hash::repeat_byte(16),
|
||||
];
|
||||
#[test]
|
||||
fn sends_distribute_collation_message() {
|
||||
let activated_hashes: Vec<Hash> = vec![
|
||||
Hash::repeat_byte(1),
|
||||
Hash::repeat_byte(4),
|
||||
Hash::repeat_byte(9),
|
||||
Hash::repeat_byte(16),
|
||||
];
|
||||
|
||||
let overseer = |mut handle: TestSubsystemContextHandle<CollationGenerationMessage>| async move {
|
||||
loop {
|
||||
match handle.try_recv().await {
|
||||
None => break,
|
||||
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
hash,
|
||||
RuntimeApiRequest::AvailabilityCores(tx),
|
||||
))) => {
|
||||
tx.send(Ok(vec![
|
||||
CoreState::Free,
|
||||
// this is weird, see explanation below
|
||||
CoreState::Scheduled(scheduled_core_for(
|
||||
(hash.as_fixed_bytes()[0] * 4) as u32,
|
||||
)),
|
||||
CoreState::Scheduled(scheduled_core_for(
|
||||
(hash.as_fixed_bytes()[0] * 5) as u32,
|
||||
)),
|
||||
]))
|
||||
.unwrap();
|
||||
}
|
||||
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
_hash,
|
||||
RuntimeApiRequest::PersistedValidationData(
|
||||
_para_id,
|
||||
_occupied_core_assumption,
|
||||
tx,
|
||||
),
|
||||
))) => {
|
||||
tx.send(Ok(Some(test_validation_data()))).unwrap();
|
||||
}
|
||||
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
_hash,
|
||||
RuntimeApiRequest::Validators(tx),
|
||||
))) => {
|
||||
tx.send(Ok(vec![Default::default(); 3])).unwrap();
|
||||
}
|
||||
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
_hash,
|
||||
RuntimeApiRequest::ValidationCode(
|
||||
_para_id,
|
||||
OccupiedCoreAssumption::Free,
|
||||
tx,
|
||||
),
|
||||
))) => {
|
||||
tx.send(Ok(Some(ValidationCode(vec![1, 2, 3])))).unwrap();
|
||||
}
|
||||
Some(msg) => {
|
||||
panic!("didn't expect any other overseer requests; got {:?}", msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
let overseer = |mut handle: TestSubsystemContextHandle<CollationGenerationMessage>| async move {
|
||||
loop {
|
||||
match handle.try_recv().await {
|
||||
None => break,
|
||||
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
hash,
|
||||
RuntimeApiRequest::AvailabilityCores(tx),
|
||||
))) => {
|
||||
tx.send(Ok(vec![
|
||||
CoreState::Free,
|
||||
// this is weird, see explanation below
|
||||
CoreState::Scheduled(scheduled_core_for(
|
||||
(hash.as_fixed_bytes()[0] * 4) as u32,
|
||||
)),
|
||||
CoreState::Scheduled(scheduled_core_for(
|
||||
(hash.as_fixed_bytes()[0] * 5) as u32,
|
||||
)),
|
||||
]))
|
||||
.unwrap();
|
||||
}
|
||||
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
_hash,
|
||||
RuntimeApiRequest::PersistedValidationData(
|
||||
_para_id,
|
||||
_occupied_core_assumption,
|
||||
tx,
|
||||
),
|
||||
))) => {
|
||||
tx.send(Ok(Some(test_validation_data()))).unwrap();
|
||||
}
|
||||
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
_hash,
|
||||
RuntimeApiRequest::Validators(tx),
|
||||
))) => {
|
||||
tx.send(Ok(vec![Default::default(); 3])).unwrap();
|
||||
}
|
||||
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
_hash,
|
||||
RuntimeApiRequest::ValidationCode(
|
||||
_para_id,
|
||||
OccupiedCoreAssumption::Free,
|
||||
tx,
|
||||
),
|
||||
))) => {
|
||||
tx.send(Ok(Some(ValidationCode(vec![1, 2, 3])))).unwrap();
|
||||
}
|
||||
Some(msg) => {
|
||||
panic!("didn't expect any other overseer requests; got {:?}", msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let config = test_config(16);
|
||||
let subsystem_config = config.clone();
|
||||
let config = test_config(16);
|
||||
let subsystem_config = config.clone();
|
||||
|
||||
let (tx, rx) = mpsc::channel(0);
|
||||
let (tx, rx) = mpsc::channel(0);
|
||||
|
||||
// empty vec doesn't allocate on the heap, so it's ok we throw it away
|
||||
let sent_messages = Arc::new(Mutex::new(Vec::new()));
|
||||
let subsystem_sent_messages = sent_messages.clone();
|
||||
subsystem_test_harness(overseer, |mut ctx| async move {
|
||||
handle_new_activations(subsystem_config, activated_hashes, &mut ctx, Metrics(None), &tx)
|
||||
.await
|
||||
.unwrap();
|
||||
// empty vec doesn't allocate on the heap, so it's ok we throw it away
|
||||
let sent_messages = Arc::new(Mutex::new(Vec::new()));
|
||||
let subsystem_sent_messages = sent_messages.clone();
|
||||
subsystem_test_harness(overseer, |mut ctx| async move {
|
||||
handle_new_activations(subsystem_config, activated_hashes, &mut ctx, Metrics(None), &tx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
std::mem::drop(tx);
|
||||
std::mem::drop(tx);
|
||||
|
||||
// collect all sent messages
|
||||
*subsystem_sent_messages.lock().await = rx.collect().await;
|
||||
});
|
||||
// collect all sent messages
|
||||
*subsystem_sent_messages.lock().await = rx.collect().await;
|
||||
});
|
||||
|
||||
let sent_messages = Arc::try_unwrap(sent_messages)
|
||||
.expect("subsystem should have shut down by now")
|
||||
.into_inner();
|
||||
let sent_messages = Arc::try_unwrap(sent_messages)
|
||||
.expect("subsystem should have shut down by now")
|
||||
.into_inner();
|
||||
|
||||
// we expect a single message to be sent, containing a candidate receipt.
|
||||
// we don't care too much about the commitments_hash right now, but let's ensure that we've calculated the
|
||||
// correct descriptor
|
||||
let expect_pov_hash = test_collation_compressed().proof_of_validity.hash();
|
||||
let expect_validation_data_hash = test_validation_data().hash();
|
||||
let expect_relay_parent = Hash::repeat_byte(4);
|
||||
let expect_validation_code_hash = ValidationCode(vec![1, 2, 3]).hash();
|
||||
let expect_payload = collator_signature_payload(
|
||||
&expect_relay_parent,
|
||||
&config.para_id,
|
||||
&expect_validation_data_hash,
|
||||
&expect_pov_hash,
|
||||
&expect_validation_code_hash,
|
||||
);
|
||||
let expect_descriptor = CandidateDescriptor {
|
||||
signature: config.key.sign(&expect_payload),
|
||||
para_id: config.para_id,
|
||||
relay_parent: expect_relay_parent,
|
||||
collator: config.key.public(),
|
||||
persisted_validation_data_hash: expect_validation_data_hash,
|
||||
pov_hash: expect_pov_hash,
|
||||
erasure_root: Default::default(), // this isn't something we're checking right now
|
||||
para_head: test_collation().head_data.hash(),
|
||||
validation_code_hash: expect_validation_code_hash,
|
||||
};
|
||||
// we expect a single message to be sent, containing a candidate receipt.
|
||||
// we don't care too much about the commitments_hash right now, but let's ensure that we've calculated the
|
||||
// correct descriptor
|
||||
let expect_pov_hash = test_collation_compressed().proof_of_validity.hash();
|
||||
let expect_validation_data_hash = test_validation_data().hash();
|
||||
let expect_relay_parent = Hash::repeat_byte(4);
|
||||
let expect_validation_code_hash = ValidationCode(vec![1, 2, 3]).hash();
|
||||
let expect_payload = collator_signature_payload(
|
||||
&expect_relay_parent,
|
||||
&config.para_id,
|
||||
&expect_validation_data_hash,
|
||||
&expect_pov_hash,
|
||||
&expect_validation_code_hash,
|
||||
);
|
||||
let expect_descriptor = CandidateDescriptor {
|
||||
signature: config.key.sign(&expect_payload),
|
||||
para_id: config.para_id,
|
||||
relay_parent: expect_relay_parent,
|
||||
collator: config.key.public(),
|
||||
persisted_validation_data_hash: expect_validation_data_hash,
|
||||
pov_hash: expect_pov_hash,
|
||||
erasure_root: Default::default(), // this isn't something we're checking right now
|
||||
para_head: test_collation().head_data.hash(),
|
||||
validation_code_hash: expect_validation_code_hash,
|
||||
};
|
||||
|
||||
assert_eq!(sent_messages.len(), 1);
|
||||
match &sent_messages[0] {
|
||||
AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation(
|
||||
CandidateReceipt { descriptor, .. },
|
||||
_pov,
|
||||
..
|
||||
)) => {
|
||||
// signature generation is non-deterministic, so we can't just assert that the
|
||||
// expected descriptor is correct. What we can do is validate that the produced
|
||||
// descriptor has a valid signature, then just copy in the generated signature
|
||||
// and check the rest of the fields for equality.
|
||||
assert!(CollatorPair::verify(
|
||||
&descriptor.signature,
|
||||
&collator_signature_payload(
|
||||
&descriptor.relay_parent,
|
||||
&descriptor.para_id,
|
||||
&descriptor.persisted_validation_data_hash,
|
||||
&descriptor.pov_hash,
|
||||
&descriptor.validation_code_hash,
|
||||
)
|
||||
.as_ref(),
|
||||
&descriptor.collator,
|
||||
));
|
||||
let expect_descriptor = {
|
||||
let mut expect_descriptor = expect_descriptor;
|
||||
expect_descriptor.signature = descriptor.signature.clone();
|
||||
expect_descriptor.erasure_root = descriptor.erasure_root.clone();
|
||||
expect_descriptor
|
||||
};
|
||||
assert_eq!(descriptor, &expect_descriptor);
|
||||
}
|
||||
_ => panic!("received wrong message type"),
|
||||
}
|
||||
}
|
||||
assert_eq!(sent_messages.len(), 1);
|
||||
match &sent_messages[0] {
|
||||
AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation(
|
||||
CandidateReceipt { descriptor, .. },
|
||||
_pov,
|
||||
..
|
||||
)) => {
|
||||
// signature generation is non-deterministic, so we can't just assert that the
|
||||
// expected descriptor is correct. What we can do is validate that the produced
|
||||
// descriptor has a valid signature, then just copy in the generated signature
|
||||
// and check the rest of the fields for equality.
|
||||
assert!(CollatorPair::verify(
|
||||
&descriptor.signature,
|
||||
&collator_signature_payload(
|
||||
&descriptor.relay_parent,
|
||||
&descriptor.para_id,
|
||||
&descriptor.persisted_validation_data_hash,
|
||||
&descriptor.pov_hash,
|
||||
&descriptor.validation_code_hash,
|
||||
)
|
||||
.as_ref(),
|
||||
&descriptor.collator,
|
||||
));
|
||||
let expect_descriptor = {
|
||||
let mut expect_descriptor = expect_descriptor;
|
||||
expect_descriptor.signature = descriptor.signature.clone();
|
||||
expect_descriptor.erasure_root = descriptor.erasure_root.clone();
|
||||
expect_descriptor
|
||||
};
|
||||
assert_eq!(descriptor, &expect_descriptor);
|
||||
}
|
||||
_ => panic!("received wrong message type"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,6 +43,9 @@ use std::{pin::Pin, collections::BTreeMap, sync::Arc};
|
||||
use thiserror::Error;
|
||||
use futures_timer::Delay;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
/// How long to wait before proposing.
|
||||
const PRE_PROPOSE_TIMEOUT: std::time::Duration = core::time::Duration::from_millis(2000);
|
||||
|
||||
@@ -599,6 +602,3 @@ impl metrics::Metrics for Metrics {
|
||||
|
||||
/// The provisioning subsystem.
|
||||
pub type ProvisioningSubsystem<Spawner> = JobSubsystem<ProvisioningJob, Spawner>;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
@@ -44,6 +44,9 @@ use cache::{RequestResult, RequestResultCache};
|
||||
|
||||
mod cache;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
const LOG_TARGET: &str = "parachain::runtime-api";
|
||||
|
||||
/// The number of maximum runtime api requests can be executed in parallel. Further requests will be buffered.
|
||||
@@ -411,6 +414,3 @@ impl metrics::Metrics for Metrics {
|
||||
Ok(Metrics(Some(metrics)))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
@@ -20,10 +20,6 @@
|
||||
|
||||
#![warn(missing_docs)]
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
|
||||
use std::collections::{BTreeMap, HashMap, HashSet, hash_map};
|
||||
use futures::{channel::oneshot, FutureExt as _};
|
||||
use polkadot_primitives::v1::{
|
||||
@@ -47,6 +43,9 @@ use polkadot_node_network_protocol::{
|
||||
PeerId, View, v1 as protocol_v1, UnifiedReputationChange as Rep,
|
||||
};
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
const LOG_TARGET: &str = "parachain::approval-distribution";
|
||||
|
||||
const COST_UNEXPECTED_MESSAGE: Rep = Rep::CostMinor("Peer sent an out-of-view assignment or approval");
|
||||
|
||||
@@ -51,6 +51,7 @@ use polkadot_node_network_protocol::{
|
||||
};
|
||||
use polkadot_node_subsystem_util::request_session_info;
|
||||
use polkadot_erasure_coding::{branches, branch_hash, recovery_threshold, obtain_chunks_v1};
|
||||
|
||||
mod error;
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -64,6 +64,8 @@ use network::{Network, send_message};
|
||||
mod multiplexer;
|
||||
pub use multiplexer::RequestMultiplexer;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
/// The maximum amount of heads a peer is allowed to have in their view at any time.
|
||||
///
|
||||
@@ -1131,9 +1133,3 @@ async fn dispatch_collation_events_to_all<I>(
|
||||
|
||||
ctx.send_messages(events.into_iter().flat_map(messages_for)).await
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -44,6 +44,9 @@ use polkadot_node_primitives::{SignedFullStatement, Statement, PoV};
|
||||
use crate::error::{Fatal, NonFatal, log_error};
|
||||
use super::{LOG_TARGET, Result};
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
const COST_UNEXPECTED_MESSAGE: Rep = Rep::CostMinor("An unexpected message");
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
@@ -866,6 +869,3 @@ pub(crate) async fn run(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
@@ -50,6 +50,9 @@ use polkadot_subsystem::{
|
||||
|
||||
use super::{modify_reputation, Result, LOG_TARGET};
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
const COLLATION_FETCH_TIMEOUT: Duration = Duration::from_secs(2);
|
||||
|
||||
const COST_UNEXPECTED_MESSAGE: Rep = Rep::CostMinor("An unexpected message");
|
||||
@@ -1244,6 +1247,3 @@ where
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -18,9 +18,6 @@
|
||||
//! and issuing a connection request to the validators relevant to
|
||||
//! the gossiping subsystems on every new session.
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
use std::time::{Duration, Instant};
|
||||
use futures::{channel::oneshot, FutureExt as _};
|
||||
use polkadot_node_subsystem::{
|
||||
@@ -38,6 +35,9 @@ use polkadot_node_network_protocol::peer_set::PeerSet;
|
||||
use sp_keystore::{CryptoStore, SyncCryptoStorePtr};
|
||||
use sp_application_crypto::{Public, AppKey};
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
const LOG_TARGET: &str = "parachain::gossip-support";
|
||||
// How much time should we wait since the last
|
||||
// authority discovery resolution failure.
|
||||
|
||||
@@ -71,6 +71,9 @@ use requester::{RequesterMessage, fetch};
|
||||
mod responder;
|
||||
use responder::{ResponderMessage, respond};
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
const COST_UNEXPECTED_STATEMENT: Rep = Rep::CostMinor("Unexpected Statement");
|
||||
const COST_FETCH_FAIL: Rep = Rep::CostMinor("Requesting `CommittedCandidateReceipt` from peer failed");
|
||||
const COST_INVALID_SIGNATURE: Rep = Rep::CostMajor("Invalid Statement Signature");
|
||||
@@ -2044,6 +2047,3 @@ impl metrics::Metrics for Metrics {
|
||||
Ok(Metrics(Some(metrics)))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -98,6 +98,9 @@ use polkadot_node_subsystem_util::{TimeoutExt, metrics::{self, prometheus}, mete
|
||||
use polkadot_node_primitives::SpawnNamed;
|
||||
use polkadot_procmacro_overseer_subsystems_gen::AllSubsystemsGen;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
// A capacity of bounded channels inside the overseer.
|
||||
const CHANNEL_CAPACITY: usize = 1024;
|
||||
// The capacity of signal channels to subsystems.
|
||||
@@ -2196,6 +2199,3 @@ fn spawn<S: SpawnNamed, M: Send + 'static>(
|
||||
instance,
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
+879
-879
File diff suppressed because it is too large
Load Diff
@@ -54,8 +54,6 @@ use thiserror::Error;
|
||||
pub use metered_channel as metered;
|
||||
pub use polkadot_node_network_protocol::MIN_GOSSIP_PEERS;
|
||||
|
||||
mod error_handling;
|
||||
|
||||
/// Error classification.
|
||||
pub use error_handling::{Fault, unwrap_non_fatal};
|
||||
|
||||
@@ -72,6 +70,11 @@ pub mod reexports {
|
||||
/// Convenient and efficient runtime info access.
|
||||
pub mod runtime;
|
||||
|
||||
mod error_handling;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
/// Duration a job will wait after sending a stop signal before hard-aborting.
|
||||
pub const JOB_GRACEFUL_STOP_DURATION: Duration = Duration::from_secs(1);
|
||||
/// Capacity of channels to and from individual jobs
|
||||
@@ -859,6 +862,3 @@ impl futures::Stream for Metronome
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
@@ -19,8 +19,8 @@ use executor::block_on;
|
||||
use thiserror::Error;
|
||||
use polkadot_node_jaeger as jaeger;
|
||||
use polkadot_node_subsystem::{
|
||||
messages::{AllMessages, CollatorProtocolMessage}, ActiveLeavesUpdate, FromOverseer, OverseerSignal,
|
||||
SpawnedSubsystem, ActivatedLeaf, LeafStatus,
|
||||
messages::{AllMessages, CollatorProtocolMessage}, ActiveLeavesUpdate, FromOverseer, OverseerSignal,
|
||||
SpawnedSubsystem, ActivatedLeaf, LeafStatus,
|
||||
};
|
||||
use assert_matches::assert_matches;
|
||||
use futures::{channel::mpsc, executor, StreamExt, future, Future, FutureExt, SinkExt};
|
||||
@@ -37,7 +37,7 @@ use std::{pin::Pin, sync::{Arc, atomic::{AtomicUsize, Ordering}}, time::Duration
|
||||
// job structs are constructed within JobTrait::run
|
||||
// most will want to retain the sender and receiver, as well as whatever other data they like
|
||||
struct FakeCollatorProtocolJob {
|
||||
receiver: mpsc::Receiver<CollatorProtocolMessage>,
|
||||
receiver: mpsc::Receiver<CollatorProtocolMessage>,
|
||||
}
|
||||
|
||||
// Error will mostly be a wrapper to make the try operator more convenient;
|
||||
@@ -45,215 +45,215 @@ struct FakeCollatorProtocolJob {
|
||||
// It must implement Debug for logging.
|
||||
#[derive(Debug, Error)]
|
||||
enum Error {
|
||||
#[error(transparent)]
|
||||
Sending(#[from]mpsc::SendError),
|
||||
#[error(transparent)]
|
||||
Sending(#[from]mpsc::SendError),
|
||||
}
|
||||
|
||||
impl JobTrait for FakeCollatorProtocolJob {
|
||||
type ToJob = CollatorProtocolMessage;
|
||||
type Error = Error;
|
||||
type RunArgs = bool;
|
||||
type Metrics = ();
|
||||
type ToJob = CollatorProtocolMessage;
|
||||
type Error = Error;
|
||||
type RunArgs = bool;
|
||||
type Metrics = ();
|
||||
|
||||
const NAME: &'static str = "FakeCollatorProtocolJob";
|
||||
const NAME: &'static str = "FakeCollatorProtocolJob";
|
||||
|
||||
/// Run a job for the parent block indicated
|
||||
//
|
||||
// this function is in charge of creating and executing the job's main loop
|
||||
fn run<S: SubsystemSender>(
|
||||
_: Hash,
|
||||
_: Arc<jaeger::Span>,
|
||||
run_args: Self::RunArgs,
|
||||
_metrics: Self::Metrics,
|
||||
receiver: mpsc::Receiver<CollatorProtocolMessage>,
|
||||
mut sender: JobSender<S>,
|
||||
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
|
||||
async move {
|
||||
let job = FakeCollatorProtocolJob { receiver };
|
||||
/// Run a job for the parent block indicated
|
||||
//
|
||||
// this function is in charge of creating and executing the job's main loop
|
||||
fn run<S: SubsystemSender>(
|
||||
_: Hash,
|
||||
_: Arc<jaeger::Span>,
|
||||
run_args: Self::RunArgs,
|
||||
_metrics: Self::Metrics,
|
||||
receiver: mpsc::Receiver<CollatorProtocolMessage>,
|
||||
mut sender: JobSender<S>,
|
||||
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
|
||||
async move {
|
||||
let job = FakeCollatorProtocolJob { receiver };
|
||||
|
||||
if run_args {
|
||||
sender.send_message(CollatorProtocolMessage::Invalid(
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
).into()).await;
|
||||
}
|
||||
if run_args {
|
||||
sender.send_message(CollatorProtocolMessage::Invalid(
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
).into()).await;
|
||||
}
|
||||
|
||||
// it isn't necessary to break run_loop into its own function,
|
||||
// but it's convenient to separate the concerns in this way
|
||||
job.run_loop().await
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
// it isn't necessary to break run_loop into its own function,
|
||||
// but it's convenient to separate the concerns in this way
|
||||
job.run_loop().await
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
impl FakeCollatorProtocolJob {
|
||||
async fn run_loop(mut self) -> Result<(), Error> {
|
||||
loop {
|
||||
match self.receiver.next().await {
|
||||
Some(_csm) => {
|
||||
unimplemented!("we'd report the collator to the peer set manager here, but that's not implemented yet");
|
||||
}
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
async fn run_loop(mut self) -> Result<(), Error> {
|
||||
loop {
|
||||
match self.receiver.next().await {
|
||||
Some(_csm) => {
|
||||
unimplemented!("we'd report the collator to the peer set manager here, but that's not implemented yet");
|
||||
}
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// with the job defined, it's straightforward to get a subsystem implementation.
|
||||
type FakeCollatorProtocolSubsystem<Spawner> =
|
||||
JobSubsystem<FakeCollatorProtocolJob, Spawner>;
|
||||
JobSubsystem<FakeCollatorProtocolJob, Spawner>;
|
||||
|
||||
// this type lets us pretend to be the overseer
|
||||
type OverseerHandle = test_helpers::TestSubsystemContextHandle<CollatorProtocolMessage>;
|
||||
|
||||
fn test_harness<T: Future<Output = ()>>(
|
||||
run_args: bool,
|
||||
test: impl FnOnce(OverseerHandle) -> T,
|
||||
run_args: bool,
|
||||
test: impl FnOnce(OverseerHandle) -> T,
|
||||
) {
|
||||
let _ = env_logger::builder()
|
||||
.is_test(true)
|
||||
.filter(
|
||||
None,
|
||||
log::LevelFilter::Trace,
|
||||
)
|
||||
.try_init();
|
||||
let _ = env_logger::builder()
|
||||
.is_test(true)
|
||||
.filter(
|
||||
None,
|
||||
log::LevelFilter::Trace,
|
||||
)
|
||||
.try_init();
|
||||
|
||||
let pool = sp_core::testing::TaskExecutor::new();
|
||||
let (context, overseer_handle) = make_subsystem_context(pool.clone());
|
||||
let pool = sp_core::testing::TaskExecutor::new();
|
||||
let (context, overseer_handle) = make_subsystem_context(pool.clone());
|
||||
|
||||
let subsystem = FakeCollatorProtocolSubsystem::new(
|
||||
pool,
|
||||
run_args,
|
||||
(),
|
||||
).run(context);
|
||||
let test_future = test(overseer_handle);
|
||||
let subsystem = FakeCollatorProtocolSubsystem::new(
|
||||
pool,
|
||||
run_args,
|
||||
(),
|
||||
).run(context);
|
||||
let test_future = test(overseer_handle);
|
||||
|
||||
futures::pin_mut!(subsystem, test_future);
|
||||
futures::pin_mut!(subsystem, test_future);
|
||||
|
||||
executor::block_on(async move {
|
||||
future::join(subsystem, test_future)
|
||||
.timeout(Duration::from_secs(2))
|
||||
.await
|
||||
.expect("test timed out instead of completing")
|
||||
});
|
||||
executor::block_on(async move {
|
||||
future::join(subsystem, test_future)
|
||||
.timeout(Duration::from_secs(2))
|
||||
.await
|
||||
.expect("test timed out instead of completing")
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn starting_and_stopping_job_works() {
|
||||
let relay_parent: Hash = [0; 32].into();
|
||||
let relay_parent: Hash = [0; 32].into();
|
||||
|
||||
test_harness(true, |mut overseer_handle| async move {
|
||||
overseer_handle
|
||||
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
|
||||
ActiveLeavesUpdate::start_work(ActivatedLeaf {
|
||||
hash: relay_parent,
|
||||
number: 1,
|
||||
status: LeafStatus::Fresh,
|
||||
span: Arc::new(jaeger::Span::Disabled),
|
||||
}),
|
||||
)))
|
||||
.await;
|
||||
assert_matches!(
|
||||
overseer_handle.recv().await,
|
||||
AllMessages::CollatorProtocol(_)
|
||||
);
|
||||
overseer_handle
|
||||
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
|
||||
ActiveLeavesUpdate::stop_work(relay_parent),
|
||||
)))
|
||||
.await;
|
||||
test_harness(true, |mut overseer_handle| async move {
|
||||
overseer_handle
|
||||
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
|
||||
ActiveLeavesUpdate::start_work(ActivatedLeaf {
|
||||
hash: relay_parent,
|
||||
number: 1,
|
||||
status: LeafStatus::Fresh,
|
||||
span: Arc::new(jaeger::Span::Disabled),
|
||||
}),
|
||||
)))
|
||||
.await;
|
||||
assert_matches!(
|
||||
overseer_handle.recv().await,
|
||||
AllMessages::CollatorProtocol(_)
|
||||
);
|
||||
overseer_handle
|
||||
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
|
||||
ActiveLeavesUpdate::stop_work(relay_parent),
|
||||
)))
|
||||
.await;
|
||||
|
||||
overseer_handle
|
||||
.send(FromOverseer::Signal(OverseerSignal::Conclude))
|
||||
.await;
|
||||
});
|
||||
overseer_handle
|
||||
.send(FromOverseer::Signal(OverseerSignal::Conclude))
|
||||
.await;
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sending_to_a_non_running_job_do_not_stop_the_subsystem() {
|
||||
let relay_parent = Hash::repeat_byte(0x01);
|
||||
let relay_parent = Hash::repeat_byte(0x01);
|
||||
|
||||
test_harness(true, |mut overseer_handle| async move {
|
||||
overseer_handle
|
||||
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
|
||||
ActiveLeavesUpdate::start_work(ActivatedLeaf {
|
||||
hash: relay_parent,
|
||||
number: 1,
|
||||
status: LeafStatus::Fresh,
|
||||
span: Arc::new(jaeger::Span::Disabled),
|
||||
}),
|
||||
)))
|
||||
.await;
|
||||
test_harness(true, |mut overseer_handle| async move {
|
||||
overseer_handle
|
||||
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
|
||||
ActiveLeavesUpdate::start_work(ActivatedLeaf {
|
||||
hash: relay_parent,
|
||||
number: 1,
|
||||
status: LeafStatus::Fresh,
|
||||
span: Arc::new(jaeger::Span::Disabled),
|
||||
}),
|
||||
)))
|
||||
.await;
|
||||
|
||||
// send to a non running job
|
||||
overseer_handle
|
||||
.send(FromOverseer::Communication {
|
||||
msg: Default::default(),
|
||||
})
|
||||
.await;
|
||||
// send to a non running job
|
||||
overseer_handle
|
||||
.send(FromOverseer::Communication {
|
||||
msg: Default::default(),
|
||||
})
|
||||
.await;
|
||||
|
||||
// the subsystem is still alive
|
||||
assert_matches!(
|
||||
overseer_handle.recv().await,
|
||||
AllMessages::CollatorProtocol(_)
|
||||
);
|
||||
// the subsystem is still alive
|
||||
assert_matches!(
|
||||
overseer_handle.recv().await,
|
||||
AllMessages::CollatorProtocol(_)
|
||||
);
|
||||
|
||||
overseer_handle
|
||||
.send(FromOverseer::Signal(OverseerSignal::Conclude))
|
||||
.await;
|
||||
});
|
||||
overseer_handle
|
||||
.send(FromOverseer::Signal(OverseerSignal::Conclude))
|
||||
.await;
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_subsystem_impl_and_name_derivation() {
|
||||
let pool = sp_core::testing::TaskExecutor::new();
|
||||
let (context, _) = make_subsystem_context::<CollatorProtocolMessage, _>(pool.clone());
|
||||
let pool = sp_core::testing::TaskExecutor::new();
|
||||
let (context, _) = make_subsystem_context::<CollatorProtocolMessage, _>(pool.clone());
|
||||
|
||||
let SpawnedSubsystem { name, .. } =
|
||||
FakeCollatorProtocolSubsystem::new(pool, false, ()).start(context);
|
||||
assert_eq!(name, "FakeCollatorProtocol");
|
||||
let SpawnedSubsystem { name, .. } =
|
||||
FakeCollatorProtocolSubsystem::new(pool, false, ()).start(context);
|
||||
assert_eq!(name, "FakeCollatorProtocol");
|
||||
}
|
||||
|
||||
|
||||
#[test]
|
||||
fn tick_tack_metronome() {
|
||||
let n = Arc::new(AtomicUsize::default());
|
||||
let n = Arc::new(AtomicUsize::default());
|
||||
|
||||
let (tick, mut block) = mpsc::unbounded();
|
||||
let (tick, mut block) = mpsc::unbounded();
|
||||
|
||||
let metronome = {
|
||||
let n = n.clone();
|
||||
let stream = Metronome::new(Duration::from_millis(137_u64));
|
||||
stream.for_each(move |_res| {
|
||||
let _ = n.fetch_add(1, Ordering::Relaxed);
|
||||
let mut tick = tick.clone();
|
||||
async move {
|
||||
tick.send(()).await.expect("Test helper channel works. qed");
|
||||
}
|
||||
}).fuse()
|
||||
};
|
||||
let metronome = {
|
||||
let n = n.clone();
|
||||
let stream = Metronome::new(Duration::from_millis(137_u64));
|
||||
stream.for_each(move |_res| {
|
||||
let _ = n.fetch_add(1, Ordering::Relaxed);
|
||||
let mut tick = tick.clone();
|
||||
async move {
|
||||
tick.send(()).await.expect("Test helper channel works. qed");
|
||||
}
|
||||
}).fuse()
|
||||
};
|
||||
|
||||
let f2 = async move {
|
||||
block.next().await;
|
||||
assert_eq!(n.load(Ordering::Relaxed), 1_usize);
|
||||
block.next().await;
|
||||
assert_eq!(n.load(Ordering::Relaxed), 2_usize);
|
||||
block.next().await;
|
||||
assert_eq!(n.load(Ordering::Relaxed), 3_usize);
|
||||
block.next().await;
|
||||
assert_eq!(n.load(Ordering::Relaxed), 4_usize);
|
||||
}.fuse();
|
||||
let f2 = async move {
|
||||
block.next().await;
|
||||
assert_eq!(n.load(Ordering::Relaxed), 1_usize);
|
||||
block.next().await;
|
||||
assert_eq!(n.load(Ordering::Relaxed), 2_usize);
|
||||
block.next().await;
|
||||
assert_eq!(n.load(Ordering::Relaxed), 3_usize);
|
||||
block.next().await;
|
||||
assert_eq!(n.load(Ordering::Relaxed), 4_usize);
|
||||
}.fuse();
|
||||
|
||||
futures::pin_mut!(f2);
|
||||
futures::pin_mut!(metronome);
|
||||
futures::pin_mut!(f2);
|
||||
futures::pin_mut!(metronome);
|
||||
|
||||
block_on(async move {
|
||||
// futures::join!(metronome, f2)
|
||||
futures::select!(
|
||||
_ = metronome => unreachable!("Metronome never stops. qed"),
|
||||
_ = f2 => (),
|
||||
)
|
||||
});
|
||||
block_on(async move {
|
||||
// futures::join!(metronome, f2)
|
||||
futures::select!(
|
||||
_ = metronome => unreachable!("Metronome never stops. qed"),
|
||||
_ = f2 => (),
|
||||
)
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user