Merge remote-tracking branch 'substrate-disk/polkadot-testnet-fixes-fp'

This commit is contained in:
Robert Habermeier
2018-08-22 18:45:14 +02:00
9 changed files with 117 additions and 70 deletions
+56 -26
View File
@@ -21,14 +21,18 @@ use polkadot_primitives::AccountId;
use std::collections::HashMap;
use std::time::{Instant, Duration};
// time before we report a validator.
const REPORT_TIME: Duration = Duration::from_secs(60 * 5);
struct Observed {
last_round_end: Instant,
offline_since: Instant,
}
#[derive(Eq, PartialEq)]
enum Activity {
Offline,
StillOffline(Duration),
Online,
}
impl Observed {
fn new() -> Observed {
let now = Instant::now();
@@ -38,31 +42,32 @@ impl Observed {
}
}
fn note_round_end(&mut self, was_online: bool) {
let now = Instant::now();
fn note_round_end(&mut self, now: Instant, was_online: Option<bool>) {
self.last_round_end = now;
if was_online {
if let Some(false) = was_online {
self.offline_since = now;
}
}
fn is_active(&self) -> bool {
/// Returns what we have observed about the online/offline state of the validator.
fn activity(&self) -> Activity {
// can happen if clocks are not monotonic
if self.offline_since > self.last_round_end { return true }
self.last_round_end.duration_since(self.offline_since) < REPORT_TIME
if self.offline_since > self.last_round_end { return Activity::Online }
if self.offline_since == self.last_round_end { return Activity::Offline }
Activity::StillOffline(self.last_round_end.duration_since(self.offline_since))
}
}
/// Tracks offline validators and can issue a report for those offline.
pub struct OfflineTracker {
observed: HashMap<AccountId, Observed>,
block_instant: Instant,
}
impl OfflineTracker {
/// Create a new tracker.
pub fn new() -> Self {
OfflineTracker { observed: HashMap::new() }
OfflineTracker { observed: HashMap::new(), block_instant: Instant::now() }
}
/// Note new consensus is starting with the given set of validators.
@@ -71,23 +76,33 @@ impl OfflineTracker {
let set: HashSet<_> = validators.iter().cloned().collect();
self.observed.retain(|k, _| set.contains(k));
self.block_instant = Instant::now();
}
/// Note that a round has ended.
pub fn note_round_end(&mut self, validator: AccountId, was_online: bool) {
self.observed.entry(validator)
.or_insert_with(Observed::new)
.note_round_end(was_online);
self.observed.entry(validator).or_insert_with(Observed::new);
for (val, obs) in self.observed.iter_mut() {
obs.note_round_end(
self.block_instant,
if val == &validator {
Some(was_online)
} else {
None
}
)
}
}
/// Generate a vector of indices for offline account IDs.
pub fn reports(&self, validators: &[AccountId]) -> Vec<u32> {
validators.iter()
.enumerate()
.filter_map(|(i, v)| if self.is_online(v) {
None
} else {
.filter_map(|(i, v)| if self.is_known_offline_now(v) {
Some(i as u32)
} else {
None
})
.collect()
}
@@ -101,13 +116,15 @@ impl OfflineTracker {
};
// we must think all validators reported externally are offline.
let thinks_online = self.is_online(v);
!thinks_online
self.is_known_offline_now(v)
})
}
fn is_online(&self, v: &AccountId) -> bool {
self.observed.get(v).map(Observed::is_active).unwrap_or(true)
/// Rwturns true only if we have seen the validator miss the last round. For further
/// rounds where we can't say for sure that they're still offline, we give them the
/// benefit of the doubt.
fn is_known_offline_now(&self, v: &AccountId) -> bool {
self.observed.get(v).map(|o| o.activity() == Activity::Offline).unwrap_or(false)
}
}
@@ -121,17 +138,30 @@ mod tests {
let v = [0; 32].into();
let v2 = [1; 32].into();
let v3 = [2; 32].into();
tracker.note_new_block(&[v, v2, v3]);
tracker.note_round_end(v, true);
tracker.note_round_end(v2, true);
tracker.note_round_end(v3, true);
assert_eq!(tracker.reports(&[v, v2, v3]), vec![0u32; 0]);
let slash_time = REPORT_TIME + Duration::from_secs(5);
tracker.observed.get_mut(&v).unwrap().offline_since -= slash_time;
tracker.observed.get_mut(&v2).unwrap().offline_since -= slash_time;
tracker.note_new_block(&[v, v2, v3]);
tracker.note_round_end(v, true);
tracker.note_round_end(v2, false);
tracker.note_round_end(v3, true);
assert_eq!(tracker.reports(&[v, v2, v3]), vec![1]);
assert_eq!(tracker.reports(&[v, v2, v3]), vec![0, 1]);
tracker.note_new_block(&[v, v2, v3]);
tracker.note_round_end(v, false);
assert_eq!(tracker.reports(&[v, v2, v3]), vec![0]);
tracker.note_new_block(&[v, v3]);
tracker.note_new_block(&[v, v2, v3]);
tracker.note_round_end(v, false);
tracker.note_round_end(v2, true);
tracker.note_round_end(v3, false);
assert_eq!(tracker.reports(&[v, v2, v3]), vec![0, 2]);
tracker.note_new_block(&[v, v2]);
tracker.note_round_end(v, false);
assert_eq!(tracker.reports(&[v, v2, v3]), vec![0]);
}
}
+22 -30
View File
@@ -62,32 +62,26 @@ fn start_bft<F, C>(
const DELAY_UNTIL: Duration = Duration::from_millis(5000);
let mut handle = LocalThreadHandle::current();
let work = Delay::new(Instant::now() + DELAY_UNTIL)
.then(move |res| {
if let Err(e) = res {
warn!(target: "bft", "Failed to force delay of consensus: {:?}", e);
}
match bft_service.build_upon(&header) {
Ok(maybe_bft_work) => {
if maybe_bft_work.is_some() {
debug!(target: "bft", "Starting agreement. After forced delay for {:?}",
DELAY_UNTIL);
}
maybe_bft_work
match bft_service.build_upon(&header) {
Ok(Some(bft_work)) => {
// do not poll work for some amount of time.
let work = Delay::new(Instant::now() + DELAY_UNTIL).then(move |res| {
if let Err(e) = res {
warn!(target: "bft", "Failed to force delay of consensus: {:?}", e);
}
Err(e) => {
warn!(target: "bft", "BFT agreement error: {}", e);
None
}
}
})
.map(|_| ());
if let Err(e) = handle.spawn_local(Box::new(work)) {
debug!(target: "bft", "Couldn't initialize BFT agreement: {:?}", e);
}
debug!(target: "bft", "Starting agreement. After forced delay for {:?}",
DELAY_UNTIL);
bft_work
});
if let Err(e) = handle.spawn_local(Box::new(work)) {
warn!(target: "bft", "Couldn't initialize BFT agreement: {:?}", e);
}
}
Ok(None) => trace!(target: "bft", "Could not start agreement on top of {}", header.hash()),
Err(e) => warn!(target: "bft", "BFT agreement error: {}", e),
}
}
// creates a task to prune redundant entries in availability store upon block finalization
@@ -198,6 +192,7 @@ impl Service {
client.import_notification_stream().for_each(move |notification| {
if notification.is_new_best {
trace!(target: "bft", "Attempting to start new consensus round after import notification of {:?}", notification.hash);
start_bft(notification.header, bft_service.clone());
}
Ok(())
@@ -221,15 +216,12 @@ impl Service {
let c = client.clone();
let s = bft_service.clone();
interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| {
interval.map_err(|e| debug!(target: "bft", "Timer error: {:?}", e)).for_each(move |_| {
if let Ok(best_block) = c.best_block_header() {
let hash = best_block.hash();
let last_agreement = s.last_agreement();
let can_build_upon = last_agreement
.map_or(true, |x| !x.live || x.parent_hash != hash);
if hash == prev_best && can_build_upon {
debug!("Starting consensus round after a timeout");
if hash == prev_best {
debug!(target: "bft", "Starting consensus round after a timeout");
start_bft(best_block, s.clone());
}
prev_best = hash;