mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-26 00:37:57 +00:00
force BFT delay in consensus service, not in proposer logic (#477)
* move forced delay to consensus service * fiddle with logging
This commit is contained in:
committed by
Arkadiy Paronyan
parent
f4cd995558
commit
a7c7bd49d9
@@ -258,8 +258,6 @@ impl<C, N, P> bft::Environment<Block> for ProposerFactory<C, N, P>
|
||||
) -> Result<(Self::Proposer, Self::Input, Self::Output), Error> {
|
||||
use runtime_primitives::traits::{Hash as HashT, BlakeTwo256};
|
||||
|
||||
const DELAY_UNTIL: Duration = Duration::from_millis(5000);
|
||||
|
||||
let parent_hash = parent_header.hash().into();
|
||||
|
||||
let id = BlockId::hash(parent_hash);
|
||||
@@ -295,9 +293,6 @@ impl<C, N, P> bft::Environment<Block> for ProposerFactory<C, N, P>
|
||||
self.parachain_empty_duration.clone(),
|
||||
);
|
||||
|
||||
debug!(target: "bft", "Initialising consensus proposer. Refusing to evaluate for {:?} from now.",
|
||||
DELAY_UNTIL);
|
||||
|
||||
let validation_para = match local_duty.validation {
|
||||
Chain::Relay => None,
|
||||
Chain::Parachain(id) => Some(id),
|
||||
@@ -320,7 +315,6 @@ impl<C, N, P> bft::Environment<Block> for ProposerFactory<C, N, P>
|
||||
client: self.client.clone(),
|
||||
dynamic_inclusion,
|
||||
local_key: sign_with,
|
||||
minimum_delay: now + DELAY_UNTIL,
|
||||
parent_hash,
|
||||
parent_id: id,
|
||||
parent_number: parent_header.number,
|
||||
@@ -375,7 +369,6 @@ pub struct Proposer<C: PolkadotApi> {
|
||||
client: Arc<C>,
|
||||
dynamic_inclusion: DynamicInclusion,
|
||||
local_key: Arc<ed25519::Pair>,
|
||||
minimum_delay: Instant,
|
||||
parent_hash: Hash,
|
||||
parent_id: BlockId,
|
||||
parent_number: BlockNumber,
|
||||
@@ -406,17 +399,10 @@ impl<C> bft::Proposer<Block> for Proposer<C>
|
||||
initial_included,
|
||||
).unwrap_or_else(|| now + Duration::from_millis(1));
|
||||
|
||||
let minimum_delay = if self.minimum_delay > now + ATTEMPT_PROPOSE_EVERY {
|
||||
Some(Delay::new(self.minimum_delay))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let timing = ProposalTiming {
|
||||
attempt_propose: Interval::new(now + ATTEMPT_PROPOSE_EVERY, ATTEMPT_PROPOSE_EVERY),
|
||||
enough_candidates: Delay::new(enough_candidates),
|
||||
dynamic_inclusion: self.dynamic_inclusion.clone(),
|
||||
minimum_delay,
|
||||
last_included: initial_included,
|
||||
};
|
||||
|
||||
@@ -489,11 +475,7 @@ impl<C> bft::Proposer<Block> for Proposer<C>
|
||||
// delay casting vote until able according to minimum block time,
|
||||
// timestamp delay, and count delay.
|
||||
// construct a future from the maximum of the two durations.
|
||||
let max_delay = [timestamp_delay, count_delay, Some(self.minimum_delay)]
|
||||
.iter()
|
||||
.cloned()
|
||||
.max()
|
||||
.expect("iterator not empty; thus max returns `Some`; qed");
|
||||
let max_delay = ::std::cmp::max(timestamp_delay, count_delay);
|
||||
|
||||
let temporary_delay = match max_delay {
|
||||
Some(duration) => future::Either::A(
|
||||
@@ -615,7 +597,6 @@ struct ProposalTiming {
|
||||
attempt_propose: Interval,
|
||||
dynamic_inclusion: DynamicInclusion,
|
||||
enough_candidates: Delay,
|
||||
minimum_delay: Option<Delay>,
|
||||
last_included: usize,
|
||||
}
|
||||
|
||||
@@ -632,12 +613,6 @@ impl ProposalTiming {
|
||||
x.expect("timer still alive; intervals never end; qed");
|
||||
}
|
||||
|
||||
if let Some(ref mut min) = self.minimum_delay {
|
||||
try_ready!(min.poll().map_err(ErrorKind::Timer));
|
||||
}
|
||||
|
||||
self.minimum_delay = None; // after this point, the future must have completed.
|
||||
|
||||
if included == self.last_included {
|
||||
return self.enough_candidates.poll().map_err(ErrorKind::Timer);
|
||||
}
|
||||
|
||||
@@ -38,7 +38,7 @@ use transaction_pool::TransactionPool;
|
||||
use tokio::executor::current_thread::TaskExecutor as LocalThreadHandle;
|
||||
use tokio::runtime::TaskExecutor as ThreadPoolHandle;
|
||||
use tokio::runtime::current_thread::Runtime as LocalRuntime;
|
||||
use tokio::timer::Interval;
|
||||
use tokio::timer::{Delay, Interval};
|
||||
|
||||
use super::{Network, Collators, ProposerFactory};
|
||||
use error;
|
||||
@@ -49,8 +49,8 @@ const TIMER_INTERVAL_MS: u64 = 500;
|
||||
// spin up an instance of BFT agreement on the current thread's executor.
|
||||
// panics if there is no current thread executor.
|
||||
fn start_bft<F, C>(
|
||||
header: &Header,
|
||||
bft_service: &BftService<Block, F, C>,
|
||||
header: Header,
|
||||
bft_service: Arc<BftService<Block, F, C>>,
|
||||
) where
|
||||
F: bft::Environment<Block> + 'static,
|
||||
C: bft::BlockImport<Block> + bft::Authorities<Block> + 'static,
|
||||
@@ -58,14 +58,35 @@ fn start_bft<F, C>(
|
||||
<F::Proposer as bft::Proposer<Block>>::Error: ::std::fmt::Display + Into<error::Error>,
|
||||
<F as bft::Environment<Block>>::Error: ::std::fmt::Display
|
||||
{
|
||||
const DELAY_UNTIL: Duration = Duration::from_millis(5000);
|
||||
|
||||
let mut handle = LocalThreadHandle::current();
|
||||
match bft_service.build_upon(&header) {
|
||||
Ok(Some(bft)) => if let Err(e) = handle.spawn_local(Box::new(bft)) {
|
||||
debug!(target: "bft", "Couldn't initialize BFT agreement: {:?}", e);
|
||||
},
|
||||
Ok(None) => {},
|
||||
Err(e) => warn!(target: "bft", "BFT agreement error: {}", e),
|
||||
}
|
||||
let work = Delay::new(Instant::now() + DELAY_UNTIL)
|
||||
.then(move |res| {
|
||||
if let Err(e) = res {
|
||||
warn!(target: "bft", "Failed to force delay of consensus: {:?}", e);
|
||||
}
|
||||
|
||||
match bft_service.build_upon(&header) {
|
||||
Ok(maybe_bft_work) => {
|
||||
if maybe_bft_work.is_some() {
|
||||
debug!(target: "bft", "Starting agreement. After forced delay for {:?}",
|
||||
DELAY_UNTIL);
|
||||
}
|
||||
|
||||
maybe_bft_work
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(target: "bft", "BFT agreement error: {}", e);
|
||||
None
|
||||
}
|
||||
}
|
||||
})
|
||||
.map(|_| ());
|
||||
|
||||
if let Err(e) = handle.spawn_local(Box::new(work)) {
|
||||
debug!(target: "bft", "Couldn't initialize BFT agreement: {:?}", e);
|
||||
}
|
||||
}
|
||||
|
||||
/// Consensus service. Starts working when created.
|
||||
@@ -113,7 +134,7 @@ impl Service {
|
||||
|
||||
client.import_notification_stream().for_each(move |notification| {
|
||||
if notification.is_new_best {
|
||||
start_bft(¬ification.header, &*bft_service);
|
||||
start_bft(notification.header, bft_service.clone());
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
@@ -139,9 +160,9 @@ impl Service {
|
||||
interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| {
|
||||
if let Ok(best_block) = c.best_block_header() {
|
||||
let hash = best_block.hash();
|
||||
if hash == prev_best {
|
||||
if hash == prev_best && s.live_agreement() != Some(hash) {
|
||||
debug!("Starting consensus round after a timeout");
|
||||
start_bft(&best_block, &*s);
|
||||
start_bft(best_block, s.clone());
|
||||
}
|
||||
prev_best = hash;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user