Collator protocol subsystem (#1659)

* WIP

* The initial implementation of the collator side.

* Improve comments

* Multiple collation requests

* Add more tests and comments to validator side

* Add comments, remove dead code

* Apply suggestions from code review

Co-authored-by: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>

* Fix build after suggested changes

* Also connect to the next validator group

* Remove a Future impl and move TimeoutExt to util

* Minor nits

* Fix build

* Change FetchCollations back to FetchCollation

* Try this

* Final fixes

* Fix build

Co-authored-by: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>
This commit is contained in:
Fedor Sakharov
2020-09-10 16:54:59 +03:00
committed by GitHub
parent 5338b23ca3
commit 98660cbd94
16 changed files with 2553 additions and 89 deletions
@@ -18,13 +18,12 @@
use polkadot_node_subsystem::messages::AllMessages;
use polkadot_node_subsystem::{FromOverseer, SubsystemContext, SubsystemError, SubsystemResult};
use polkadot_node_subsystem_util::TimeoutExt;
use futures::channel::mpsc;
use futures::poll;
use futures::prelude::*;
use futures_timer::Delay;
use parking_lot::Mutex;
use pin_project::pin_project;
use sp_core::{testing::TaskExecutor, traits::SpawnNamed};
use std::convert::Infallible;
@@ -284,45 +283,3 @@ pub fn subsystem_test_harness<M, OverseerFactory, Overseer, TestFactory, Test>(
.expect("test timed out instead of completing")
});
}
/// A future that wraps another future with a `Delay` allowing for time-limited futures.
#[pin_project]
pub struct Timeout<F: Future> {
#[pin]
future: F,
#[pin]
delay: Delay,
}
/// Extends `Future` to allow time-limited futures.
pub trait TimeoutExt: Future {
fn timeout(self, duration: Duration) -> Timeout<Self>
where
Self: Sized,
{
Timeout {
future: self,
delay: Delay::new(duration),
}
}
}
impl<F: Future> TimeoutExt for F {}
impl<F: Future> Future for Timeout<F> {
type Output = Option<F::Output>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
let this = self.project();
if this.delay.poll(ctx).is_ready() {
return Poll::Ready(None);
}
if let Poll::Ready(output) = this.future.poll(ctx) {
return Poll::Ready(Some(output));
}
Poll::Pending
}
}