apply authority set changes

This commit is contained in:
Robert Habermeier
2018-10-26 19:19:12 +02:00
parent a422a14b52
commit 5f6dc6bc6a
2 changed files with 157 additions and 25 deletions
@@ -54,12 +54,14 @@ impl<H, N> SharedAuthoritySet<H, N> {
}
}
impl<H, N: Add<Output=N> + Ord + Clone> SharedAuthoritySet<H, N> {
impl<H: Eq, N: Add<Output=N> + Ord + Clone> SharedAuthoritySet<H, N> {
/// Note an upcoming pending transition.
pub(crate) fn add_pending_change(&self, pending: PendingChange<H, N>) {
// ordered first by effective number and then by signal-block number.
let mut inner = self.inner.write();
let key = (pending.effective_number(), pending.canon_height);
let idx = inner.pending_changes
.binary_search_by_key(&pending.effective_number(), |change| change.effective_number())
.binary_search_by_key(&key, |change| (change.effective_number(), change.canon_height))
.unwrap_or_else(|i| i);
inner.pending_changes.insert(idx, pending);
@@ -67,7 +69,17 @@ impl<H, N: Add<Output=N> + Ord + Clone> SharedAuthoritySet<H, N> {
/// Get the earliest limit-block number, if any.
pub(crate) fn current_limit(&self) -> Option<N> {
self.inner.read().pending_changes.get(0).map(|change| change.effective_number().clone())
self.inner.read().current_limit()
}
/// Get the current set ID. This is incremented every time the set changes.
pub(crate) fn set_id(&self) -> u64 {
self.inner.read().set_id
}
/// Execute a closure with the inner set mutably.
pub(crate) fn with_mut<F, U>(&self, f: F) -> U where F: FnOnce(&mut AuthoritySet<H, N>) -> U {
f(&mut *self.inner.write())
}
}
@@ -78,18 +90,81 @@ impl<H, N> From<AuthoritySet<H, N>> for SharedAuthoritySet<H, N> {
}
/// A set of authorities.
#[derive(Encode, Decode)]
#[derive(Debug, Clone, Encode, Decode)]
pub(crate) struct AuthoritySet<H, N> {
current_authorities: Vec<(AuthorityId, u64)>,
set_id: u64,
pending_changes: Vec<PendingChange<H, N>>,
}
impl<H, N> AuthoritySet<H, N> {
/// Get the earliest limit-block number, if any.
pub(crate) fn current_limit(&self) -> Option<N> {
self.pending_changes.get(0).map(|change| change.effective_number().clone())
}
/// Get the set identifier.
pub(crate) fn set_id(&self) -> u64 {
self.set_id
}
/// Get the current set id and a reference to the current authority set.
pub(crate) fn current(&self) -> (u64, &[(AuthorityId, u64)]) {
(self.set_id, &self.current_authorities[..])
}
}
impl<H: Eq, N: Ord + Debug> AuthoritySet<H, N> {
/// Apply or prune any pending transitions. Provide a closure that can be used to check for the
/// finalized block with given number.
///
/// Returns true when the set's representation has changed.
pub(crate) fn apply_changes<F, E>(&mut self, just_finalized: N, canonical: F) -> Result<bool, E>
where F: FnMut(N) -> Result<H, E>
{
let mut changed = false;
loop {
let remove_up_to = match self.pending_changes.first() {
None => break,
Some(change) => {
let effective_number = change.effective_number();
if effective_number > just_finalized { break }
// check if the block that signalled the change is canonical in
// our chain.
if canonical(change.canon_height)? == change.canon_hash {
// apply this change: make the set canonical
info!(target: "finality", "Applying authority set change scheduled at block #{:?}",
change.canon_height);
self.current_authorities = change.next_authorities.clone();
self.set_id += 1;
// discard any signalled changes
// that happened before or equal to the effective number of the change.
self.pending_changes.iter()
.take_while(|c| c.canon_height <= effective_number)
.count()
} else {
1 // prune out this entry; it's no longer relevant.
}
}
};
let remove_up_to = ::std::cmp::min(remove_up_to, self.pending_changes.len());
self.pending_changes.drain(..remove_up_to);
changed = true; // always changed because we strip at least the first change.
}
Ok(changed)
}
}
/// A pending change to the authority set.
///
/// This will be applied when the announcing block is at some depth within
/// the finalized chain.
#[derive(Encode, Decode)]
#[derive(Debug, Clone, Encode, Decode)]
pub(crate) struct PendingChange<H, N> {
/// The new authorities and weights to apply.
pub(crate) next_authorities: Vec<(AuthorityId, u64)>,
@@ -103,7 +178,7 @@ pub(crate) struct PendingChange<H, N> {
}
impl<H, N: Add<Output=N> + Clone> PendingChange<H, N> {
/// Returns the effective number.
/// Returns the effective number this change will be applied at.
fn effective_number(&self) -> N {
self.canon_height.clone() + self.finalization_depth.clone()
}
+76 -19
View File
@@ -47,11 +47,11 @@ extern crate parity_codec_derive;
use futures::prelude::*;
use futures::stream::Fuse;
use futures::sync::mpsc;
use client::{Client, ImportNotifications, backend::Backend, CallExecutor};
use client::{Client, error::Error as ClientError, ImportNotifications, backend::Backend, CallExecutor};
use codec::{Encode, Decode};
use consensus_common::BlockImport;
use runtime_primitives::traits::{
As, NumberFor, Block as BlockT, Header as HeaderT, DigestItemFor,
NumberFor, Block as BlockT, Header as HeaderT, DigestItemFor,
};
use runtime_primitives::{generic::BlockId, Justification};
use substrate_primitives::{ed25519, AuthorityId, Blake2Hasher};
@@ -108,7 +108,7 @@ pub enum Error {
/// A blockchain error.
Blockchain(String),
/// Could not complete a round on disk.
CouldNotCompleteRound(::client::error::Error),
CouldNotCompleteRound(ClientError),
/// A timer failed to fire.
Timer(::tokio::timer::Error),
}
@@ -421,6 +421,7 @@ pub struct Environment<B, E, Block: BlockT, N: Network> {
config: Config,
authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
network: N,
set_id: u64,
}
impl<Block: BlockT, B, E, N> grandpa::Chain<Block::Hash, NumberFor<Block>> for Environment<B, E, Block, N> where
@@ -459,13 +460,20 @@ impl<Block: BlockT, B, E, N> grandpa::Chain<Block::Hash, NumberFor<Block>> for E
}
fn best_chain_containing(&self, block: Block::Hash) -> Option<(Block::Hash, NumberFor<Block>)> {
match self.inner.best_containing(block, None) {
// we refuse to vote beyond the current limit number where transitions are scheduled to
// occur.
// once blocks are finalized that make that transition irrelevant or activate it,
// we will proceed onwards. most of the time there will be no pending transition.
let limit = self.authority_set.current_limit();
match self.inner.best_containing(block, limit) {
Ok(Some(hash)) => {
let header = self.inner.header(&BlockId::Hash(hash)).ok()?
.expect("Header known to exist after `best_containing` call; qed");
Some((hash, header.number().clone()))
}
// Ok(None) can be returned when `block` is after `limit`. That might cause issues.
// might be better to return the header itself in this (rare) case.
Ok(None) => None,
Err(e) => {
debug!(target: "afg", "Encountered error finding best chain containing {:?}: {:?}", block, e);
@@ -486,7 +494,8 @@ pub struct ScheduledChange<N> {
/// A GRANDPA-compatible DigestItem. This can describe when GRANDPA set changes
/// are scheduled.
// TODO: with specialization, do a blanket implementation so this trait
//
// With specialization, could do a blanket implementation so this trait
// doesn't have to be implemented by users.
pub trait CompatibleDigestItem<N> {
/// If this digest item notes a GRANDPA set change, return information about
@@ -494,6 +503,33 @@ pub trait CompatibleDigestItem<N> {
fn scheduled_change(&self) -> Option<ScheduledChange<N>> { None }
}
/// Signals either an early exit of a voter or an error.
#[derive(Debug)]
pub enum ExitOrError {
/// An error occurred.
Error(Error),
/// Early exit of the voter: the new set ID and the new authorities along with respective weights.
AuthoritiesChanged(u64, Vec<(AuthorityId, u64)>),
}
impl From<Error> for ExitOrError {
fn from(e: Error) -> Self {
ExitOrError::Error(e)
}
}
impl From<ClientError> for ExitOrError {
fn from(e: ClientError) -> Self {
ExitOrError::Error(Error::from(e))
}
}
impl From<grandpa::Error> for ExitOrError {
fn from(e: grandpa::Error) -> Self {
ExitOrError::Error(Error::from(e))
}
}
impl<B, E, Block: BlockT, N> voter::Environment<Block::Hash, NumberFor<Block>> for Environment<B, E, Block, N> where
Block: 'static,
B: Backend<Block, Blake2Hasher> + 'static,
@@ -514,7 +550,7 @@ impl<B, E, Block: BlockT, N> voter::Environment<Block::Hash, NumberFor<Block>> f
SinkItem = ::grandpa::Message<Block::Hash, NumberFor<Block>>,
SinkError = Self::Error,
>>;
type Error = Error;
type Error = ExitOrError;
#[allow(unreachable_code)]
fn round_data(
@@ -528,24 +564,21 @@ impl<B, E, Block: BlockT, N> voter::Environment<Block::Hash, NumberFor<Block>> f
let prevote_timer = Delay::new(now + self.config.gossip_duration * 2);
let precommit_timer = Delay::new(now + self.config.gossip_duration * 4);
// TODO [now]: Get from shared authority set.
let set_id = unimplemented!();
// TODO: dispatch this with `mpsc::spawn`.
let incoming = checked_message_stream::<Block, _>(
round,
set_id,
self.set_id,
self.network.messages_for(round),
self.config.genesis_voters.clone(),
);
let (out_rx, outgoing) = outgoing_messages::<Block, _>(
round,
set_id,
self.set_id,
self.config.local_key.clone(),
self.config.genesis_voters.clone(),
self.network.clone(),
);
).sink_map_err(Into::into);
// schedule incoming messages from the network to be held until
// corresponding blocks are imported.
@@ -556,7 +589,7 @@ impl<B, E, Block: BlockT, N> voter::Environment<Block::Hash, NumberFor<Block>> f
);
// join incoming network messages with locally originating ones.
let incoming = Box::new(incoming.select(out_rx));
let incoming = Box::new(incoming.select(out_rx).map_err(Into::into));
// schedule network message cleanup when sink drops.
let outgoing = Box::new(ClearOnDrop {
@@ -580,21 +613,44 @@ impl<B, E, Block: BlockT, N> voter::Environment<Block::Hash, NumberFor<Block>> f
.insert_aux(&[(LAST_COMPLETED_KEY, &encoded_state[..])], &[])
{
warn!(target: "afg", "Shutting down voter due to error bookkeeping last completed round in DB: {:?}", e);
Err(Error::CouldNotCompleteRound(e))
Err(Error::CouldNotCompleteRound(e).into())
} else {
Ok(())
}
}
fn finalize_block(&self, hash: Block::Hash, number: NumberFor<Block>) -> Result<(), Self::Error> {
// TODO: don't unconditionally notify.
// ideally some handle to a synchronization oracle would be used
// to avoid unconditionally notifying.
if let Err(e) = self.inner.finalize_block(BlockId::Hash(hash), true) {
warn!(target: "afg", "Error applying finality to block {:?}: {:?}", (hash, number), e);
// we return without error because not being able to finalize (temporarily) is
// non-fatal.
return Ok(());
}
// we return without error in all cases because not being able to finalize is
// non-fatal.
Ok(())
self.authority_set.with_mut(|authority_set| {
let client = &self.inner;
let prior_id = authority_set.set_id();
let has_changed = authority_set.apply_changes(number, |canon_number| {
client.block_hash_from_id(&BlockId::number(canon_number))
.map(|h| h.expect("given number always less than newly-finalized number; \
thus there is a block with that number finalized already; qed"))
})?;
if has_changed {
// TODO [now]: write to disk. if it fails, exit the node.
}
let (new_id, set_ref) = authority_set.current();
if new_id != prior_id {
// the authority set has changed.
return Err(ExitOrError::AuthoritiesChanged(new_id, set_ref.to_vec()));
}
Ok(())
})
}
fn prevote_equivocation(
@@ -692,7 +748,7 @@ pub fn run_grandpa<B, E, Block: BlockT, N>(
))?
};
// TODO: attempt to load from disk.
// TODO [now]: attempt to load from disk.
let authority_set = SharedAuthoritySet::genesis(
voters.iter().map(|(&id, &weight)| (id, weight)).collect(),
);
@@ -707,6 +763,7 @@ pub fn run_grandpa<B, E, Block: BlockT, N>(
config,
voters,
network,
set_id: authority_set.set_id(),
authority_set,
});