implement collation generation subsystem (#1557)

* start sketching out a collation generation subsystem

* invent a basic strategy for double initialization

* clean up warnings

* impl util requests from runtime assuming a context instead of a FromJob sender

* implement collation generation algorithm from guide

* update AllMessages in tests

* fix trivial review comments

* remove another redundant declaration from merge

* filter availability cores by para_id

* handle new activations each in their own async task

* update guide according to the actual current implementation

* add initialization to guide

* add general-purpose subsystem_test_harness helper

* write first handle_new_activations test

* add test that handle_new_activations filters local_validation_data requests

* add (failing) test of collation distribution message sending

* rustfmt

* broken: work on fixing sender test

Unfortunately, for reasons that are not yet clear, despite the public key
and checked data being identical, the signer is not producing an identical
signature. This commit produces this output (among more):

signing with  Public(c4733ab0bbe3ba4c096685d1737a7f498cdbdd167a767d04a21dc7df12b8c858 (5GWHUNm5...))
checking with Public(c4733ab0bbe3ba4c096685d1737a7f498cdbdd167a767d04a21dc7df12b8c858 (5GWHUNm5...))
signed payload:  [4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 10, 0, 0, 0, c7, e5, c0, 64, 7a, db, fe, 44, 81, e5, 51, 11, 79, 9f, a5, 63, 93, 94, 3c, c4, 36, c6, 30, 36, c2, c5, 44, a2, 1b, db, b7, 82, 3, 17, a, 2e, 75, 97, b7, b7, e3, d8, 4c, 5, 39, 1d, 13, 9a, 62, b1, 57, e7, 87, 86, d8, c0, 82, f2, 9d, cf, 4c, 11, 13, 14]
checked payload: [4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 10, 0, 0, 0, c7, e5, c0, 64, 7a, db, fe, 44, 81, e5, 51, 11, 79, 9f, a5, 63, 93, 94, 3c, c4, 36, c6, 30, 36, c2, c5, 44, a2, 1b, db, b7, 82, 3, 17, a, 2e, 75, 97, b7, b7, e3, d8, 4c, 5, 39, 1d, 13, 9a, 62, b1, 57, e7, 87, 86, d8, c0, 82, f2, 9d, cf, 4c, 11, 13, 14]

* fix broken test

* collation function returns commitments hash

It doesn't look like we use the actual commitments data anywhere, and
it's not obvious if there are any fields of `CandidateCommitments`
not available to the collator, so this commit just assigns them the
entire responsibility of generating the hash.

* add missing overseer impls

* calculating erasure coding is polkadot's responsibility, not cumulus

* concurrentize per-relay_parent requests
This commit is contained in:
Peter Goodspeed-Niklaus
2020-08-17 14:27:37 +02:00
committed by GitHub
parent ab1a513265
commit 54bec3bfc0
13 changed files with 1031 additions and 108 deletions
+89 -40
View File
@@ -16,19 +16,21 @@
//! Utilities for testing subsystems.
use polkadot_node_subsystem::{SubsystemContext, FromOverseer, SubsystemResult, SubsystemError};
use polkadot_node_subsystem::messages::AllMessages;
use polkadot_node_subsystem::{FromOverseer, SubsystemContext, SubsystemError, SubsystemResult};
use futures::prelude::*;
use futures::channel::mpsc;
use futures::poll;
use futures::prelude::*;
use futures_timer::Delay;
use parking_lot::Mutex;
use sp_core::traits::SpawnNamed;
use sp_core::{testing::TaskExecutor, traits::SpawnNamed};
use std::convert::Infallible;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
use std::time::Duration;
enum SinkState<T> {
Empty {
@@ -50,24 +52,21 @@ pub struct SingleItemStream<T>(Arc<Mutex<SinkState<T>>>);
impl<T> Sink<T> for SingleItemSink<T> {
type Error = Infallible;
fn poll_ready(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Result<(), Infallible>> {
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Infallible>> {
let mut state = self.0.lock();
match *state {
SinkState::Empty { .. } => Poll::Ready(Ok(())),
SinkState::Item { ref mut ready_waker, .. } => {
SinkState::Item {
ref mut ready_waker,
..
} => {
*ready_waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
fn start_send(
self: Pin<&mut Self>,
item: T,
) -> Result<(), Infallible> {
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Infallible> {
let mut state = self.0.lock();
match *state {
@@ -88,24 +87,21 @@ impl<T> Sink<T> for SingleItemSink<T> {
Ok(())
}
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Result<(), Infallible>> {
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Infallible>> {
let mut state = self.0.lock();
match *state {
SinkState::Empty { .. } => Poll::Ready(Ok(())),
SinkState::Item { ref mut flush_waker, .. } => {
SinkState::Item {
ref mut flush_waker,
..
} => {
*flush_waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
fn poll_close(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Result<(), Infallible>> {
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Infallible>> {
self.poll_flush(cx)
}
}
@@ -120,7 +116,11 @@ impl<T> Stream for SingleItemStream<T> {
match std::mem::replace(&mut *state, SinkState::Empty { read_waker }) {
SinkState::Empty { .. } => Poll::Pending,
SinkState::Item { item, ready_waker, flush_waker } => {
SinkState::Item {
item,
ready_waker,
flush_waker,
} => {
if let Some(waker) = ready_waker {
waker.wake();
}
@@ -141,10 +141,7 @@ impl<T> Stream for SingleItemStream<T> {
/// not when the item is buffered.
pub fn single_item_sink<T>() -> (SingleItemSink<T>, SingleItemStream<T>) {
let inner = Arc::new(Mutex::new(SinkState::Empty { read_waker: None }));
(
SingleItemSink(inner.clone()),
SingleItemStream(inner),
)
(SingleItemSink(inner.clone()), SingleItemStream(inner))
}
/// A test subsystem context.
@@ -155,7 +152,9 @@ pub struct TestSubsystemContext<M, S> {
}
#[async_trait::async_trait]
impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext for TestSubsystemContext<M, S> {
impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext
for TestSubsystemContext<M, S>
{
type Message = M;
async fn try_recv(&mut self) -> Result<Option<FromOverseer<M>>, ()> {
@@ -170,9 +169,11 @@ impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext for Tes
self.rx.next().await.ok_or(SubsystemError)
}
async fn spawn(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>)
-> SubsystemResult<()>
{
async fn spawn(
&mut self,
name: &'static str,
s: Pin<Box<dyn Future<Output = ()> + Send>>,
) -> SubsystemResult<()> {
self.spawn.spawn(name, s);
Ok(())
}
@@ -185,15 +186,23 @@ impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext for Tes
}
async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> {
self.tx.send(msg).await.expect("test overseer no longer live");
self.tx
.send(msg)
.await
.expect("test overseer no longer live");
Ok(())
}
async fn send_messages<T>(&mut self, msgs: T) -> SubsystemResult<()>
where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send
where
T: IntoIterator<Item = AllMessages> + Send,
T::IntoIter: Send,
{
let mut iter = stream::iter(msgs.into_iter().map(Ok));
self.tx.send_all(&mut iter).await.expect("test overseer no longer live");
self.tx
.send_all(&mut iter)
.await
.expect("test overseer no longer live");
Ok(())
}
@@ -209,19 +218,27 @@ impl<M> TestSubsystemContextHandle<M> {
/// Send a message or signal to the subsystem. This resolves at the point in time where the
/// subsystem has _read_ the message.
pub async fn send(&mut self, from_overseer: FromOverseer<M>) {
self.tx.send(from_overseer).await.expect("Test subsystem no longer live");
self.tx
.send(from_overseer)
.await
.expect("Test subsystem no longer live");
}
/// Receive the next message from the subsystem.
pub async fn recv(&mut self) -> AllMessages {
self.rx.next().await.expect("Test subsystem no longer live")
self.try_recv().await.expect("Test subsystem no longer live")
}
/// Receive the next message from the subsystem, or `None` if the channel has been closed.
pub async fn try_recv(&mut self) -> Option<AllMessages> {
self.rx.next().await
}
}
/// Make a test subsystem context.
pub fn make_subsystem_context<M, S>(spawn: S)
-> (TestSubsystemContext<M, S>, TestSubsystemContextHandle<M>)
{
pub fn make_subsystem_context<M, S>(
spawn: S,
) -> (TestSubsystemContext<M, S>, TestSubsystemContextHandle<M>) {
let (overseer_tx, overseer_rx) = single_item_sink();
let (all_messages_tx, all_messages_rx) = mpsc::unbounded();
@@ -233,7 +250,39 @@ pub fn make_subsystem_context<M, S>(spawn: S)
},
TestSubsystemContextHandle {
tx: overseer_tx,
rx: all_messages_rx
rx: all_messages_rx,
},
)
}
}
/// Test a subsystem, mocking the overseer
///
/// Pass in two async closures: one mocks the overseer, the other runs the test from the perspective of a subsystem.
///
/// Times out in two seconds.
pub fn subsystem_test_harness<M, OverseerFactory, Overseer, TestFactory, Test>(
overseer_factory: OverseerFactory,
test_factory: TestFactory,
) where
OverseerFactory: FnOnce(TestSubsystemContextHandle<M>) -> Overseer,
Overseer: Future<Output = ()>,
TestFactory: FnOnce(TestSubsystemContext<M, TaskExecutor>) -> Test,
Test: Future<Output = ()>,
{
let pool = TaskExecutor::new();
let (context, handle) = make_subsystem_context(pool);
let overseer = overseer_factory(handle);
let test = test_factory(context);
let timeout = Delay::new(Duration::from_secs(2));
futures::pin_mut!(overseer, test, timeout);
futures::executor::block_on(async move {
futures::select! {
_ = overseer.fuse() => (),
_ = test.fuse() => (),
_ = timeout.fuse() => panic!("test timed out instead of completing"),
}
});
}