Minimal parachains part 2: Parachain statement and data routing (#173)

* dynamic inclusion threshold calculator

* collators interface

* collation helpers

* initial proposal-creation future

* create proposer when asked to propose

* remove local_availability duty

* statement table tracks includable parachain count

* beginnings of timing future

* finish proposal logic

* remove stray println

* extract shared table to separate module

* change ordering

* includability tracking

* fix doc

* initial changes to parachains module

* initialise dummy block before API calls

* give polkadot control over round proposer based on random seed

* propose only after enough candidates

* flesh out parachains module a bit more

* set_heads

* actually introduce set_heads to runtime

* update block_builder to accept parachains

* split block validity errors from real errors in evaluation

* update WASM runtimes

* polkadot-api methods for parachains additions

* delay evaluation until candidates are ready

* comments

* fix dynamic inclusion with zero initial

* test for includability tracker

* wasm validation of parachain candidates

* move primitives to primitives crate

* remove runtime-std dependency from codec

* adjust doc

* polkadot-parachain-primitives

* kill legacy polkadot-validator crate

* basic-add test chain

* test for basic_add parachain

* move to test-chains dir

* use wasm-build

* new wasm directory layout

* reorganize a bit more

* Fix for rh-minimal-parachain (#141)

* Remove extern "C"

We already encountered such behavior (bug?) in pwasm-std, I believe.

* Fix `panic_fmt` signature by adding `_col`

Wrong `panic_fmt` signature can inhibit some optimizations in LTO mode.

* Add linker flags and use wasm-gc in build script

Pass --import-memory to LLD to emit wasm binary with imported memory.

Also use wasm-gc instead of wasm-build.

* Fix effective_max.

I'm not sure why it was the way it was actually.

* Recompile wasm.

* Fix indent

* more basic_add tests

* validate parachain WASM

* produce statements on receiving statements

* tests for reactive statement production

* fix build

* add OOM lang item to runtime-io

* use dynamic_inclusion when evaluating as well

* fix update_includable_count

* remove dead code

* grumbles

* actually defer round_proposer logic

* update wasm

* address a few more grumbles

* schedule collation work as soon as BFT is started

* impl future in collator

* fix comment

* governance proposals for adding and removing parachains

* bump protocol version

* tear out polkadot-specific pieces of substrate-network

* extract out polkadot-specific stuff from substrate-network

* begin polkadot network subsystem

* grumbles

* update WASM checkins

* parse status from polkadot peer

* allow invoke of network specialization

* begin statement router implementation

* remove dependency on tokio-timer

* fix sanity check and have proposer factory create communication streams

* pull out statement routing from consensus library

* fix comments

* adjust typedefs

* extract consensus_gossip out of main network protocol handler

* port substrate-bft to new tokio

* port polkadot-consensus to new tokio

* fix typo

* start message processing task

* initial consensus network implementation

* remove known tracking from statement-table crate

* extract router into separate module

* defer statements until later

* double signature is invalid

* propagating statements

* grumbles

* request block data

* fix compilation

* embed new consensus network into service

* port demo CLI to tokio

* all test crates compile

* some tests for fetching block data

* whitespace

* adjusting some tokio stuff

* update exit-future

* remove overly noisy warning

* clean up collation work a bit

* address review grumbles

* fix lock order in protocol handler

* rebuild wasm artifacts

* tag AuthorityId::from_slice for std only

* address formatting grumbles

* rename event_loop to executor

* some more docs for polkadot-network crate
This commit is contained in:
Robert Habermeier
2018-07-06 14:17:03 +02:00
committed by GitHub
parent 5355956314
commit be5ff4e62f
62 changed files with 3424 additions and 2134 deletions
+5 -15
View File
@@ -23,7 +23,7 @@ use std::sync::Arc;
use polkadot_api::PolkadotApi;
use polkadot_primitives::{Hash, AccountId, BlockId};
use polkadot_primitives::parachain::{Id as ParaId, Chain, BlockData, Extrinsic, CandidateReceipt};
use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt};
use futures::prelude::*;
@@ -55,7 +55,7 @@ pub trait Collators: Clone {
///
/// This future is fused.
pub struct CollationFetch<C: Collators, P: PolkadotApi> {
parachain: Option<ParaId>,
parachain: ParaId,
relay_parent_hash: Hash,
relay_parent: BlockId,
collators: C,
@@ -65,16 +65,13 @@ pub struct CollationFetch<C: Collators, P: PolkadotApi> {
impl<C: Collators, P: PolkadotApi> CollationFetch<C, P> {
/// Create a new collation fetcher for the given chain.
pub fn new(parachain: Chain, relay_parent: BlockId, relay_parent_hash: Hash, collators: C, client: Arc<P>) -> Self {
pub fn new(parachain: ParaId, relay_parent: BlockId, relay_parent_hash: Hash, collators: C, client: Arc<P>) -> Self {
CollationFetch {
relay_parent_hash,
relay_parent,
collators,
client,
parachain: match parachain {
Chain::Parachain(id) => Some(id),
Chain::Relay => None,
},
parachain,
live_fetch: None,
}
}
@@ -85,26 +82,19 @@ impl<C: Collators, P: PolkadotApi> Future for CollationFetch<C, P> {
type Error = C::Error;
fn poll(&mut self) -> Poll<(Collation, Extrinsic), C::Error> {
let parachain = match self.parachain.as_ref() {
Some(p) => p.clone(),
None => return Ok(Async::NotReady),
};
loop {
let x = {
let parachain = self.parachain.clone();
let (r, c) = (self.relay_parent_hash, &self.collators);
let poll = self.live_fetch
.get_or_insert_with(move || c.collate(parachain, r).into_future())
.poll();
if let Err(_) = poll { self.parachain = None }
try_ready!(poll)
};
match validate_collation(&*self.client, &self.relay_parent, &x) {
Ok(()) => {
self.parachain = None;
// TODO: generate extrinsic while verifying.
return Ok(Async::Ready((x, Extrinsic)));
}
@@ -61,7 +61,7 @@ impl DynamicInclusion {
/// would be enough, or `None` if it is sufficient now.
///
/// Panics if `now` is earlier than the `start`.
pub fn acceptable_in(&self, now: Instant, included: usize) -> Option<Duration> {
pub fn acceptable_in(&self, now: Instant, included: usize) -> Option<Instant> {
let elapsed = now.duration_since(self.start);
let elapsed = duration_to_micros(&elapsed);
@@ -70,7 +70,8 @@ impl DynamicInclusion {
if elapsed >= valid_after {
None
} else {
Some(Duration::from_millis((valid_after - elapsed) as u64 / 1000))
let until = Duration::from_millis((valid_after - elapsed) as u64 / 1000);
Some(now + until)
}
}
}
@@ -104,7 +105,7 @@ mod tests {
Duration::from_millis(4000),
);
assert_eq!(dynamic.acceptable_in(now, 5), Some(Duration::from_millis(2000)));
assert_eq!(dynamic.acceptable_in(now, 5), Some(now + Duration::from_millis(2000)));
assert!(dynamic.acceptable_in(now + Duration::from_millis(2000), 5).is_none());
assert!(dynamic.acceptable_in(now + Duration::from_millis(3000), 5).is_none());
assert!(dynamic.acceptable_in(now + Duration::from_millis(4000), 5).is_none());
+1 -1
View File
@@ -37,7 +37,7 @@ error_chain! {
description("Proposer destroyed before finishing proposing or evaluating"),
display("Proposer destroyed before finishing proposing or evaluating"),
}
Timer(e: String) {
Timer(e: ::tokio::timer::Error) {
description("Failed to register or resolve async timer."),
display("Timer failed: {}", e),
}
+192 -167
View File
@@ -44,11 +44,10 @@ extern crate substrate_codec as codec;
extern crate substrate_primitives as primitives;
extern crate substrate_runtime_support as runtime_support;
extern crate substrate_runtime_primitives as runtime_primitives;
extern crate substrate_network;
extern crate substrate_client as client;
extern crate exit_future;
extern crate tokio_core;
extern crate substrate_client as client;
extern crate tokio;
#[macro_use]
extern crate error_chain;
@@ -67,33 +66,32 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use codec::Slicable;
use table::generic::Statement as GenericStatement;
use runtime_support::Hashable;
use polkadot_api::PolkadotApi;
use polkadot_primitives::{Hash, Block, BlockId, BlockNumber, Header, Timestamp};
use polkadot_primitives::parachain::{Id as ParaId, Chain, DutyRoster, BlockData, Extrinsic as ParachainExtrinsic, CandidateReceipt};
use polkadot_runtime::BareExtrinsic;
use polkadot_primitives::{Hash, Block, BlockId, BlockNumber, Header, Timestamp, SessionKey};
use polkadot_primitives::parachain::{Id as ParaId, Chain, DutyRoster, BlockData, Extrinsic as ParachainExtrinsic, CandidateReceipt, CandidateSignature};
use primitives::AuthorityId;
use transaction_pool::{TransactionPool};
use tokio_core::reactor::{Handle, Timeout, Interval};
use transaction_pool::TransactionPool;
use tokio::runtime::TaskExecutor;
use tokio::timer::{Delay, Interval};
use futures::prelude::*;
use futures::future::{self, Shared};
use futures::future;
use collation::CollationFetch;
use dynamic_inclusion::DynamicInclusion;
pub use self::collation::{Collators, Collation};
pub use self::collation::{validate_collation, Collators, Collation};
pub use self::error::{ErrorKind, Error};
pub use self::shared_table::{SharedTable, StatementSource, StatementProducer, ProducedStatements};
pub use self::shared_table::{SharedTable, StatementProducer, ProducedStatements, Statement, SignedStatement, GenericStatement};
pub use service::Service;
mod collation;
mod dynamic_inclusion;
mod evaluation;
mod error;
mod service;
mod shared_table;
pub mod collation;
// block size limit.
const MAX_TRANSACTIONS_SIZE: usize = 4 * 1024 * 1024;
@@ -108,8 +106,9 @@ pub trait TableRouter: Clone {
/// Future that resolves when extrinsic candidate data is fetched.
type FetchExtrinsic: IntoFuture<Item=ParachainExtrinsic,Error=Self::Error>;
/// Note local candidate data, making it available on the network to other validators.
fn local_candidate_data(&self, hash: Hash, block_data: BlockData, extrinsic: ParachainExtrinsic);
/// Call with local candidate data. This will make the data available on the network,
/// and sign, import, and broadcast a statement about the candidate.
fn local_candidate(&self, candidate: CandidateReceipt, block_data: BlockData, extrinsic: ParachainExtrinsic);
/// Fetch block data for a specific candidate.
fn fetch_block_data(&self, candidate: &CandidateReceipt) -> Self::FetchCandidate;
@@ -118,23 +117,28 @@ pub trait TableRouter: Clone {
fn fetch_extrinsic_data(&self, candidate: &CandidateReceipt) -> Self::FetchExtrinsic;
}
/// A long-lived network which can create statement table routing instances.
/// A long-lived network which can create parachain statement and BFT message routing processes on demand.
pub trait Network {
/// The table router type. This should handle importing of any statements,
/// routing statements to peers, and driving completion of any `StatementProducers`.
type TableRouter: TableRouter;
/// The input stream of BFT messages. Should never logically conclude.
type Input: Stream<Item=bft::Communication<Block>,Error=Error>;
/// The output sink of BFT messages. Messages sent here should eventually pass to all
/// current authorities.
type Output: Sink<SinkItem=bft::Communication<Block>,SinkError=Error>;
/// Instantiate a table router using the given shared table.
fn table_router(&self, table: Arc<SharedTable>) -> Self::TableRouter;
/// Instantiate a table router using the given shared table and task executor.
fn communication_for(&self, validators: &[SessionKey], table: Arc<SharedTable>, task_executor: TaskExecutor) -> (Self::TableRouter, Self::Input, Self::Output);
}
/// Information about a specific group.
#[derive(Debug, Clone, Default)]
pub struct GroupInfo {
/// Authorities meant to check validity of candidates.
pub validity_guarantors: HashSet<AuthorityId>,
pub validity_guarantors: HashSet<SessionKey>,
/// Authorities meant to check availability of candidate data.
pub availability_guarantors: HashSet<AuthorityId>,
pub availability_guarantors: HashSet<SessionKey>,
/// Number of votes needed for validity.
pub needed_validity: usize,
/// Number of votes needed for availability.
@@ -144,20 +148,21 @@ pub struct GroupInfo {
/// Sign a table statement against a parent hash.
/// The actual message signed is the encoded statement concatenated with the
/// parent hash.
pub fn sign_table_statement(statement: &table::Statement, key: &ed25519::Pair, parent_hash: &Hash) -> ed25519::Signature {
use polkadot_primitives::parachain::Statement as RawStatement;
let raw = match *statement {
GenericStatement::Candidate(ref c) => RawStatement::Candidate(c.clone()),
GenericStatement::Valid(h) => RawStatement::Valid(h),
GenericStatement::Invalid(h) => RawStatement::Invalid(h),
GenericStatement::Available(h) => RawStatement::Available(h),
};
let mut encoded = raw.encode();
pub fn sign_table_statement(statement: &Statement, key: &ed25519::Pair, parent_hash: &Hash) -> CandidateSignature {
let mut encoded = statement.encode();
encoded.extend(&parent_hash.0);
key.sign(&encoded)
key.sign(&encoded).into()
}
/// Check signature on table statement.
pub fn check_statement(statement: &Statement, signature: &CandidateSignature, signer: SessionKey, parent_hash: &Hash) -> bool {
use runtime_primitives::traits::Verify;
let mut encoded = statement.encode();
encoded.extend(&parent_hash.0);
signature.verify(&encoded[..], &signer.into())
}
fn make_group_info(roster: DutyRoster, authorities: &[AuthorityId], local_id: AuthorityId) -> Result<(HashMap<ParaId, GroupInfo>, LocalDuty), Error> {
@@ -217,41 +222,43 @@ fn make_group_info(roster: DutyRoster, authorities: &[AuthorityId], local_id: Au
}
}
fn timer_error(e: &::std::io::Error) -> Error {
ErrorKind::Timer(format!("{}", e)).into()
}
/// Polkadot proposer factory.
pub struct ProposerFactory<C, N, P> {
/// The client instance.
pub client: Arc<C>,
pub client: Arc<P>,
/// The transaction pool.
pub transaction_pool: Arc<TransactionPool<C>>,
pub transaction_pool: Arc<TransactionPool<P>>,
/// The backing network handle.
pub network: N,
/// Parachain collators.
pub collators: P,
/// The timer used to schedule proposal intervals.
pub handle: Handle,
pub collators: C,
/// handle to remote task executor
pub handle: TaskExecutor,
/// The duration after which parachain-empty blocks will be allowed.
pub parachain_empty_duration: Duration,
}
impl<C, N, P> bft::ProposerFactory<Block> for ProposerFactory<C, N, P>
impl<C, N, P> bft::Environment<Block> for ProposerFactory<C, N, P>
where
C: PolkadotApi + Send + Sync,
C: Collators + Send + 'static,
N: Network,
P: Collators,
P: PolkadotApi + Send + Sync + 'static,
<C::Collation as IntoFuture>::Future: Send + 'static,
N::TableRouter: Send + 'static,
{
type Proposer = Proposer<C, N::TableRouter, P>;
type Proposer = Proposer<P>;
type Input = N::Input;
type Output = N::Output;
type Error = Error;
fn init(&self, parent_header: &Header, authorities: &[AuthorityId], sign_with: Arc<ed25519::Pair>) -> Result<Self::Proposer, Error> {
use std::time::Duration;
fn init(&self,
parent_header: &Header,
authorities: &[AuthorityId],
sign_with: Arc<ed25519::Pair>
) -> Result<(Self::Proposer, Self::Input, Self::Output), Error> {
const DELAY_UNTIL: Duration = Duration::from_millis(5000);
let parent_hash = parent_header.blake2_256().into();
let parent_hash = parent_header.hash().into();
let id = BlockId::hash(parent_hash);
let duty_roster = self.client.duty_roster(&id)?;
@@ -267,70 +274,116 @@ impl<C, N, P> bft::ProposerFactory<Block> for ProposerFactory<C, N, P>
let n_parachains = active_parachains.len();
let table = Arc::new(SharedTable::new(group_info, sign_with.clone(), parent_hash));
let router = self.network.table_router(table.clone());
let dynamic_inclusion = DynamicInclusion::new(
n_parachains,
Instant::now(),
self.parachain_empty_duration.clone(),
let (router, input, output) = self.network.communication_for(
authorities,
table.clone(),
self.handle.clone()
);
let timeout = Timeout::new(DELAY_UNTIL, &self.handle)
.map_err(|e| timer_error(&e))?;
let now = Instant::now();
let dynamic_inclusion = DynamicInclusion::new(
n_parachains,
now,
self.parachain_empty_duration.clone(),
);
debug!(target: "bft", "Initialising consensus proposer. Refusing to evaluate for {:?} from now.",
DELAY_UNTIL);
// TODO [PoC-2]: kick off collation process.
Ok(Proposer {
let validation_para = match local_duty.validation {
Chain::Relay => None,
Chain::Parachain(id) => Some(id),
};
let collation_work = validation_para.map(|para| CollationFetch::new(
para,
id.clone(),
parent_hash.clone(),
self.collators.clone(),
self.client.clone(),
));
let drop_signal = dispatch_collation_work(
router.clone(),
&self.handle,
collation_work,
);
let proposer = Proposer {
client: self.client.clone(),
collators: self.collators.clone(),
delay: timeout.shared(),
handle: self.handle.clone(),
dynamic_inclusion,
local_duty,
local_key: sign_with,
minimum_delay: now + DELAY_UNTIL,
parent_hash,
parent_id: id,
parent_number: parent_header.number,
random_seed,
router,
table,
transaction_pool: self.transaction_pool.clone(),
})
_drop_signal: drop_signal,
};
Ok((proposer, input, output))
}
}
// dispatch collation work to be done in the background. returns a signal object
// that should fire when the collation work is no longer necessary (e.g. when the proposer object is dropped)
fn dispatch_collation_work<R, C, P>(
router: R,
handle: &TaskExecutor,
work: Option<CollationFetch<C, P>>,
) -> exit_future::Signal where
C: Collators + Send + 'static,
P: PolkadotApi + Send + Sync + 'static,
<C::Collation as IntoFuture>::Future: Send + 'static,
R: TableRouter + Send + 'static,
{
let (signal, exit) = exit_future::signal();
let handled_work = work.then(move |result| match result {
Ok(Some((collation, extrinsic))) => {
router.local_candidate(collation.receipt, collation.block_data, extrinsic);
Ok(())
}
Ok(None) => Ok(()),
Err(_e) => {
warn!(target: "consensus", "Failed to collate candidate");
Ok(())
}
});
let cancellable_work = handled_work.select(exit).then(|_| Ok(()));
// spawn onto thread pool.
handle.spawn(cancellable_work);
signal
}
struct LocalDuty {
validation: Chain,
}
/// The Polkadot proposer logic.
pub struct Proposer<C: PolkadotApi, R, P> {
pub struct Proposer<C: PolkadotApi> {
client: Arc<C>,
collators: P,
delay: Shared<Timeout>,
dynamic_inclusion: DynamicInclusion,
handle: Handle,
local_duty: LocalDuty,
local_key: Arc<ed25519::Pair>,
minimum_delay: Instant,
parent_hash: Hash,
parent_id: BlockId,
parent_number: BlockNumber,
random_seed: Hash,
router: R,
table: Arc<SharedTable>,
transaction_pool: Arc<TransactionPool<C>>,
_drop_signal: exit_future::Signal,
}
impl<C, R, P> bft::Proposer<Block> for Proposer<C, R, P>
impl<C> bft::Proposer<Block> for Proposer<C>
where
C: PolkadotApi + Send + Sync,
R: TableRouter,
P: Collators,
{
type Error = Error;
type Create = future::Either<
CreateProposal<C, R, P>,
CreateProposal<C>,
future::FutureResult<Block, Error>,
>;
type Evaluate = Box<Future<Item=bool, Error=Error>>;
@@ -339,32 +392,24 @@ impl<C, R, P> bft::Proposer<Block> for Proposer<C, R, P>
const ATTEMPT_PROPOSE_EVERY: Duration = Duration::from_millis(100);
let initial_included = self.table.includable_count();
let now = Instant::now();
let enough_candidates = self.dynamic_inclusion.acceptable_in(
Instant::now(),
now,
initial_included,
).unwrap_or_default();
).unwrap_or_else(|| now + Duration::from_millis(1));
let timing = {
let delay = self.delay.clone();
let dynamic_inclusion = self.dynamic_inclusion.clone();
let make_timing = move |handle| -> Result<ProposalTiming, ::std::io::Error> {
let attempt_propose = Interval::new(ATTEMPT_PROPOSE_EVERY, handle)?;
let enough_candidates = Timeout::new(enough_candidates, handle)?;
Ok(ProposalTiming {
attempt_propose,
enough_candidates,
dynamic_inclusion,
minimum_delay: Some(delay),
last_included: initial_included,
})
};
let minimum_delay = if self.minimum_delay > now + ATTEMPT_PROPOSE_EVERY {
Some(Delay::new(self.minimum_delay))
} else {
None
};
match make_timing(&self.handle) {
Ok(timing) => timing,
Err(e) => {
return future::Either::B(future::err(timer_error(&e)));
}
}
let timing = ProposalTiming {
attempt_propose: Interval::new(now + ATTEMPT_PROPOSE_EVERY, ATTEMPT_PROPOSE_EVERY),
enough_candidates: Delay::new(enough_candidates),
dynamic_inclusion: self.dynamic_inclusion.clone(),
minimum_delay,
last_included: initial_included,
};
future::Either::A(CreateProposal {
@@ -373,15 +418,7 @@ impl<C, R, P> bft::Proposer<Block> for Proposer<C, R, P>
parent_id: self.parent_id.clone(),
client: self.client.clone(),
transaction_pool: self.transaction_pool.clone(),
collation: CollationFetch::new(
self.local_duty.validation,
self.parent_id.clone(),
self.parent_hash.clone(),
self.collators.clone(),
self.client.clone()
),
table: self.table.clone(),
router: self.router.clone(),
timing,
})
}
@@ -415,9 +452,7 @@ impl<C, R, P> bft::Proposer<Block> for Proposer<C, R, P>
};
let vote_delays = {
// delay casting vote until able (according to minimum block time)
let minimum_delay = self.delay.clone()
.map_err(|e| timer_error(&*e));
let now = Instant::now();
let included_candidate_hashes = proposal
.parachain_heads()
@@ -431,33 +466,35 @@ impl<C, R, P> bft::Proposer<Block> for Proposer<C, R, P>
// the duration at which the given number of parachains is acceptable.
let count_delay = self.dynamic_inclusion.acceptable_in(
Instant::now(),
now,
proposal.parachain_heads().len(),
);
// the duration until the given timestamp is current
let proposed_timestamp = proposal.timestamp();
let timestamp_delay = if proposed_timestamp > current_timestamp {
Some(Duration::from_secs(proposed_timestamp - current_timestamp))
Some(now + Duration::from_secs(proposed_timestamp - current_timestamp))
} else {
None
};
// delay casting vote until able according to minimum block time,
// timestamp delay, and count delay.
// construct a future from the maximum of the two durations.
let temporary_delay = match ::std::cmp::max(timestamp_delay, count_delay) {
Some(duration) => {
let maybe_timeout = Timeout::new(duration, &self.handle);
let max_delay = [timestamp_delay, count_delay, Some(self.minimum_delay)]
.iter()
.cloned()
.max()
.expect("iterator not empty; thus max returns `Some`; qed");
let f = future::result(maybe_timeout)
.and_then(|timeout| timeout)
.map_err(|e| timer_error(&e));
future::Either::A(f)
}
let temporary_delay = match max_delay {
Some(duration) => future::Either::A(
Delay::new(duration).map_err(|e| Error::from(ErrorKind::Timer(e)))
),
None => future::Either::B(future::ok(())),
};
minimum_delay.join3(includability_tracker, temporary_delay)
includability_tracker.join(temporary_delay)
};
// evaluate whether the block is actually valid.
@@ -497,7 +534,7 @@ impl<C, R, P> bft::Proposer<Block> for Proposer<C, R, P>
use bft::generic::Misbehavior as GenericMisbehavior;
use runtime_primitives::bft::{MisbehaviorKind, MisbehaviorReport};
use runtime_primitives::MaybeUnsigned;
use polkadot_runtime::{Call, Extrinsic, UncheckedExtrinsic, ConsensusCall};
use polkadot_runtime::{Call, Extrinsic, BareExtrinsic, UncheckedExtrinsic, ConsensusCall};
let local_id = self.local_key.public().0.into();
let mut next_index = {
@@ -569,71 +606,59 @@ fn current_timestamp() -> Timestamp {
struct ProposalTiming {
attempt_propose: Interval,
dynamic_inclusion: DynamicInclusion,
enough_candidates: Timeout,
minimum_delay: Option<Shared<Timeout>>,
enough_candidates: Delay,
minimum_delay: Option<Delay>,
last_included: usize,
}
impl ProposalTiming {
// whether it's time to attempt a proposal.
// shouldn't be called outside of the context of a task.
fn poll(&mut self, included: usize) -> Poll<(), Error> {
fn poll(&mut self, included: usize) -> Poll<(), ErrorKind> {
// first drain from the interval so when the minimum delay is up
// we don't have any notifications built up.
//
// this interval is just meant to produce periodic task wakeups
// that lead to the `dynamic_inclusion` getting updated as necessary.
if let Async::Ready(x) = self.attempt_propose.poll()
.map_err(|e| timer_error(&e))?
{
if let Async::Ready(x) = self.attempt_propose.poll().map_err(ErrorKind::Timer)? {
x.expect("timer still alive; intervals never end; qed");
}
if let Some(ref mut min) = self.minimum_delay {
try_ready!(min.poll().map_err(|e| timer_error(&*e)));
try_ready!(min.poll().map_err(ErrorKind::Timer));
}
self.minimum_delay = None; // after this point, the future must have completed.
if included == self.last_included {
return self.enough_candidates.poll().map_err(|e| timer_error(&e));
return self.enough_candidates.poll().map_err(ErrorKind::Timer);
}
// the amount of includable candidates has changed. schedule a wakeup
// if it's not sufficient anymore.
let now = Instant::now();
match self.dynamic_inclusion.acceptable_in(now, included) {
Some(duration) => {
match self.dynamic_inclusion.acceptable_in(Instant::now(), included) {
Some(instant) => {
self.last_included = included;
self.enough_candidates.reset(now + duration);
self.enough_candidates.poll().map_err(|e| timer_error(&e))
}
None => {
Ok(Async::Ready(()))
self.enough_candidates.reset(instant);
self.enough_candidates.poll().map_err(ErrorKind::Timer)
}
None => Ok(Async::Ready(())),
}
}
}
/// Future which resolves upon the creation of a proposal.
pub struct CreateProposal<C: PolkadotApi, R, P: Collators> {
pub struct CreateProposal<C: PolkadotApi> {
parent_hash: Hash,
parent_number: BlockNumber,
parent_id: BlockId,
client: Arc<C>,
transaction_pool: Arc<TransactionPool<C>>,
collation: CollationFetch<P, C>,
router: R,
table: Arc<SharedTable>,
timing: ProposalTiming,
}
impl<C, R, P> CreateProposal<C, R, P>
where
C: PolkadotApi,
R: TableRouter,
P: Collators,
{
impl<C> CreateProposal<C> where C: PolkadotApi {
fn propose_with(&self, candidates: Vec<CandidateReceipt>) -> Result<Block, Error> {
use polkadot_api::BlockBuilder;
use runtime_primitives::traits::{Hashing, BlakeTwo256};
@@ -702,35 +727,17 @@ impl<C, R, P> CreateProposal<C, R, P>
}
}
impl<C, R, P> Future for CreateProposal<C, R, P>
where
C: PolkadotApi,
R: TableRouter,
P: Collators,
{
impl<C> Future for CreateProposal<C> where C: PolkadotApi {
type Item = Block;
type Error = Error;
fn poll(&mut self) -> Poll<Block, Error> {
// 1. poll local collation future.
match self.collation.poll() {
Ok(Async::Ready((collation, extrinsic))) => {
let hash = collation.receipt.hash();
self.router.local_candidate_data(hash, collation.block_data, extrinsic);
// TODO: if we are an availability guarantor also, we should produce an availability statement.
self.table.sign_and_import(&self.router, GenericStatement::Candidate(collation.receipt));
}
Ok(Async::NotReady) => {},
Err(_) => {}, // TODO: handle this failure to collate.
}
// 2. try to propose if we have enough includable candidates and other
// 1. try to propose if we have enough includable candidates and other
// delays have concluded.
let included = self.table.includable_count();
try_ready!(self.timing.poll(included));
// 3. propose
// 2. propose
let proposed_candidates = self.table.with_proposal(|proposed_set| {
proposed_set.into_iter().cloned().collect()
});
@@ -738,3 +745,21 @@ impl<C, R, P> Future for CreateProposal<C, R, P>
self.propose_with(proposed_candidates).map(Async::Ready)
}
}
#[cfg(test)]
mod tests {
use super::*;
use substrate_keyring::Keyring;
#[test]
fn sign_and_check_statement() {
let statement: Statement = GenericStatement::Valid([1; 32].into());
let parent_hash = [2; 32].into();
let sig = sign_table_statement(&statement, &Keyring::Alice.pair(), &parent_hash);
assert!(check_statement(&statement, &sig, Keyring::Alice.to_raw_public().into(), &parent_hash));
assert!(!check_statement(&statement, &sig, Keyring::Alice.to_raw_public().into(), &[0xff; 32].into()));
assert!(!check_statement(&statement, &sig, Keyring::Bob.to_raw_public().into(), &parent_hash));
}
}
+40 -235
View File
@@ -18,6 +18,10 @@
/// Consensus service. A long runnung service that manages BFT agreement and parachain
/// candidate agreement over the network.
///
/// This uses a handle to an underlying thread pool to dispatch heavy work
/// such as candidate verification while performing event-driven work
/// on a local event loop.
use std::thread;
use std::time::{Duration, Instant};
@@ -27,197 +31,37 @@ use bft::{self, BftService};
use client::{BlockchainEvents, ChainHead};
use ed25519;
use futures::prelude::*;
use futures::{future, Canceled};
use polkadot_api::LocalPolkadotApi;
use polkadot_primitives::{BlockId, Block, Header, Hash, AccountId};
use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt};
use primitives::AuthorityId;
use runtime_support::Hashable;
use substrate_network as net;
use tokio_core::reactor;
use polkadot_primitives::{Block, Header};
use transaction_pool::TransactionPool;
use super::{TableRouter, SharedTable, ProposerFactory};
use tokio::executor::current_thread::TaskExecutor as LocalThreadHandle;
use tokio::runtime::TaskExecutor as ThreadPoolHandle;
use tokio::runtime::current_thread::Runtime as LocalRuntime;
use tokio::timer::Interval;
use super::{Network, Collators, ProposerFactory};
use error;
const TIMER_DELAY_MS: u64 = 5000;
const TIMER_INTERVAL_MS: u64 = 500;
struct BftSink<E> {
network: Arc<net::ConsensusService<Block>>,
parent_hash: Hash,
_e: ::std::marker::PhantomData<E>,
}
struct Messages {
network_stream: net::BftMessageStream<Block>,
local_id: AuthorityId,
authorities: Vec<AuthorityId>,
}
impl Stream for Messages {
type Item = bft::Communication<Block>;
type Error = bft::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// check the network
loop {
match self.network_stream.poll() {
Err(_) => return Err(bft::InputStreamConcluded.into()),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(None)) => return Ok(Async::NotReady), // the input stream for agreements is never meant to logically end.
Ok(Async::Ready(Some(message))) => {
match process_message(message, &self.local_id, &self.authorities) {
Ok(Some(message)) => return Ok(Async::Ready(Some(message))),
Ok(None) => {} // ignored local message.
Err(e) => {
debug!("Message validation failed: {:?}", e);
}
}
}
}
}
}
}
fn process_message(msg: net::LocalizedBftMessage<Block>, local_id: &AuthorityId, authorities: &[AuthorityId]) -> Result<Option<bft::Communication<Block>>, bft::Error> {
Ok(Some(match msg.message {
net::generic_message::BftMessage::Consensus(c) => bft::generic::Communication::Consensus(match c {
net::generic_message::SignedConsensusMessage::Propose(proposal) => bft::generic::LocalizedMessage::Propose({
if &proposal.sender == local_id { return Ok(None) }
let proposal = bft::generic::LocalizedProposal {
round_number: proposal.round_number as usize,
proposal: proposal.proposal,
digest: proposal.digest,
sender: proposal.sender,
digest_signature: ed25519::LocalizedSignature {
signature: proposal.digest_signature,
signer: proposal.sender.into(),
},
full_signature: ed25519::LocalizedSignature {
signature: proposal.full_signature,
signer: proposal.sender.into(),
}
};
bft::check_proposal(authorities, &msg.parent_hash, &proposal)?;
trace!(target: "bft", "importing proposal message for round {} from {}", proposal.round_number, proposal.sender);
proposal
}),
net::generic_message::SignedConsensusMessage::Vote(vote) => bft::generic::LocalizedMessage::Vote({
if &vote.sender == local_id { return Ok(None) }
let vote = bft::generic::LocalizedVote {
sender: vote.sender,
signature: ed25519::LocalizedSignature {
signature: vote.signature,
signer: vote.sender.into(),
},
vote: match vote.vote {
net::generic_message::ConsensusVote::Prepare(r, h) => bft::generic::Vote::Prepare(r as usize, h),
net::generic_message::ConsensusVote::Commit(r, h) => bft::generic::Vote::Commit(r as usize, h),
net::generic_message::ConsensusVote::AdvanceRound(r) => bft::generic::Vote::AdvanceRound(r as usize),
}
};
bft::check_vote::<Block>(authorities, &msg.parent_hash, &vote)?;
trace!(target: "bft", "importing vote {:?} from {}", vote.vote, vote.sender);
vote
}),
}),
net::generic_message::BftMessage::Auxiliary(a) => {
let justification = bft::UncheckedJustification::<Hash>::from(a);
// TODO: get proper error
let justification: Result<_, bft::Error> = bft::check_prepare_justification::<Block>(authorities, msg.parent_hash, justification)
.map_err(|_| bft::ErrorKind::InvalidJustification.into());
bft::generic::Communication::Auxiliary(justification?)
},
}))
}
impl<E> Sink for BftSink<E> {
type SinkItem = bft::Communication<Block>;
// TODO: replace this with the ! type when that's stabilized
type SinkError = E;
fn start_send(&mut self, message: bft::Communication<Block>) -> ::futures::StartSend<bft::Communication<Block>, E> {
let network_message = net::generic_message::LocalizedBftMessage {
message: match message {
bft::generic::Communication::Consensus(c) => net::generic_message::BftMessage::Consensus(match c {
bft::generic::LocalizedMessage::Propose(proposal) => net::generic_message::SignedConsensusMessage::Propose(net::generic_message::SignedConsensusProposal {
round_number: proposal.round_number as u32,
proposal: proposal.proposal,
digest: proposal.digest,
sender: proposal.sender,
digest_signature: proposal.digest_signature.signature,
full_signature: proposal.full_signature.signature,
}),
bft::generic::LocalizedMessage::Vote(vote) => net::generic_message::SignedConsensusMessage::Vote(net::generic_message::SignedConsensusVote {
sender: vote.sender,
signature: vote.signature.signature,
vote: match vote.vote {
bft::generic::Vote::Prepare(r, h) => net::generic_message::ConsensusVote::Prepare(r as u32, h),
bft::generic::Vote::Commit(r, h) => net::generic_message::ConsensusVote::Commit(r as u32, h),
bft::generic::Vote::AdvanceRound(r) => net::generic_message::ConsensusVote::AdvanceRound(r as u32),
}
}),
}),
bft::generic::Communication::Auxiliary(justification) => net::generic_message::BftMessage::Auxiliary(justification.uncheck().into()),
},
parent_hash: self.parent_hash,
};
self.network.send_bft_message(network_message);
Ok(::futures::AsyncSink::Ready)
}
fn poll_complete(&mut self) -> ::futures::Poll<(), E> {
Ok(Async::Ready(()))
}
}
struct Network(Arc<net::ConsensusService<Block>>);
impl super::Network for Network {
type TableRouter = Router;
fn table_router(&self, _table: Arc<SharedTable>) -> Self::TableRouter {
Router {
network: self.0.clone()
}
}
}
// spin up an instance of BFT agreement on the current thread's executor.
// panics if there is no current thread executor.
fn start_bft<F, C>(
header: &Header,
handle: reactor::Handle,
client: &bft::Authorities<Block>,
network: Arc<net::ConsensusService<Block>>,
bft_service: &BftService<Block, F, C>,
) where
F: bft::ProposerFactory<Block> + 'static,
F: bft::Environment<Block> + 'static,
C: bft::BlockImport<Block> + bft::Authorities<Block> + 'static,
<F as bft::ProposerFactory<Block>>::Error: ::std::fmt::Debug,
F::Error: ::std::fmt::Debug,
<F::Proposer as bft::Proposer<Block>>::Error: ::std::fmt::Display + Into<error::Error>,
{
let parent_hash = header.hash();
if bft_service.live_agreement().map_or(false, |h| h == parent_hash) {
return;
}
let authorities = match client.authorities(&BlockId::hash(parent_hash)) {
Ok(authorities) => authorities,
Err(e) => {
debug!("Error reading authorities: {:?}", e);
return;
}
};
let input = Messages {
network_stream: network.bft_messages(parent_hash),
local_id: bft_service.local_id(),
authorities,
};
let output = BftSink { network: network, parent_hash: parent_hash, _e: Default::default() };
match bft_service.build_upon(&header, input.map_err(Into::into), output) {
Ok(Some(bft)) => handle.spawn(bft),
let mut handle = LocalThreadHandle::current();
match bft_service.build_upon(&header) {
Ok(Some(bft)) => if let Err(e) = handle.spawn_local(Box::new(bft)) {
debug!(target: "bft", "Couldn't initialize BFT agreement: {:?}", e);
},
Ok(None) => {},
Err(e) => debug!(target: "bft", "BFT agreement error: {:?}", e),
}
@@ -231,54 +75,56 @@ pub struct Service {
impl Service {
/// Create and start a new instance.
pub fn new<A, C>(
pub fn new<A, C, N>(
client: Arc<C>,
api: Arc<A>,
network: Arc<net::ConsensusService<Block>>,
network: N,
transaction_pool: Arc<TransactionPool<A>>,
thread_pool: ThreadPoolHandle,
parachain_empty_duration: Duration,
key: ed25519::Pair,
) -> Service
where
A: LocalPolkadotApi + Send + Sync + 'static,
C: BlockchainEvents<Block> + ChainHead<Block> + bft::BlockImport<Block> + bft::Authorities<Block> + Send + Sync + 'static,
N: Network + Collators + Send + 'static,
N::TableRouter: Send + 'static,
<N::Collation as IntoFuture>::Future: Send + 'static,
{
let (signal, exit) = ::exit_future::signal();
let thread = thread::spawn(move || {
let mut core = reactor::Core::new().expect("tokio::Core could not be created");
let mut runtime = LocalRuntime::new().expect("Could not create local runtime");
let key = Arc::new(key);
let factory = ProposerFactory {
client: api.clone(),
transaction_pool: transaction_pool.clone(),
network: Network(network.clone()),
collators: NoCollators,
collators: network.clone(),
network,
parachain_empty_duration,
handle: core.handle(),
handle: thread_pool,
};
let bft_service = Arc::new(BftService::new(client.clone(), key, factory));
let notifications = {
let handle = core.handle();
let network = network.clone();
let client = client.clone();
let bft_service = bft_service.clone();
client.import_notification_stream().for_each(move |notification| {
if notification.is_new_best {
start_bft(&notification.header, handle.clone(), &*client, network.clone(), &*bft_service);
start_bft(&notification.header, &*bft_service);
}
Ok(())
})
};
let interval = reactor::Interval::new_at(
let interval = Interval::new(
Instant::now() + Duration::from_millis(TIMER_DELAY_MS),
Duration::from_millis(TIMER_INTERVAL_MS),
&core.handle(),
).expect("it is always possible to create an interval with valid params");
);
let mut prev_best = match client.best_block_header() {
Ok(header) => header.blake2_256(),
Ok(header) => header.hash(),
Err(e) => {
warn!("Cant's start consensus service. Error reading best block header: {:?}", e);
return;
@@ -288,15 +134,13 @@ impl Service {
let timed = {
let c = client.clone();
let s = bft_service.clone();
let n = network.clone();
let handle = core.handle();
interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| {
if let Ok(best_block) = c.best_block_header() {
let hash = best_block.blake2_256();
let hash = best_block.hash();
if hash == prev_best {
debug!("Starting consensus round after a timeout");
start_bft(&best_block, handle.clone(), &*c, n.clone(), &*s);
start_bft(&best_block, &*s);
}
prev_best = hash;
}
@@ -304,9 +148,9 @@ impl Service {
})
};
core.handle().spawn(notifications);
core.handle().spawn(timed);
if let Err(e) = core.run(exit) {
runtime.spawn(notifications);
runtime.spawn(timed);
if let Err(e) = runtime.block_on(exit) {
debug!("BFT event loop error {:?}", e);
}
});
@@ -328,42 +172,3 @@ impl Drop for Service {
}
}
}
// Collators implementation which never collates anything.
// TODO: do a real implementation.
#[derive(Clone, Copy)]
struct NoCollators;
impl ::collation::Collators for NoCollators {
type Error = ();
type Collation = future::Empty<::collation::Collation, ()>;
fn collate(&self, _parachain: ParaId, _relay_parent: Hash) -> Self::Collation {
future::empty()
}
fn note_bad_collator(&self, _collator: AccountId) { }
}
#[derive(Clone)]
struct Router {
network: Arc<net::ConsensusService<Block>>,
}
impl TableRouter for Router {
type Error = Canceled;
type FetchCandidate = future::Empty<BlockData, Self::Error>;
type FetchExtrinsic = future::FutureResult<Extrinsic, Self::Error>;
fn local_candidate_data(&self, _hash: Hash, _block_data: BlockData, _extrinsic: Extrinsic) {
// TODO
}
fn fetch_block_data(&self, _candidate: &CandidateReceipt) -> Self::FetchCandidate {
future::empty()
}
fn fetch_extrinsic_data(&self, _candidate: &CandidateReceipt) -> Self::FetchExtrinsic {
future::ok(Extrinsic)
}
}
@@ -21,11 +21,9 @@ use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use table::{self, Table, Context as TableContextTrait};
use table::generic::Statement as GenericStatement;
use collation::Collation;
use polkadot_primitives::Hash;
use polkadot_primitives::{Hash, SessionKey};
use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt};
use primitives::AuthorityId;
use parking_lot::Mutex;
use futures::{future, prelude::*};
@@ -36,6 +34,8 @@ use self::includable::IncludabilitySender;
mod includable;
pub use self::includable::Includable;
pub use table::{SignedStatement, Statement};
pub use table::generic::Statement as GenericStatement;
struct TableContext {
parent_hash: Hash,
@@ -44,11 +44,11 @@ struct TableContext {
}
impl table::Context for TableContext {
fn is_member_of(&self, authority: &AuthorityId, group: &ParaId) -> bool {
fn is_member_of(&self, authority: &SessionKey, group: &ParaId) -> bool {
self.groups.get(group).map_or(false, |g| g.validity_guarantors.contains(authority))
}
fn is_availability_guarantor_of(&self, authority: &AuthorityId, group: &ParaId) -> bool {
fn is_availability_guarantor_of(&self, authority: &SessionKey, group: &ParaId) -> bool {
self.groups.get(group).map_or(false, |g| g.availability_guarantors.contains(authority))
}
@@ -61,7 +61,7 @@ impl table::Context for TableContext {
}
impl TableContext {
fn local_id(&self) -> AuthorityId {
fn local_id(&self) -> SessionKey {
self.key.public().into()
}
@@ -76,14 +76,6 @@ impl TableContext {
}
}
/// Source of statements
pub enum StatementSource {
/// Locally produced statement.
Local,
/// Received statement from remote source, with optional sender.
Remote(Option<AuthorityId>),
}
// A shared table object.
struct SharedTableInner {
table: Table<TableContext>,
@@ -96,28 +88,21 @@ struct SharedTableInner {
impl SharedTableInner {
// Import a single statement. Provide a handle to a table router and a function
// used to determine if a referenced candidate is valid.
fn import_statement<R: TableRouter, C: FnMut(Collation) -> bool>(
//
// the statement producer, if any, will produce only statements concerning the same candidate
// as the one just imported
fn import_remote_statement<R: TableRouter>(
&mut self,
context: &TableContext,
router: &R,
statement: table::SignedStatement,
statement_source: StatementSource,
check_candidate: C,
) -> StatementProducer<
) -> Option<StatementProducer<
<R::FetchCandidate as IntoFuture>::Future,
<R::FetchExtrinsic as IntoFuture>::Future,
C,
> {
// this blank producer does nothing until we attach some futures
// and set a candidate digest.
let received_from = match statement_source {
StatementSource::Local => return Default::default(),
StatementSource::Remote(from) => from,
};
let summary = match self.table.import_statement(context, statement, received_from) {
>> {
let summary = match self.table.import_statement(context, statement) {
Some(summary) => summary,
None => return Default::default(),
None => return None,
};
self.update_trackers(&summary.candidate, context);
@@ -159,7 +144,6 @@ impl SharedTableInner {
fetch_block_data,
fetch_extrinsic,
evaluate: checking_validity,
check_candidate,
})
}
}
@@ -167,10 +151,10 @@ impl SharedTableInner {
None
};
StatementProducer {
work.map(|work| StatementProducer {
produced_statements: Default::default(),
work,
}
work
})
}
fn update_trackers(&mut self, candidate: &Hash, context: &TableContext) {
@@ -199,71 +183,78 @@ pub struct ProducedStatements {
}
/// Future that produces statements about a specific candidate.
pub struct StatementProducer<D: Future, E: Future, C> {
pub struct StatementProducer<D: Future, E: Future> {
produced_statements: ProducedStatements,
work: Option<Work<D, E, C>>,
work: Work<D, E>,
}
struct Work<D: Future, E: Future, C> {
candidate_receipt: CandidateReceipt,
fetch_block_data: future::Fuse<D>,
fetch_extrinsic: Option<future::Fuse<E>>,
evaluate: bool,
check_candidate: C
}
impl<D: Future, E: Future, C> Default for StatementProducer<D, E, C> {
fn default() -> Self {
StatementProducer {
produced_statements: Default::default(),
work: None,
impl<D: Future, E: Future> StatementProducer<D, E> {
/// Attach a function for verifying fetched collation to the statement producer.
/// This will transform it into a future.
///
/// The collation-checking function should return `true` if known to be valid,
/// `false` if known to be invalid, and `None` if unable to determine.
pub fn prime<C: FnMut(Collation) -> Option<bool>>(self, check_candidate: C) -> PrimedStatementProducer<D, E, C> {
PrimedStatementProducer {
inner: self,
check_candidate,
}
}
}
impl<D, E, C, Err> Future for StatementProducer<D, E, C>
struct Work<D: Future, E: Future> {
candidate_receipt: CandidateReceipt,
fetch_block_data: future::Fuse<D>,
fetch_extrinsic: Option<future::Fuse<E>>,
evaluate: bool,
}
/// Primed statement producer.
pub struct PrimedStatementProducer<D: Future, E: Future, C> {
inner: StatementProducer<D, E>,
check_candidate: C,
}
impl<D, E, C, Err> Future for PrimedStatementProducer<D, E, C>
where
D: Future<Item=BlockData,Error=Err>,
E: Future<Item=Extrinsic,Error=Err>,
C: FnMut(Collation) -> bool,
C: FnMut(Collation) -> Option<bool>,
{
type Item = ProducedStatements;
type Error = Err;
fn poll(&mut self) -> Poll<ProducedStatements, Err> {
let work = match self.work {
Some(ref mut work) => work,
None => return Ok(Async::Ready(::std::mem::replace(&mut self.produced_statements, Default::default()))),
};
let work = &mut self.inner.work;
if let Async::Ready(block_data) = work.fetch_block_data.poll()? {
self.produced_statements.block_data = Some(block_data.clone());
self.inner.produced_statements.block_data = Some(block_data.clone());
if work.evaluate {
let is_good = (work.check_candidate)(Collation {
let is_good = (self.check_candidate)(Collation {
block_data,
receipt: work.candidate_receipt.clone(),
});
let hash = work.candidate_receipt.hash();
self.produced_statements.validity = Some(if is_good {
GenericStatement::Valid(hash)
} else {
GenericStatement::Invalid(hash)
});
self.inner.produced_statements.validity = match is_good {
Some(true) => Some(GenericStatement::Valid(hash)),
Some(false) => Some(GenericStatement::Invalid(hash)),
None => None,
};
}
}
if let Some(ref mut fetch_extrinsic) = work.fetch_extrinsic {
if let Async::Ready(extrinsic) = fetch_extrinsic.poll()? {
self.produced_statements.extrinsic = Some(extrinsic);
self.inner.produced_statements.extrinsic = Some(extrinsic);
}
}
let done = self.produced_statements.block_data.is_some() && {
let done = self.inner.produced_statements.block_data.is_some() && {
if work.evaluate {
true
} else if self.produced_statements.extrinsic.is_some() {
self.produced_statements.availability =
} else if self.inner.produced_statements.extrinsic.is_some() {
self.inner.produced_statements.availability =
Some(GenericStatement::Available(work.candidate_receipt.hash()));
true
@@ -273,7 +264,7 @@ impl<D, E, C, Err> Future for StatementProducer<D, E, C>
};
if done {
Ok(Async::Ready(::std::mem::replace(&mut self.produced_statements, Default::default())))
Ok(Async::Ready(::std::mem::replace(&mut self.inner.produced_statements, Default::default())))
} else {
Ok(Async::NotReady)
}
@@ -313,29 +304,60 @@ impl SharedTable {
}
}
/// Get the parent hash this table should hold statements localized to.
pub fn consensus_parent_hash(&self) -> &Hash {
&self.context.parent_hash
}
/// Get the local validator session key.
pub fn session_key(&self) -> SessionKey {
self.context.local_id()
}
/// Get group info.
pub fn group_info(&self) -> &HashMap<ParaId, GroupInfo> {
&self.context.groups
}
/// Import a single statement. Provide a handle to a table router
/// for dispatching any other requests which come up.
pub fn import_statement<R: TableRouter, C: FnMut(Collation) -> bool>(
/// Import a single statement with remote source, whose signature has already been checked.
///
/// The statement producer, if any, will produce only statements concerning the same candidate
/// as the one just imported
pub fn import_remote_statement<R: TableRouter>(
&self,
router: &R,
statement: table::SignedStatement,
received_from: StatementSource,
check_candidate: C,
) -> StatementProducer<<R::FetchCandidate as IntoFuture>::Future, <R::FetchExtrinsic as IntoFuture>::Future, C> {
self.inner.lock().import_statement(&*self.context, router, statement, received_from, check_candidate)
) -> Option<StatementProducer<
<R::FetchCandidate as IntoFuture>::Future,
<R::FetchExtrinsic as IntoFuture>::Future,
>> {
self.inner.lock().import_remote_statement(&*self.context, router, statement)
}
/// Import many statements at once.
///
/// Provide an iterator yielding remote, pre-checked statements.
///
/// The statement producer, if any, will produce only statements concerning the same candidate
/// as the one just imported
pub fn import_remote_statements<R, I, U>(&self, router: &R, iterable: I) -> U
where
R: TableRouter,
I: IntoIterator<Item=table::SignedStatement>,
U: ::std::iter::FromIterator<Option<StatementProducer<
<R::FetchCandidate as IntoFuture>::Future,
<R::FetchExtrinsic as IntoFuture>::Future,
>>>,
{
let mut inner = self.inner.lock();
iterable.into_iter().map(move |statement| {
inner.import_remote_statement(&*self.context, router, statement)
}).collect()
}
/// Sign and import a local statement.
pub fn sign_and_import<R: TableRouter>(
&self,
router: &R,
statement: table::Statement,
) {
pub fn sign_and_import(&self, statement: table::Statement) -> SignedStatement {
let proposed_digest = match statement {
GenericStatement::Candidate(ref c) => Some(c.hash()),
_ => None,
@@ -348,36 +370,8 @@ impl SharedTable {
inner.proposed_digest = proposed_digest;
}
let producer = inner.import_statement(
&*self.context,
router,
signed_statement,
StatementSource::Local,
|_| true,
);
assert!(producer.work.is_none(), "local statement import never leads to additional work; qed");
}
/// Import many statements at once.
///
/// Provide an iterator yielding pairs of (statement, statement_source).
pub fn import_statements<R, I, C, U>(&self, router: &R, iterable: I) -> U
where
R: TableRouter,
I: IntoIterator<Item=(table::SignedStatement, StatementSource, C)>,
C: FnMut(Collation) -> bool,
U: ::std::iter::FromIterator<StatementProducer<
<R::FetchCandidate as IntoFuture>::Future,
<R::FetchExtrinsic as IntoFuture>::Future,
C,
>>,
{
let mut inner = self.inner.lock();
iterable.into_iter().map(move |(statement, statement_source, check_candidate)| {
inner.import_statement(&*self.context, router, statement, statement_source, check_candidate)
}).collect()
inner.table.import_statement(&*self.context, signed_statement.clone());
signed_statement
}
/// Execute a closure using a specific candidate.
@@ -406,15 +400,10 @@ impl SharedTable {
}
/// Get all witnessed misbehavior.
pub fn get_misbehavior(&self) -> HashMap<AuthorityId, table::Misbehavior> {
pub fn get_misbehavior(&self) -> HashMap<SessionKey, table::Misbehavior> {
self.inner.lock().table.get_misbehavior().clone()
}
/// Fill a statement batch.
pub fn fill_batch<B: table::StatementBatch>(&self, batch: &mut B) {
self.inner.lock().table.fill_batch(batch);
}
/// Track includability of a given set of candidate hashes.
pub fn track_includability<I>(&self, iterable: I) -> Includable
where I: IntoIterator<Item=Hash>
@@ -446,17 +435,12 @@ mod tests {
type FetchCandidate = ::futures::future::Empty<BlockData,()>;
type FetchExtrinsic = ::futures::future::Empty<Extrinsic,()>;
/// Note local candidate data, making it available on the network to other validators.
fn local_candidate_data(&self, _hash: Hash, _block_data: BlockData, _extrinsic: Extrinsic) {
fn local_candidate(&self, _candidate: CandidateReceipt, _block_data: BlockData, _extrinsic: Extrinsic) {
}
/// Fetch block data for a specific candidate.
fn fetch_block_data(&self, _candidate: &CandidateReceipt) -> Self::FetchCandidate {
::futures::future::empty()
}
/// Fetch extrinsic data for a specific candidate.
fn fetch_extrinsic_data(&self, _candidate: &CandidateReceipt) -> Self::FetchExtrinsic {
::futures::future::empty()
}
@@ -490,6 +474,7 @@ mod tests {
balance_uploads: Vec::new(),
egress_queue_roots: Vec::new(),
fees: 1_000_000,
block_data_hash: [2; 32].into(),
};
let candidate_statement = GenericStatement::Candidate(candidate);
@@ -501,15 +486,13 @@ mod tests {
sender: validity_other,
};
let producer = shared_table.import_statement(
let producer = shared_table.import_remote_statement(
&DummyRouter,
signed_statement,
StatementSource::Remote(None),
|_| true,
);
).expect("candidate and local validity group are same");
assert!(producer.work.is_some(), "candidate and local validity group are same");
assert!(producer.work.as_ref().unwrap().evaluate, "should evaluate validity");
assert!(producer.work.evaluate, "should evaluate validity");
assert!(producer.work.fetch_extrinsic.is_none(), "should not fetch extrinsic");
}
#[test]
@@ -540,6 +523,7 @@ mod tests {
balance_uploads: Vec::new(),
egress_queue_roots: Vec::new(),
fees: 1_000_000,
block_data_hash: [2; 32].into(),
};
let candidate_statement = GenericStatement::Candidate(candidate);
@@ -551,15 +535,12 @@ mod tests {
sender: validity_other,
};
let producer = shared_table.import_statement(
let producer = shared_table.import_remote_statement(
&DummyRouter,
signed_statement,
StatementSource::Remote(None),
|_| true,
);
).expect("should produce work");
assert!(producer.work.is_some(), "candidate and local availability group are same");
assert!(producer.work.as_ref().unwrap().fetch_extrinsic.is_some(), "should fetch extrinsic when guaranteeing availability");
assert!(!producer.work.as_ref().unwrap().evaluate, "should not evaluate validity");
assert!(producer.work.fetch_extrinsic.is_some(), "should fetch extrinsic when guaranteeing availability");
assert!(!producer.work.evaluate, "should not evaluate validity");
}
}