restart voter with new authority set as necessary

This commit is contained in:
Robert Habermeier
2018-10-27 17:32:42 +02:00
parent 25984d8dc8
commit 1a3fe5657d
2 changed files with 136 additions and 61 deletions
@@ -85,6 +85,15 @@ impl<H, N> From<AuthoritySet<H, N>> for SharedAuthoritySet<H, N> {
}
}
/// Status of the set after changes were applied.
pub(crate) struct Status<H, N> {
/// Whether internal changes were made.
pub(crate) changed: bool,
/// `Some` when underlying authority set has changed, containign the
/// block where that set changed.
pub(crate) new_set_block: Option<(H, N)>,
}
/// A set of authorities.
#[derive(Debug, Clone, Encode, Decode)]
pub(crate) struct AuthoritySet<H, N> {
@@ -129,12 +138,16 @@ impl<H: Eq, N> AuthoritySet<H, N>
/// Apply or prune any pending transitions. Provide a closure that can be used to check for the
/// finalized block with given number.
///
/// Returns true when the set's representation has changed.
/// When the set has changed, the return value will be `Ok(Some((H, N)))` which is the cnaonical
/// block where the set last changed.
pub(crate) fn apply_changes<F, E>(&mut self, just_finalized: N, mut canonical: F)
-> Result<bool, E>
-> Result<Status<H, N>, E>
where F: FnMut(N) -> Result<H, E>
{
let mut changed = false;
let mut status = Status {
changed: false,
new_set_block: None,
};
loop {
let remove_up_to = match self.pending_changes.first() {
None => break,
@@ -152,6 +165,11 @@ impl<H: Eq, N> AuthoritySet<H, N>
self.current_authorities = change.next_authorities.clone();
self.set_id += 1;
status.new_set_block = Some((
canonical(effective_number.clone())?,
effective_number.clone(),
));
// discard any signalled changes
// that happened before or equal to the effective number of the change.
self.pending_changes.iter()
@@ -165,10 +183,10 @@ impl<H: Eq, N> AuthoritySet<H, N>
let remove_up_to = ::std::cmp::min(remove_up_to, self.pending_changes.len());
self.pending_changes.drain(..remove_up_to);
changed = true; // always changed because we strip at least the first change.
status.changed = true; // always changed because we strip at least the first change.
}
Ok(changed)
Ok(status)
}
}
@@ -264,16 +282,18 @@ mod tests {
authorities.add_pending_change(change_a.clone());
authorities.add_pending_change(change_b.clone());
authorities.apply_changes::<_, ()>(10, |_| panic!()).unwrap();
authorities.apply_changes(10, |_| Err(())).unwrap();
assert!(authorities.current_authorities.is_empty());
authorities.apply_changes::<_, ()>(
15,
|n| if n == 5 { Ok("hash_a") } else { panic!() }
).unwrap();
authorities.apply_changes(15, |n| match n {
5 => Ok("hash_a"),
15 => Ok("hash_15_canon"),
_ => Err(()),
}).unwrap();
assert_eq!(authorities.current_authorities, set_a);
assert_eq!(authorities.set_id, 1);
assert!(authorities.pending_changes.is_empty());
}
#[test]
@@ -318,10 +338,12 @@ mod tests {
5 => Ok("hash_a"),
15 => Ok("hash_b"),
16 => Ok("hash_c"),
26 => Ok("hash_26"),
_ => Err(()),
}).unwrap();
assert_eq!(authorities.current_authorities, set_c);
assert_eq!(authorities.set_id, 2); // has been bumped only twice
assert!(authorities.pending_changes.is_empty());
}
}
+104 -51
View File
@@ -67,8 +67,8 @@ mod authorities;
const LAST_COMPLETED_KEY: &[u8] = b"grandpa_completed_round";
const AUTHORITY_SET_KEY: &[u8] = b"grandpa_voters";
/// round-number, round-state, set indicator
type LastCompleted<H, N> = (u64, RoundState<H, N>, u64);
/// round-number, round-state
type LastCompleted<H, N> = (u64, RoundState<H, N>);
/// A GRANDPA message for a substrate chain.
pub type Message<Block> = grandpa::Message<<Block as BlockT>::Hash, NumberFor<Block>>;
@@ -85,11 +85,10 @@ pub type Prevote<Block> = grandpa::Prevote<<Block as BlockT>::Hash, NumberFor<Bl
pub type Precommit<Block> = grandpa::Precommit<<Block as BlockT>::Hash, NumberFor<Block>>;
/// Configuration for the GRANDPA service.
#[derive(Clone)]
pub struct Config {
/// The expected duration for a message to be gossiped across the network.
pub gossip_duration: Duration,
/// The voters.
pub genesis_voters: Vec<AuthorityId>,
/// The local signing key.
pub local_key: Option<Arc<ed25519::Pair>>,
}
@@ -331,7 +330,7 @@ fn checked_message_stream<Block: BlockT, S>(
round: u64,
set_id: u64,
inner: S,
voters: Vec<AuthorityId>,
voters: Arc<HashMap<AuthorityId, u64>>,
)
-> impl Stream<Item=SignedMessage<Block>,Error=Error> where
S: Stream<Item=Vec<u8>,Error=()>
@@ -346,7 +345,7 @@ fn checked_message_stream<Block: BlockT, S>(
})
.and_then(move |msg| {
// check signature.
if !voters.contains(&msg.id) {
if !voters.contains_key(&msg.id) {
debug!(target: "afg", "Skipping message from unknown voter {}", msg.id);
return Ok(None);
}
@@ -368,7 +367,7 @@ fn outgoing_messages<Block: BlockT, N: Network>(
round: u64,
set_id: u64,
local_key: Option<Arc<ed25519::Pair>>,
voters: Vec<AuthorityId>,
voters: Arc<HashMap<AuthorityId, u64>>,
network: N,
) -> (
impl Stream<Item=SignedMessage<Block>,Error=Error>,
@@ -376,7 +375,12 @@ fn outgoing_messages<Block: BlockT, N: Network>(
) {
let locals = local_key.and_then(|pair| {
let public = pair.public();
voters.iter().find(|id| id.0 == public.0).map(move |id| (pair, id.clone()))
let id = AuthorityId(public.0);
if voters.contains_key(&id) {
Some((pair, id))
} else {
None
}
});
let (tx, rx) = mpsc::unbounded();
@@ -413,7 +417,7 @@ fn outgoing_messages<Block: BlockT, N: Network>(
/// The environment we run GRANDPA in.
pub struct Environment<B, E, Block: BlockT, N: Network> {
inner: Arc<Client<B, E, Block>>,
voters: HashMap<AuthorityId, u64>,
voters: Arc<HashMap<AuthorityId, u64>>,
config: Config,
authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
network: N,
@@ -499,28 +503,37 @@ pub trait CompatibleDigestItem<N> {
fn scheduled_change(&self) -> Option<ScheduledChange<N>> { None }
}
/// A new authority set along with the canonical block it changed at.
#[derive(Debug)]
pub struct NewAuthoritySet<H, N> {
canon_number: N,
canon_hash: H,
set_id: u64,
authorities: Vec<(AuthorityId, u64)>,
}
/// Signals either an early exit of a voter or an error.
#[derive(Debug)]
pub enum ExitOrError {
pub enum ExitOrError<H, N> {
/// An error occurred.
Error(Error),
/// Early exit of the voter: the new set ID and the new authorities along with respective weights.
AuthoritiesChanged(u64, Vec<(AuthorityId, u64)>),
AuthoritiesChanged(NewAuthoritySet<H, N>),
}
impl From<Error> for ExitOrError {
impl<H, N> From<Error> for ExitOrError<H, N> {
fn from(e: Error) -> Self {
ExitOrError::Error(e)
}
}
impl From<ClientError> for ExitOrError {
impl<H, N> From<ClientError> for ExitOrError<H, N> {
fn from(e: ClientError) -> Self {
ExitOrError::Error(Error::Client(e))
}
}
impl From<grandpa::Error> for ExitOrError {
impl<H, N> From<grandpa::Error> for ExitOrError<H, N> {
fn from(e: grandpa::Error) -> Self {
ExitOrError::Error(Error::from(e))
}
@@ -546,7 +559,7 @@ impl<B, E, Block: BlockT, N> voter::Environment<Block::Hash, NumberFor<Block>> f
SinkItem = ::grandpa::Message<Block::Hash, NumberFor<Block>>,
SinkError = Self::Error,
>>;
type Error = ExitOrError;
type Error = ExitOrError<Block::Hash, NumberFor<Block>>;
#[allow(unreachable_code)]
fn round_data(
@@ -565,14 +578,14 @@ impl<B, E, Block: BlockT, N> voter::Environment<Block::Hash, NumberFor<Block>> f
round,
self.set_id,
self.network.messages_for(round),
self.config.genesis_voters.clone(),
self.voters.clone(),
);
let (out_rx, outgoing) = outgoing_messages::<Block, _>(
round,
self.set_id,
self.config.local_key.clone(),
self.config.genesis_voters.clone(),
self.voters.clone(),
self.network.clone(),
);
@@ -597,7 +610,7 @@ impl<B, E, Block: BlockT, N> voter::Environment<Block::Hash, NumberFor<Block>> f
voter::RoundData {
prevote_timer: Box::new(prevote_timer.map_err(|e| Error::Timer(e).into())),
precommit_timer: Box::new(precommit_timer.map_err(|e| Error::Timer(e).into())),
voters: self.voters.clone(),
voters: (&*self.voters).clone(),
incoming,
outgoing,
}
@@ -628,21 +641,30 @@ impl<B, E, Block: BlockT, N> voter::Environment<Block::Hash, NumberFor<Block>> f
self.authority_set.with_mut(|authority_set| {
let client = &self.inner;
let prior_id = authority_set.set_id();
let has_changed = authority_set.apply_changes(number, |canon_number| {
let status = authority_set.apply_changes(number, |canon_number| {
client.block_hash_from_id(&BlockId::number(canon_number))
.map(|h| h.expect("given number always less than newly-finalized number; \
thus there is a block with that number finalized already; qed"))
})?;
if has_changed {
if status.changed {
// TODO [now]: write to disk. if it fails, exit the node.
// write `authorities.encode()`
if let Some((ref canon_hash, ref canon_number)) = status.new_set_block {
// write `LastFinalized` entry with `RoundState::genesis(canon)`.
}
}
let (new_id, set_ref) = authority_set.current();
if new_id != prior_id {
if let Some((canon_hash, canon_number)) = status.new_set_block {
// the authority set has changed.
return Err(ExitOrError::AuthoritiesChanged(new_id, set_ref.to_vec()));
let (new_id, set_ref) = authority_set.current();
return Err(ExitOrError::AuthoritiesChanged(NewAuthoritySet {
canon_hash,
canon_number,
set_id: new_id,
authorities: set_ref.to_vec(),
}));
}
Ok(())
@@ -727,22 +749,12 @@ pub fn run_grandpa<B, E, Block: BlockT, N>(
NumberFor<Block>: BlockNumberOps,
DigestItemFor<Block>: CompatibleDigestItem<NumberFor<Block>>,
{
use futures::future::{self, Loop as FutureLoop};
use runtime_primitives::traits::Zero;
let chain_info = client.info()?;
let genesis_hash = chain_info.chain.genesis_hash;
let last_finalized = (
chain_info.chain.finalized_hash,
chain_info.chain.finalized_number,
);
let (last_round_number, last_state) = match client.backend().get_aux(LAST_COMPLETED_KEY)? {
None => (0, RoundState::genesis((genesis_hash, <NumberFor<Block>>::zero()))),
Some(raw) => <(u64, RoundState<Block::Hash, NumberFor<Block>>)>::decode(&mut &raw[..])
.ok_or_else(|| ::client::error::ErrorKind::Backend(
format!("Last GRANDPA round state kept in invalid format")
))?
};
// TODO [now]: attempt to load from disk.
let authority_set = SharedAuthoritySet::genesis(
@@ -754,23 +766,66 @@ pub fn run_grandpa<B, E, Block: BlockT, N>(
authority_set: authority_set.clone(),
};
let environment = Arc::new(Environment {
inner: client,
config,
voters,
network,
let (last_round_number, last_state) = match client.backend().get_aux(LAST_COMPLETED_KEY)? {
None => (0, RoundState::genesis((genesis_hash, <NumberFor<Block>>::zero()))),
Some(raw) => LastCompleted::decode(&mut &raw[..])
.ok_or_else(|| ::client::error::ErrorKind::Backend(
format!("Last GRANDPA round state kept in invalid format")
))?
};
let initial_environment = Arc::new(Environment {
inner: client.clone(),
config: config.clone(),
voters: Arc::new(voters),
network: network.clone(),
set_id: authority_set.set_id(),
authority_set,
authority_set: authority_set.clone(),
});
let voter = voter::Voter::new(
environment,
last_round_number,
last_state,
last_finalized,
);
let voters = future::loop_fn((initial_environment, last_round_number, last_state), move |params| {
let (env, last_round_number, last_state) = params;
let chain_info = match client.info() {
Ok(i) => i,
Err(e) => return future::Either::B(future::err(Error::Client(e))),
};
let work = voter.map_err(|e| warn!("GRANDPA Voter failed: {:?}", e));
let last_finalized = (
chain_info.chain.finalized_hash,
chain_info.chain.finalized_number,
);
let voter = voter::Voter::new(env, last_round_number, last_state, last_finalized);
let client = client.clone();
let config = config.clone();
let network = network.clone();
let authority_set = authority_set.clone();
future::Either::A(voter.then(move |res| match res {
// voters don't conclude naturally; this could reasonably be an error.
Ok(()) => Ok(FutureLoop::Break(())),
Err(ExitOrError::Error(e)) => Err(e),
Err(ExitOrError::AuthoritiesChanged(new)) => {
let env = Arc::new(Environment {
inner: client,
config,
voters: Arc::new(new.authorities.into_iter().collect()),
set_id: new.set_id,
network,
authority_set,
});
// start the new authority set using the block where the
// set changed (not where the signal happened!) as the base.
Ok(FutureLoop::Continue((
env,
0, // always start at round 0 when changing sets.
RoundState::genesis((new.canon_hash, new.canon_number)),
)))
}
}))
});
let work = voters.map_err(|e| warn!("GRANDPA Voter failed: {:?}", e));
Ok((work, block_import))
}
@@ -870,7 +925,6 @@ mod tests {
let (voter, _) = run_grandpa(
Config {
gossip_duration: TEST_GOSSIP_DURATION,
genesis_voters: voters.clone(),
local_key: Some(Arc::new(key.clone().into())),
},
client,
@@ -929,7 +983,6 @@ mod tests {
let (voter, _) = run_grandpa(
Config {
gossip_duration: TEST_GOSSIP_DURATION,
genesis_voters: voters.keys().cloned().collect(),
local_key,
},
client,