diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 78f1add8c1..69a1d2480a 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -3531,6 +3531,7 @@ dependencies = [ "srml-support 0.1.0", "substrate-client 0.1.0", "substrate-consensus-aura-primitives 0.1.0", + "substrate-consensus-aura-slots 0.1.0", "substrate-consensus-common 0.1.0", "substrate-executor 0.1.0", "substrate-inherents 0.1.0", @@ -3555,6 +3556,24 @@ dependencies = [ "substrate-primitives 0.1.0", ] +[[package]] +name = "substrate-consensus-aura-slots" +version = "0.1.0" +dependencies = [ + "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "parity-codec 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", + "sr-primitives 0.1.0", + "substrate-client 0.1.0", + "substrate-consensus-aura-primitives 0.1.0", + "substrate-consensus-common 0.1.0", + "substrate-inherents 0.1.0", + "substrate-primitives 0.1.0", + "tokio 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "substrate-consensus-common" version = "0.1.0" diff --git a/substrate/core/consensus/aura/Cargo.toml b/substrate/core/consensus/aura/Cargo.toml index e58ec6f0ef..1b7f44cd38 100644 --- a/substrate/core/consensus/aura/Cargo.toml +++ b/substrate/core/consensus/aura/Cargo.toml @@ -13,6 +13,7 @@ runtime_support = { package = "srml-support", path = "../../../srml/support" } runtime_primitives = { package = "sr-primitives", path = "../../sr-primitives" } runtime_version = { package = "sr-version", path = "../../sr-version" } runtime_io = { package = "sr-io", path = "../../sr-io" } +aura_slots = { package = "substrate-consensus-aura-slots", path = "slots" } aura_primitives = { package = "substrate-consensus-aura-primitives", path = "primitives" } inherents = { package = "substrate-inherents", path = "../../inherents" } srml-consensus = { path = "../../../srml/consensus" } @@ -30,4 +31,4 @@ substrate-executor = { path = "../../executor" } network = { package = "substrate-network", path = "../../network", features = ["test-helpers"]} service = { package = "substrate-service", path = "../../service" } test_client = { package = "substrate-test-client", path = "../../test-client" } -env_logger = "0.6" \ No newline at end of file +env_logger = "0.6" diff --git a/substrate/core/consensus/aura/slots/Cargo.toml b/substrate/core/consensus/aura/slots/Cargo.toml new file mode 100644 index 0000000000..b8cb306c30 --- /dev/null +++ b/substrate/core/consensus/aura/slots/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "substrate-consensus-aura-slots" +version = "0.1.0" +authors = ["Parity Technologies "] +description = "Generic slots-based utilities for consensus" +edition = "2018" + +[dependencies] +parity-codec = "3.0" +client = { package = "substrate-client", path = "../../../client" } +primitives = { package = "substrate-primitives", path = "../../../primitives" } +runtime_primitives = { package = "sr-primitives", path = "../../../sr-primitives" } +aura_primitives = { package = "substrate-consensus-aura-primitives", path = "../primitives" } +consensus_common = { package = "substrate-consensus-common", path = "../../common" } +inherents = { package = "substrate-inherents", path = "../../../inherents" } +futures = "0.1.17" +tokio = "0.1.7" +parking_lot = "0.7.1" +error-chain = "0.12" +log = "0.4" diff --git a/substrate/core/consensus/aura/slots/src/lib.rs b/substrate/core/consensus/aura/slots/src/lib.rs new file mode 100644 index 0000000000..b77fb1266c --- /dev/null +++ b/substrate/core/consensus/aura/slots/src/lib.rs @@ -0,0 +1,255 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +extern crate parity_codec as codec; + +mod slots; + +pub use slots::{Slots, SlotInfo}; + +use std::sync::{mpsc, Arc}; +use std::thread; +use futures::prelude::*; +use futures::{Future, IntoFuture, future::{self, Either}}; +use log::{warn, debug, info}; +use runtime_primitives::generic::BlockId; +use runtime_primitives::traits::{ProvideRuntimeApi, Block}; +use consensus_common::SyncOracle; +use inherents::{InherentData, InherentDataProviders}; +use aura_primitives::AuraApi; +use client::ChainHead; +use codec::Encode; + +/// A worker that should be invoked at every new slot. +pub trait SlotWorker { + type OnSlot: IntoFuture; + + /// Called when the proposer starts. + fn on_start( + &self, + slot_duration: u64 + ) -> Result<(), consensus_common::Error>; + + /// Called when a new slot is triggered. + fn on_slot( + &self, + chain_head: B::Header, + slot_info: SlotInfo, + ) -> Self::OnSlot; +} + +/// Slot compatible inherent data. +pub trait SlotCompatible { + /// Extract timestamp and slot from inherent data. + fn extract_timestamp_and_slot(inherent: &InherentData) -> Result<(u64, u64), consensus_common::Error>; +} + +/// Convert an inherent error to common error. +pub fn inherent_to_common_error(err: inherents::RuntimeString) -> consensus_common::Error { + consensus_common::ErrorKind::InherentData(err.into()).into() +} + +/// Start a new slot worker in a separate thread. +pub fn start_slot_worker_thread( + slot_duration: SlotDuration, + client: Arc, + worker: Arc, + sync_oracle: SO, + on_exit: OnExit, + inherent_data_providers: InherentDataProviders, +) -> Result<(), consensus_common::Error> where + B: Block + 'static, + C: ChainHead + Send + Sync + 'static, + W: SlotWorker + Send + Sync + 'static, + SO: SyncOracle + Send + Clone + 'static, + SC: SlotCompatible + 'static, + OnExit: Future + Send + 'static +{ + use tokio::runtime::current_thread::Runtime; + + let (result_sender, result_recv) = mpsc::channel(); + + thread::spawn(move || { + let mut runtime = match Runtime::new() { + Ok(r) => r, + Err(e) => { + warn!("Unable to start authorship: {:?}", e); + return; + } + }; + + let slot_worker_future = match start_slot_worker::<_, _, _, _, SC, _>( + slot_duration, + client, + worker, + sync_oracle, + on_exit, + inherent_data_providers, + ) { + Ok(slot_worker_future) => { + result_sender + .send(Ok(())) + .expect("Receive is not dropped before receiving a result; qed"); + slot_worker_future + }, + Err(e) => { + result_sender + .send(Err(e)) + .expect("Receive is not dropped before receiving a result; qed"); + return; + } + }; + + let _ = runtime.block_on(slot_worker_future); + }); + + result_recv.recv().expect("Aura start thread result sender dropped") +} + +/// Start a new slot worker. +pub fn start_slot_worker( + slot_duration: SlotDuration, + client: Arc, + worker: Arc, + sync_oracle: SO, + on_exit: OnExit, + inherent_data_providers: InherentDataProviders, +) -> Result, consensus_common::Error> where + B: Block, + C: ChainHead, + W: SlotWorker, + SO: SyncOracle + Send + Clone, + SC: SlotCompatible, + OnExit: Future, +{ + worker.on_start(slot_duration.0)?; + + let make_authorship = move || { + let client = client.clone(); + let worker = worker.clone(); + let sync_oracle = sync_oracle.clone(); + let SlotDuration(slot_duration) = slot_duration; + let inherent_data_providers = inherent_data_providers.clone(); + + // rather than use a timer interval, we schedule our waits ourselves + Slots::::new(slot_duration, inherent_data_providers) + .map_err(|e| debug!(target: "aura", "Faulty timer: {:?}", e)) + .for_each(move |slot_info| { + let client = client.clone(); + let worker = worker.clone(); + let sync_oracle = sync_oracle.clone(); + + // only propose when we are not syncing. + if sync_oracle.is_major_syncing() { + debug!(target: "aura", "Skipping proposal slot due to sync."); + return Either::B(future::ok(())); + } + + let slot_num = slot_info.number; + let chain_head = match client.best_block_header() { + Ok(x) => x, + Err(e) => { + warn!(target: "aura", "Unable to author block in slot {}. \ + no best block header: {:?}", slot_num, e); + return Either::B(future::ok(())) + } + }; + + Either::A( + worker.on_slot(chain_head, slot_info).into_future() + .map_err(|e| debug!(target: "aura", "Encountered aura error: {:?}", e)) + ) + }) + }; + + let work = future::loop_fn((), move |()| { + let authorship_task = ::std::panic::AssertUnwindSafe(make_authorship()); + authorship_task.catch_unwind().then(|res| { + match res { + Ok(Ok(())) => (), + Ok(Err(())) => warn!("Aura authorship task terminated unexpectedly. Restarting"), + Err(e) => { + if let Some(s) = e.downcast_ref::<&'static str>() { + warn!("Aura authorship task panicked at {:?}", s); + } + + warn!("Restarting Aura authorship task"); + } + } + + Ok(future::Loop::Continue(())) + }) + }); + + Ok(work.select(on_exit).then(|_| Ok(()))) +} + +/// A header which has been checked +pub enum CheckedHeader { + /// A header which has slot in the future. this is the full header (not stripped) + /// and the slot in which it should be processed. + Deferred(H, u64), + /// A header which is fully checked, including signature. This is the pre-header + /// accompanied by the seal components. + Checked(H, u64, S), +} + +/// A slot duration. Create with `get_or_compute`. +// The internal member should stay private here. +#[derive(Clone, Copy, Debug)] +pub struct SlotDuration(u64); + +impl SlotDuration { + /// Either fetch the slot duration from disk or compute it from the genesis + /// state. + pub fn get_or_compute(client: &C) -> ::client::error::Result where + C: client::backend::AuxStore, + C: ProvideRuntimeApi, + C::Api: AuraApi, + { + use parity_codec::Decode; + const SLOT_KEY: &[u8] = b"aura_slot_duration"; + + match client.get_aux(SLOT_KEY)? { + Some(v) => u64::decode(&mut &v[..]) + .map(SlotDuration) + .ok_or_else(|| ::client::error::ErrorKind::Backend( + format!("Aura slot duration kept in invalid format"), + ).into()), + None => { + use runtime_primitives::traits::Zero; + let genesis_slot_duration = client.runtime_api() + .slot_duration(&BlockId::number(Zero::zero()))?; + + info!( + "Loaded block-time = {:?} seconds from genesis on first-launch", + genesis_slot_duration + ); + + genesis_slot_duration.using_encoded(|s| { + client.insert_aux(&[(SLOT_KEY, &s[..])], &[]) + })?; + + Ok(SlotDuration(genesis_slot_duration)) + } + } + } + + /// Returns slot duration value. + pub fn get(&self) -> u64 { + self.0 + } +} diff --git a/substrate/core/consensus/aura/src/slots.rs b/substrate/core/consensus/aura/slots/src/slots.rs similarity index 74% rename from substrate/core/consensus/aura/src/slots.rs rename to substrate/core/consensus/aura/slots/src/slots.rs index 05ce5b1726..9b665ce0d2 100644 --- a/substrate/core/consensus/aura/src/slots.rs +++ b/substrate/core/consensus/aura/slots/src/slots.rs @@ -1,4 +1,4 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. +// Copyright 2019 Parity Technologies (UK) Ltd. // This file is part of Substrate. // Substrate is free software: you can redistribute it and/or modify @@ -19,36 +19,49 @@ //! This is used instead of `tokio_timer::Interval` because it was unreliable. use std::time::{Instant, Duration}; +use std::marker::PhantomData; use tokio::timer::Delay; use futures::prelude::*; use futures::try_ready; - +use log::warn; use inherents::{InherentDataProviders, InherentData}; - use consensus_common::{Error, ErrorKind}; +use crate::SlotCompatible; + +/// Returns current duration since unix epoch. +pub fn duration_now() -> Option { + use std::time::SystemTime; + + let now = SystemTime::now(); + now.duration_since(SystemTime::UNIX_EPOCH).map_err(|e| { + warn!("Current time {:?} is before unix epoch. Something is wrong: {:?}", now, e); + }).ok() +} /// Returns the duration until the next slot, based on current duration since -pub(crate) fn time_until_next(now: Duration, slot_duration: u64) -> Duration { +pub fn time_until_next(now: Duration, slot_duration: u64) -> Duration { let remaining_full_secs = slot_duration - (now.as_secs() % slot_duration) - 1; let remaining_nanos = 1_000_000_000 - now.subsec_nanos(); Duration::new(remaining_full_secs, remaining_nanos) } /// Information about a slot. -pub(crate) struct SlotInfo { +pub struct SlotInfo { /// The slot number. - pub(crate) number: u64, + pub number: u64, /// Current timestamp. - pub(crate) timestamp: u64, + pub timestamp: u64, /// The instant at which the slot ends. - pub(crate) ends_at: Instant, + pub ends_at: Instant, /// The inherent data. - pub(crate) inherent_data: InherentData, + pub inherent_data: InherentData, + /// Slot duration. + pub duration: u64, } impl SlotInfo { /// Yields the remaining duration in the slot. - pub(crate) fn remaining_duration(&self) -> Duration { + pub fn remaining_duration(&self) -> Duration { let now = Instant::now(); if now < self.ends_at { self.ends_at.duration_since(now) @@ -59,26 +72,28 @@ impl SlotInfo { } /// A stream that returns every time there is a new slot. -pub(crate) struct Slots { +pub struct Slots { last_slot: u64, slot_duration: u64, inner_delay: Option, inherent_data_providers: InherentDataProviders, + _marker: PhantomData, } -impl Slots { +impl Slots { /// Create a new `Slots` stream. - pub(crate) fn new(slot_duration: u64, inherent_data_providers: InherentDataProviders) -> Self { + pub fn new(slot_duration: u64, inherent_data_providers: InherentDataProviders) -> Self { Slots { last_slot: 0, slot_duration, inner_delay: None, inherent_data_providers, + _marker: PhantomData, } } } -impl Stream for Slots { +impl Stream for Slots { type Item = SlotInfo; type Error = Error; @@ -87,7 +102,7 @@ impl Stream for Slots { self.inner_delay = match self.inner_delay.take() { None => { // schedule wait. - let wait_until = match crate::duration_now() { + let wait_until = match duration_now() { None => return Ok(Async::Ready(None)), Some(now) => Instant::now() + time_until_next(now, slot_duration), }; @@ -105,7 +120,7 @@ impl Stream for Slots { let inherent_data = self.inherent_data_providers.create_inherent_data() .map_err(crate::inherent_to_common_error)?; - let (timestamp, slot_num) = crate::extract_timestamp_and_slot(&inherent_data)?; + let (timestamp, slot_num) = SC::extract_timestamp_and_slot(&inherent_data)?; // reschedule delay for next slot. let ends_at = Instant::now() + time_until_next(Duration::from_secs(timestamp), slot_duration); @@ -119,6 +134,7 @@ impl Stream for Slots { Async::Ready( Some(SlotInfo { number: slot_num, + duration: self.slot_duration, timestamp, ends_at, inherent_data, diff --git a/substrate/core/consensus/aura/src/lib.rs b/substrate/core/consensus/aura/src/lib.rs index 154bc214f6..81c257671d 100644 --- a/substrate/core/consensus/aura/src/lib.rs +++ b/substrate/core/consensus/aura/src/lib.rs @@ -26,9 +26,7 @@ //! Blocks from future steps will be either deferred or rejected depending on how //! far in the future they are. -mod slots; - -use std::{sync::{Arc, mpsc}, time::Duration, thread}; +use std::{sync::Arc, time::Duration, thread}; use parity_codec::Encode; use consensus_common::{ @@ -45,16 +43,18 @@ use runtime_primitives::traits::{ use primitives::{Ed25519AuthorityId, ed25519}; use inherents::{InherentDataProviders, InherentData, RuntimeString}; -use futures::{Stream, Future, IntoFuture, future::{self, Either}}; +use futures::{Stream, Future, IntoFuture, future}; use tokio::timer::Timeout; -use slots::Slots; -use ::log::{warn, debug, info, trace}; +use log::{warn, debug, info, trace}; use srml_aura::{ InherentType as AuraInherent, AuraInherentData, timestamp::{TimestampInherentData, InherentType as TimestampInherent, InherentError as TIError} }; +use aura_slots::{CheckedHeader, SlotWorker, SlotInfo, SlotCompatible}; + +pub use aura_slots::SlotDuration; pub use aura_primitives::*; pub use consensus_common::SyncOracle; @@ -99,14 +99,6 @@ fn slot_now(slot_duration: u64) -> Option { duration_now().map(|s| s.as_secs() / slot_duration) } -fn extract_timestamp_and_slot( - data: &InherentData -) -> Result<(TimestampInherent, AuraInherent), consensus_common::Error> { - data.timestamp_inherent_data() - .and_then(|t| data.aura_inherent_data().map(|a| (t, a))) - .map_err(inherent_to_common_error) -} - fn inherent_to_common_error(err: RuntimeString) -> consensus_common::Error { consensus_common::ErrorKind::InherentData(err.into()).into() } @@ -136,261 +128,234 @@ impl CompatibleDigestItem for generic::DigestItem Result<(TimestampInherent, AuraInherent), consensus_common::Error> { + data.timestamp_inherent_data() + .and_then(|t| data.aura_inherent_data().map(|a| (t, a))) + .map_err(inherent_to_common_error) + } +} + /// Start the aura worker in a separate thread. -pub fn start_aura_thread( +pub fn start_aura_thread( slot_duration: SlotDuration, local_key: Arc, client: Arc, block_import: Arc, env: Arc, sync_oracle: SO, - on_exit: impl Future + Send + 'static, + on_exit: OnExit, inherent_data_providers: InherentDataProviders, ) -> Result<(), consensus_common::Error> where B: Block + 'static, C: Authorities + ChainHead + Send + Sync + 'static, E: Environment + Send + Sync + 'static, - E::Proposer: Proposer + 'static, + E::Proposer: Proposer + Send + 'static, + <>::Create as IntoFuture>::Future: Send + 'static, I: BlockImport + Send + Sync + 'static, Error: From + From + 'static, - SO: SyncOracle + Send + Clone + 'static, + SO: SyncOracle + Send + Sync + Clone + 'static, + OnExit: Future + Send + 'static, DigestItemFor: CompatibleDigestItem + DigestItem + 'static, Error: ::std::error::Error + Send + From<::consensus_common::Error> + 'static, { - use tokio::runtime::current_thread::Runtime; + let worker = AuraWorker { + client: client.clone(), block_import, env, local_key, inherent_data_providers: inherent_data_providers.clone(), sync_oracle: sync_oracle.clone(), + }; - let (result_sender, result_recv) = mpsc::channel(); - - thread::spawn(move || { - let mut runtime = match Runtime::new() { - Ok(r) => r, - Err(e) => { - warn!("Unable to start authorship: {:?}", e); - return; - } - }; - - let aura_future = match start_aura( - slot_duration, - local_key, - client, - block_import, - env, - sync_oracle, - on_exit, - inherent_data_providers, - ) { - Ok(aura_future) => { - result_sender - .send(Ok(())) - .expect("Receive is not dropped before receiving a result; qed"); - aura_future - }, - Err(e) => { - result_sender - .send(Err(e)) - .expect("Receive is not dropped before receiving a result; qed"); - return; - } - }; - - let _ = runtime.block_on(aura_future); - }); - - result_recv.recv().expect("Aura start thread result sender dropped") + aura_slots::start_slot_worker_thread::<_, _, _, _, AuraSlotCompatible, _>( + slot_duration, + client, + Arc::new(worker), + sync_oracle, + on_exit, + inherent_data_providers + ) } /// Start the aura worker. The returned future should be run in a tokio runtime. -pub fn start_aura( +pub fn start_aura( slot_duration: SlotDuration, local_key: Arc, client: Arc, block_import: Arc, env: Arc, sync_oracle: SO, - on_exit: impl Future, + on_exit: OnExit, inherent_data_providers: InherentDataProviders, ) -> Result, consensus_common::Error> where B: Block, C: Authorities + ChainHead, E: Environment, E::Proposer: Proposer, - I: BlockImport, + <>::Create as IntoFuture>::Future: Send + 'static, + I: BlockImport + Send + Sync + 'static, + Error: From + From, + SO: SyncOracle + Send + Sync + Clone, + DigestItemFor: CompatibleDigestItem + DigestItem, + Error: ::std::error::Error + Send + 'static + From<::consensus_common::Error>, + OnExit: Future, +{ + let worker = AuraWorker { + client: client.clone(), block_import, env, local_key, inherent_data_providers: inherent_data_providers.clone(), sync_oracle: sync_oracle.clone(), + }; + aura_slots::start_slot_worker::<_, _, _, _, AuraSlotCompatible, _>( + slot_duration, + client, + Arc::new(worker), + sync_oracle, + on_exit, + inherent_data_providers + ) +} + +struct AuraWorker { + client: Arc, + block_import: Arc, + env: Arc, + local_key: Arc, + sync_oracle: SO, + inherent_data_providers: InherentDataProviders, +} + +impl SlotWorker for AuraWorker where + C: Authorities, + E: Environment, + E::Proposer: Proposer, + <>::Create as IntoFuture>::Future: Send + 'static, + I: BlockImport + Send + Sync + 'static, Error: From + From, SO: SyncOracle + Send + Clone, DigestItemFor: CompatibleDigestItem + DigestItem, Error: ::std::error::Error + Send + 'static + From<::consensus_common::Error>, { - register_aura_inherent_data_provider(&inherent_data_providers, slot_duration.0)?; + type OnSlot = Box + Send>; - let make_authorship = move || { + fn on_start( + &self, + slot_duration: u64 + ) -> Result<(), consensus_common::Error> { + register_aura_inherent_data_provider(&self.inherent_data_providers, slot_duration) + } - let client = client.clone(); - let pair = local_key.clone(); - let block_import = block_import.clone(); - let env = env.clone(); - let sync_oracle = sync_oracle.clone(); - let SlotDuration(slot_duration) = slot_duration; - let inherent_data_providers = inherent_data_providers.clone(); + fn on_slot( + &self, + chain_head: B::Header, + slot_info: SlotInfo, + ) -> Self::OnSlot { + let pair = self.local_key.clone(); + let public_key = self.local_key.public(); + let client = self.client.clone(); + let block_import = self.block_import.clone(); + let env = self.env.clone(); - // rather than use a timer interval, we schedule our waits ourselves - Slots::new(slot_duration, inherent_data_providers) - .map_err(|e| debug!(target: "aura", "Faulty timer: {:?}", e)) - .for_each(move |slot_info| { - let client = client.clone(); - let pair = pair.clone(); - let block_import = block_import.clone(); - let env = env.clone(); - let sync_oracle = sync_oracle.clone(); - let public_key = pair.public(); + let (timestamp, slot_num, slot_duration) = + (slot_info.timestamp, slot_info.number, slot_info.duration); - // only propose when we are not syncing. - if sync_oracle.is_major_syncing() { - debug!(target: "aura", "Skipping proposal slot due to sync."); - return Either::B(future::ok(())); - } - - let (timestamp, slot_num) = (slot_info.timestamp, slot_info.number); - let chain_head = match client.best_block_header() { - Ok(x) => x, - Err(e) => { - warn!(target: "aura", "Unable to author block in slot {}. \ - no best block header: {:?}", slot_num, e); - return Either::B(future::ok(())) - } - }; - - let authorities = match client.authorities(&BlockId::Hash(chain_head.hash())) { - Ok(authorities) => authorities, - Err(e) => { - warn!( - "Unable to fetch authorities at block {:?}: {:?}", - chain_head.hash(), - e - ); - return Either::B(future::ok(())); - } - }; - - if sync_oracle.is_offline() && authorities.len() > 1 { - debug!(target: "aura", "Skipping proposal slot. Waiting for the netork."); - return Either::B(future::ok(())); - } - - let proposal_work = match slot_author(slot_num, &authorities) { - None => return Either::B(future::ok(())), - Some(author) => if author.0 == public_key.0 { - debug!( - target: "aura", "Starting authorship at slot {}; timestamp = {}", - slot_num, - timestamp - ); - - // we are the slot author. make a block and sign it. - let proposer = match env.init(&chain_head, &authorities) { - Ok(p) => p, - Err(e) => { - warn!("Unable to author block in slot {:?}: {:?}", slot_num, e); - return Either::B(future::ok(())) - } - }; - - let remaining_duration = slot_info.remaining_duration(); - // deadline our production to approx. the end of the slot - Timeout::new( - proposer.propose(slot_info.inherent_data, remaining_duration).into_future(), - remaining_duration, - ) - } else { - return Either::B(future::ok(())); - } - }; - - let block_import = block_import.clone(); - Either::A(proposal_work - .map(move |b| { - // minor hack since we don't have access to the timestamp - // that is actually set by the proposer. - let slot_after_building = slot_now(slot_duration); - if slot_after_building != Some(slot_num) { - info!( - "Discarding proposal for slot {}; block production took too long", - slot_num - ); - return - } - - let (header, body) = b.deconstruct(); - let header_num = header.number().clone(); - let pre_hash = header.hash(); - let parent_hash = header.parent_hash().clone(); - - // sign the pre-sealed hash of the block and then - // add it to a digest item. - let to_sign = (slot_num, pre_hash).encode(); - let signature = pair.sign(&to_sign[..]); - let item = as CompatibleDigestItem>::aura_seal( - slot_num, - signature, - ); - - let import_block: ImportBlock = ImportBlock { - origin: BlockOrigin::Own, - header, - justification: None, - post_digests: vec![item], - body: Some(body), - finalized: false, - auxiliary: Vec::new(), - fork_choice: ForkChoiceStrategy::LongestChain, - }; - - info!("Pre-sealed block for proposal at {}. Hash now {:?}, previously {:?}.", - header_num, - import_block.post_header().hash(), - pre_hash - ); - - if let Err(e) = block_import.import_block(import_block, None) { - warn!(target: "aura", "Error with block built on {:?}: {:?}", - parent_hash, e); - } - }) - .map_err(|e| warn!("Failed to construct block: {:?}", e)) - ) - }) - }; - - let work = future::loop_fn((), move |()| { - let authorship_task = ::std::panic::AssertUnwindSafe(make_authorship()); - authorship_task.catch_unwind().then(|res| { - match res { - Ok(Ok(())) => (), - Ok(Err(())) => warn!("Aura authorship task terminated unexpectedly. Restarting"), - Err(e) => { - if let Some(s) = e.downcast_ref::<&'static str>() { - warn!("Aura authorship task panicked at {:?}", s); - } - - warn!("Restarting Aura authorship task"); - } + let authorities = match client.authorities(&BlockId::Hash(chain_head.hash())) { + Ok(authorities) => authorities, + Err(e) => { + warn!( + "Unable to fetch authorities at block {:?}: {:?}", + chain_head.hash(), + e + ); + return Box::new(future::ok(())); } + }; - Ok(future::Loop::Continue(())) - }) - }); + if self.sync_oracle.is_offline() && authorities.len() > 1 { + debug!(target: "aura", "Skipping proposal slot. Waiting for the netork."); + return Box::new(future::ok(())); + } - Ok(work.select(on_exit).then(|_| Ok(()))) -} + let proposal_work = match slot_author(slot_num, &authorities) { + None => return Box::new(future::ok(())), + Some(author) => if author.0 == public_key.0 { + debug!( + target: "aura", "Starting authorship at slot {}; timestamp = {}", + slot_num, + timestamp + ); -// a header which has been checked -enum CheckedHeader { - // a header which has slot in the future. this is the full header (not stripped) - // and the slot in which it should be processed. - Deferred(H, u64), - // a header which is fully checked, including signature. This is the pre-header - // accompanied by the seal components. - Checked(H, u64, ed25519::Signature), + // we are the slot author. make a block and sign it. + let proposer = match env.init(&chain_head, &authorities) { + Ok(p) => p, + Err(e) => { + warn!("Unable to author block in slot {:?}: {:?}", slot_num, e); + return Box::new(future::ok(())) + } + }; + + let remaining_duration = slot_info.remaining_duration(); + // deadline our production to approx. the end of the + // slot + Timeout::new( + proposer.propose(slot_info.inherent_data, remaining_duration).into_future(), + remaining_duration, + ) + } else { + return Box::new(future::ok(())); + } + }; + + Box::new( + proposal_work + .map(move |b| { + // minor hack since we don't have access to the timestamp + // that is actually set by the proposer. + let slot_after_building = slot_now(slot_duration); + if slot_after_building != Some(slot_num) { + info!( + "Discarding proposal for slot {}; block production took too long", + slot_num + ); + return + } + + let (header, body) = b.deconstruct(); + let header_num = header.number().clone(); + let pre_hash = header.hash(); + let parent_hash = header.parent_hash().clone(); + + // sign the pre-sealed hash of the block and then + // add it to a digest item. + let to_sign = (slot_num, pre_hash).encode(); + let signature = pair.sign(&to_sign[..]); + let item = as CompatibleDigestItem>::aura_seal( + slot_num, + signature, + ); + + let import_block: ImportBlock = ImportBlock { + origin: BlockOrigin::Own, + header, + justification: None, + post_digests: vec![item], + body: Some(body), + finalized: false, + auxiliary: Vec::new(), + fork_choice: ForkChoiceStrategy::LongestChain, + }; + + info!("Pre-sealed block for proposal at {}. Hash now {:?}, previously {:?}.", + header_num, + import_block.post_header().hash(), + pre_hash + ); + + if let Err(e) = block_import.import_block(import_block, None) { + warn!(target: "aura", "Error with block built on {:?}: {:?}", + parent_hash, e); + } + }) + .map_err(|e| consensus_common::ErrorKind::ClientImport(format!("{:?}", e)).into()) + ) + } } /// check a header has been signed by the right key. If the slot is too far in the future, an error will be returned. @@ -398,7 +363,7 @@ enum CheckedHeader { // // FIXME #1018 needs misbehavior types fn check_header(slot_now: u64, mut header: B::Header, hash: B::Hash, authorities: &[Ed25519AuthorityId]) - -> Result, String> + -> Result, String> where DigestItemFor: CompatibleDigestItem { let digest_item = match header.digest_mut().pop() { @@ -528,7 +493,7 @@ impl Verifier for AuraVerifier where mut body: Option>, ) -> Result<(ImportBlock, Option>), String> { let mut inherent_data = self.inherent_data_providers.create_inherent_data().map_err(String::from)?; - let (timestamp_now, slot_now) = extract_timestamp_and_slot(&inherent_data) + let (timestamp_now, slot_now) = AuraSlotCompatible::extract_timestamp_and_slot(&inherent_data) .map_err(|e| format!("Could not extract timestamp and slot: {:?}", e))?; let hash = header.hash(); let parent_hash = *header.parent_hash(); @@ -594,53 +559,6 @@ impl Verifier for AuraVerifier where /// The Aura import queue type. pub type AuraImportQueue = BasicQueue>; -/// A slot duration. Create with `get_or_compute`. -// The internal member should stay private here. -#[derive(Clone, Copy, Debug)] -pub struct SlotDuration(u64); - -impl SlotDuration { - /// Either fetch the slot duration from disk or compute it from the genesis - /// state. - pub fn get_or_compute(client: &C) -> ::client::error::Result where - C: ::client::backend::AuxStore, - C: ProvideRuntimeApi, - C::Api: AuraApi, - { - use parity_codec::Decode; - const SLOT_KEY: &[u8] = b"aura_slot_duration"; - - match client.get_aux(SLOT_KEY)? { - Some(v) => u64::decode(&mut &v[..]) - .map(SlotDuration) - .ok_or_else(|| ::client::error::ErrorKind::Backend( - format!("Aura slot duration kept in invalid format"), - ).into()), - None => { - use runtime_primitives::traits::Zero; - let genesis_slot_duration = client.runtime_api() - .slot_duration(&BlockId::number(Zero::zero()))?; - - info!( - "Loaded block-time = {:?} seconds from genesis on first-launch", - genesis_slot_duration - ); - - genesis_slot_duration.using_encoded(|s| { - client.insert_aux(&[(SLOT_KEY, &s[..])], &[]) - })?; - - Ok(SlotDuration(genesis_slot_duration)) - } - } - } - - /// Returns slot duration value. - pub fn get(&self) -> u64 { - self.0 - } -} - /// Register the aura inherent data provider, if not registered already. fn register_aura_inherent_data_provider( inherent_data_providers: &InherentDataProviders, @@ -670,7 +588,7 @@ pub fn import_queue( DigestItemFor: CompatibleDigestItem + DigestItem, E: ExtraVerification, { - register_aura_inherent_data_provider(&inherent_data_providers, slot_duration.0)?; + register_aura_inherent_data_provider(&inherent_data_providers, slot_duration.get())?; let verifier = Arc::new( AuraVerifier { client: client.clone(), extra, inherent_data_providers } @@ -750,10 +668,10 @@ mod tests { let inherent_data_providers = InherentDataProviders::new(); register_aura_inherent_data_provider( &inherent_data_providers, - slot_duration.0 + slot_duration.get() ).expect("Registers aura inherent data provider"); - assert_eq!(slot_duration.0, SLOT_DURATION); + assert_eq!(slot_duration.get(), SLOT_DURATION); Arc::new(AuraVerifier { client, extra: NothingExtra, @@ -800,7 +718,7 @@ mod tests { let mut runtime = current_thread::Runtime::new().unwrap(); for (peer_id, key) in peers { - let mut client = net.lock().peer(*peer_id).client().clone(); + let client = net.lock().peer(*peer_id).client().clone(); let environ = Arc::new(DummyFactory(client.clone())); import_notifications.push( client.import_notification_stream() @@ -815,7 +733,7 @@ mod tests { let inherent_data_providers = InherentDataProviders::new(); register_aura_inherent_data_provider( - &inherent_data_providers, slot_duration.0 + &inherent_data_providers, slot_duration.get() ).expect("Registers aura inherent data provider"); let aura = start_aura(