// Copyright (C) Parity Technologies (UK) Ltd. // This file is part of Pezkuwi. // Pezkuwi is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Pezkuwi is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Pezkuwi. If not, see . //! Utilities for testing subsystems. #![warn(missing_docs)] use pezkuwi_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks}; use pezkuwi_pez_node_primitives::{AvailableData, ErasureChunk, Proof}; use pezkuwi_node_subsystem::{ messages::AllMessages, overseer, FromOrchestra, OverseerSignal, SpawnGlue, SpawnedSubsystem, SubsystemError, SubsystemResult, TrySendError, }; use pezkuwi_node_subsystem_util::TimeoutExt; use pezkuwi_primitives::{ChunkIndex, Hash}; use futures::{channel::mpsc, poll, prelude::*}; use parking_lot::Mutex; use pezsp_core::testing::TaskExecutor; use std::{ collections::VecDeque, convert::Infallible, future::Future, pin::Pin, sync::{atomic::AtomicUsize, Arc}, task::{Context, Poll, Waker}, time::Duration, }; /// Generally useful mock data providers for unit tests. pub mod mock; enum SinkState { Empty { read_waker: Option }, Item { item: T, ready_waker: Option, flush_waker: Option }, } /// The sink half of a single-item sink that does not resolve until the item has been read. pub struct SingleItemSink(Arc>>); // Derive clone not possible, as it puts `Clone` constraint on `T` which is not sensible here. impl Clone for SingleItemSink { fn clone(&self) -> Self { Self(self.0.clone()) } } /// The stream half of a single-item sink. pub struct SingleItemStream(Arc>>); impl Sink for SingleItemSink { type Error = Infallible; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let mut state = self.0.lock(); match *state { SinkState::Empty { .. } => Poll::Ready(Ok(())), SinkState::Item { ref mut ready_waker, .. } => { *ready_waker = Some(cx.waker().clone()); Poll::Pending }, } } fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Infallible> { let mut state = self.0.lock(); match *state { SinkState::Empty { ref mut read_waker } => if let Some(waker) = read_waker.take() { waker.wake(); }, _ => panic!("start_send called outside of empty sink state ensured by poll_ready"), } *state = SinkState::Item { item, ready_waker: None, flush_waker: None }; Ok(()) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let mut state = self.0.lock(); match *state { SinkState::Empty { .. } => Poll::Ready(Ok(())), SinkState::Item { ref mut flush_waker, .. } => { *flush_waker = Some(cx.waker().clone()); Poll::Pending }, } } fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { self.poll_flush(cx) } } impl Stream for SingleItemStream { type Item = T; fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let mut state = self.0.lock(); let read_waker = Some(cx.waker().clone()); match std::mem::replace(&mut *state, SinkState::Empty { read_waker }) { SinkState::Empty { .. } => Poll::Pending, SinkState::Item { item, ready_waker, flush_waker } => { if let Some(waker) = ready_waker { waker.wake(); } if let Some(waker) = flush_waker { waker.wake(); } Poll::Ready(Some(item)) }, } } } /// Create a single-item Sink/Stream pair. /// /// The sink's send methods resolve at the point which the stream reads the item, /// not when the item is buffered. pub fn single_item_sink() -> (SingleItemSink, SingleItemStream) { let inner = Arc::new(Mutex::new(SinkState::Empty { read_waker: None })); (SingleItemSink(inner.clone()), SingleItemStream(inner)) } /// A test subsystem sender. #[derive(Clone)] pub struct TestSubsystemSender { tx: mpsc::UnboundedSender, message_counter: MessageCounter, } /// Construct a sender/receiver pair. pub fn sender_receiver() -> (TestSubsystemSender, mpsc::UnboundedReceiver) { let (tx, rx) = mpsc::unbounded(); (TestSubsystemSender { tx, message_counter: MessageCounter::default() }, rx) } #[async_trait::async_trait] impl overseer::SubsystemSender for TestSubsystemSender where AllMessages: From, OutgoingMessage: Send + 'static, { async fn send_message(&mut self, msg: OutgoingMessage) { self.send_message_with_priority::(msg).await; } async fn send_message_with_priority(&mut self, msg: OutgoingMessage) { self.message_counter.increment(P::priority()); self.tx.send(msg.into()).await.expect("test overseer no longer live"); } fn try_send_message( &mut self, msg: OutgoingMessage, ) -> Result<(), TrySendError> { self.try_send_message_with_priority::(msg) } fn try_send_message_with_priority( &mut self, msg: OutgoingMessage, ) -> Result<(), TrySendError> { self.message_counter.increment(P::priority()); self.tx.unbounded_send(msg.into()).expect("test overseer no longer live"); Ok(()) } async fn send_messages(&mut self, msgs: I) where I: IntoIterator + Send, I::IntoIter: Send, { let mut iter = stream::iter(msgs.into_iter().map(|msg| Ok(msg.into()))); self.tx.send_all(&mut iter).await.expect("test overseer no longer live"); } fn send_unbounded_message(&mut self, msg: OutgoingMessage) { self.tx.unbounded_send(msg.into()).expect("test overseer no longer live"); } } /// A test subsystem context. pub struct TestSubsystemContext { tx: TestSubsystemSender, rx: mpsc::Receiver>, spawn: S, message_buffer: VecDeque>, } #[async_trait::async_trait] impl overseer::SubsystemContext for TestSubsystemContext where M: overseer::AssociateOutgoing + std::fmt::Debug + Send + 'static, AllMessages: From<::OutgoingMessages>, AllMessages: From, Spawner: overseer::gen::Spawner + Send + 'static, { type Message = M; type Sender = TestSubsystemSender; type Signal = OverseerSignal; type OutgoingMessages = ::OutgoingMessages; type Error = SubsystemError; async fn try_recv(&mut self) -> Result>, ()> { if let Some(msg) = self.message_buffer.pop_front() { return Ok(Some(msg)); } match poll!(self.rx.next()) { Poll::Ready(Some(msg)) => Ok(Some(msg)), Poll::Ready(None) => Err(()), Poll::Pending => Ok(None), } } async fn recv(&mut self) -> SubsystemResult> { if let Some(msg) = self.message_buffer.pop_front() { return Ok(msg); } self.rx .next() .await .ok_or_else(|| SubsystemError::Context("Receiving end closed".to_owned())) } async fn recv_signal(&mut self) -> SubsystemResult { loop { let msg = self .rx .next() .await .ok_or_else(|| SubsystemError::Context("Receiving end closed".to_owned()))?; if let FromOrchestra::Signal(sig) = msg { return Ok(sig); } else { self.message_buffer.push_back(msg) } } } fn spawn( &mut self, name: &'static str, s: Pin + Send>>, ) -> SubsystemResult<()> { self.spawn.spawn(name, None, s); Ok(()) } fn spawn_blocking( &mut self, name: &'static str, s: Pin + Send>>, ) -> SubsystemResult<()> { self.spawn.spawn_blocking(name, None, s); Ok(()) } fn sender(&mut self) -> &mut TestSubsystemSender { &mut self.tx } } /// A handle for interacting with the subsystem context. pub struct TestSubsystemContextHandle { /// Direct access to sender of messages. /// /// Useful for shared ownership situations (one can have multiple senders, but only one /// receiver. pub tx: mpsc::Sender>, /// Direct access to the receiver. pub rx: mpsc::UnboundedReceiver, /// Message counter over subsystems. pub message_counter: MessageCounter, /// Intermediate buffer for a message when using `peek`. message_buffer: Option, } impl TestSubsystemContextHandle { /// Fallback timeout value used to never block test execution /// indefinitely. pub const TIMEOUT: Duration = Duration::from_secs(120); /// Send a message or signal to the subsystem. This resolves at the point in time when the /// subsystem has _read_ the message. pub async fn send(&mut self, from_overseer: FromOrchestra) { self.tx .send(from_overseer) .timeout(Self::TIMEOUT) .await .expect("`fn send` does not timeout") .expect("Test subsystem no longer live"); } /// Receive the next message from the subsystem. pub async fn recv(&mut self) -> AllMessages { self.try_recv() .timeout(Self::TIMEOUT) .await .expect("`fn recv` does not timeout") .expect("Test subsystem no longer live") } /// Receive the next message from the subsystem, or `None` if the channel has been closed. pub async fn try_recv(&mut self) -> Option { if let Some(msg) = self.message_buffer.take() { return Some(msg); } self.rx .next() .timeout(Self::TIMEOUT) .await .expect("`try_recv` does not timeout") } /// Peek into the next message from the subsystem or `None` if the channel has been closed. pub async fn peek(&mut self) -> Option<&AllMessages> { if self.message_buffer.is_none() { self.message_buffer = self .rx .next() .timeout(Self::TIMEOUT) .await .expect("`try_recv` does not timeout"); } self.message_buffer.as_ref() } } /// Make a test subsystem context with `buffer_size == 0`. This is used by most /// of the tests. pub fn make_subsystem_context( spawner: S, ) -> (TestSubsystemContext>, TestSubsystemContextHandle) { make_buffered_subsystem_context(spawner, 0) } /// Message counter over subsystems. #[derive(Default, Clone)] pub struct MessageCounter { total: Arc, with_high_priority: Arc, } impl MessageCounter { /// Increment the message counter. pub fn increment(&mut self, priority_level: overseer::PriorityLevel) { self.total.fetch_add(1, std::sync::atomic::Ordering::SeqCst); if matches!(priority_level, overseer::PriorityLevel::High) { self.with_high_priority.fetch_add(1, std::sync::atomic::Ordering::SeqCst); } } /// Reset the message counter. pub fn reset(&mut self) { self.total.store(0, std::sync::atomic::Ordering::SeqCst); self.with_high_priority.store(0, std::sync::atomic::Ordering::SeqCst); } /// Get the messages with high priority count. pub fn with_high_priority(&self) -> usize { self.with_high_priority.load(std::sync::atomic::Ordering::SeqCst) } } /// Make a test subsystem context with buffered overseer channel. Some tests (e.g. /// `dispute-coordinator`) create too many parallel operations and deadlock unless /// the channel is buffered. Usually `buffer_size=1` is enough. pub fn make_buffered_subsystem_context( spawner: S, buffer_size: usize, ) -> (TestSubsystemContext>, TestSubsystemContextHandle) { let (overseer_tx, overseer_rx) = mpsc::channel(buffer_size); let (all_messages_tx, all_messages_rx) = mpsc::unbounded(); let message_counter = MessageCounter::default(); ( TestSubsystemContext { tx: TestSubsystemSender { tx: all_messages_tx, message_counter: message_counter.clone(), }, rx: overseer_rx, spawn: SpawnGlue(spawner), message_buffer: VecDeque::new(), }, TestSubsystemContextHandle { tx: overseer_tx, rx: all_messages_rx, message_counter: message_counter.clone(), message_buffer: None, }, ) } /// Test a subsystem, mocking the overseer /// /// Pass in two async closures: one mocks the overseer, the other runs the test from the perspective /// of a subsystem. /// /// Times out in 5 seconds. pub fn subsystem_test_harness( overseer_factory: OverseerFactory, test_factory: TestFactory, ) where OverseerFactory: FnOnce(TestSubsystemContextHandle) -> Overseer, Overseer: Future, TestFactory: FnOnce(TestSubsystemContext>) -> Test, Test: Future, { let pool = TaskExecutor::new(); let (context, handle) = make_subsystem_context(pool); let overseer = overseer_factory(handle); let test = test_factory(context); futures::pin_mut!(overseer, test); futures::executor::block_on(async move { future::join(overseer, test) .timeout(Duration::from_secs(10)) .await .expect("test timed out instead of completing") }); } /// A forward subsystem that implements [`Subsystem`](overseer::Subsystem). /// /// It forwards all communication from the overseer to the internal message /// channel. /// /// This subsystem is useful for testing functionality that interacts with the overseer. pub struct ForwardSubsystem(pub mpsc::Sender); impl overseer::Subsystem for ForwardSubsystem where M: overseer::AssociateOutgoing + std::fmt::Debug + Send + 'static, Context: overseer::SubsystemContext< Message = M, Signal = OverseerSignal, Error = SubsystemError, OutgoingMessages = ::OutgoingMessages, >, { fn start(mut self, mut ctx: Context) -> SpawnedSubsystem { let future = Box::pin(async move { loop { match ctx.recv().await { Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) => return Ok(()), Ok(FromOrchestra::Communication { msg }) => { let _ = self.0.send(msg).await; }, Err(_) => return Ok(()), _ => (), } } }); SpawnedSubsystem { name: "forward-subsystem", future } } } /// Asserts that two patterns match, yet only one #[macro_export] macro_rules! arbitrary_order { ($rx:expr; $p1:pat => $e1:expr; $p2:pat => $e2:expr) => { // If i.e. a enum has only two variants, `_` is unreachable. match $rx { $p1 => { let __ret1 = { $e1 }; let __ret2 = match $rx { $p2 => $e2, #[allow(unreachable_patterns)] _ => unreachable!("first pattern matched, second pattern did not"), }; (__ret1, __ret2) }, $p2 => { let __ret2 = { $e2 }; let __ret1 = match $rx { $p1 => $e1, #[allow(unreachable_patterns)] _ => unreachable!("second pattern matched, first pattern did not"), }; (__ret1, __ret2) }, #[allow(unreachable_patterns)] _ => unreachable!("neither first nor second pattern matched"), } }; } /// Future that yields the execution once and resolves /// immediately after. /// /// Useful when one wants to poll the background task to completion /// before sending messages to it in order to avoid races. pub struct Yield(bool); impl Yield { /// Returns new `Yield` future. pub fn new() -> Self { Self(false) } } impl Future for Yield { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { if !self.0 { self.0 = true; cx.waker().wake_by_ref(); Poll::Pending } else { Poll::Ready(()) } } } /// Helper for chunking available data. pub fn derive_erasure_chunks_with_proofs_and_root( n_validators: usize, available_data: &AvailableData, alter_chunk: impl Fn(usize, &mut Vec), ) -> (Vec, Hash) { let mut chunks: Vec> = obtain_chunks(n_validators, available_data).unwrap(); for (i, chunk) in chunks.iter_mut().enumerate() { alter_chunk(i, chunk) } // create proofs for each erasure chunk let branches = branches(chunks.as_ref()); let root = branches.root(); let erasure_chunks = branches .enumerate() .map(|(index, (proof, chunk))| ErasureChunk { chunk: chunk.to_vec(), index: ChunkIndex(index as _), proof: Proof::try_from(proof).unwrap(), }) .collect::>(); (erasure_chunks, root) } #[cfg(test)] mod tests { use super::*; #[test] fn macro_arbitrary_order() { let mut vals = vec![Some(15_usize), None]; let (first, second) = arbitrary_order!(vals.pop().unwrap(); Some(fx) => fx; None => 0); assert_eq!(first, 15_usize); assert_eq!(second, 0_usize); } #[test] fn macro_arbitrary_order_swapped() { let mut vals = vec![None, Some(11_usize)]; let (first, second) = arbitrary_order!(vals.pop().unwrap(); Some(fx) => fx; None => 0); assert_eq!(first, 11_usize); assert_eq!(second, 0); } }