Forward-port BFT fixes from v0.2 and restructure agreement cancelling (#619)

* more accurate consensus superseding logic

* mild revision to `can_build_on` logic

* fix a deadlock when spawning agreement as non-authority

* dropping BFT future before poll doesn't lead to service deadlock

* push cancel to BFTfuture rather than waiting for task
This commit is contained in:
Robert Habermeier
2018-08-28 13:28:28 +02:00
committed by Gav Wood
parent eb10c392ce
commit a0069f5fe2
3 changed files with 124 additions and 80 deletions
+6 -6
View File
@@ -1920,7 +1920,7 @@ dependencies = [
"lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.41 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -1994,7 +1994,7 @@ dependencies = [
[[package]]
name = "rhododendron"
version = "0.3.2"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -2347,7 +2347,7 @@ dependencies = [
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"rhododendron 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"rhododendron 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
"substrate-codec 0.1.0",
"substrate-executor 0.1.0",
"substrate-keyring 0.1.0",
@@ -2531,7 +2531,7 @@ dependencies = [
name = "substrate-misbehavior-check"
version = "0.1.0"
dependencies = [
"rhododendron 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"rhododendron 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
"substrate-bft 0.1.0",
"substrate-codec 0.1.0",
"substrate-keyring 0.1.0",
@@ -3009,7 +3009,7 @@ name = "substrate-test-client"
version = "0.1.0"
dependencies = [
"hashdb 0.2.1 (git+https://github.com/paritytech/parity-common)",
"rhododendron 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"rhododendron 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
"substrate-bft 0.1.0",
"substrate-client 0.1.0",
"substrate-codec 0.1.0",
@@ -3904,7 +3904,7 @@ dependencies = [
"checksum regex-syntax 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "747ba3b235651f6e2f67dfa8bcdcd073ddb7c243cb21c442fc12395dfcac212d"
"checksum relay 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1576e382688d7e9deecea24417e350d3062d97e32e45d70b1cde65994ff1489a"
"checksum remove_dir_all 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3488ba1b9a2084d38645c4c08276a1752dcbf2c7130d74f1569681ad5d2799c5"
"checksum rhododendron 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "289a6395497f70b8076bf5b9c223e1dc5c0a77619d0a75124f7d4c728d09d2d8"
"checksum rhododendron 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "e20523445e693f394c0e487113ae656071311c9ee4c1e914441bece8c929b21d"
"checksum ring 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)" = "6f7d28b30a72c01b458428e0ae988d4149c20d902346902be881e3edc4bb325c"
"checksum rlp 0.2.1 (git+https://github.com/paritytech/parity-common)" = "<none>"
"checksum rlp 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "89db7f8dfdd5eb7ab3ac3ece7a07fd273a680b4b224cb231181280e8996f9f0b"
+116 -72
View File
@@ -47,15 +47,13 @@ extern crate rhododendron;
#[macro_use]
extern crate log;
#[macro_use]
extern crate futures;
#[macro_use]
extern crate error_chain;
use std::mem;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Instant, Duration};
use codec::Encode;
@@ -65,7 +63,7 @@ use runtime_primitives::traits::{Block, Header};
use runtime_primitives::bft::{Message as PrimitiveMessage, Action as PrimitiveAction, Justification as PrimitiveJustification};
use primitives::AuthorityId;
use futures::{task, Async, Stream, Sink, Future, IntoFuture};
use futures::{Async, Stream, Sink, Future, IntoFuture};
use futures::sync::oneshot;
use tokio::timer::Delay;
use parking_lot::Mutex;
@@ -73,6 +71,13 @@ use parking_lot::Mutex;
pub use rhododendron::{InputStreamConcluded, AdvanceRoundReason};
pub use error::{Error, ErrorKind};
// statuses for an agreement
mod status {
pub const LIVE: usize = 0;
pub const BAD: usize = 1;
pub const GOOD: usize = 2;
}
/// Messages over the proposal.
/// Each message carries an associated round number.
pub type Message<B> = rhododendron::Message<B, <B as Block>::Hash>;
@@ -193,7 +198,7 @@ pub trait Proposer<B: Block> {
/// Block import trait.
pub trait BlockImport<B: Block> {
/// Import a block alongside its corresponding justification.
fn import_block(&self, block: B, justification: Justification<B::Hash>, authorities: &[AuthorityId]);
fn import_block(&self, block: B, justification: Justification<B::Hash>, authorities: &[AuthorityId]) -> bool;
}
/// Trait for getting the authorities at a given block.
@@ -336,8 +341,8 @@ pub struct BftFuture<B, P, I, InStream, OutSink> where
OutSink: Sink<SinkItem=Communication<B>, SinkError=P::Error>,
{
inner: rhododendron::Agreement<BftInstance<B, P>, InStream, OutSink>,
cancel: Arc<AtomicBool>,
send_task: Option<oneshot::Sender<task::Task>>,
status: Arc<AtomicUsize>,
cancel: oneshot::Receiver<()>,
import: Arc<I>,
}
@@ -354,18 +359,19 @@ impl<B, P, I, InStream, OutSink> Future for BftFuture<B, P, I, InStream, OutSink
type Error = ();
fn poll(&mut self) -> ::futures::Poll<(), ()> {
// send the task to the bft service so this can be cancelled.
if let Some(sender) = self.send_task.take() {
let _ = sender.send(task::current());
}
// service has canceled the future. bail
if self.cancel.load(Ordering::Acquire) {
return Ok(Async::Ready(()))
}
let cancel = match self.cancel.poll() {
Ok(Async::Ready(())) | Err(_) => true,
Ok(Async::NotReady) => false,
};
// TODO: handle and log this error in a way which isn't noisy on exit.
let committed = try_ready!(self.inner.poll().map_err(|_| ()));
let committed = match self.inner.poll().map_err(|_| ()) {
Ok(Async::Ready(x)) => x,
Ok(Async::NotReady) =>
return Ok(if cancel { Async::Ready(()) } else { Async::NotReady }),
Err(()) => return Err(()),
};
// if something was committed, the round leader must have proposed.
self.inner.context().proposer.on_round_end(committed.round_number, true);
@@ -373,11 +379,26 @@ impl<B, P, I, InStream, OutSink> Future for BftFuture<B, P, I, InStream, OutSink
// If we didn't see the proposal (very unlikely),
// we will get the block from the network later.
if let Some(justified_block) = committed.candidate {
let hash = justified_block.hash();
info!(target: "bft", "Importing block #{} ({}) directly from BFT consensus",
justified_block.header().number(), justified_block.hash());
justified_block.header().number(), hash);
self.import.import_block(justified_block, committed.justification,
&self.inner.context().authorities)
let import_ok = self.import.import_block(
justified_block,
committed.justification,
&self.inner.context().authorities
);
if !import_ok {
warn!(target: "bft", "{:?} was bad block agreed on in round #{}",
hash, committed.round_number);
self.status.store(status::BAD, Ordering::Release);
} else {
self.status.store(status::GOOD, Ordering::Release);
}
} else {
// assume good unless we received the proposal.
self.status.store(status::GOOD, Ordering::Release);
}
self.inner.context().update_round_cache(committed.round_number);
@@ -397,32 +418,24 @@ impl<B, P, I, InStream, OutSink> Drop for BftFuture<B, P, I, InStream, OutSink>
// TODO: have a trait member to pass misbehavior reports into.
let misbehavior = self.inner.drain_misbehavior().collect::<Vec<_>>();
self.inner.context().proposer.import_misbehavior(misbehavior);
self.cancel.store(true, Ordering::Release);
}
}
struct AgreementHandle {
cancel: Arc<AtomicBool>,
task: Option<oneshot::Receiver<task::Task>>,
status: Arc<AtomicUsize>,
send_cancel: Option<oneshot::Sender<()>>,
}
impl AgreementHandle {
fn is_live(&self) -> bool {
!self.cancel.load(Ordering::Acquire)
fn status(&self) -> usize {
self.status.load(Ordering::Acquire)
}
}
impl Drop for AgreementHandle {
fn drop(&mut self) {
let task = match self.task.take() {
Some(t) => t,
None => return,
};
// if this fails, the task is definitely not live anyway.
if let Ok(task) = task.wait() {
self.cancel.store(true, Ordering::Release);
task.notify();
if let Some(sender) = self.send_cancel.take() {
let _ = sender.send(());
}
}
}
@@ -486,7 +499,12 @@ impl<B, P, I> BftService<B, P, I>
where
{
let hash = header.hash();
if self.last_agreement().map_or(false, |last| last.parent_hash == hash) {
let mut live_agreement = self.live_agreement.lock();
let can_build = live_agreement.as_ref()
.map_or(true, |x| self.can_build_on_inner(header, x));
if !can_build {
return Ok(None)
}
@@ -494,14 +512,15 @@ impl<B, P, I> BftService<B, P, I>
let n = authorities.len();
let max_faulty = max_faulty_of(n);
trace!(target: "bft", "Initiating agreement on top of #{}, {:?}", header.number(), hash);
trace!(target: "bft", "max_faulty_of({})={}", n, max_faulty);
let local_id = self.local_id();
if !authorities.contains(&local_id) {
// cancel current agreement
self.live_agreement.lock().take();
Err(From::from(ErrorKind::InvalidAuthority(local_id)))?;
live_agreement.take();
Err(ErrorKind::InvalidAuthority(local_id).into())?;
}
let (proposer, input, output) = self.factory.init(header, &authorities, self.key.clone())?;
@@ -529,8 +548,11 @@ impl<B, P, I> BftService<B, P, I>
trace!(target: "bft", "Round cache: {:?}", &*cache);
if cache.hash.as_ref() == Some(&hash) {
trace!(target: "bft", "Fast-forwarding to round {}", cache.start_round);
agreement.fast_forward(cache.start_round);
let start_round = cache.start_round;
cache.start_round += 1;
drop(cache);
agreement.fast_forward(start_round);
} else {
*cache = RoundCache {
hash: Some(hash.clone()),
@@ -539,22 +561,19 @@ impl<B, P, I> BftService<B, P, I>
}
}
let cancel = Arc::new(AtomicBool::new(false));
let status = Arc::new(AtomicUsize::new(status::LIVE));
let (tx, rx) = oneshot::channel();
// cancel current agreement.
// defers drop of live to the end.
let _preempted_consensus = {
mem::replace(&mut *self.live_agreement.lock(), Some((hash, AgreementHandle {
task: Some(rx),
cancel: cancel.clone(),
})))
};
*live_agreement = Some((hash, AgreementHandle {
send_cancel: Some(tx),
status: status.clone(),
}));
Ok(Some(BftFuture {
inner: agreement,
cancel: cancel,
send_task: Some(tx),
status: status,
cancel: rx,
import: self.client.clone(),
}))
}
@@ -564,21 +583,24 @@ impl<B, P, I> BftService<B, P, I>
self.live_agreement.lock().take();
}
/// Get current agreement parent hash if any.
pub fn last_agreement(&self) -> Option<LastAgreement<B::Hash>> {
self.live_agreement.lock()
.as_ref()
.map(|&(ref h, ref handle)| LastAgreement { parent_hash: h.clone(), live: handle.is_live() })
/// Whether we can build using the given header.
pub fn can_build_on(&self, header: &B::Header) -> bool {
self.live_agreement.lock().as_ref()
.map_or(true, |x| self.can_build_on_inner(header, x))
}
}
/// Get a reference to the underyling client.
pub fn client(&self) -> &I { &*self.client }
/// Struct representing the last agreement the service has processed.
pub struct LastAgreement<H> {
/// The parent hash that agreement was building on.
pub parent_hash: H,
/// Whether that agreement was live.
pub live: bool,
fn can_build_on_inner(&self, header: &B::Header, live: &(B::Hash, AgreementHandle)) -> bool {
let hash = header.hash();
let &(ref live_hash, ref handle) = live;
match handle.status() {
_ if *header.parent_hash() == *live_hash => true, // can always follow with next block.
status::BAD => hash == *live_hash, // bad block can be re-agreed on.
_ => false, // canceled won't appear since we overwrite the handle before returning.
}
}
}
/// Given a total number of authorities, yield the maximum faulty that would be allowed.
@@ -746,7 +768,6 @@ mod tests {
use runtime_primitives::testing::{Block as GenericTestBlock, Header as TestHeader};
use primitives::H256;
use self::keyring::Keyring;
use tokio::executor::current_thread;
extern crate substrate_keyring as keyring;
@@ -758,8 +779,9 @@ mod tests {
}
impl BlockImport<TestBlock> for FakeClient {
fn import_block(&self, block: TestBlock, _justification: Justification<H256>, _authorities: &[AuthorityId]) {
assert!(self.imported_heights.lock().insert(block.header.number))
fn import_block(&self, block: TestBlock, _justification: Justification<H256>, _authorities: &[AuthorityId]) -> bool {
assert!(self.imported_heights.lock().insert(block.header.number));
true
}
}
@@ -888,21 +910,18 @@ mod tests {
second.parent_hash = first_hash;
let second_hash = second.hash();
let bft = service.build_upon(&first).unwrap();
let mut first_bft = service.build_upon(&first).unwrap().unwrap();
assert!(service.live_agreement.lock().as_ref().unwrap().0 == first_hash);
let mut core = current_thread::CurrentThread::new();
// turn the core so the future gets polled and sends its task to the
// service. otherwise it deadlocks.
core.spawn(bft.unwrap());
core.run_timeout(::std::time::Duration::from_millis(100)).unwrap();
let bft = service.build_upon(&second).unwrap();
let _second_bft = service.build_upon(&second).unwrap();
assert!(service.live_agreement.lock().as_ref().unwrap().0 != first_hash);
assert!(service.live_agreement.lock().as_ref().unwrap().0 == second_hash);
core.spawn(bft.unwrap());
core.run_timeout(::std::time::Duration::from_millis(100)).unwrap();
// first_bft has been cancelled. need to swap out so we can check it.
let (_tx, mut rx) = oneshot::channel();
::std::mem::swap(&mut rx, &mut first_bft.cancel);
assert!(rx.wait().is_ok());
}
#[test]
@@ -1041,4 +1060,29 @@ mod tests {
assert!(false);
}
}
#[test]
fn drop_bft_future_does_not_deadlock() {
let client = FakeClient {
authorities: vec![
Keyring::One.to_raw_public().into(),
Keyring::Two.to_raw_public().into(),
Keyring::Alice.to_raw_public().into(),
Keyring::Eve.to_raw_public().into(),
],
imported_heights: Mutex::new(HashSet::new()),
};
let service = make_service(client);
let first = from_block_number(2);
let first_hash = first.hash();
let mut second = from_block_number(3);
second.parent_hash = first_hash;
let _ = service.build_upon(&first).unwrap();
assert!(service.live_agreement.lock().as_ref().unwrap().0 == first_hash);
service.live_agreement.lock().take();
}
}
+2 -2
View File
@@ -518,7 +518,7 @@ impl<B, E, Block> bft::BlockImport<Block> for Client<B, E, Block>
block: Block,
justification: ::bft::Justification<Block::Hash>,
authorities: &[AuthorityId]
) {
) -> bool {
let (header, extrinsics) = block.deconstruct();
let justified_header = JustifiedHeader {
header: header,
@@ -526,7 +526,7 @@ impl<B, E, Block> bft::BlockImport<Block> for Client<B, E, Block>
authorities: authorities.to_vec(),
};
let _ = self.import_block(BlockOrigin::ConsensusBroadcast, justified_header, Some(extrinsics));
self.import_block(BlockOrigin::ConsensusBroadcast, justified_header, Some(extrinsics)).is_ok()
}
}