From 192907811ba66da3c35d46f456e28d7d5051e792 Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan Date: Fri, 24 Aug 2018 14:45:06 +0200 Subject: [PATCH] BFT delay adjustments (#593) * force delay only on votes * set proposal timestamp forward * Adjusted timeout formula --- polkadot/consensus/src/lib.rs | 19 +++++++++++++------ polkadot/consensus/src/service.rs | 21 +++------------------ 2 files changed, 16 insertions(+), 24 deletions(-) diff --git a/polkadot/consensus/src/lib.rs b/polkadot/consensus/src/lib.rs index 944f3f85d5..92d97d4444 100644 --- a/polkadot/consensus/src/lib.rs +++ b/polkadot/consensus/src/lib.rs @@ -64,7 +64,7 @@ extern crate substrate_keyring; use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::{self, Duration, Instant}; use codec::{Decode, Encode}; use extrinsic_store::Store as ExtrinsicStore; @@ -274,6 +274,9 @@ impl bft::Environment for ProposerFactory ) -> Result<(Self::Proposer, Self::Input, Self::Output), Error> { use runtime_primitives::traits::{Hash as HashT, BlakeTwo256}; + // force delay in evaluation this long. + const FORCE_DELAY: Timestamp = 5; + let parent_hash = parent_header.hash().into(); let id = BlockId::hash(parent_hash); @@ -343,6 +346,7 @@ impl bft::Environment for ProposerFactory transaction_pool: self.transaction_pool.clone(), offline: self.offline.clone(), validators, + minimum_timestamp: current_timestamp() + FORCE_DELAY, _drop_signal: drop_signal, }; @@ -422,6 +426,7 @@ pub struct Proposer { transaction_pool: Arc>, offline: SharedOfflineTracker, validators: Vec, + minimum_timestamp: u64, _drop_signal: exit_future::Signal, } @@ -473,6 +478,7 @@ impl bft::Proposer for Proposer table: self.table.clone(), offline: self.offline.clone(), validators: self.validators.clone(), + minimum_timestamp: self.minimum_timestamp, timing, }) } @@ -525,9 +531,11 @@ impl bft::Proposer for Proposer ); // the duration until the given timestamp is current - let proposed_timestamp = proposal.timestamp(); + let proposed_timestamp = ::std::cmp::max(self.minimum_timestamp, proposal.timestamp()); let timestamp_delay = if proposed_timestamp > current_timestamp { - Some(now + Duration::from_secs(proposed_timestamp - current_timestamp)) + let delay_s = proposed_timestamp - current_timestamp; + debug!(target: "bft", "Delaying evaluation of proposal for {} seconds", delay_s); + Some(now + Duration::from_secs(delay_s)) } else { None }; @@ -677,8 +685,6 @@ impl bft::Proposer for Proposer } fn current_timestamp() -> Timestamp { - use std::time; - time::SystemTime::now().duration_since(time::UNIX_EPOCH) .expect("now always later than unix epoch; qed") .as_secs() @@ -732,6 +738,7 @@ pub struct CreateProposal { timing: ProposalTiming, validators: Vec, offline: SharedOfflineTracker, + minimum_timestamp: Timestamp, } impl CreateProposal where C: PolkadotApi + Send + Sync { @@ -743,7 +750,7 @@ impl CreateProposal where C: PolkadotApi + Send + Sync { const MAX_VOTE_OFFLINE_SECONDS: Duration = Duration::from_secs(60); // TODO: handle case when current timestamp behind that in state. - let timestamp = current_timestamp(); + let timestamp = ::std::cmp::max(self.minimum_timestamp, current_timestamp()); let elapsed_since_start = self.timing.dynamic_inclusion.started_at().elapsed(); let offline_indices = if elapsed_since_start > MAX_VOTE_OFFLINE_SECONDS { diff --git a/polkadot/consensus/src/service.rs b/polkadot/consensus/src/service.rs index ea84209ea8..8cf6838801 100644 --- a/polkadot/consensus/src/service.rs +++ b/polkadot/consensus/src/service.rs @@ -39,7 +39,7 @@ use extrinsic_store::Store as ExtrinsicStore; use tokio::executor::current_thread::TaskExecutor as LocalThreadHandle; use tokio::runtime::TaskExecutor as ThreadPoolHandle; use tokio::runtime::current_thread::Runtime as LocalRuntime; -use tokio::timer::{Delay, Interval}; +use tokio::timer::Interval; use super::{Network, Collators, ProposerFactory}; use error; @@ -59,25 +59,10 @@ fn start_bft( >::Error: ::std::fmt::Display + Into, >::Error: ::std::fmt::Display { - const DELAY_UNTIL: Duration = Duration::from_millis(5000); - let mut handle = LocalThreadHandle::current(); match bft_service.build_upon(&header) { - Ok(Some(bft_work)) => { - // do not poll work for some amount of time. - let work = Delay::new(Instant::now() + DELAY_UNTIL).then(move |res| { - if let Err(e) = res { - warn!(target: "bft", "Failed to force delay of consensus: {:?}", e); - } - - debug!(target: "bft", "Starting agreement. After forced delay for {:?}", - DELAY_UNTIL); - - bft_work - }); - if let Err(e) = handle.spawn_local(Box::new(work)) { - warn!(target: "bft", "Couldn't initialize BFT agreement: {:?}", e); - } + Ok(Some(bft_work)) => if let Err(e) = handle.spawn_local(Box::new(bft_work)) { + warn!(target: "bft", "Couldn't initialize BFT agreement: {:?}", e); } Ok(None) => trace!(target: "bft", "Could not start agreement on top of {}", header.hash()), Err(e) => warn!(target: "bft", "BFT agreement error: {}", e),