Collator node workflow (#280)

* arbitrary application logic in CLI

* collation work

* split up exit and work futures in application

* collation node workflow

* typo

* indentation fix

* doc grumbles

* rename Application to Worker

* refactor Worker::exit to exit_only
This commit is contained in:
Robert Habermeier
2018-07-06 15:13:31 +01:00
committed by Sergey Pepyakin
parent 6bfcbd6d59
commit 24f7b548dc
13 changed files with 285 additions and 94 deletions
+151 -19
View File
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Collation Logic.
//! Collation node logic.
//!
//! A collator node lives on a distinct parachain and submits a proposal for
//! a state transition, along with a proof for its validity
@@ -28,7 +28,7 @@
//! destination B as egress(X)[A -> B]
//!
//! On every block, each parachain will be intended to route messages from some
//! subset of all the other parachains.
//! subset of all the other parachains. (NOTE: in practice this is not done until PoC-3)
//!
//! Since the egress information is unique to every block, when routing from a
//! parachain a collator must gather all egress posts from that parachain
@@ -45,25 +45,41 @@
//! to be performed, as the collation logic itself.
extern crate futures;
extern crate substrate_client as client;
extern crate substrate_codec as codec;
extern crate substrate_primitives as primitives;
extern crate ed25519;
extern crate polkadot_api;
extern crate polkadot_cli;
extern crate polkadot_runtime;
extern crate polkadot_primitives;
use std::collections::{BTreeSet, BTreeMap};
#[macro_use]
extern crate log;
use futures::{stream, Stream, Future, IntoFuture};
use polkadot_primitives::parachain::{self, CandidateSignature, ConsolidatedIngress, Message, Id as ParaId};
use std::collections::{BTreeSet, BTreeMap};
use std::sync::Arc;
use futures::{future, stream, Stream, Future, IntoFuture};
use client::BlockchainEvents;
use polkadot_api::PolkadotApi;
use polkadot_primitives::BlockId;
use polkadot_primitives::parachain::{self, BlockData, HeadData, ConsolidatedIngress, Collation, Message, Id as ParaId};
use polkadot_cli::{ClientError, ServiceComponents, ClientBackend, PolkadotBlock, StateMachineBackend, Service};
use polkadot_cli::Worker;
/// Parachain context needed for collation.
///
/// This can be implemented through an externally attached service or a stub.
pub trait ParachainContext {
/// Produce a candidate, given the latest ingress queue information.
/// This is expected to be a lightweight, shared type like an Arc.
pub trait ParachainContext: Clone {
/// Produce a candidate, given the latest ingress queue information and the last parachain head.
fn produce_candidate<I: IntoIterator<Item=(ParaId, Message)>>(
&self,
last_head: HeadData,
ingress: I,
) -> (parachain::BlockData, polkadot_primitives::AccountId, CandidateSignature);
) -> (BlockData, HeadData);
}
/// Relay chain context needed to collate.
@@ -120,29 +136,145 @@ pub fn collate_ingress<'a, R>(relay_context: R)
.map(ConsolidatedIngress)
}
/// Produce a candidate for the parachain.
pub fn collate<'a, R: 'a, P>(local_id: ParaId, relay_context: R, para_context: P)
-> impl Future<Item=parachain::Candidate, Error=R::Error> + 'a
/// Produce a candidate for the parachain, with given contexts, parent head, and signing key.
pub fn collate<'a, R, P>(
local_id: ParaId,
last_head: HeadData,
relay_context: R,
para_context: P,
key: Arc<ed25519::Pair>,
)
-> impl Future<Item=parachain::Collation, Error=R::Error> + 'a
where
R: RelayChainContext,
R::Error: 'a,
R: RelayChainContext + 'a,
R::Error: 'a,
R::FutureEgress: 'a,
P: ParachainContext + 'a,
{
collate_ingress(relay_context).map(move |ingress| {
let (block_data, _, signature) = para_context.produce_candidate(
let (block_data, head_data) = para_context.produce_candidate(
last_head,
ingress.0.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg)))
);
parachain::Candidate {
let signature = key.sign(&block_data.0[..]).into();
let pubkey_bytes: [u8; 32] = key.public().into();
let receipt = parachain::CandidateReceipt {
parachain_index: local_id,
collator_signature: signature,
block: block_data,
unprocessed_ingress: ingress,
collator: pubkey_bytes.into(),
signature,
head_data,
balance_uploads: Vec::new(),
egress_queue_roots: Vec::new(),
fees: 0,
block_data_hash: block_data.hash(),
};
parachain::Collation {
receipt,
block_data,
}
})
}
/// Polkadot-api context.
struct ApiContext;
impl RelayChainContext for ApiContext {
type Error = ();
type FutureEgress = Result<Vec<Vec<Message>>, Self::Error>;
fn routing_parachains(&self) -> BTreeSet<ParaId> {
BTreeSet::new()
}
fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress {
Ok(Vec::new())
}
}
struct CollationNode<P, E> {
parachain_context: P,
exit: E,
para_id: ParaId,
key: Arc<ed25519::Pair>,
}
impl<P, E> Worker for CollationNode<P, E> where
P: ParachainContext + 'static,
E: Future<Item=(),Error=()> + Send + 'static
{
type Work = Box<Future<Item=(),Error=()>>;
type Exit = E;
fn exit_only(self) -> Self::Exit {
self.exit
}
fn work<C: ServiceComponents>(self, service: &Service<C>) -> Self::Work
where ClientError: From<<<<C as ServiceComponents>::Backend as ClientBackend<PolkadotBlock>>::State as StateMachineBackend>::Error>,
{
let CollationNode { parachain_context, exit, para_id, key } = self;
let client = service.client();
let api = service.api();
let work = client.import_notification_stream()
.and_then(move |notification| {
let id = BlockId::hash(notification.hash);
match api.parachain_head(&id, para_id) {
Ok(Some(last_head)) => {
let collation_work = collate(
para_id,
HeadData(last_head),
ApiContext,
parachain_context.clone(),
key.clone(),
).map(Some);
future::Either::A(collation_work)
}
Ok(None) => {
info!("Parachain {:?} appears to be inactive. Cannot collate.", id);
future::Either::B(future::ok(None))
}
Err(e) => {
warn!("Could not collate for parachain {:?}: {:?}", id, e);
future::Either::B(future::ok(None)) // returning error would shut down the collation node
}
}
})
.for_each(|_collation: Option<Collation>| {
// TODO: import into network.
Ok(())
});
let work_and_exit = work.select(exit).then(|_| Ok(()));
Box::new(work_and_exit) as Box<_>
}
}
/// Run a collator node with the given `RelayChainContext` and `ParachainContext` and
/// arguments to the underlying polkadot node.
///
/// Provide a future which resolves when the node should exit.
/// This function blocks until done.
pub fn run_collator<P, E>(
parachain_context: P,
para_id: ParaId,
exit: E,
key: Arc<ed25519::Pair>,
args: Vec<::std::ffi::OsString>
) -> polkadot_cli::error::Result<()> where
P: ParachainContext + 'static,
E: IntoFuture<Item=(),Error=()>,
E::Future: Send + 'static,
{
let node_logic = CollationNode { parachain_context, exit: exit.into_future(), para_id, key };
polkadot_cli::run(args, node_logic)
}
#[cfg(test)]
mod tests {
use super::*;
@@ -170,7 +302,7 @@ mod tests {
}
}
#[test]
#[test]
fn collates_ingress() {
let route_from = |x: &[ParaId]| {
let mut set = BTreeSet::new();