Fix handling of justifications (#2086)

* util: fork-tree: check predicate first while traversing tree

* core: sync: keep track of justifications sent to the import queue

* core: grandpa: verify authority set changes dependencies

* core: fork-tree: add more tests

* core: grandpa: extend enacts_standard_change tests
This commit is contained in:
André Silva
2019-03-22 19:07:24 +01:00
committed by Arkadiy Paronyan
parent 7469713dea
commit 25ec793e35
5 changed files with 204 additions and 49 deletions
@@ -358,17 +358,18 @@ impl<B: BlockT> BlockImporter<B> {
let success = self.justification_import.as_ref().map(|justification_import| {
justification_import.import_justification(hash, number, justification)
.map_err(|e| {
debug!("Justification import failed with {:?} for hash: {:?} number: {:?} coming from node: {:?}", e, hash, number, who);
debug!(target: "sync", "Justification import failed with {:?} for hash: {:?} number: {:?} coming from node: {:?}", e, hash, number, who);
e
}).is_ok()
}).unwrap_or(false);
if let Some(link) = self.link.as_ref() {
link.justification_imported(who, &hash, number, success);
}
}
fn handle_import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
trace!(target:"sync", "Scheduling {} blocks for import", blocks.len());
trace!(target: "sync", "Scheduling {} blocks for import", blocks.len());
self.worker_sender
.send(BlockImportWorkerMsg::ImportBlocks(origin, blocks))
.expect("1. This is holding a sender to the worker, 2. the worker should not quit while a sender is still held; qed");
@@ -423,7 +424,7 @@ impl<B: BlockT, V: 'static + Verifier<B>> BlockImportWorker<B, V> {
_ => Default::default(),
};
trace!(target:"sync", "Starting import of {} blocks {}", count, blocks_range);
trace!(target: "sync", "Starting import of {} blocks {}", count, blocks_range);
let mut results = vec![];
@@ -351,14 +351,18 @@ where
/// authority set change (without triggering it), ensuring that if there are
/// multiple changes in the same branch, finalizing this block won't
/// finalize past multiple transitions (i.e. transitions must be finalized
/// in-order). The given function `is_descendent_of` should return `true` if
/// the second hash (target) is a descendent of the first hash (base).
/// in-order). Returns `Some(true)` if the block being finalized enacts a
/// change that can be immediately applied, `Some(false)` if the block being
/// finalized enacts a change but it cannot be applied yet since there are
/// other dependent changes, and `None` if no change is enacted. The given
/// function `is_descendent_of` should return `true` if the second hash
/// (target) is a descendent of the first hash (base).
pub fn enacts_standard_change<F, E>(
&self,
finalized_hash: H,
finalized_number: N,
is_descendent_of: &F,
) -> Result<bool, fork_tree::Error<E>>
) -> Result<Option<bool>, fork_tree::Error<E>>
where F: Fn(&H, &H) -> Result<bool, E>,
E: std::error::Error,
{
@@ -659,21 +663,51 @@ mod tests {
delay_kind: DelayKind::Finalized,
};
let change_b = PendingChange {
next_authorities: set_a.clone(),
delay: 10,
canon_height: 20,
canon_hash: "hash_b",
delay_kind: DelayKind::Finalized,
};
authorities.add_pending_change(change_a.clone(), &static_is_descendent_of(false)).unwrap();
authorities.add_pending_change(change_b.clone(), &static_is_descendent_of(true)).unwrap();
let is_descendent_of = is_descendent_of(|base, hash| match (*base, *hash) {
("hash_a", "hash_b") => true,
("hash_a", "hash_d") => true,
("hash_a", "hash_e") => true,
("hash_b", "hash_d") => true,
("hash_b", "hash_e") => true,
("hash_a", "hash_c") => false,
("hash_b", "hash_c") => false,
_ => unreachable!(),
});
// "hash_c" won't finalize the existing change since it isn't a descendent
assert!(!authorities.enacts_standard_change("hash_c", 15, &is_descendent_of).unwrap());
// "hash_b" at depth 14 won't work either
assert!(!authorities.enacts_standard_change("hash_b", 14, &is_descendent_of).unwrap());
assert_eq!(
authorities.enacts_standard_change("hash_c", 15, &is_descendent_of).unwrap(),
None,
);
// "hash_d" at depth 14 won't work either
assert_eq!(
authorities.enacts_standard_change("hash_d", 14, &is_descendent_of).unwrap(),
None,
);
// but it should work at depth 15 (change height + depth)
assert!(authorities.enacts_standard_change("hash_b", 15, &is_descendent_of).unwrap());
assert_eq!(
authorities.enacts_standard_change("hash_d", 15, &is_descendent_of).unwrap(),
Some(true),
);
// finalizing "hash_e" at depth 20 will trigger change at "hash_b", but
// it can't be applied yet since "hash_a" must be applied first
assert_eq!(
authorities.enacts_standard_change("hash_e", 30, &is_descendent_of).unwrap(),
Some(false),
);
}
#[test]
@@ -709,7 +743,10 @@ mod tests {
// there's an effective change triggered at block 15 but not a standard one.
// so this should do nothing.
assert!(!authorities.enacts_standard_change("hash_c", 15, &static_is_descendent_of(true)).unwrap());
assert_eq!(
authorities.enacts_standard_change("hash_c", 15, &static_is_descendent_of(true)).unwrap(),
None,
);
// throw a standard change into the mix to prove that it's discarded
// for being on the same fork.
+34 -22
View File
@@ -116,7 +116,7 @@ impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> JustificationImport<Block>
}
enum AppliedChanges<H, N> {
Standard,
Standard(bool), // true if the change is ready to be applied (i.e. it's a root)
Forced(NewAuthoritySet<H, N>),
None,
}
@@ -124,7 +124,7 @@ enum AppliedChanges<H, N> {
impl<H, N> AppliedChanges<H, N> {
fn needs_justification(&self) -> bool {
match *self {
AppliedChanges::Standard => true,
AppliedChanges::Standard(_) => true,
AppliedChanges::Forced(_) | AppliedChanges::None => false,
}
}
@@ -345,8 +345,8 @@ impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> GrandpaBlockImport<B, E, Block, RA
.map_err(|e| ConsensusErrorKind::ClientImport(e.to_string()))
.map_err(ConsensusError::from)?;
if did_standard {
AppliedChanges::Standard
if let Some(root) = did_standard {
AppliedChanges::Standard(root)
} else {
AppliedChanges::None
}
@@ -358,7 +358,7 @@ impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> GrandpaBlockImport<B, E, Block, RA
if let Some((_, ref authorities)) = just_in_case {
let authorities_change = match applied_changes {
AppliedChanges::Forced(ref new) => Some(new),
AppliedChanges::Standard => None, // the change isn't actually applied yet.
AppliedChanges::Standard(_) => None, // the change isn't actually applied yet.
AppliedChanges::None => None,
};
@@ -405,7 +405,7 @@ impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> BlockImport<Block>
let pending_changes = self.make_authorities_changes(&mut block, hash)?;
// we don't want to finalize on `inner.import_block`
let justification = block.justification.take();
let mut justification = block.justification.take();
let enacts_consensus_change = new_authorities.is_some();
let import_result = self.inner.import_block(block, new_authorities);
@@ -435,23 +435,34 @@ impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> BlockImport<Block>
}
let needs_justification = applied_changes.needs_justification();
if let AppliedChanges::Forced(new) = applied_changes {
// NOTE: when we do a force change we are "discrediting" the old set so we
// ignore any justifications from them. this block may contain a justification
// which should be checked and imported below against the new authority
// triggered by this forced change. the new grandpa voter will start at the
// last median finalized block (which is before the block that enacts the
// change), full nodes syncing the chain will not be able to successfully
// import justifications for those blocks since their local authority set view
// is still of the set before the forced change was enacted, still after #1867
// they should import the block and discard the justification, and they will
// then request a justification from sync if it's necessary (which they should
// then be able to successfully validate).
let _ = self.send_voter_commands.unbounded_send(VoterCommand::ChangeAuthorities(new));
// we must clear all pending justifications requests, presumably they won't be
// finalized hence why this forced changes was triggered
imported_aux.clear_justification_requests = true;
match applied_changes {
AppliedChanges::Forced(new) => {
// NOTE: when we do a force change we are "discrediting" the old set so we
// ignore any justifications from them. this block may contain a justification
// which should be checked and imported below against the new authority
// triggered by this forced change. the new grandpa voter will start at the
// last median finalized block (which is before the block that enacts the
// change), full nodes syncing the chain will not be able to successfully
// import justifications for those blocks since their local authority set view
// is still of the set before the forced change was enacted, still after #1867
// they should import the block and discard the justification, and they will
// then request a justification from sync if it's necessary (which they should
// then be able to successfully validate).
let _ = self.send_voter_commands.unbounded_send(VoterCommand::ChangeAuthorities(new));
// we must clear all pending justifications requests, presumably they won't be
// finalized hence why this forced changes was triggered
imported_aux.clear_justification_requests = true;
},
AppliedChanges::Standard(false) => {
// we can't apply this change yet since there are other dependent changes that we
// need to apply first, drop any justification that might have been provided with
// the block to make sure we request them from `sync` which will ensure they'll be
// applied in-order.
justification.take();
},
_ => {},
}
if !needs_justification && !enacts_consensus_change {
@@ -481,6 +492,7 @@ impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> BlockImport<Block>
if enacts_consensus_change {
self.consensus_changes.lock().note_change((number, hash));
}
imported_aux.needs_justification = true;
}
}
+20 -1
View File
@@ -98,6 +98,7 @@ struct PendingJustifications<B: BlockT> {
pending_requests: VecDeque<PendingJustification<B>>,
peer_requests: HashMap<NodeIndex, PendingJustification<B>>,
previous_requests: HashMap<PendingJustification<B>, Vec<(NodeIndex, Instant)>>,
importing_requests: HashSet<PendingJustification<B>>,
}
impl<B: BlockT> PendingJustifications<B> {
@@ -107,6 +108,7 @@ impl<B: BlockT> PendingJustifications<B> {
pending_requests: VecDeque::new(),
peer_requests: HashMap::new(),
previous_requests: HashMap::new(),
importing_requests: HashSet::new(),
}
}
@@ -243,6 +245,16 @@ impl<B: BlockT> PendingJustifications<B> {
/// Queues a retry in case the import failed.
fn justification_import_result(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
let request = (hash, number);
if !self.importing_requests.remove(&request) {
debug!(target: "sync", "Got justification import result for unknown justification {:?} {:?} request.",
request.0,
request.1,
);
return;
};
if success {
if self.justifications.finalize_root(&request.0).is_none() {
warn!(target: "sync", "Imported justification for {:?} {:?} which isn't a root in the tree: {:?}",
@@ -250,6 +262,7 @@ impl<B: BlockT> PendingJustifications<B> {
request.1,
self.justifications.roots().collect::<Vec<_>>(),
);
return;
};
@@ -278,6 +291,7 @@ impl<B: BlockT> PendingJustifications<B> {
if let Some(request) = self.peer_requests.remove(&who) {
if let Some(justification) = justification {
import_queue.import_justification(who.clone(), request.0, request.1, justification);
self.importing_requests.insert(request);
return
}
@@ -285,6 +299,7 @@ impl<B: BlockT> PendingJustifications<B> {
.entry(request)
.or_insert(Vec::new())
.push((who, Instant::now()));
self.pending_requests.push_front(request);
}
}
@@ -299,7 +314,11 @@ impl<B: BlockT> PendingJustifications<B> {
) -> Result<(), fork_tree::Error<ClientError>>
where F: Fn(&B::Hash, &B::Hash) -> Result<bool, ClientError>
{
use std::collections::HashSet;
if self.importing_requests.contains(&(*best_finalized_hash, best_finalized_number)) {
// we imported this justification ourselves, so we should get back a response
// from the import queue through `justification_import_result`
return Ok(());
}
self.justifications.finalize(best_finalized_hash, best_finalized_number, &is_descendent_of)?;
+100 -14
View File
@@ -228,17 +228,19 @@ impl<H, N, V> ForkTree<H, N, V> where
/// Checks if any node in the tree is finalized by either finalizing the
/// node itself or a child node that's not in the tree, guaranteeing that
/// the node being finalized isn't a descendent of any of the node's
/// children. The given `predicate` is checked on the prospective finalized
/// root and must pass for finalization to occur. The given function
/// `is_descendent_of` should return `true` if the second hash (target) is a
/// descendent of the first hash (base).
/// children. Returns `Some(true)` if the node being finalized is a root,
/// `Some(false)` if the node being finalized is not a root, and `None` if
/// no node in the tree is finalized. The given `predicate` is checked on
/// the prospective finalized root and must pass for finalization to occur.
/// The given function `is_descendent_of` should return `true` if the second
/// hash (target) is a descendent of the first hash (base).
pub fn finalizes_any_with_descendent_if<F, P, E>(
&self,
hash: &H,
number: N,
is_descendent_of: &F,
predicate: P,
) -> Result<bool, Error<E>>
) -> Result<Option<bool>, Error<E>>
where E: std::error::Error,
F: Fn(&H, &H) -> Result<bool, E>,
P: Fn(&V) -> bool,
@@ -253,20 +255,20 @@ impl<H, N, V> ForkTree<H, N, V> where
// tree, if we find a valid node that passes the predicate then we must
// ensure that we're not finalizing past any of its child nodes.
for node in self.node_iter() {
if node.hash == *hash || is_descendent_of(&node.hash, hash)? {
if predicate(&node.data) {
if predicate(&node.data) {
if node.hash == *hash || is_descendent_of(&node.hash, hash)? {
for node in node.children.iter() {
if node.number <= number && is_descendent_of(&node.hash, &hash)? {
return Err(Error::UnfinalizedAncestor);
}
}
return Ok(true);
return Ok(Some(self.roots.iter().any(|root| root.hash == node.hash)));
}
}
}
Ok(false)
Ok(None)
}
/// Finalize a root in the tree by either finalizing the node itself or a
@@ -298,8 +300,8 @@ impl<H, N, V> ForkTree<H, N, V> where
// we're not finalizing past any children node.
let mut position = None;
for (i, root) in self.roots.iter().enumerate() {
if root.hash == *hash || is_descendent_of(&root.hash, hash)? {
if predicate(&root.data) {
if predicate(&root.data) {
if root.hash == *hash || is_descendent_of(&root.hash, hash)? {
for node in root.children.iter() {
if node.number <= number && is_descendent_of(&node.hash, &hash)? {
return Err(Error::UnfinalizedAncestor);
@@ -692,7 +694,19 @@ mod test {
&is_descendent_of,
|c| c.effective <= 2,
),
Ok(false),
Ok(None),
);
// finalizing "D" will finalize a block from the tree, but it can't be applied yet
// since it is not a root change
assert_eq!(
tree.finalizes_any_with_descendent_if(
&"D",
10,
&is_descendent_of,
|c| c.effective == 10,
),
Ok(Some(false)),
);
// finalizing "B" doesn't finalize "A0" since the predicate doesn't pass,
@@ -720,7 +734,7 @@ mod test {
&is_descendent_of,
|c| c.effective <= 5,
),
Ok(true),
Ok(Some(true)),
);
assert_eq!(
@@ -757,7 +771,7 @@ mod test {
&is_descendent_of,
|c| c.effective <= 100,
),
Ok(true),
Ok(Some(true)),
);
assert_eq!(
@@ -788,4 +802,76 @@ mod test {
],
);
}
#[test]
fn minimizes_calls_to_is_descendent_of() {
use std::sync::atomic::{AtomicUsize, Ordering};
let n_is_descendent_of_calls = AtomicUsize::new(0);
let is_descendent_of = |_: &&str, _: &&str| -> Result<bool, TestError> {
n_is_descendent_of_calls.fetch_add(1, Ordering::SeqCst);
Ok(true)
};
{
// Deep tree where we want to call `finalizes_any_with_descendent_if`. The
// search for the node should first check the predicate (which is cheaper) and
// only then call `is_descendent_of`
let mut tree = ForkTree::new();
let letters = vec!["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K"];
for (i, letter) in letters.iter().enumerate() {
tree.import::<_, TestError>(*letter, i, i, &|_, _| Ok(true)).unwrap();
}
// "L" is a descendent of "K", but the predicate will only pass for "K",
// therefore only one call to `is_descendent_of` should be made
assert_eq!(
tree.finalizes_any_with_descendent_if(
&"L",
11,
&is_descendent_of,
|i| *i == 10,
),
Ok(Some(false)),
);
assert_eq!(
n_is_descendent_of_calls.load(Ordering::SeqCst),
1,
);
}
n_is_descendent_of_calls.store(0, Ordering::SeqCst);
{
// Multiple roots in the tree where we want to call `finalize_with_descendent_if`.
// The search for the root node should first check the predicate (which is cheaper)
// and only then call `is_descendent_of`
let mut tree = ForkTree::new();
let letters = vec!["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K"];
for (i, letter) in letters.iter().enumerate() {
tree.import::<_, TestError>(*letter, i, i, &|_, _| Ok(false)).unwrap();
}
// "L" is a descendent of "K", but the predicate will only pass for "K",
// therefore only one call to `is_descendent_of` should be made
assert_eq!(
tree.finalize_with_descendent_if(
&"L",
11,
&is_descendent_of,
|i| *i == 10,
),
Ok(FinalizationResult::Changed(Some(10))),
);
assert_eq!(
n_is_descendent_of_calls.load(Ordering::SeqCst),
1,
);
}
}
}