Introduce BuildParachainContext trait (#302)

* Introduce `BuildParachainContext` trait

* Change the structure and hide the actual network implementation behind a
trait

* Add functions to collator `Network` trait
This commit is contained in:
Bastian Köcher
2019-06-27 23:55:40 +02:00
committed by GitHub
parent 6eb3f92a8e
commit b92dd81300
5 changed files with 93 additions and 31 deletions
+73 -26
View File
@@ -53,14 +53,18 @@ use futures::{future, Stream, Future, IntoFuture};
use log::{info, warn};
use client::BlockchainEvents;
use primitives::{ed25519, Pair};
use polkadot_primitives::{BlockId, SessionKey, Hash, Block};
use polkadot_primitives::parachain::{
self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId, Extrinsic,
PoVBlock, Status as ParachainStatus,
use polkadot_primitives::{
BlockId, SessionKey, Hash, Block,
parachain::{
self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId, Extrinsic,
PoVBlock, Status as ParachainStatus,
}
};
use polkadot_cli::{PolkadotService, CustomConfiguration, ParachainHost};
use polkadot_cli::{Worker, IntoExit, ProvideRuntimeApi, TaskExecutor};
use polkadot_network::validation::{ValidationNetwork, SessionParams};
use polkadot_cli::{
Worker, IntoExit, ProvideRuntimeApi, TaskExecutor, PolkadotService, CustomConfiguration,
ParachainHost,
};
use polkadot_network::validation::{SessionParams, ValidationNetwork};
use polkadot_network::NetworkService;
use tokio::timer::Timeout;
use consensus_common::SelectChain;
@@ -68,9 +72,41 @@ use aura::AuraApi;
pub use polkadot_cli::VersionInfo;
pub use polkadot_network::validation::Incoming;
pub use polkadot_validation::SignedStatement;
pub use polkadot_primitives::parachain::CollatorId;
pub use substrate_network::PeerId;
const COLLATION_TIMEOUT: Duration = Duration::from_secs(30);
/// An abstraction over the `Network` with useful functions for a `Collator`.
pub trait Network {
/// Convert the given `CollatorId` to a `PeerId`.
fn collator_id_to_peer_id(&self, collator_id: CollatorId) ->
Box<dyn Future<Item=Option<PeerId>, Error=()> + Send>;
/// Create a `Stream` of checked statements for the given `relay_parent`.
///
/// The returned stream will not terminate, so it is required to make sure that the stream is
/// dropped when it is not required anymore. Otherwise, it will stick around in memory
/// infinitely.
fn checked_statements(&self, relay_parent: Hash) -> Box<dyn Stream<Item=SignedStatement, Error=()>>;
}
impl<P, E> Network for ValidationNetwork<P, E, NetworkService, TaskExecutor> where
P: 'static,
E: 'static,
{
fn collator_id_to_peer_id(&self, collator_id: CollatorId) ->
Box<dyn Future<Item=Option<PeerId>, Error=()> + Send>
{
Box::new(Self::collator_id_to_peer_id(self, collator_id))
}
fn checked_statements(&self, relay_parent: Hash) -> Box<dyn Stream<Item=SignedStatement, Error=()>> {
Box::new(Self::checked_statements(self, relay_parent))
}
}
/// Error to return when the head data was invalid.
#[derive(Clone, Copy, Debug)]
pub struct InvalidHead;
@@ -93,6 +129,15 @@ impl<R: fmt::Display> fmt::Display for Error<R> {
}
}
/// Something that can build a `ParachainContext`.
pub trait BuildParachainContext {
/// The parachain context produced by the `build` function.
type ParachainContext: self::ParachainContext;
/// Build the `ParachainContext`.
fn build(self, network: Arc<dyn Network>) -> Result<Self::ParachainContext, ()>;
}
/// Parachain context needed for collation.
///
/// This can be implemented through an externally attached service or a stub.
@@ -114,7 +159,7 @@ pub trait ParachainContext: Clone {
/// This encapsulates a network and local database which may store
/// some of the input.
pub trait RelayChainContext {
type Error: ::std::fmt::Debug;
type Error: std::fmt::Debug;
/// Future that resolves to the un-routed egress queues of a parachain.
/// The first item is the oldest.
@@ -182,7 +227,7 @@ pub fn collate<'a, R, P>(
/// Polkadot-api context.
struct ApiContext<P, E> {
network: ValidationNetwork<P, E, NetworkService, TaskExecutor>,
network: Arc<ValidationNetwork<P, E, NetworkService, TaskExecutor>>,
parent_hash: Hash,
authorities: Vec<SessionKey>,
}
@@ -210,14 +255,13 @@ impl<P: 'static, E: 'static> RelayChainContext for ApiContext<P, E> where
}
struct CollationNode<P, E> {
parachain_context: P,
build_parachain_context: P,
exit: E,
para_id: ParaId,
key: Arc<ed25519::Pair>,
}
impl<P, E> IntoExit for CollationNode<P, E> where
P: ParachainContext + Send + 'static,
E: Future<Item=(),Error=()> + Send + 'static
{
type Exit = E;
@@ -227,11 +271,12 @@ impl<P, E> IntoExit for CollationNode<P, E> where
}
impl<P, E> Worker for CollationNode<P, E> where
P: ParachainContext + Send + 'static,
E: Future<Item=(),Error=()> + Clone + Send + Sync + 'static,
<P::ProduceCandidate as IntoFuture>::Future: Send + 'static,
P: BuildParachainContext + Send + 'static,
P::ParachainContext: Send + 'static,
<<P::ParachainContext as ParachainContext>::ProduceCandidate as IntoFuture>::Future: Send + 'static,
E: Future<Item=(), Error=()> + Clone + Send + Sync + 'static,
{
type Work = Box<dyn Future<Item=(),Error=()> + Send>;
type Work = Box<dyn Future<Item=(), Error=()> + Send>;
fn configuration(&self) -> CustomConfiguration {
let mut config = CustomConfiguration::default();
@@ -242,10 +287,10 @@ impl<P, E> Worker for CollationNode<P, E> where
config
}
fn work<S>(self, service: &S, task_executor: TaskExecutor) -> Self::Work
where S: PolkadotService,
fn work<S>(self, service: &S, task_executor: TaskExecutor) -> Self::Work where
S: PolkadotService,
{
let CollationNode { parachain_context, exit, para_id, key } = self;
let CollationNode { build_parachain_context, exit, para_id, key } = self;
let client = service.client();
let network = service.network();
let known_oracle = client.clone();
@@ -278,14 +323,15 @@ impl<P, E> Worker for CollationNode<P, E> where
},
);
let validation_network = ValidationNetwork::new(
let validation_network = Arc::new(ValidationNetwork::new(
network.clone(),
exit.clone(),
message_validator,
client.clone(),
task_executor,
);
));
let parachain_context = build_parachain_context.build(validation_network.clone()).unwrap();
let inner_exit = exit.clone();
let work = client.import_notification_stream()
.for_each(move |notification| {
@@ -376,27 +422,28 @@ fn compute_targets(para_id: ParaId, session_keys: &[SessionKey], roster: DutyRos
.collect()
}
/// Run a collator node with the given `RelayChainContext` and `ParachainContext` and
/// arguments to the underlying polkadot node.
/// Run a collator node with the given `RelayChainContext` and `ParachainContext`
/// build by the given `BuildParachainContext` and arguments to the underlying polkadot node.
///
/// Provide a future which resolves when the node should exit.
/// This function blocks until done.
pub fn run_collator<P, E, I, ArgT>(
parachain_context: P,
build_parachain_context: P,
para_id: ParaId,
exit: E,
key: Arc<ed25519::Pair>,
args: I,
version: VersionInfo,
) -> polkadot_cli::error::Result<()> where
P: ParachainContext + Send + 'static,
<P::ProduceCandidate as IntoFuture>::Future: Send + 'static,
P: BuildParachainContext + Send + 'static,
P::ParachainContext: Send + 'static,
<<P::ParachainContext as ParachainContext>::ProduceCandidate as IntoFuture>::Future: Send + 'static,
E: IntoFuture<Item=(),Error=()>,
E::Future: Send + Clone + Sync + 'static,
I: IntoIterator<Item=ArgT>,
ArgT: Into<std::ffi::OsString> + Clone,
{
let node_logic = CollationNode { parachain_context, exit: exit.into_future(), para_id, key };
let node_logic = CollationNode { build_parachain_context, exit: exit.into_future(), para_id, key };
polkadot_cli::run(args, node_logic, version)
}