Grandpa validator set handoff justification (#1190)

* core: make block justification optional

* runtime: update wasm binaries

* core: optionally pass justification on finalize_block

* finality-grandpa: add channel to trigger authority set changes

this will allow the `BlockImport` to trigger an authority set change when
importing a change block that provides a justification (when syncing)

* finality-grandpa: move finalize_block to free function

* finality-grandpa: add GrandpaOracle for auth set liveness checking

this will be used by `BlockImport` to check whether the authority set for a
given block is still live, if the authority set isn't live then importing a
change block requires a justification.

* finality-grandpa: store justification on finalized transition blocks

* finality-grandpa: check justification on authority set change blocks

* finality-grandpa: poll grandpa liveness oracle every 10 seconds

* finality-grandpa: spawn grandpa oracle in service setup

* core: support multiple subscriptions per consensus gossip topic

* finality-grandpa: create and verify justifications

* finality-grandpa: update to local branch of grandpa

* finality-grandpa: update to finality-grandpa v0.5.0

* finality-grandpa: move grandpa oracle code

* finality-grandpa: fix canonality check

* finality-grandpa: clean up error handling

* finality-grandpa: fix canonical_at_height

* finality-grandpa: fix tests

* runtime: update wasm binaries

* core: add tests for finalizing block with justification

* finality-grandpa: improve validation of justifications

* core: remove unused IncompleteJustification block import error

* core: test multiple subscribers for same consensus gossip topic

* Revert "finality-grandpa: improve validation of justifications"

This reverts commit 51eb2c58c2219801e876af6d6c9371bdd9ff2477.

* finality-grandpa: fix commit validation

* finality-grandpa: fix commit ancestry validation

* finality-grandpa: use grandpa v0.5.1

* finality-grandpa: add docs

* finality-grandpa: fix failing test

* finality-grandpa: only allow a pending authority set change per fork

* finality-grandpa: fix validator set transition test
This commit is contained in:
André Silva
2018-12-08 05:34:59 +00:00
committed by Gav Wood
parent da822276dd
commit e779eeb2ec
29 changed files with 1115 additions and 389 deletions
@@ -20,6 +20,7 @@ use parking_lot::RwLock;
use substrate_primitives::AuthorityId;
use std::cmp::Ord;
use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::Add;
use std::sync::Arc;
@@ -63,6 +64,11 @@ where
pub(crate) fn set_id(&self) -> u64 {
self.inner.read().set_id
}
/// Get the current authorities and their weights (for the current set ID).
pub(crate) fn current_authorities(&self) -> HashMap<AuthorityId, u64> {
self.inner.read().current_authorities.iter().cloned().collect()
}
}
impl<H, N> From<AuthoritySet<H, N>> for SharedAuthoritySet<H, N> {
@@ -109,8 +115,22 @@ where
N: Add<Output=N> + Ord + Clone + Debug,
H: Debug
{
/// Note an upcoming pending transition.
pub(crate) fn add_pending_change(&mut self, pending: PendingChange<H, N>) {
/// Note an upcoming pending transition. This makes sure that there isn't
/// already any pending change for the same chain. Multiple pending changes
/// are allowed but they must be signalled in different forks. The closure
/// should return an error if the pending change block is equal to or a
/// descendent of the given block.
pub(crate) fn add_pending_change<F, E: Debug>(
&mut self,
pending: PendingChange<H, N>,
is_equal_or_descendent_of: F,
) -> Result<(), E> where
F: Fn(&H) -> Result<(), E>,
{
for change in self.pending_changes.iter() {
is_equal_or_descendent_of(&change.canon_hash)?;
}
// ordered first by effective number and then by signal-block number.
let key = (pending.effective_number(), pending.canon_height.clone());
let idx = self.pending_changes
@@ -121,6 +141,8 @@ where
.unwrap_or_else(|i| i);
self.pending_changes.insert(idx, pending);
Ok(())
}
/// Inspect pending changes.
@@ -141,7 +163,7 @@ where
/// block where the set last changed.
pub(crate) fn apply_changes<F, E>(&mut self, just_finalized: N, mut canonical: F)
-> Result<Status<H, N>, E>
where F: FnMut(N) -> Result<H, E>
where F: FnMut(N) -> Result<Option<H>, E>
{
let mut status = Status {
changed: false,
@@ -156,30 +178,35 @@ where
// check if the block that signalled the change is canonical in
// our chain.
let canonical_at_height = canonical(change.canon_height.clone())?;
let canonical_hash = canonical(change.canon_height.clone())?;
let effective_hash = canonical(effective_number.clone())?;
debug!(target: "afg", "Evaluating potential set change at block {:?}. Our canonical hash is {:?}",
(&change.canon_height, &change.canon_hash), canonical_at_height);
(&change.canon_height, &change.canon_hash), canonical_hash);
if canonical_at_height == change.canon_hash {
// apply this change: make the set canonical
info!(target: "finality", "Applying authority set change scheduled at block #{:?}",
change.canon_height);
match (canonical_hash, effective_hash) {
(Some(canonical_hash), Some(effective_hash)) => {
if canonical_hash == change.canon_hash {
// apply this change: make the set canonical
info!(target: "finality", "Applying authority set change scheduled at block #{:?}",
change.canon_height);
self.current_authorities = change.next_authorities.clone();
self.set_id += 1;
self.current_authorities = change.next_authorities.clone();
self.set_id += 1;
status.new_set_block = Some((
canonical(effective_number.clone())?,
effective_number.clone(),
));
status.new_set_block = Some((
effective_hash,
effective_number.clone(),
));
// discard any signalled changes
// that happened before or equal to the effective number of the change.
self.pending_changes.iter()
.take_while(|c| c.canon_height <= effective_number)
.count()
} else {
1 // prune out this entry; it's no longer relevant.
// discard all signalled changes since they're
// necessarily from other forks
self.pending_changes.len()
} else {
1 // prune out this entry; it's no longer relevant.
}
},
_ => 1, // prune out this entry; it's no longer relevant.
}
}
};
@@ -191,6 +218,28 @@ where
Ok(status)
}
/// Check whether the given finalized block number enacts any authority set
/// change (without triggering it). Provide a closure that can be used to
/// check for the canonical block with a given number.
pub fn enacts_change<F, E>(&self, just_finalized: N, mut canonical: F)
-> Result<bool, E>
where F: FnMut(N) -> Result<Option<H>, E>
{
for change in self.pending_changes.iter() {
if change.effective_number() > just_finalized { break };
// check if the block that signalled the change is canonical in
// our chain.
match canonical(change.canon_height.clone())? {
Some(ref canonical_hash) if *canonical_hash == change.canon_hash =>
return Ok(true),
_ => (),
}
}
Ok(false)
}
}
/// A pending change to the authority set.
@@ -221,6 +270,10 @@ impl<H, N: Add<Output=N> + Clone> PendingChange<H, N> {
mod tests {
use super::*;
fn ignore_existing_changes<A>(_a: &A) -> Result<(), ::Error> {
Ok(())
}
#[test]
fn changes_sorted_in_correct_order() {
let mut authorities = AuthoritySet {
@@ -250,9 +303,9 @@ mod tests {
canon_hash: "hash_c",
};
authorities.add_pending_change(change_a.clone());
authorities.add_pending_change(change_b.clone());
authorities.add_pending_change(change_c.clone());
authorities.add_pending_change(change_a.clone(), ignore_existing_changes).unwrap();
authorities.add_pending_change(change_b.clone(), ignore_existing_changes).unwrap();
authorities.add_pending_change(change_c.clone(), ignore_existing_changes).unwrap();
assert_eq!(authorities.pending_changes, vec![change_a, change_c, change_b]);
}
@@ -282,15 +335,15 @@ mod tests {
canon_hash: "hash_b",
};
authorities.add_pending_change(change_a.clone());
authorities.add_pending_change(change_b.clone());
authorities.add_pending_change(change_a.clone(), ignore_existing_changes).unwrap();
authorities.add_pending_change(change_b.clone(), ignore_existing_changes).unwrap();
authorities.apply_changes(10, |_| Err(())).unwrap();
assert!(authorities.current_authorities.is_empty());
authorities.apply_changes(15, |n| match n {
5 => Ok("hash_a"),
15 => Ok("hash_15_canon"),
5 => Ok(Some("hash_a")),
15 => Ok(Some("hash_15_canon")),
_ => Err(()),
}).unwrap();
@@ -300,7 +353,7 @@ mod tests {
}
#[test]
fn apply_many_changes_at_once() {
fn disallow_multiple_changes_on_same_fork() {
let mut authorities = AuthoritySet {
current_authorities: Vec::new(),
set_id: 0,
@@ -318,11 +371,10 @@ mod tests {
canon_hash: "hash_a",
};
// will be ignored because it was signalled when change_a still pending.
let change_b = PendingChange {
next_authorities: set_b.clone(),
finalization_depth: 10,
canon_height: 15,
canon_height: 16,
canon_hash: "hash_b",
};
@@ -333,20 +385,50 @@ mod tests {
canon_hash: "hash_c",
};
authorities.add_pending_change(change_a.clone());
authorities.add_pending_change(change_b.clone());
authorities.add_pending_change(change_c.clone());
let is_equal_or_descendent_of = |base, block| -> Result<(), ()> {
match (base, block) {
("hash_a", "hash_b") => return Err(()),
("hash_a", "hash_c") => return Ok(()),
("hash_c", "hash_b") => return Ok(()),
_ => unreachable!(),
}
};
authorities.apply_changes(26, |n| match n {
5 => Ok("hash_a"),
15 => Ok("hash_b"),
16 => Ok("hash_c"),
26 => Ok("hash_26"),
authorities.add_pending_change(
change_a.clone(),
|base| is_equal_or_descendent_of(base, change_a.canon_hash),
).unwrap();
// change b is on the same chain has the unfinalized change a so it should error
assert!(
authorities.add_pending_change(
change_b.clone(),
|base| is_equal_or_descendent_of(base, change_b.canon_hash),
).is_err()
);
// change c is accepted because it's on a different fork
authorities.add_pending_change(
change_c.clone(),
|base| is_equal_or_descendent_of(base, change_c.canon_hash)
).unwrap();
authorities.apply_changes(15, |n| match n {
5 => Ok(Some("hash_a")),
15 => Ok(Some("hash_a15")),
_ => Err(()),
}).unwrap();
assert_eq!(authorities.current_authorities, set_c);
assert_eq!(authorities.set_id, 2); // has been bumped only twice
assert_eq!(authorities.current_authorities, set_a);
// pending change c has been removed since it was on a different fork
// and can no longer be enacted
assert!(authorities.pending_changes.is_empty());
// pending change b can now be added
authorities.add_pending_change(
change_b.clone(),
|base| is_equal_or_descendent_of(base, change_b.canon_hash),
).unwrap();
}
}
@@ -32,7 +32,7 @@ fn localized_payload<E: Encode>(round: u64, set_id: u64, message: &E) -> Vec<u8>
}
// check a message.
fn check_message_sig<Block: BlockT>(
pub(crate) fn check_message_sig<Block: BlockT>(
message: &Message<Block>,
id: &AuthorityId,
signature: &ed25519::Signature,
+614 -145
View File
@@ -84,7 +84,8 @@ extern crate parity_codec_derive;
use futures::prelude::*;
use futures::sync::mpsc;
use client::{
Client, error::Error as ClientError, backend::Backend, CallExecutor, BlockchainEvents
BlockchainEvents, CallExecutor, Client, backend::Backend,
error::Error as ClientError, error::ErrorKind as ClientErrorKind,
};
use client::blockchain::HeaderBackend;
use client::runtime_api::TaggedTransactionQueue;
@@ -96,14 +97,15 @@ use runtime_primitives::traits::{
use fg_primitives::GrandpaApi;
use runtime_primitives::generic::BlockId;
use substrate_primitives::{ed25519, H256, AuthorityId, Blake2Hasher};
use tokio::timer::Delay;
use tokio::timer::{Delay, Interval};
use grandpa::Error as GrandpaError;
use grandpa::{voter, round::State as RoundState, Equivocation, BlockNumberOps};
use network::{Service as NetworkService, ExHashT};
use network::consensus_gossip::{ConsensusMessage};
use std::collections::HashMap;
use parking_lot::Mutex;
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::sync::Arc;
use std::time::{Instant, Duration};
@@ -197,6 +199,12 @@ impl From<GrandpaError> for Error {
}
}
impl From<ClientError> for Error {
fn from(e: ClientError) -> Self {
Error::Client(e)
}
}
/// A handle to the network. This is generally implemented by providing some
/// handle to a gossip service or similar.
///
@@ -508,78 +516,8 @@ impl<B, E, Block: BlockT<Hash=H256>, N, RA> voter::Environment<Block::Hash, Numb
}
}
fn finalize_block(&self, hash: Block::Hash, number: NumberFor<Block>, _commit: Commit<Block>) -> Result<(), Self::Error> {
// ideally some handle to a synchronization oracle would be used
// to avoid unconditionally notifying.
if let Err(e) = self.inner.finalize_block(BlockId::Hash(hash), true) {
warn!(target: "afg", "Error applying finality to block {:?}: {:?}", (hash, number), e);
// we return without error because not being able to finalize (temporarily) is
// non-fatal.
return Ok(());
}
debug!(target: "afg", "Finalizing blocks up to ({:?}, {})", number, hash);
// lock must be held through writing to DB to avoid race
let mut authority_set = self.authority_set.inner().write();
let client = &self.inner;
let status = authority_set.apply_changes(number, |canon_number| {
client.block_hash_from_id(&BlockId::number(canon_number))
.map(|h| h.expect("given number always less than newly-finalized number; \
thus there is a block with that number finalized already; qed"))
})?;
if status.changed {
// write new authority set state to disk.
let encoded_set = authority_set.encode();
let write_result = if let Some((ref canon_hash, ref canon_number)) = status.new_set_block {
// we also overwrite the "last completed round" entry with a blank slate
// because from the perspective of the finality gadget, the chain has
// reset.
let round_state = RoundState::genesis((*canon_hash, *canon_number));
let last_completed: LastCompleted<_, _> = (0, round_state);
let encoded = last_completed.encode();
client.backend().insert_aux(
&[
(AUTHORITY_SET_KEY, &encoded_set[..]),
(LAST_COMPLETED_KEY, &encoded[..]),
],
&[]
)
} else {
client.backend().insert_aux(&[(AUTHORITY_SET_KEY, &encoded_set[..])], &[])
};
if let Err(e) = write_result {
warn!(target: "finality", "Failed to write updated authority set to disk. Bailing.");
warn!(target: "finality", "Node is in a potentially inconsistent state.");
return Err(e.into());
}
}
if let Some((canon_hash, canon_number)) = status.new_set_block {
// the authority set has changed.
let (new_id, set_ref) = authority_set.current();
if set_ref.len() > 16 {
info!("Applying GRANDPA set change to new set with {} authorities", set_ref.len());
} else {
info!("Applying GRANDPA set change to new set {:?}", set_ref);
}
Err(ExitOrError::AuthoritiesChanged(NewAuthoritySet {
canon_hash,
canon_number,
set_id: new_id,
authorities: set_ref.to_vec(),
}))
} else {
Ok(())
}
fn finalize_block(&self, hash: Block::Hash, number: NumberFor<Block>, round: u64, commit: Commit<Block>) -> Result<(), Self::Error> {
finalize_block(&*self.inner, &self.authority_set, hash, number, (round, commit).into())
}
fn round_commit_timer(&self) -> Self::Timer {
@@ -611,19 +549,344 @@ impl<B, E, Block: BlockT<Hash=H256>, N, RA> voter::Environment<Block::Hash, Numb
}
}
/// A GRANDPA justification for block finality, it includes a commit message and
/// an ancestry proof including all headers routing all precommit target blocks
/// to the commit target block. Due to the current voting strategy the precommit
/// targets should be the same as the commit target, since honest voters don't
/// vote past authority set change blocks.
///
/// This is meant to be stored in the db and passed around the network to other
/// nodes, and are used by syncing nodes to prove authority set handoffs.
#[derive(Encode, Decode)]
struct GrandpaJustification<Block: BlockT> {
round: u64,
commit: Commit<Block>,
votes_ancestries: Vec<Block::Header>,
}
impl<Block: BlockT<Hash=H256>> GrandpaJustification<Block> {
/// Create a GRANDPA justification from the given commit. This method
/// assumes the commit is valid and well-formed.
fn from_commit<B, E, RA>(
client: &Client<B, E, Block, RA>,
round: u64,
commit: Commit<Block>,
) -> Result<GrandpaJustification<Block>, Error> where
B: Backend<Block, Blake2Hasher>,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync,
RA: Send + Sync,
{
let mut votes_ancestries_hashes = HashSet::new();
let mut votes_ancestries = Vec::new();
let error = || {
let msg = "invalid precommits for target commit".to_string();
Err(Error::Client(ClientErrorKind::BadJustification(msg).into()))
};
for signed in commit.precommits.iter() {
let mut current_hash = signed.precommit.target_hash.clone();
loop {
if current_hash == commit.target_hash { break; }
match client.backend().blockchain().header(BlockId::Hash(current_hash))? {
Some(current_header) => {
if *current_header.number() <= commit.target_number {
return error();
}
let parent_hash = current_header.parent_hash().clone();
if votes_ancestries_hashes.insert(current_hash) {
votes_ancestries.push(current_header);
}
current_hash = parent_hash;
},
_ => return error(),
}
}
}
Ok(GrandpaJustification { round, commit, votes_ancestries })
}
/// Decode a GRANDPA justification and validate the commit and the votes'
/// ancestry proofs.
fn decode_and_verify(
encoded: Vec<u8>,
set_id: u64,
voters: &HashMap<AuthorityId, u64>,
) -> Result<GrandpaJustification<Block>, ClientError> where
NumberFor<Block>: grandpa::BlockNumberOps,
{
use grandpa::Chain;
let justification = match GrandpaJustification::decode(&mut &*encoded) {
Some(justification) => justification,
_ => {
let msg = "failed to decode grandpa justification".to_string();
return Err(ClientErrorKind::BadJustification(msg).into());
}
};
let ancestry_chain = AncestryChain::<Block>::new(&justification.votes_ancestries);
match grandpa::validate_commit(
&justification.commit,
voters,
None,
&ancestry_chain,
) {
Ok(Some(_)) => {},
_ => {
let msg = "invalid commit in grandpa justification".to_string();
return Err(ClientErrorKind::BadJustification(msg).into());
}
}
let mut visited_hashes = HashSet::new();
for signed in justification.commit.precommits.iter() {
if let Err(_) = communication::check_message_sig::<Block>(
&grandpa::Message::Precommit(signed.precommit.clone()),
&signed.id,
&signed.signature,
justification.round,
set_id,
) {
return Err(ClientErrorKind::BadJustification(
"invalid signature for precommit in grandpa justification".to_string()).into());
}
if justification.commit.target_hash == signed.precommit.target_hash {
continue;
}
match ancestry_chain.ancestry(justification.commit.target_hash, signed.precommit.target_hash) {
Ok(route) => {
// ancestry starts from parent hash but the precommit target hash has been visited
visited_hashes.insert(signed.precommit.target_hash);
for hash in route {
visited_hashes.insert(hash);
}
},
_ => {
return Err(ClientErrorKind::BadJustification(
"invalid precommit ancestry proof in grandpa justification".to_string()).into());
},
}
}
let ancestry_hashes = justification.votes_ancestries
.iter()
.map(|h: &Block::Header| h.hash())
.collect();
if visited_hashes != ancestry_hashes {
return Err(ClientErrorKind::BadJustification(
"invalid precommit ancestries in grandpa justification with unused headers".to_string()).into());
}
Ok(justification)
}
}
enum JustificationOrCommit<Block: BlockT> {
Justification(GrandpaJustification<Block>),
Commit((u64, Commit<Block>)),
}
impl<Block: BlockT> From<(u64, Commit<Block>)> for JustificationOrCommit<Block> {
fn from(commit: (u64, Commit<Block>)) -> JustificationOrCommit<Block> {
JustificationOrCommit::Commit(commit)
}
}
impl<Block: BlockT> From<GrandpaJustification<Block>> for JustificationOrCommit<Block> {
fn from(justification: GrandpaJustification<Block>) -> JustificationOrCommit<Block> {
JustificationOrCommit::Justification(justification)
}
}
/// Finalize the given block and apply any authority set changes. If an
/// authority set change is enacted then a justification is created (if not
/// given) and stored with the block when finalizing it.
fn finalize_block<B, Block: BlockT<Hash=H256>, E, RA>(
client: &Client<B, E, Block, RA>,
authority_set: &SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
hash: Block::Hash,
number: NumberFor<Block>,
justification_or_commit: JustificationOrCommit<Block>,
) -> Result<(), ExitOrError<Block::Hash, NumberFor<Block>>> where
B: Backend<Block, Blake2Hasher>,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync,
RA: Send + Sync,
{
// lock must be held through writing to DB to avoid race
let mut authority_set = authority_set.inner().write();
let status = authority_set.apply_changes(number, |canon_number| {
canonical_at_height(client, (hash, number), canon_number)
})?;
if status.changed {
// write new authority set state to disk.
let encoded_set = authority_set.encode();
let write_result = if let Some((ref canon_hash, ref canon_number)) = status.new_set_block {
// we also overwrite the "last completed round" entry with a blank slate
// because from the perspective of the finality gadget, the chain has
// reset.
let round_state = RoundState::genesis((*canon_hash, *canon_number));
let last_completed: LastCompleted<_, _> = (0, round_state);
let encoded = last_completed.encode();
client.backend().insert_aux(
&[
(AUTHORITY_SET_KEY, &encoded_set[..]),
(LAST_COMPLETED_KEY, &encoded[..]),
],
&[]
)
} else {
client.backend().insert_aux(&[(AUTHORITY_SET_KEY, &encoded_set[..])], &[])
};
if let Err(e) = write_result {
warn!(target: "finality", "Failed to write updated authority set to disk. Bailing.");
warn!(target: "finality", "Node is in a potentially inconsistent state.");
return Err(e.into());
}
}
// NOTE: this code assumes that honest voters will never vote past a
// transition block, thus we don't have to worry about the case where
// we have a transition with `effective_block = N`, but we finalize
// `N+1`. this assumption is required to make sure we store
// justifications for transition blocks which will be requested by
// syncing clients.
let justification = match justification_or_commit {
JustificationOrCommit::Justification(justification) => Some(justification.encode()),
JustificationOrCommit::Commit((round_number, commit)) =>
if status.new_set_block.is_some() {
let justification = GrandpaJustification::from_commit(
client,
round_number,
commit,
)?;
Some(justification.encode())
} else {
None
},
};
debug!(target: "afg", "Finalizing blocks up to ({:?}, {})", number, hash);
// ideally some handle to a synchronization oracle would be used
// to avoid unconditionally notifying.
client.finalize_block(BlockId::Hash(hash), justification, true).map_err(|e| {
warn!(target: "finality", "Error applying finality to block {:?}: {:?}", (hash, number), e);
warn!(target: "finality", "Node is in a potentially inconsistent state.");
e
})?;
if let Some((canon_hash, canon_number)) = status.new_set_block {
// the authority set has changed.
let (new_id, set_ref) = authority_set.current();
if set_ref.len() > 16 {
info!("Applying GRANDPA set change to new set with {} authorities", set_ref.len());
} else {
info!("Applying GRANDPA set change to new set {:?}", set_ref);
}
Err(ExitOrError::AuthoritiesChanged(NewAuthoritySet {
canon_hash,
canon_number,
set_id: new_id,
authorities: set_ref.to_vec(),
}))
} else {
Ok(())
}
}
/// An oracle for liveness checking of a GRANDPA authority set. This is used
/// when importing blocks, if the block enacts an authority set change then
/// either it must provide a justification or if the GRANDPA authority set is
/// still live then the block can be imported unjustified since the block will
/// still be finalized by GRANDPA in a future round. The current heuristic for
/// deciding whether an authority set is live is to check if there were any
/// recent commit messages on an unfiltered stream).
struct GrandpaOracle<Block: BlockT> {
unfiltered_commits_stream: Box<dyn Stream<Item=(u64, CompactCommit<Block>), Error=Error> + Send>,
last_commit_target: Option<(Instant, Block::Hash, NumberFor<Block>)>,
}
impl<Block: BlockT> GrandpaOracle<Block> {
fn new(stream: Box<dyn Stream<Item=(u64, CompactCommit<Block>), Error=Error> + Send>) -> GrandpaOracle<Block> {
GrandpaOracle {
unfiltered_commits_stream: stream,
last_commit_target: None,
}
}
fn poll(&mut self) {
while let Ok(Async::Ready(Some((_, commit)))) = self.unfiltered_commits_stream.poll() {
self.last_commit_target = Some((Instant::now(), commit.target_hash, commit.target_number));
}
}
fn is_live(&self) -> bool {
self.last_commit_target.map(|(instant, _, _)| {
instant.elapsed() < Duration::from_secs(30)
}).unwrap_or(false)
}
}
#[derive(Clone)]
struct SharedGrandpaOracle<Block: BlockT> {
inner: Arc<Mutex<Option<GrandpaOracle<Block>>>>,
}
impl<Block: BlockT> SharedGrandpaOracle<Block> {
fn empty() -> SharedGrandpaOracle<Block> {
SharedGrandpaOracle { inner: Arc::new(Mutex::new(None)) }
}
fn poll(&self) {
if let Some(inner) = self.inner.lock().as_mut() {
inner.poll();
}
}
fn is_live(&self) -> bool {
self.inner.lock()
.as_ref()
.map(|inner| inner.is_live())
.unwrap_or(false)
}
}
/// A block-import handler for GRANDPA.
///
/// This scans each imported block for signals of changing authority set.
/// If the block being imported enacts an authority set change then:
/// - If the current authority set is still live: we import the block
/// - Otherwise, the block must include a valid justification.
///
/// When using GRANDPA, the block import worker should be using this block import
/// object.
pub struct GrandpaBlockImport<B, E, Block: BlockT<Hash=H256>, RA, PRA> {
inner: Arc<Client<B, E, Block, RA>>,
authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
authority_set_change: mpsc::UnboundedSender<NewAuthoritySet<Block::Hash, NumberFor<Block>>>,
authority_set_oracle: SharedGrandpaOracle<Block>,
api: Arc<PRA>,
}
impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> BlockImport<Block>
for GrandpaBlockImport<B, E, Block, RA, PRA> where
NumberFor<Block>: grandpa::BlockNumberOps,
B: Backend<Block, Blake2Hasher> + 'static,
E: CallExecutor<Block, Blake2Hasher> + 'static + Clone + Send + Sync,
DigestFor<Block>: Encode,
@@ -638,43 +901,160 @@ impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> BlockImport<Block>
{
use authorities::PendingChange;
let maybe_change = self.api.runtime_api().grandpa_pending_change(
&BlockId::hash(*block.header.parent_hash()),
&block.header.digest().clone(),
)?;
// we don't want to finalize on `inner.import_block`
let justification = block.justification.take();
let number = block.header.number().clone();
let hash = block.post_header().hash();
let parent_hash = *block.header.parent_hash();
let digest = block.header.digest().clone();
let is_live = self.authority_set_oracle.is_live();
// when we update the authorities, we need to hold the lock
// until the block is written to prevent a race if we need to restore
// the old authority set on error.
let just_in_case = maybe_change.map(|change| {
let hash = block.post_header().hash();
let number = block.header.number().clone();
let mut authorities = self.authority_set.inner().write();
let old_set = authorities.clone();
authorities.add_pending_change(PendingChange {
next_authorities: change.next_authorities,
finalization_depth: change.delay,
canon_height: number,
canon_hash: hash,
});
block.auxiliary.push((AUTHORITY_SET_KEY.to_vec(), Some(authorities.encode())));
(old_set, authorities)
});
let result = self.inner.import_block(block, new_authorities);
if let Err(ref e) = result {
if let Some((old_set, mut authorities)) = just_in_case {
debug!(target: "afg", "Restoring old set after block import error: {:?}", e);
*authorities = old_set;
}
let import_result = self.inner.import_block(block, new_authorities)?;
if import_result != ImportResult::Queued {
return Ok(import_result);
}
result
let maybe_change = self.api.runtime_api().grandpa_pending_change(
&BlockId::hash(parent_hash),
&digest,
)?;
let is_equal_or_descendent_of = |base: &Block::Hash| -> Result<(), ClientError> {
let error = || {
Err(ClientErrorKind::Backend(
"invalid authority set change: multiple pending changes on the same chain".to_string()
).into())
};
if *base == hash { return error(); }
if *base == parent_hash { return error(); }
let tree_route = ::client::blockchain::tree_route(
self.inner.backend().blockchain(),
BlockId::Hash(parent_hash),
BlockId::Hash(*base),
)?;
if tree_route.common_block().hash == *base {
return error();
}
Ok(())
};
if let Some(change) = maybe_change {
let mut authorities = self.authority_set.inner().write();
authorities.add_pending_change(
PendingChange {
next_authorities: change.next_authorities,
finalization_depth: change.delay,
canon_height: number,
canon_hash: hash,
},
is_equal_or_descendent_of,
)?;
let encoded = authorities.encode();
self.inner.backend().insert_aux(&[(AUTHORITY_SET_KEY, &encoded[..])], &[])?;
};
let enacts_change = self.authority_set.inner().read().enacts_change(number, |canon_number| {
canonical_at_height(&self.inner, (hash, number), canon_number)
})?;
// a pending change is enacted by the given block, if the current
// grandpa authority set isn't live anymore the provided `ImportBlock`
// should include a justification for finalizing the block.
match justification {
Some(justification) => {
if enacts_change && !is_live {
let justification = GrandpaJustification::decode_and_verify(
justification,
self.authority_set.set_id(),
&self.authority_set.current_authorities(),
)?;
let result = finalize_block(
&*self.inner,
&self.authority_set,
hash,
number,
justification.into(),
);
match result {
Ok(_) => {
unreachable!("returns Ok when no authority set change should be enacted; \
verified previously that finalizing the current block enacts a change; \
qed;");
},
Err(ExitOrError::AuthoritiesChanged(new)) => {
debug!(target: "finality", "Imported justified block #{} that enacts authority set change, signalling voter.", number);
if let Err(_) = self.authority_set_change.unbounded_send(new) {
return Err(ClientErrorKind::Backend(
"imported and finalized change block but grandpa voter is no longer running".to_string()
).into());
}
},
Err(ExitOrError::Error(_)) => {
return Err(ClientErrorKind::Backend(
"imported change block but failed to finalize it, node may be in an inconsistent state".to_string()
).into());
},
}
}
},
None if enacts_change && !is_live => {
return Err(ClientErrorKind::BadJustification(
"missing justification for block that enacts authority set change".to_string()
).into());
},
_ => {}
}
Ok(import_result)
}
}
/// Using the given base get the block at the given height on this chain. The
/// target block must be an ancestor of base, therefore `height <= base.height`.
fn canonical_at_height<B, E, Block: BlockT<Hash=H256>, RA>(
client: &Client<B, E, Block, RA>,
base: (Block::Hash, NumberFor<Block>),
height: NumberFor<Block>,
) -> Result<Option<Block::Hash>, ClientError> where
B: Backend<Block, Blake2Hasher>,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync,
{
use runtime_primitives::traits::{One, Zero};
if height > base.1 {
return Ok(None);
}
if height == base.1 {
return Ok(Some(base.0));
}
let mut current = match client.header(&BlockId::Hash(base.0))? {
Some(header) => header,
_ => return Ok(None),
};
let mut steps = base.1 - height;
while steps > NumberFor::<Block>::zero() {
current = match client.header(&BlockId::Hash(*current.parent_hash()))? {
Some(header) => header,
_ => return Ok(None),
};
steps -= NumberFor::<Block>::one();
}
Ok(Some(current.hash()))
}
impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> Authorities<Block> for GrandpaBlockImport<B, E, Block, RA, PRA>
where
B: Backend<Block, Blake2Hasher> + 'static,
@@ -693,18 +1073,49 @@ where
pub struct LinkHalf<B, E, Block: BlockT<Hash=H256>, RA> {
client: Arc<Client<B, E, Block, RA>>,
authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
authority_set_change: mpsc::UnboundedReceiver<NewAuthoritySet<Block::Hash, NumberFor<Block>>>,
authority_set_oracle: SharedGrandpaOracle<Block>,
}
impl<B, E, Block: BlockT<Hash=H256>, RA> Clone for LinkHalf<B, E, Block, RA>
where
B: Backend<Block, Blake2Hasher> + 'static,
E: CallExecutor<Block, Blake2Hasher> + 'static + Clone + Send + Sync,
RA: TaggedTransactionQueue<Block>, // necessary for client to import `BlockImport`.
struct AncestryChain<Block: BlockT> {
ancestry: HashMap<Block::Hash, Block::Header>,
}
impl<Block: BlockT> AncestryChain<Block> {
fn new(ancestry: &[Block::Header]) -> AncestryChain<Block> {
let ancestry: HashMap<_, _> = ancestry
.iter()
.cloned()
.map(|h: Block::Header| (h.hash(), h))
.collect();
AncestryChain { ancestry }
}
}
impl<Block: BlockT> grandpa::Chain<Block::Hash, NumberFor<Block>> for AncestryChain<Block> where
NumberFor<Block>: grandpa::BlockNumberOps
{
fn clone(&self) -> Self {
LinkHalf {
client: self.client.clone(),
authority_set: self.authority_set.clone()
fn ancestry(&self, base: Block::Hash, block: Block::Hash) -> Result<Vec<Block::Hash>, GrandpaError> {
let mut route = Vec::new();
let mut current_hash = block;
loop {
if current_hash == base { break; }
match self.ancestry.get(&current_hash) {
Some(current_header) => {
current_hash = *current_header.parent_hash();
route.push(current_hash);
},
_ => return Err(GrandpaError::NotDescendent),
}
}
route.pop(); // remove the base
Ok(route)
}
fn best_chain_containing(&self, _block: Block::Hash) -> Option<(Block::Hash, NumberFor<Block>)> {
None
}
}
@@ -746,13 +1157,24 @@ pub fn block_import<B, E, Block: BlockT<Hash=H256>, RA, PRA>(
.into(),
};
let (authority_set_change_tx, authority_set_change_rx) = mpsc::unbounded();
let authority_set_oracle = SharedGrandpaOracle::empty();
Ok((
GrandpaBlockImport {
inner: client.clone(),
authority_set: authority_set.clone(),
authority_set_change: authority_set_change_tx,
authority_set_oracle: authority_set_oracle.clone(),
api
},
LinkHalf { client, authority_set },
LinkHalf {
client,
authority_set,
authority_set_change: authority_set_change_rx,
authority_set_oracle,
},
))
}
@@ -808,7 +1230,10 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
config: Config,
link: LinkHalf<B, E, Block, RA>,
network: N,
) -> ::client::error::Result<impl Future<Item=(),Error=()> + Send + 'static> where
) -> ::client::error::Result<(
impl Future<Item=(),Error=()> + Send + 'static,
impl Future<Item=(),Error=()> + Send + 'static,
)> where
Block::Hash: Ord,
B: Backend<Block, Blake2Hasher> + 'static,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + 'static,
@@ -821,7 +1246,20 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
use futures::future::{self, Loop as FutureLoop};
use runtime_primitives::traits::Zero;
let LinkHalf { client, authority_set } = link;
let LinkHalf {
client,
authority_set,
authority_set_change,
authority_set_oracle
} = link;
let oracle_work = {
let authority_set_oracle = authority_set_oracle.clone();
Interval::new(Instant::now(), Duration::from_secs(1))
.for_each(move |_| Ok(authority_set_oracle.poll()))
.map_err(|_| ())
};
let chain_info = client.info()?;
let genesis_hash = chain_info.chain.genesis_hash;
@@ -833,9 +1271,7 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
))?
};
let voters = authority_set.inner().read().current().1.iter()
.cloned()
.collect();
let voters = authority_set.current_authorities();
let initial_environment = Arc::new(Environment {
inner: client.clone(),
@@ -846,8 +1282,9 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
authority_set: authority_set.clone(),
});
let work = future::loop_fn((initial_environment, last_round_number, last_state), move |params| {
let (env, last_round_number, last_state) = params;
let initial_state = (initial_environment, last_round_number, last_state, authority_set_change.into_future());
let voter_work = future::loop_fn(initial_state, move |params| {
let (env, last_round_number, last_state, authority_set_change) = params;
debug!(target: "afg", "{}: Starting new voter with set ID {}", config.name(), env.set_id);
let chain_info = match client.info() {
@@ -867,6 +1304,14 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
&network,
);
let unfiltered_commits_stream = Box::new(::communication::checked_commit_stream::<Block, _>(
env.set_id,
network.commit_messages(env.set_id),
env.voters.clone(),
));
*authority_set_oracle.inner.lock() = Some(GrandpaOracle::new(unfiltered_commits_stream));
let voters = (*env.voters).clone();
let voter = voter::Voter::new(
@@ -881,30 +1326,54 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
let config = config.clone();
let network = network.clone();
let authority_set = authority_set.clone();
future::Either::A(voter.then(move |res| match res {
// voters don't conclude naturally; this could reasonably be an error.
Ok(()) => Ok(FutureLoop::Break(())),
Err(ExitOrError::Error(e)) => Err(e),
Err(ExitOrError::AuthoritiesChanged(new)) => {
let env = Arc::new(Environment {
inner: client,
config,
voters: Arc::new(new.authorities.into_iter().collect()),
set_id: new.set_id,
network,
authority_set,
});
// start the new authority set using the block where the
// set changed (not where the signal happened!) as the base.
Ok(FutureLoop::Continue((
env,
0, // always start at round 0 when changing sets.
RoundState::genesis((new.canon_hash, new.canon_number)),
)))
let trigger_authority_set_change = |new: NewAuthoritySet<_, _>, authority_set_change| {
let env = Arc::new(Environment {
inner: client,
config,
voters: Arc::new(new.authorities.into_iter().collect()),
set_id: new.set_id,
network,
authority_set,
});
// start the new authority set using the block where the
// set changed (not where the signal happened!) as the base.
Ok(FutureLoop::Continue((
env,
0, // always start at round 0 when changing sets.
RoundState::genesis((new.canon_hash, new.canon_number)),
authority_set_change,
)))
};
future::Either::A(voter.select2(authority_set_change).then(move |res| match res {
Ok(future::Either::A(((), _))) => {
// voters don't conclude naturally; this could reasonably be an error.
Ok(FutureLoop::Break(()))
},
Err(future::Either::B(_)) => {
// the `authority_set_change` stream should not fail.
Ok(FutureLoop::Break(()))
},
Ok(future::Either::B(((None, _), _))) => {
// the `authority_set_change` stream should never conclude since it's never closed.
Ok(FutureLoop::Break(()))
},
Err(future::Either::A((ExitOrError::Error(e), _))) => {
// return inner voter error
Err(e)
}
Ok(future::Either::B(((Some(new), authority_set_change), _))) => {
// authority set change triggered externally through the channel
trigger_authority_set_change(new, authority_set_change.into_future())
}
Err(future::Either::A((ExitOrError::AuthoritiesChanged(new), authority_set_change))) => {
// authority set change triggered internally by finalizing a change block
trigger_authority_set_change(new, authority_set_change)
},
}))
});
}).map_err(|e| warn!("GRANDPA Voter failed: {:?}", e));
Ok(work.map_err(|e| warn!("GRANDPA Voter failed: {:?}", e)))
Ok((voter_work, oracle_work))
}
+90 -46
View File
@@ -364,7 +364,7 @@ fn finalize_3_voters_no_observers() {
);
fn assert_send<T: Send>(_: &T) { }
let voter = run_grandpa(
let (voter, oracle) = run_grandpa(
Config {
gossip_duration: TEST_GOSSIP_DURATION,
local_key: Some(Arc::new(key.clone().into())),
@@ -376,6 +376,7 @@ fn finalize_3_voters_no_observers() {
assert_send(&voter);
runtime.spawn(oracle);
runtime.spawn(voter);
}
@@ -424,7 +425,7 @@ fn finalize_3_voters_1_observer() {
.take_while(|n| Ok(n.header.number() < &20))
.for_each(move |_| Ok(()))
);
let voter = run_grandpa(
let (voter, oracle) = run_grandpa(
Config {
gossip_duration: TEST_GOSSIP_DURATION,
local_key,
@@ -434,6 +435,7 @@ fn finalize_3_voters_1_observer() {
MessageRouting::new(net.clone(), peer_id),
).expect("all in order with client and network");
runtime.spawn(oracle);
runtime.spawn(voter);
}
@@ -476,59 +478,96 @@ fn transition_3_voters_twice_1_observer() {
let api = TestApi::new(genesis_voters);
let transitions = api.scheduled_changes.clone();
let add_transition = move |parent_hash, change| {
transitions.lock().insert(parent_hash, change);
};
let net = Arc::new(Mutex::new(GrandpaTestNet::new(api, 8)));
let mut net = GrandpaTestNet::new(api, 9);
let mut runtime = tokio::runtime::Runtime::new().unwrap();
// first 20 blocks: transition at 15, applied at 20.
{
net.peer(0).push_blocks(14, false);
net.peer(0).generate_blocks(1, BlockOrigin::File, |builder| {
let block = builder.bake().unwrap();
add_transition(*block.header.parent_hash(), ScheduledChange {
next_authorities: make_ids(peers_b),
delay: 4,
});
net.lock().peer(0).push_blocks(1, false);
net.lock().sync();
block
});
net.peer(0).push_blocks(5, false);
}
// at block 21 we do another transition, but this time instant.
// add more until we have 30.
{
net.peer(0).generate_blocks(1, BlockOrigin::File, |builder| {
let block = builder.bake().unwrap();
add_transition(*block.header.parent_hash(), ScheduledChange {
next_authorities: make_ids(peers_c),
delay: 0,
});
block
});
net.peer(0).push_blocks(9, false);
}
net.sync();
for (i, peer) in net.peers().iter().enumerate() {
assert_eq!(peer.client().info().unwrap().chain.best_number, 30,
"Peer #{} failed to sync", i);
for (i, peer) in net.lock().peers().iter().enumerate() {
assert_eq!(peer.client().info().unwrap().chain.best_number, 1,
"Peer #{} failed to sync", i);
let set_raw = peer.client().backend().get_aux(::AUTHORITY_SET_KEY).unwrap().unwrap();
let set = AuthoritySet::<Hash, BlockNumber>::decode(&mut &set_raw[..]).unwrap();
assert_eq!(set.current(), (0, make_ids(peers_a).as_slice()));
assert_eq!(set.pending_changes().len(), 2);
assert_eq!(set.pending_changes().len(), 0);
}
let net = Arc::new(Mutex::new(net));
let mut finality_notifications = Vec::new();
{
let net = net.clone();
let client = net.lock().peers[0].client().clone();
let transitions = transitions.clone();
let add_transition = move |parent_hash, change| {
transitions.lock().insert(parent_hash, change);
};
let peers_c = peers_c.clone();
let executor = runtime.executor().clone();
let mut runtime = current_thread::Runtime::new().unwrap();
// wait for blocks to be finalized before generating new ones
let block_production = client.finality_notification_stream()
.take_while(|n| Ok(n.header.number() < &30))
.for_each(move |n| {
match n.header.number() {
1 => {
// first 14 blocks.
net.lock().peer(0).push_blocks(13, false);
},
14 => {
// generate transition at block 15, applied at 20.
net.lock().peer(0).generate_blocks(1, BlockOrigin::File, |builder| {
let block = builder.bake().unwrap();
add_transition(*block.header.parent_hash(), ScheduledChange {
next_authorities: make_ids(peers_b),
delay: 4,
});
block
});
net.lock().peer(0).push_blocks(5, false);
},
20 => {
let net = net.clone();
let add_transition = add_transition.clone();
// at block 21 we do another transition, but this time instant.
// add more until we have 30.
let generate_blocks = move || {
net.lock().peer(0).generate_blocks(1, BlockOrigin::File, |builder| {
let block = builder.bake().unwrap();
add_transition(*block.header.parent_hash(), ScheduledChange {
next_authorities: make_ids(&peers_c),
delay: 0,
});
block
});
net.lock().peer(0).push_blocks(9, false);
};
// delay block generation for a bit for the liveness tracker to be
// able to update due to the authority set change
let delay_generate = Delay::new(Instant::now() + Duration::from_millis(5000))
.and_then(move |_| {
generate_blocks();
Ok(())
})
.map_err(|_| ());
executor.spawn(delay_generate);
},
_ => {},
}
Ok(())
});
runtime.spawn(block_production);
}
let mut finality_notifications = Vec::new();
let all_peers = peers_a.iter()
.chain(peers_b)
.chain(peers_c)
@@ -560,7 +599,7 @@ fn transition_3_voters_twice_1_observer() {
assert!(set.pending_changes().is_empty());
})
);
let voter = run_grandpa(
let (voter, oracle) = run_grandpa(
Config {
gossip_duration: TEST_GOSSIP_DURATION,
local_key,
@@ -570,6 +609,7 @@ fn transition_3_voters_twice_1_observer() {
MessageRouting::new(net.clone(), peer_id),
).expect("all in order with client and network");
runtime.spawn(oracle);
runtime.spawn(voter);
}
@@ -579,7 +619,11 @@ fn transition_3_voters_twice_1_observer() {
.map_err(|_| ());
let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL)
.for_each(move |_| { net.lock().route_until_complete(); Ok(()) })
.for_each(move |_| {
net.lock().send_import_notifications();
net.lock().sync();
Ok(())
})
.map(|_| ())
.map_err(|_| ());