[forward port] continue rounds (#583)

* keep rounds consistent when encountering bad block

* fix interval logic

* Fixed indentation
This commit is contained in:
Robert Habermeier
2018-08-17 22:12:37 +02:00
committed by Arkadiy Paronyan
parent e482958705
commit 3ef4463c3a
2 changed files with 93 additions and 19 deletions
+6 -6
View File
@@ -1918,7 +1918,7 @@ dependencies = [
"lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.41 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -1992,7 +1992,7 @@ dependencies = [
[[package]]
name = "rhododendron"
version = "0.3.1"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -2345,7 +2345,7 @@ dependencies = [
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"rhododendron 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"rhododendron 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"substrate-codec 0.1.0",
"substrate-executor 0.1.0",
"substrate-keyring 0.1.0",
@@ -2526,7 +2526,7 @@ dependencies = [
name = "substrate-misbehavior-check"
version = "0.1.0"
dependencies = [
"rhododendron 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"rhododendron 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"substrate-bft 0.1.0",
"substrate-codec 0.1.0",
"substrate-keyring 0.1.0",
@@ -2998,7 +2998,7 @@ name = "substrate-test-client"
version = "0.1.0"
dependencies = [
"hashdb 0.2.1 (git+https://github.com/paritytech/parity-common)",
"rhododendron 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"rhododendron 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"substrate-bft 0.1.0",
"substrate-client 0.1.0",
"substrate-codec 0.1.0",
@@ -3893,7 +3893,7 @@ dependencies = [
"checksum regex-syntax 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "747ba3b235651f6e2f67dfa8bcdcd073ddb7c243cb21c442fc12395dfcac212d"
"checksum relay 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1576e382688d7e9deecea24417e350d3062d97e32e45d70b1cde65994ff1489a"
"checksum remove_dir_all 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3488ba1b9a2084d38645c4c08276a1752dcbf2c7130d74f1569681ad5d2799c5"
"checksum rhododendron 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3d2b4c928dfb981e491432f0809e93c99857112b0a348a93eee6b13e0bf0f0f3"
"checksum rhododendron 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "289a6395497f70b8076bf5b9c223e1dc5c0a77619d0a75124f7d4c728d09d2d8"
"checksum ring 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)" = "6f7d28b30a72c01b458428e0ae988d4149c20d902346902be881e3edc4bb325c"
"checksum rlp 0.2.1 (git+https://github.com/paritytech/parity-common)" = "<none>"
"checksum rlp 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "89db7f8dfdd5eb7ab3ac3ece7a07fd273a680b4b224cb231181280e8996f9f0b"
+87 -13
View File
@@ -167,7 +167,7 @@ pub trait Environment<B: Block> {
/// block.
pub trait Proposer<B: Block> {
/// Error type which can occur when proposing or evaluating.
type Error: From<Error> + From<InputStreamConcluded> + 'static;
type Error: From<Error> + From<InputStreamConcluded> + ::std::fmt::Debug + 'static;
/// Future that resolves to a committed proposal.
type Create: IntoFuture<Item=B,Error=Self::Error>;
/// Future that resolves when a proposal is evaluated.
@@ -202,12 +202,24 @@ pub trait Authorities<B: Block> {
fn authorities(&self, at: &BlockId<B>) -> Result<Vec<AuthorityId>, Error>;
}
// caches the round number to start at if we end up with BFT consensus on the same
// parent hash more than once (happens if block is bad).
//
// this will force a committed but locally-bad block to be considered analogous to
// a round advancement vote.
#[derive(Debug)]
struct RoundCache<H> {
hash: Option<H>,
start_round: usize,
}
/// Instance of BFT agreement.
struct BftInstance<B: Block, P> {
key: Arc<ed25519::Pair>,
authorities: Vec<AuthorityId>,
parent_hash: B::Hash,
round_timeout_multiplier: u64,
cache: Arc<Mutex<RoundCache<B::Hash>>>,
proposer: P,
}
@@ -229,6 +241,13 @@ impl<B: Block, P: Proposer<B>> BftInstance<B, P>
Duration::from_secs(timeout)
}
fn update_round_cache(&self, current_round: usize) {
let mut cache = self.cache.lock();
if cache.hash.as_ref() == Some(&self.parent_hash) {
cache.start_round = current_round + 1;
}
}
}
impl<B: Block, P: Proposer<B>> rhododendron::Context for BftInstance<B, P>
@@ -299,6 +318,8 @@ impl<B: Block, P: Proposer<B>> rhododendron::Context for BftInstance<B, P>
collect_pubkeys(accumulator.voters()));
debug!(target: "bft", "Round {} should end in at most {} seconds from now", next_round, round_timeout.as_secs());
self.update_round_cache(next_round);
if let AdvanceRoundReason::Timeout = reason {
self.proposer.on_round_end(round, accumulator.proposal().is_some());
}
@@ -359,6 +380,8 @@ impl<B, P, I, InStream, OutSink> Future for BftFuture<B, P, I, InStream, OutSink
&self.inner.context().authorities)
}
self.inner.context().update_round_cache(committed.round_number);
Ok(Async::Ready(()))
}
}
@@ -374,6 +397,7 @@ impl<B, P, I, InStream, OutSink> Drop for BftFuture<B, P, I, InStream, OutSink>
// TODO: have a trait member to pass misbehavior reports into.
let misbehavior = self.inner.drain_misbehavior().collect::<Vec<_>>();
self.inner.context().proposer.import_misbehavior(misbehavior);
self.cancel.store(true, Ordering::Release);
}
}
@@ -382,6 +406,12 @@ struct AgreementHandle {
task: Option<oneshot::Receiver<task::Task>>,
}
impl AgreementHandle {
fn is_live(&self) -> bool {
!self.cancel.load(Ordering::Acquire)
}
}
impl Drop for AgreementHandle {
fn drop(&mut self) {
let task = match self.task.take() {
@@ -404,6 +434,7 @@ impl Drop for AgreementHandle {
pub struct BftService<B: Block, P, I> {
client: Arc<I>,
live_agreement: Mutex<Option<(B::Hash, AgreementHandle)>>,
round_cache: Arc<Mutex<RoundCache<B::Hash>>>,
round_timeout_multiplier: u64,
key: Arc<ed25519::Pair>, // TODO: key changing over time.
factory: P,
@@ -422,9 +453,13 @@ impl<B, P, I> BftService<B, P, I>
BftService {
client: client,
live_agreement: Mutex::new(None),
round_cache: Arc::new(Mutex::new(RoundCache {
hash: None,
start_round: 0,
})),
round_timeout_multiplier: 4,
key: key, // TODO: key changing over time.
factory: factory,
factory,
}
}
@@ -451,8 +486,8 @@ impl<B, P, I> BftService<B, P, I>
where
{
let hash = header.hash();
if self.live_agreement.lock().as_ref().map_or(false, |&(ref h, _)| h == &hash) {
return Ok(None);
if self.last_agreement().map_or(false, |last| last.parent_hash == hash) {
return Ok(None)
}
let authorities = self.client.authorities(&BlockId::Hash(hash.clone()))?;
@@ -474,12 +509,13 @@ impl<B, P, I> BftService<B, P, I>
let bft_instance = BftInstance {
proposer,
parent_hash: hash.clone(),
cache: self.round_cache.clone(),
round_timeout_multiplier: self.round_timeout_multiplier,
key: self.key.clone(),
authorities: authorities,
};
let agreement = rhododendron::agree(
let mut agreement = rhododendron::agree(
bft_instance,
n,
max_faulty,
@@ -487,6 +523,22 @@ impl<B, P, I> BftService<B, P, I>
output,
);
// fast forward round number if necessary.
{
let mut cache = self.round_cache.lock();
trace!(target: "bft", "Round cache: {:?}", &*cache);
if cache.hash.as_ref() == Some(&hash) {
trace!(target: "bft", "Fast-forwarding to round {}", cache.start_round);
agreement.fast_forward(cache.start_round);
cache.start_round += 1;
} else {
*cache = RoundCache {
hash: Some(hash.clone()),
start_round: 1,
};
}
}
let cancel = Arc::new(AtomicBool::new(false));
let (tx, rx) = oneshot::channel();
@@ -513,10 +565,20 @@ impl<B, P, I> BftService<B, P, I>
}
/// Get current agreement parent hash if any.
pub fn live_agreement(&self) -> Option<B::Hash> {
self.live_agreement.lock().as_ref().map(|&(ref h, _)| h.clone())
pub fn last_agreement(&self) -> Option<LastAgreement<B::Hash>> {
self.live_agreement.lock()
.as_ref()
.map(|&(ref h, ref handle)| LastAgreement { parent_hash: h.clone(), live: handle.is_live() })
}
}
/// Struct representing the last agreement the service has processed.
pub struct LastAgreement<H> {
/// The parent hash that agreement was building on.
pub parent_hash: H,
/// Whether that agreement was live.
pub live: bool,
}
/// Given a total number of authorities, yield the maximum faulty that would be allowed.
@@ -684,7 +746,6 @@ mod tests {
use runtime_primitives::testing::{Block as GenericTestBlock, Header as TestHeader};
use primitives::H256;
use self::keyring::Keyring;
use futures::stream;
use tokio::executor::current_thread;
extern crate substrate_keyring as keyring;
@@ -709,9 +770,9 @@ mod tests {
}
// "black hole" output sink.
struct Output<E>(::std::marker::PhantomData<E>);
struct Comms<E>(::std::marker::PhantomData<E>);
impl<E> Sink for Output<E> {
impl<E> Sink for Comms<E> {
type SinkItem = Communication<TestBlock>;
type SinkError = E;
@@ -724,19 +785,28 @@ mod tests {
}
}
impl<E> Stream for Comms<E> {
type Item = Communication<TestBlock>;
type Error = E;
fn poll(&mut self) -> ::futures::Poll<Option<Self::Item>, Self::Error> {
Ok(::futures::Async::NotReady)
}
}
struct DummyFactory;
struct DummyProposer(u64);
impl Environment<TestBlock> for DummyFactory {
type Proposer = DummyProposer;
type Input = stream::Empty<Communication<TestBlock>, Error>;
type Output = Output<Error>;
type Input = Comms<Error>;
type Output = Comms<Error>;
type Error = Error;
fn init(&self, parent_header: &TestHeader, _authorities: &[AuthorityId], _sign_with: Arc<ed25519::Pair>)
-> Result<(DummyProposer, Self::Input, Self::Output), Error>
{
Ok((DummyProposer(parent_header.number + 1), stream::empty(), Output(::std::marker::PhantomData)))
Ok((DummyProposer(parent_header.number + 1), Comms(::std::marker::PhantomData), Comms(::std::marker::PhantomData)))
}
}
@@ -770,6 +840,10 @@ mod tests {
BftService {
client: Arc::new(client),
live_agreement: Mutex::new(None),
round_cache: Arc::new(Mutex::new(RoundCache {
hash: None,
start_round: 0,
})),
round_timeout_multiplier: 4,
key: Arc::new(Keyring::One.into()),
factory: DummyFactory