mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 11:41:02 +00:00
Collator-side of collator protocol (#351)
* skeleton of collators object * awaiting and handling collations. rename `collators` to CollationPool * add some tests * add tests * implement Collators trait for ConsensusNetwork * plug collators into main polkadot-network * ignore collator role message * add a couple more tests * garbage collection for collations * extract session-key tracking from consensus * add local_collations.rs * finish polish of local_collations * integrate local_collations into network layer * introduce API for adding local collations * mostly finish collator implementation pending service fix * Specialized network() * push collations to the network * grumbles * substrate-service has custom configuration * initialize network in collator mode as necessary
This commit is contained in:
committed by
GitHub
parent
c28dd30461
commit
5a09802e57
@@ -49,6 +49,7 @@ extern crate substrate_client as client;
|
||||
extern crate substrate_codec as codec;
|
||||
extern crate substrate_primitives as primitives;
|
||||
extern crate ed25519;
|
||||
extern crate tokio;
|
||||
|
||||
extern crate polkadot_api;
|
||||
extern crate polkadot_cli;
|
||||
@@ -58,16 +59,20 @@ extern crate polkadot_primitives;
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
|
||||
use std::collections::{BTreeSet, BTreeMap};
|
||||
use std::collections::{BTreeSet, BTreeMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use futures::{future, stream, Stream, Future, IntoFuture};
|
||||
use client::BlockchainEvents;
|
||||
use polkadot_api::PolkadotApi;
|
||||
use polkadot_primitives::BlockId;
|
||||
use polkadot_primitives::parachain::{self, BlockData, HeadData, ConsolidatedIngress, Collation, Message, Id as ParaId};
|
||||
use polkadot_cli::{ServiceComponents, Service};
|
||||
use polkadot_primitives::{AccountId, BlockId, SessionKey};
|
||||
use polkadot_primitives::parachain::{self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId};
|
||||
use polkadot_cli::{ServiceComponents, Service, CustomConfiguration};
|
||||
use polkadot_cli::Worker;
|
||||
use tokio::timer::Deadline;
|
||||
|
||||
const COLLATION_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
|
||||
/// Parachain context needed for collation.
|
||||
///
|
||||
@@ -99,6 +104,11 @@ pub trait RelayChainContext {
|
||||
fn unrouted_egress(&self, id: ParaId) -> Self::FutureEgress;
|
||||
}
|
||||
|
||||
fn key_to_account_id(key: &ed25519::Pair) -> AccountId {
|
||||
let pubkey_bytes: [u8; 32] = key.public().into();
|
||||
pubkey_bytes.into()
|
||||
}
|
||||
|
||||
/// Collate the necessary ingress queue using the given context.
|
||||
pub fn collate_ingress<'a, R>(relay_context: R)
|
||||
-> impl Future<Item=ConsolidatedIngress, Error=R::Error> + 'a
|
||||
@@ -159,11 +169,10 @@ pub fn collate<'a, R, P>(
|
||||
|
||||
let block_data_hash = block_data.hash();
|
||||
let signature = key.sign(&block_data_hash.0[..]).into();
|
||||
let pubkey_bytes: [u8; 32] = key.public().into();
|
||||
|
||||
let receipt = parachain::CandidateReceipt {
|
||||
parachain_index: local_id,
|
||||
collator: pubkey_bytes.into(),
|
||||
collator: key_to_account_id(&*key),
|
||||
signature,
|
||||
head_data,
|
||||
balance_uploads: Vec::new(),
|
||||
@@ -183,7 +192,7 @@ pub fn collate<'a, R, P>(
|
||||
struct ApiContext;
|
||||
|
||||
impl RelayChainContext for ApiContext {
|
||||
type Error = ();
|
||||
type Error = ::polkadot_api::Error;
|
||||
type FutureEgress = Result<Vec<Vec<Message>>, Self::Error>;
|
||||
|
||||
fn routing_parachains(&self) -> BTreeSet<ParaId> {
|
||||
@@ -203,12 +212,21 @@ struct CollationNode<P, E> {
|
||||
}
|
||||
|
||||
impl<P, E> Worker for CollationNode<P, E> where
|
||||
P: ParachainContext + 'static,
|
||||
P: ParachainContext + Send + 'static,
|
||||
E: Future<Item=(),Error=()> + Send + 'static
|
||||
{
|
||||
type Work = Box<Future<Item=(),Error=()>>;
|
||||
type Work = Box<Future<Item=(),Error=()> + Send>;
|
||||
type Exit = E;
|
||||
|
||||
fn configuration(&self) -> CustomConfiguration {
|
||||
let mut config = CustomConfiguration::default();
|
||||
config.collating_for = Some((
|
||||
key_to_account_id(&*self.key),
|
||||
self.para_id.clone(),
|
||||
));
|
||||
config
|
||||
}
|
||||
|
||||
fn exit_only(self) -> Self::Exit {
|
||||
self.exit
|
||||
}
|
||||
@@ -217,35 +235,66 @@ impl<P, E> Worker for CollationNode<P, E> where
|
||||
let CollationNode { parachain_context, exit, para_id, key } = self;
|
||||
let client = service.client();
|
||||
let api = service.api();
|
||||
let network = service.network();
|
||||
|
||||
let work = client.import_notification_stream()
|
||||
.and_then(move |notification| {
|
||||
let id = BlockId::hash(notification.hash);
|
||||
|
||||
match api.parachain_head(&id, para_id) {
|
||||
Ok(Some(last_head)) => {
|
||||
let collation_work = collate(
|
||||
para_id,
|
||||
HeadData(last_head),
|
||||
ApiContext,
|
||||
parachain_context.clone(),
|
||||
key.clone(),
|
||||
).map(Some);
|
||||
|
||||
future::Either::A(collation_work)
|
||||
}
|
||||
Ok(None) => {
|
||||
info!("Parachain {:?} appears to be inactive. Cannot collate.", id);
|
||||
future::Either::B(future::ok(None))
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Could not collate for parachain {:?}: {:?}", id, e);
|
||||
future::Either::B(future::ok(None)) // returning error would shut down the collation node
|
||||
.for_each(move |notification| {
|
||||
macro_rules! try_fr {
|
||||
($e:expr) => {
|
||||
match $e {
|
||||
Ok(x) => x,
|
||||
Err(e) => return future::Either::A(future::err(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.for_each(|_collation: Option<Collation>| {
|
||||
// TODO: import into network.
|
||||
|
||||
let relay_parent = notification.hash;
|
||||
let id = BlockId::hash(relay_parent);
|
||||
|
||||
let network = network.clone();
|
||||
let api = api.clone();
|
||||
let key = key.clone();
|
||||
let parachain_context = parachain_context.clone();
|
||||
|
||||
let work = future::lazy(move || {
|
||||
let last_head = match try_fr!(api.parachain_head(&id, para_id)) {
|
||||
Some(last_head) => last_head,
|
||||
None => return future::Either::A(future::ok(())),
|
||||
};
|
||||
|
||||
let targets = compute_targets(
|
||||
para_id,
|
||||
try_fr!(api.session_keys(&id)).as_slice(),
|
||||
try_fr!(api.duty_roster(&id)),
|
||||
);
|
||||
|
||||
let collation_work = collate(
|
||||
para_id,
|
||||
HeadData(last_head),
|
||||
ApiContext,
|
||||
parachain_context,
|
||||
key,
|
||||
).map(move |collation| {
|
||||
network.with_spec(|spec, ctx| spec.add_local_collation(
|
||||
ctx,
|
||||
relay_parent,
|
||||
targets,
|
||||
collation,
|
||||
));
|
||||
});
|
||||
|
||||
future::Either::B(collation_work)
|
||||
});
|
||||
let deadlined = Deadline::new(work, Instant::now() + COLLATION_TIMEOUT);
|
||||
let silenced = deadlined.then(|res| match res {
|
||||
Ok(()) => Ok(()),
|
||||
Err(e) => {
|
||||
warn!("Collation failure: {}", e);
|
||||
Ok(())
|
||||
}
|
||||
});
|
||||
|
||||
tokio::spawn(silenced);
|
||||
Ok(())
|
||||
});
|
||||
|
||||
@@ -254,6 +303,16 @@ impl<P, E> Worker for CollationNode<P, E> where
|
||||
}
|
||||
}
|
||||
|
||||
fn compute_targets(para_id: ParaId, session_keys: &[SessionKey], roster: DutyRoster) -> HashSet<SessionKey> {
|
||||
use polkadot_primitives::parachain::Chain;
|
||||
|
||||
roster.validator_duty.iter().enumerate()
|
||||
.filter(|&(_, c)| c == &Chain::Parachain(para_id))
|
||||
.filter_map(|(i, _)| session_keys.get(i))
|
||||
.cloned()
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Run a collator node with the given `RelayChainContext` and `ParachainContext` and
|
||||
/// arguments to the underlying polkadot node.
|
||||
///
|
||||
@@ -266,7 +325,7 @@ pub fn run_collator<P, E>(
|
||||
key: Arc<ed25519::Pair>,
|
||||
args: Vec<::std::ffi::OsString>
|
||||
) -> polkadot_cli::error::Result<()> where
|
||||
P: ParachainContext + 'static,
|
||||
P: ParachainContext + Send + 'static,
|
||||
E: IntoFuture<Item=(),Error=()>,
|
||||
E::Future: Send + 'static,
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user