mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 22:11:06 +00:00
Rewrite Inherent data (#1488)
* Implement new inherent data * Fixes compilation on wasm * Fixes after rebase * Switch back to generate inherent stuff by macro * Update after rebase * Apply suggestions from code review Co-Authored-By: bkchr <bkchr@users.noreply.github.com> * Fix compilation after rebase * Address grumbles * Remove `InherentDataProviders` from `Client` * Update wasm files after rebase * Address grumbles * Fixes compilation after latest merge * Last fix
This commit is contained in:
@@ -33,6 +33,8 @@ extern crate srml_support as runtime_support;
|
||||
extern crate sr_io as runtime_io;
|
||||
extern crate sr_primitives as runtime_primitives;
|
||||
extern crate substrate_consensus_aura_primitives as aura_primitives;
|
||||
extern crate srml_aura;
|
||||
extern crate substrate_inherents as inherents;
|
||||
|
||||
extern crate substrate_consensus_common as consensus_common;
|
||||
extern crate tokio;
|
||||
@@ -57,24 +59,33 @@ extern crate env_logger;
|
||||
|
||||
mod slots;
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::{sync::{Arc, mpsc}, time::Duration, thread};
|
||||
|
||||
use codec::Encode;
|
||||
use consensus_common::{Authorities, BlockImport, Environment, Error as ConsensusError, Proposer, ForkChoiceStrategy};
|
||||
use consensus_common::{
|
||||
Authorities, BlockImport, Environment, Error as ConsensusError, Proposer, ForkChoiceStrategy
|
||||
};
|
||||
use consensus_common::import_queue::{Verifier, BasicQueue};
|
||||
use client::ChainHead;
|
||||
use client::block_builder::api::BlockBuilder as BlockBuilderApi;
|
||||
use consensus_common::{ImportBlock, BlockOrigin};
|
||||
use runtime_primitives::{generic, generic::BlockId, Justification, BasicInherentData};
|
||||
use runtime_primitives::traits::{Block, Header, Digest, DigestItemFor, DigestItem, ProvideRuntimeApi};
|
||||
use runtime_primitives::{generic, generic::BlockId, Justification};
|
||||
use runtime_primitives::traits::{
|
||||
Block, Header, Digest, DigestItemFor, DigestItem, ProvideRuntimeApi
|
||||
};
|
||||
use primitives::{Ed25519AuthorityId, ed25519};
|
||||
use inherents::{InherentDataProviders, InherentData, RuntimeString};
|
||||
|
||||
use futures::{Stream, Future, IntoFuture, future::{self, Either}};
|
||||
use tokio::timer::Timeout;
|
||||
use api::AuraApi;
|
||||
use slots::Slots;
|
||||
|
||||
use srml_aura::{
|
||||
InherentType as AuraInherent, AuraInherentData,
|
||||
timestamp::{TimestampInherentData, InherentType as TimestampInherent, InherentError as TIError}
|
||||
};
|
||||
|
||||
pub use aura_primitives::*;
|
||||
pub use consensus_common::SyncOracle;
|
||||
|
||||
@@ -114,18 +125,23 @@ fn duration_now() -> Option<Duration> {
|
||||
}).ok()
|
||||
}
|
||||
|
||||
fn timestamp_and_slot_now(slot_duration: u64) -> Option<(u64, u64)> {
|
||||
duration_now().map(|s| {
|
||||
let s = s.as_secs();
|
||||
(s, s / slot_duration)
|
||||
})
|
||||
}
|
||||
|
||||
/// Get the slot for now.
|
||||
fn slot_now(slot_duration: u64) -> Option<u64> {
|
||||
duration_now().map(|s| s.as_secs() / slot_duration)
|
||||
}
|
||||
|
||||
fn extract_timestamp_and_slot(
|
||||
data: &InherentData
|
||||
) -> Result<(TimestampInherent, AuraInherent), consensus_common::Error> {
|
||||
data.timestamp_inherent_data()
|
||||
.and_then(|t| data.aura_inherent_data().map(|a| (t, a)))
|
||||
.map_err(inherent_to_common_error)
|
||||
}
|
||||
|
||||
fn inherent_to_common_error(err: RuntimeString) -> consensus_common::Error {
|
||||
consensus_common::ErrorKind::InherentData(err.into()).into()
|
||||
}
|
||||
|
||||
/// A digest item which is usable with aura consensus.
|
||||
pub trait CompatibleDigestItem: Sized {
|
||||
/// Construct a digest item which is a slot number and a signature on the
|
||||
@@ -160,11 +176,12 @@ pub fn start_aura_thread<B, C, E, I, SO, Error>(
|
||||
env: Arc<E>,
|
||||
sync_oracle: SO,
|
||||
on_exit: impl Future<Item=(),Error=()> + Send + 'static,
|
||||
) where
|
||||
inherent_data_providers: InherentDataProviders,
|
||||
) -> Result<(), consensus_common::Error> where
|
||||
B: Block + 'static,
|
||||
C: Authorities<B> + ChainHead<B> + Send + Sync + 'static,
|
||||
E: Environment<B, AuraConsensusData, Error=Error> + Send + Sync + 'static,
|
||||
E::Proposer: Proposer<B, AuraConsensusData, Error=Error> + 'static,
|
||||
E: Environment<B, Error=Error> + Send + Sync + 'static,
|
||||
E::Proposer: Proposer<B, Error=Error> + 'static,
|
||||
I: BlockImport<B> + Send + Sync + 'static,
|
||||
Error: From<C::Error> + From<I::Error> + 'static,
|
||||
SO: SyncOracle + Send + Clone + 'static,
|
||||
@@ -173,7 +190,9 @@ pub fn start_aura_thread<B, C, E, I, SO, Error>(
|
||||
{
|
||||
use tokio::runtime::current_thread::Runtime;
|
||||
|
||||
::std::thread::spawn(move || {
|
||||
let (result_sender, result_recv) = mpsc::channel();
|
||||
|
||||
thread::spawn(move || {
|
||||
let mut runtime = match Runtime::new() {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
@@ -182,7 +201,7 @@ pub fn start_aura_thread<B, C, E, I, SO, Error>(
|
||||
}
|
||||
};
|
||||
|
||||
let _ = runtime.block_on(start_aura(
|
||||
let aura_future = match start_aura(
|
||||
slot_duration,
|
||||
local_key,
|
||||
client,
|
||||
@@ -190,8 +209,26 @@ pub fn start_aura_thread<B, C, E, I, SO, Error>(
|
||||
env,
|
||||
sync_oracle,
|
||||
on_exit,
|
||||
));
|
||||
inherent_data_providers,
|
||||
) {
|
||||
Ok(aura_future) => {
|
||||
result_sender
|
||||
.send(Ok(()))
|
||||
.expect("Receive is not dropped before receiving a result; qed");
|
||||
aura_future
|
||||
},
|
||||
Err(e) => {
|
||||
result_sender
|
||||
.send(Err(e))
|
||||
.expect("Receive is not dropped before receiving a result; qed");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let _ = runtime.block_on(aura_future);
|
||||
});
|
||||
|
||||
result_recv.recv().expect("Aura start thread result sender dropped")
|
||||
}
|
||||
|
||||
/// Start the aura worker. The returned future should be run in a tokio runtime.
|
||||
@@ -203,17 +240,20 @@ pub fn start_aura<B, C, E, I, SO, Error>(
|
||||
env: Arc<E>,
|
||||
sync_oracle: SO,
|
||||
on_exit: impl Future<Item=(),Error=()>,
|
||||
) -> impl Future<Item=(),Error=()> where
|
||||
inherent_data_providers: InherentDataProviders,
|
||||
) -> Result<impl Future<Item=(), Error=()>, consensus_common::Error> where
|
||||
B: Block,
|
||||
C: Authorities<B> + ChainHead<B>,
|
||||
E: Environment<B, AuraConsensusData, Error=Error>,
|
||||
E::Proposer: Proposer<B, AuraConsensusData, Error=Error>,
|
||||
E: Environment<B, Error=Error>,
|
||||
E::Proposer: Proposer<B, Error=Error>,
|
||||
I: BlockImport<B>,
|
||||
Error: From<C::Error> + From<I::Error>,
|
||||
SO: SyncOracle + Send + Clone,
|
||||
DigestItemFor<B>: CompatibleDigestItem + DigestItem<AuthorityId=Ed25519AuthorityId>,
|
||||
Error: ::std::error::Error + Send + 'static + From<::consensus_common::Error>,
|
||||
{
|
||||
register_aura_inherent_data_provider(&inherent_data_providers, slot_duration.0)?;
|
||||
|
||||
let make_authorship = move || {
|
||||
|
||||
let client = client.clone();
|
||||
@@ -222,9 +262,10 @@ pub fn start_aura<B, C, E, I, SO, Error>(
|
||||
let env = env.clone();
|
||||
let sync_oracle = sync_oracle.clone();
|
||||
let SlotDuration(slot_duration) = slot_duration;
|
||||
let inherent_data_providers = inherent_data_providers.clone();
|
||||
|
||||
// rather than use a timer interval, we schedule our waits ourselves
|
||||
Slots::new(slot_duration)
|
||||
Slots::new(slot_duration, inherent_data_providers)
|
||||
.map_err(|e| debug!(target: "aura", "Faulty timer: {:?}", e))
|
||||
.for_each(move |slot_info| {
|
||||
let client = client.clone();
|
||||
@@ -244,7 +285,7 @@ pub fn start_aura<B, C, E, I, SO, Error>(
|
||||
let chain_head = match client.best_block_header() {
|
||||
Ok(x) => x,
|
||||
Err(e) => {
|
||||
warn!(target:"aura", "Unable to author block in slot {}. \
|
||||
warn!(target: "aura", "Unable to author block in slot {}. \
|
||||
no best block header: {:?}", slot_num, e);
|
||||
return Either::B(future::ok(()))
|
||||
}
|
||||
@@ -253,8 +294,11 @@ pub fn start_aura<B, C, E, I, SO, Error>(
|
||||
let authorities = match client.authorities(&BlockId::Hash(chain_head.hash())) {
|
||||
Ok(authorities) => authorities,
|
||||
Err(e) => {
|
||||
warn!("Unable to fetch authorities at\
|
||||
block {:?}: {:?}", chain_head.hash(), e);
|
||||
warn!(
|
||||
"Unable to fetch authorities at block {:?}: {:?}",
|
||||
chain_head.hash(),
|
||||
e
|
||||
);
|
||||
return Either::B(future::ok(()));
|
||||
}
|
||||
};
|
||||
@@ -262,8 +306,11 @@ pub fn start_aura<B, C, E, I, SO, Error>(
|
||||
let proposal_work = match slot_author(slot_num, &authorities) {
|
||||
None => return Either::B(future::ok(())),
|
||||
Some(author) => if author.0 == public_key.0 {
|
||||
debug!(target: "aura", "Starting authorship at slot {}; timestamp = {}",
|
||||
slot_num, timestamp);
|
||||
debug!(
|
||||
target: "aura", "Starting authorship at slot {}; timestamp = {}",
|
||||
slot_num,
|
||||
timestamp
|
||||
);
|
||||
|
||||
// we are the slot author. make a block and sign it.
|
||||
let proposer = match env.init(&chain_head, &authorities) {
|
||||
@@ -274,17 +321,12 @@ pub fn start_aura<B, C, E, I, SO, Error>(
|
||||
}
|
||||
};
|
||||
|
||||
let consensus_data = AuraConsensusData {
|
||||
timestamp,
|
||||
slot: slot_num,
|
||||
slot_duration,
|
||||
};
|
||||
|
||||
let remaining_duration = slot_info.remaining_duration();
|
||||
// deadline our production to approx. the end of the
|
||||
// slot
|
||||
Timeout::new(
|
||||
proposer.propose(consensus_data).into_future(),
|
||||
slot_info.remaining_duration(),
|
||||
proposer.propose(slot_info.inherent_data).into_future(),
|
||||
remaining_duration,
|
||||
)
|
||||
} else {
|
||||
return Either::B(future::ok(()));
|
||||
@@ -298,8 +340,10 @@ pub fn start_aura<B, C, E, I, SO, Error>(
|
||||
// that is actually set by the proposer.
|
||||
let slot_after_building = slot_now(slot_duration);
|
||||
if slot_after_building != Some(slot_num) {
|
||||
info!("Discarding proposal for slot {}; block production took too long",
|
||||
slot_num);
|
||||
info!(
|
||||
"Discarding proposal for slot {}; block production took too long",
|
||||
slot_num
|
||||
);
|
||||
return
|
||||
}
|
||||
|
||||
@@ -363,7 +407,7 @@ pub fn start_aura<B, C, E, I, SO, Error>(
|
||||
})
|
||||
});
|
||||
|
||||
work.select(on_exit).then(|_| Ok(()))
|
||||
Ok(work.select(on_exit).then(|_| Ok(())))
|
||||
}
|
||||
|
||||
// a header which has been checked
|
||||
@@ -431,11 +475,58 @@ pub trait ExtraVerification<B: Block>: Send + Sync {
|
||||
}
|
||||
|
||||
/// A verifier for Aura blocks.
|
||||
pub struct AuraVerifier<C, E, MakeInherent> {
|
||||
slot_duration: SlotDuration,
|
||||
pub struct AuraVerifier<C, E> {
|
||||
client: Arc<C>,
|
||||
make_inherent: MakeInherent,
|
||||
extra: E,
|
||||
inherent_data_providers: inherents::InherentDataProviders,
|
||||
}
|
||||
|
||||
impl<C, E> AuraVerifier<C, E>
|
||||
{
|
||||
fn check_inherents<B: Block>(
|
||||
&self,
|
||||
block: B,
|
||||
block_id: BlockId<B>,
|
||||
inherent_data: InherentData,
|
||||
timestamp_now: u64,
|
||||
) -> Result<(), String>
|
||||
where C: ProvideRuntimeApi, C::Api: BlockBuilderApi<B>
|
||||
{
|
||||
const MAX_TIMESTAMP_DRIFT_SECS: u64 = 60;
|
||||
|
||||
let inherent_res = self.client.runtime_api().check_inherents(
|
||||
&block_id,
|
||||
block,
|
||||
inherent_data,
|
||||
).map_err(|e| format!("{:?}", e))?;
|
||||
|
||||
if !inherent_res.ok() {
|
||||
inherent_res
|
||||
.into_errors()
|
||||
.try_for_each(|(i, e)| match TIError::try_from(&i, &e) {
|
||||
Some(TIError::ValidAtTimestamp(timestamp)) => {
|
||||
// halt import until timestamp is valid.
|
||||
// reject when too far ahead.
|
||||
if timestamp > timestamp_now + MAX_TIMESTAMP_DRIFT_SECS {
|
||||
return Err("Rejecting block too far in future".into());
|
||||
}
|
||||
|
||||
let diff = timestamp.saturating_sub(timestamp_now);
|
||||
info!(
|
||||
target: "aura",
|
||||
"halting for block {} seconds in the future",
|
||||
diff
|
||||
);
|
||||
thread::sleep(Duration::from_secs(diff));
|
||||
Ok(())
|
||||
},
|
||||
Some(TIError::Other(e)) => Err(e.into()),
|
||||
None => Err(self.inherent_data_providers.error_to_string(&i, &e)),
|
||||
})
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// No-op extra verification.
|
||||
@@ -450,12 +541,11 @@ impl<B: Block> ExtraVerification<B> for NothingExtra {
|
||||
}
|
||||
}
|
||||
|
||||
impl<B: Block, C, E, MakeInherent, Inherent> Verifier<B> for AuraVerifier<C, E, MakeInherent> where
|
||||
impl<B: Block, C, E> Verifier<B> for AuraVerifier<C, E> where
|
||||
C: Authorities<B> + BlockImport<B> + ProvideRuntimeApi + Send + Sync,
|
||||
C::Api: BlockBuilderApi<B, Inherent>,
|
||||
C::Api: BlockBuilderApi<B>,
|
||||
DigestItemFor<B>: CompatibleDigestItem + DigestItem<AuthorityId=Ed25519AuthorityId>,
|
||||
E: ExtraVerification<B>,
|
||||
MakeInherent: Fn(u64, u64) -> Inherent + Send + Sync,
|
||||
{
|
||||
fn verify(
|
||||
&self,
|
||||
@@ -464,11 +554,9 @@ impl<B: Block, C, E, MakeInherent, Inherent> Verifier<B> for AuraVerifier<C, E,
|
||||
justification: Option<Justification>,
|
||||
mut body: Option<Vec<B::Extrinsic>>,
|
||||
) -> Result<(ImportBlock<B>, Option<Vec<Ed25519AuthorityId>>), String> {
|
||||
use runtime_primitives::CheckInherentError;
|
||||
const MAX_TIMESTAMP_DRIFT_SECS: u64 = 60;
|
||||
|
||||
let (timestamp_now, slot_now) = timestamp_and_slot_now(self.slot_duration.0)
|
||||
.ok_or("System time is before UnixTime?".to_owned())?;
|
||||
let mut inherent_data = self.inherent_data_providers.create_inherent_data().map_err(String::from)?;
|
||||
let (timestamp_now, slot_now) = extract_timestamp_and_slot(&inherent_data)
|
||||
.map_err(|e| format!("Could not extract timestamp and slot: {:?}", e))?;
|
||||
let hash = header.hash();
|
||||
let parent_hash = *header.parent_hash();
|
||||
let authorities = self.client.authorities(&BlockId::Hash(parent_hash))
|
||||
@@ -491,30 +579,15 @@ impl<B: Block, C, E, MakeInherent, Inherent> Verifier<B> for AuraVerifier<C, E,
|
||||
// to check that the internally-set timestamp in the inherents
|
||||
// actually matches the slot set in the seal.
|
||||
if let Some(inner_body) = body.take() {
|
||||
let inherent = (self.make_inherent)(timestamp_now, slot_num);
|
||||
let block: B = Block::new(pre_header.clone(), inner_body);
|
||||
inherent_data.aura_replace_inherent_data(slot_num);
|
||||
let block = B::new(pre_header.clone(), inner_body);
|
||||
|
||||
let inherent_res = self.client.runtime_api().check_inherents(
|
||||
&BlockId::Hash(parent_hash),
|
||||
self.check_inherents(
|
||||
block.clone(),
|
||||
inherent,
|
||||
).map_err(|e| format!("{:?}", e))?;
|
||||
|
||||
match inherent_res {
|
||||
Ok(()) => {}
|
||||
Err(CheckInherentError::ValidAtTimestamp(timestamp)) => {
|
||||
// halt import until timestamp is valid.
|
||||
// reject when too far ahead.
|
||||
if timestamp > timestamp_now + MAX_TIMESTAMP_DRIFT_SECS {
|
||||
return Err("Rejecting block too far in future".into());
|
||||
}
|
||||
|
||||
let diff = timestamp.saturating_sub(timestamp_now);
|
||||
info!(target: "aura", "halting for block {} seconds in the future", diff);
|
||||
::std::thread::sleep(Duration::from_secs(diff));
|
||||
},
|
||||
Err(CheckInherentError::Other(s)) => return Err(s.into_owned()),
|
||||
}
|
||||
BlockId::Hash(parent_hash),
|
||||
inherent_data,
|
||||
timestamp_now,
|
||||
)?;
|
||||
|
||||
let (_, inner_body) = block.deconstruct();
|
||||
body = Some(inner_body);
|
||||
@@ -546,16 +619,8 @@ impl<B: Block, C, E, MakeInherent, Inherent> Verifier<B> for AuraVerifier<C, E,
|
||||
}
|
||||
}
|
||||
|
||||
/// A utility for making the basic-inherent data.
|
||||
pub fn make_basic_inherent(timestamp: u64, slot_now: u64) -> BasicInherentData {
|
||||
BasicInherentData::new(timestamp, slot_now)
|
||||
}
|
||||
|
||||
/// A type for a function which produces inherent.
|
||||
pub type InherentProducingFn<I> = fn(u64, u64) -> I;
|
||||
|
||||
/// The Aura import queue type.
|
||||
pub type AuraImportQueue<B, C, E, MakeInherent> = BasicQueue<B, AuraVerifier<C, E, MakeInherent>>;
|
||||
pub type AuraImportQueue<B, C, E> = BasicQueue<B, AuraVerifier<C, E>>;
|
||||
|
||||
/// A slot duration. Create with `get_or_compute`.
|
||||
// The internal member should stay private here.
|
||||
@@ -584,8 +649,10 @@ impl SlotDuration {
|
||||
let genesis_slot_duration = client.runtime_api()
|
||||
.slot_duration(&BlockId::number(Zero::zero()))?;
|
||||
|
||||
info!("Loaded block-time = {:?} seconds from genesis on first-launch",
|
||||
genesis_slot_duration);
|
||||
info!(
|
||||
"Loaded block-time = {:?} seconds from genesis on first-launch",
|
||||
genesis_slot_duration
|
||||
);
|
||||
|
||||
genesis_slot_duration.using_encoded(|s| {
|
||||
client.insert_aux(&[(SLOT_KEY, &s[..])], &[])
|
||||
@@ -597,22 +664,39 @@ impl SlotDuration {
|
||||
}
|
||||
}
|
||||
|
||||
/// Register the aura inherent data provider, if not registered already.
|
||||
fn register_aura_inherent_data_provider(
|
||||
inherent_data_providers: &InherentDataProviders,
|
||||
slot_duration: u64,
|
||||
) -> Result<(), consensus_common::Error> {
|
||||
if !inherent_data_providers.has_provider(&srml_aura::INHERENT_IDENTIFIER) {
|
||||
inherent_data_providers
|
||||
.register_provider(srml_aura::InherentDataProvider::new(slot_duration))
|
||||
.map_err(inherent_to_common_error)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Start an import queue for the Aura consensus algorithm.
|
||||
pub fn import_queue<B, C, E, MakeInherent, Inherent>(
|
||||
pub fn import_queue<B, C, E>(
|
||||
slot_duration: SlotDuration,
|
||||
client: Arc<C>,
|
||||
extra: E,
|
||||
make_inherent: MakeInherent,
|
||||
) -> AuraImportQueue<B, C, E, MakeInherent> where
|
||||
inherent_data_providers: InherentDataProviders,
|
||||
) -> Result<AuraImportQueue<B, C, E>, consensus_common::Error> where
|
||||
B: Block,
|
||||
C: Authorities<B> + BlockImport<B,Error=ConsensusError> + ProvideRuntimeApi + Send + Sync,
|
||||
C::Api: BlockBuilderApi<B, Inherent>,
|
||||
C: Authorities<B> + BlockImport<B, Error=ConsensusError> + ProvideRuntimeApi + Send + Sync,
|
||||
C::Api: BlockBuilderApi<B>,
|
||||
DigestItemFor<B>: CompatibleDigestItem + DigestItem<AuthorityId=Ed25519AuthorityId>,
|
||||
E: ExtraVerification<B>,
|
||||
MakeInherent: Fn(u64, u64) -> Inherent + Send + Sync,
|
||||
{
|
||||
let verifier = Arc::new(AuraVerifier { slot_duration, client: client.clone(), extra, make_inherent });
|
||||
BasicQueue::new(verifier, client)
|
||||
register_aura_inherent_data_provider(&inherent_data_providers, slot_duration.0)?;
|
||||
|
||||
let verifier = Arc::new(
|
||||
AuraVerifier { client: client.clone(), extra, inherent_data_providers }
|
||||
);
|
||||
Ok(BasicQueue::new(verifier, client))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -636,7 +720,7 @@ mod tests {
|
||||
struct DummyFactory(Arc<TestClient>);
|
||||
struct DummyProposer(u64, Arc<TestClient>);
|
||||
|
||||
impl Environment<TestBlock, AuraConsensusData> for DummyFactory {
|
||||
impl Environment<TestBlock> for DummyFactory {
|
||||
type Proposer = DummyProposer;
|
||||
type Error = Error;
|
||||
|
||||
@@ -647,11 +731,11 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
impl Proposer<TestBlock, AuraConsensusData> for DummyProposer {
|
||||
impl Proposer<TestBlock> for DummyProposer {
|
||||
type Error = Error;
|
||||
type Create = Result<TestBlock, Error>;
|
||||
|
||||
fn propose(&self, _consensus_data: AuraConsensusData) -> Result<TestBlock, Error> {
|
||||
fn propose(&self, _: InherentData) -> Result<TestBlock, Error> {
|
||||
self.1.new_block().unwrap().bake().map_err(|e| e.into())
|
||||
}
|
||||
}
|
||||
@@ -663,36 +747,38 @@ mod tests {
|
||||
peers: Vec<Arc<Peer<AuraVerifier<
|
||||
PeersClient,
|
||||
NothingExtra,
|
||||
InherentProducingFn<()>,
|
||||
>, ()>>>,
|
||||
started: bool
|
||||
started: bool,
|
||||
}
|
||||
|
||||
impl TestNetFactory for AuraTestNet {
|
||||
type Verifier = AuraVerifier<PeersClient, NothingExtra, InherentProducingFn<()>>;
|
||||
type Verifier = AuraVerifier<PeersClient, NothingExtra>;
|
||||
type PeerData = ();
|
||||
|
||||
/// Create new test network with peers and given config.
|
||||
fn from_config(_config: &ProtocolConfig) -> Self {
|
||||
AuraTestNet {
|
||||
peers: Vec::new(),
|
||||
started: false
|
||||
started: false,
|
||||
}
|
||||
}
|
||||
|
||||
fn make_verifier(&self, client: Arc<PeersClient>, _cfg: &ProtocolConfig)
|
||||
-> Arc<Self::Verifier>
|
||||
{
|
||||
fn make_inherent(_: u64, _: u64) { () }
|
||||
let slot_duration = SlotDuration::get_or_compute(&*client)
|
||||
.expect("slot duration available");
|
||||
let inherent_data_providers = InherentDataProviders::new();
|
||||
register_aura_inherent_data_provider(
|
||||
&inherent_data_providers,
|
||||
slot_duration.0
|
||||
).expect("Registers aura inherent data provider");
|
||||
|
||||
assert_eq!(slot_duration.0, SLOT_DURATION);
|
||||
Arc::new(AuraVerifier {
|
||||
client,
|
||||
slot_duration,
|
||||
extra: NothingExtra,
|
||||
make_inherent: make_inherent as _,
|
||||
inherent_data_providers,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -748,6 +834,11 @@ mod tests {
|
||||
let slot_duration = SlotDuration::get_or_compute(&*client)
|
||||
.expect("slot duration available");
|
||||
|
||||
let inherent_data_providers = InherentDataProviders::new();
|
||||
register_aura_inherent_data_provider(
|
||||
&inherent_data_providers, slot_duration.0
|
||||
).expect("Registers aura inherent data provider");
|
||||
|
||||
let aura = start_aura(
|
||||
slot_duration,
|
||||
Arc::new(key.clone().into()),
|
||||
@@ -756,7 +847,8 @@ mod tests {
|
||||
environ.clone(),
|
||||
DummyOracle,
|
||||
futures::empty(),
|
||||
);
|
||||
inherent_data_providers,
|
||||
).expect("Starts aura");
|
||||
|
||||
runtime.spawn(aura);
|
||||
}
|
||||
|
||||
@@ -22,6 +22,10 @@ use std::time::{Instant, Duration};
|
||||
use tokio::timer::Delay;
|
||||
use futures::prelude::*;
|
||||
|
||||
use inherents::{InherentDataProviders, InherentData};
|
||||
|
||||
use consensus_common::{Error, ErrorKind};
|
||||
|
||||
/// Returns the duration until the next slot, based on current duration since
|
||||
pub(crate) fn time_until_next(now: Duration, slot_duration: u64) -> Duration {
|
||||
let remaining_full_secs = slot_duration - (now.as_secs() % slot_duration) - 1;
|
||||
@@ -30,7 +34,6 @@ pub(crate) fn time_until_next(now: Duration, slot_duration: u64) -> Duration {
|
||||
}
|
||||
|
||||
/// Information about a slot.
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct SlotInfo {
|
||||
/// The slot number.
|
||||
pub(crate) number: u64,
|
||||
@@ -38,6 +41,8 @@ pub(crate) struct SlotInfo {
|
||||
pub(crate) timestamp: u64,
|
||||
/// The instant at which the slot ends.
|
||||
pub(crate) ends_at: Instant,
|
||||
/// The inherent data.
|
||||
pub(crate) inherent_data: InherentData,
|
||||
}
|
||||
|
||||
impl SlotInfo {
|
||||
@@ -57,22 +62,24 @@ pub(crate) struct Slots {
|
||||
last_slot: u64,
|
||||
slot_duration: u64,
|
||||
inner_delay: Option<Delay>,
|
||||
inherent_data_providers: InherentDataProviders,
|
||||
}
|
||||
|
||||
impl Slots {
|
||||
/// Create a new `slots` stream.
|
||||
pub(crate) fn new(slot_duration: u64) -> Self {
|
||||
/// Create a new `Slots` stream.
|
||||
pub(crate) fn new(slot_duration: u64, inherent_data_providers: InherentDataProviders) -> Self {
|
||||
Slots {
|
||||
last_slot: 0,
|
||||
slot_duration,
|
||||
inner_delay: None,
|
||||
inherent_data_providers,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for Slots {
|
||||
type Item = SlotInfo;
|
||||
type Error = tokio::timer::Error;
|
||||
type Error = Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<SlotInfo>, Self::Error> {
|
||||
let slot_duration = self.slot_duration;
|
||||
@@ -90,30 +97,33 @@ impl Stream for Slots {
|
||||
};
|
||||
|
||||
if let Some(ref mut inner_delay) = self.inner_delay {
|
||||
try_ready!(inner_delay.poll());
|
||||
try_ready!(inner_delay.poll().map_err(|e| Error::from(ErrorKind::FaultyTimer(e))));
|
||||
}
|
||||
|
||||
// timeout has fired.
|
||||
|
||||
let (timestamp, slot_num) = match ::timestamp_and_slot_now(slot_duration) {
|
||||
None => return Ok(Async::Ready(None)),
|
||||
Some(x) => x,
|
||||
};
|
||||
let inherent_data = self.inherent_data_providers.create_inherent_data()
|
||||
.map_err(::inherent_to_common_error)?;
|
||||
let (timestamp, slot_num) = ::extract_timestamp_and_slot(&inherent_data)?;
|
||||
|
||||
// reschedule delay for next slot.
|
||||
let ends_at = Instant::now()
|
||||
+ time_until_next(Duration::from_secs(timestamp), slot_duration);
|
||||
let ends_at = Instant::now() + time_until_next(Duration::from_secs(timestamp), slot_duration);
|
||||
self.inner_delay = Some(Delay::new(ends_at));
|
||||
|
||||
// never yield the same slot twice.
|
||||
if slot_num > self.last_slot {
|
||||
self.last_slot = slot_num;
|
||||
|
||||
Ok(Async::Ready(Some(SlotInfo {
|
||||
number: slot_num,
|
||||
timestamp,
|
||||
ends_at,
|
||||
})))
|
||||
Ok(
|
||||
Async::Ready(
|
||||
Some(SlotInfo {
|
||||
number: slot_num,
|
||||
timestamp,
|
||||
ends_at,
|
||||
inherent_data,
|
||||
})
|
||||
)
|
||||
)
|
||||
} else {
|
||||
// re-poll until we get a new slot.
|
||||
self.poll()
|
||||
|
||||
Reference in New Issue
Block a user