Sync block justifications (#1410)

* core: sync protocol for justifications

* core: basic test for justification sync

* core: pass block number with justification

* grandpa: request justifications when importing change blocks

* core: pass finality notifications to chain sync

* core: require justifications for pending change blocks on start

* core: avoid requesting justifications from previous failed peers

* core: timeout block justification requests

* core: add some docs

* core: fix unused variables warning

* core: tick pending justifications fetch periodically

* grandpa: add test for syncing justifications

* core: early exit dispatch of pending justifications

* core: style fix

* core: grandpa: change logging level

* core: sync: add missing docs

* core: network: report peer on bad justification

* core: replace mem::replace with Option::take

* core: revert authority set changes on failed block finalization

* core: grandpa: add docs to import_justification

* core: warn on re-finalization of last finalized block

* core: only notify sync with last finality notification

* core: style fix

* core: add docs for PendingJustifications

* core: network: use BlockRequest messages for justification requests

* core: reference issues in todo comments

* core: grandpa: revert authority set changes on db

* core: grandpa: remove inconsistent state warning
This commit is contained in:
André Silva
2019-01-21 06:04:01 +00:00
committed by Gav Wood
parent 3ea681998a
commit 399cee310a
13 changed files with 747 additions and 140 deletions
+19 -2
View File
@@ -728,7 +728,11 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
// find tree route from last finalized to given block. // find tree route from last finalized to given block.
let last_finalized = self.backend.blockchain().last_finalized()?; let last_finalized = self.backend.blockchain().last_finalized()?;
if block == last_finalized { return Ok(()) } if block == last_finalized {
warn!("Possible safety violation: attempted to re-finalize last finalized block {:?} ", last_finalized);
return Ok(());
}
let route_from_finalized = crate::blockchain::tree_route( let route_from_finalized = crate::blockchain::tree_route(
self.backend.blockchain(), self.backend.blockchain(),
BlockId::Hash(last_finalized), BlockId::Hash(last_finalized),
@@ -1093,7 +1097,7 @@ impl<B, E, Block, RA> consensus::BlockImport<Block> for Client<B, E, Block, RA>
match self.backend.blockchain().status(BlockId::Hash(parent_hash)) { match self.backend.blockchain().status(BlockId::Hash(parent_hash)) {
Ok(blockchain::BlockStatus::InChain) => {}, Ok(blockchain::BlockStatus::InChain) => {},
Ok(blockchain::BlockStatus::Unknown) => return Ok(ImportResult::UnknownParent), Ok(blockchain::BlockStatus::Unknown) => return Ok(ImportResult::UnknownParent),
Err(e) => return Err(ConsensusErrorKind::ClientImport(e.to_string()).into()) Err(e) => return Err(ConsensusErrorKind::ClientImport(e.to_string()).into()),
} }
let import_headers = if post_digests.is_empty() { let import_headers = if post_digests.is_empty() {
@@ -1131,6 +1135,19 @@ impl<B, E, Block, RA> consensus::BlockImport<Block> for Client<B, E, Block, RA>
); );
result.map_err(|e| ConsensusErrorKind::ClientImport(e.to_string()).into()) result.map_err(|e| ConsensusErrorKind::ClientImport(e.to_string()).into())
} }
/// Import a block justification and finalize the block. The justification
/// isn't interpreted by the client and is assumed to have been validated
/// previously. The block is finalized unconditionally.
fn import_justification(
&self,
hash: Block::Hash,
_number: NumberFor<Block>,
justification: Justification,
) -> Result<(), Self::Error> {
self.finalize_block(BlockId::Hash(hash), Some(justification), true)
.map_err(|_| ConsensusErrorKind::InvalidJustification.into())
}
} }
impl<B, E, Block, RA> consensus::Authorities<Block> for Client<B, E, Block, RA> where impl<B, E, Block, RA> consensus::Authorities<Block> for Client<B, E, Block, RA> where
@@ -16,7 +16,7 @@
//! Block import helpers. //! Block import helpers.
use runtime_primitives::traits::{AuthorityIdFor, Block as BlockT, Header as HeaderT, DigestItemFor}; use runtime_primitives::traits::{AuthorityIdFor, Block as BlockT, DigestItemFor, Header as HeaderT, NumberFor};
use runtime_primitives::Justification; use runtime_primitives::Justification;
use std::borrow::Cow; use std::borrow::Cow;
@@ -33,6 +33,9 @@ pub enum ImportResult {
KnownBad, KnownBad,
/// Block parent is not in the chain. /// Block parent is not in the chain.
UnknownParent, UnknownParent,
/// Added to the import queue but must be justified
/// (usually required to safely enact consensus changes).
NeedsJustification,
} }
/// Block data origin. /// Block data origin.
@@ -140,9 +143,22 @@ impl<Block: BlockT> ImportBlock<Block> {
/// Block import trait. /// Block import trait.
pub trait BlockImport<B: BlockT> { pub trait BlockImport<B: BlockT> {
type Error: ::std::error::Error + Send + 'static; type Error: ::std::error::Error + Send + 'static;
/// Import a Block alongside the new authorities valid form this block forward
fn import_block(&self, /// Called by the import queue when it is started.
fn on_start(&self, _link: &::import_queue::Link<B>) { }
/// Import a Block alongside the new authorities valid from this block forward
fn import_block(
&self,
block: ImportBlock<B>, block: ImportBlock<B>,
new_authorities: Option<Vec<AuthorityIdFor<B>>> new_authorities: Option<Vec<AuthorityIdFor<B>>>,
) -> Result<ImportResult, Self::Error>; ) -> Result<ImportResult, Self::Error>;
/// Import a Block justification and finalize the given block.
fn import_justification(
&self,
hash: B::Hash,
number: NumberFor<B>,
justification: Justification,
) -> Result<(), Self::Error>;
} }
@@ -92,6 +92,8 @@ pub trait ImportQueue<B: BlockT>: Send + Sync {
fn is_importing(&self, hash: &B::Hash) -> bool; fn is_importing(&self, hash: &B::Hash) -> bool;
/// Import bunch of blocks. /// Import bunch of blocks.
fn import_blocks(&self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>); fn import_blocks(&self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>);
/// Import a block justification.
fn import_justification(&self, hash: B::Hash, number: NumberFor<B>, justification: Justification) -> bool;
} }
/// Import queue status. It isn't completely accurate. /// Import queue status. It isn't completely accurate.
@@ -161,6 +163,7 @@ impl<B: BlockT, V: 'static + Verifier<B>> ImportQueue<B> for BasicQueue<B, V> {
let verifier = self.verifier.clone(); let verifier = self.verifier.clone();
let block_import = self.block_import.clone(); let block_import = self.block_import.clone();
*self.handle.lock() = Some(::std::thread::Builder::new().name("ImportQueue".into()).spawn(move || { *self.handle.lock() = Some(::std::thread::Builder::new().name("ImportQueue".into()).spawn(move || {
block_import.on_start(&link);
import_thread(block_import, link, qdata, verifier) import_thread(block_import, link, qdata, verifier)
})?); })?);
Ok(()) Ok(())
@@ -218,6 +221,10 @@ impl<B: BlockT, V: 'static + Verifier<B>> ImportQueue<B> for BasicQueue<B, V> {
queue.push_back((origin, blocks)); queue.push_back((origin, blocks));
self.data.signal.notify_one(); self.data.signal.notify_one();
} }
fn import_justification(&self, hash: B::Hash, number: NumberFor<B>, justification: Justification) -> bool {
self.block_import.import_justification(hash, number, justification).is_ok()
}
} }
impl<B: BlockT, V: 'static + Verifier<B>> Drop for BasicQueue<B, V> { impl<B: BlockT, V: 'static + Verifier<B>> Drop for BasicQueue<B, V> {
@@ -279,6 +286,8 @@ fn import_thread<B: BlockT, L: Link<B>, V: Verifier<B>>(
pub trait Link<B: BlockT>: Send { pub trait Link<B: BlockT>: Send {
/// Block imported. /// Block imported.
fn block_imported(&self, _hash: &B::Hash, _number: NumberFor<B>) { } fn block_imported(&self, _hash: &B::Hash, _number: NumberFor<B>) { }
/// Request a justification for the given block.
fn request_justification(&self, _hash: &B::Hash, _number: NumberFor<B>) { }
/// Maintain sync. /// Maintain sync.
fn maintain_sync(&self) { } fn maintain_sync(&self) { }
/// Disconnect from peer. /// Disconnect from peer.
@@ -296,6 +305,8 @@ pub enum BlockImportResult<H: ::std::fmt::Debug + PartialEq, N: ::std::fmt::Debu
ImportedKnown(H, N), ImportedKnown(H, N),
/// Imported unknown block. /// Imported unknown block.
ImportedUnknown(H, N), ImportedUnknown(H, N),
/// Imported unjustified block that requires one.
ImportedUnjustified(H, N),
} }
/// Block import error. /// Block import error.
@@ -409,6 +420,10 @@ pub fn import_single_block<B: BlockT, V: Verifier<B>>(
trace!(target: "sync", "Block queued {}: {:?}", number, hash); trace!(target: "sync", "Block queued {}: {:?}", number, hash);
Ok(BlockImportResult::ImportedUnknown(hash, number)) Ok(BlockImportResult::ImportedUnknown(hash, number))
}, },
Ok(ImportResult::NeedsJustification) => {
trace!(target: "sync", "Block queued but requires justification {}: {:?}", number, hash);
Ok(BlockImportResult::ImportedUnjustified(hash, number))
},
Ok(ImportResult::UnknownParent) => { Ok(ImportResult::UnknownParent) => {
debug!(target: "sync", "Block with unknown parent {}: {:?}, parent: {:?}", number, hash, parent); debug!(target: "sync", "Block with unknown parent {}: {:?}, parent: {:?}", number, hash, parent);
Err(BlockImportError::UnknownParent) Err(BlockImportError::UnknownParent)
@@ -416,7 +431,7 @@ pub fn import_single_block<B: BlockT, V: Verifier<B>>(
Ok(ImportResult::KnownBad) => { Ok(ImportResult::KnownBad) => {
debug!(target: "sync", "Peer gave us a bad block {}: {:?}", number, hash); debug!(target: "sync", "Peer gave us a bad block {}: {:?}", number, hash);
Err(BlockImportError::BadBlock(peer)) //TODO: use persistent ID Err(BlockImportError::BadBlock(peer)) //TODO: use persistent ID
} },
Err(e) => { Err(e) => {
debug!(target: "sync", "Error importing block {}: {:?}: {:?}", number, hash, e); debug!(target: "sync", "Error importing block {}: {:?}: {:?}", number, hash, e);
Err(BlockImportError::Error) Err(BlockImportError::Error)
@@ -439,6 +454,11 @@ pub fn process_import_result<B: BlockT>(
link.block_imported(&hash, number); link.block_imported(&hash, number);
1 1
}, },
Ok(BlockImportResult::ImportedUnjustified(hash, number)) => {
link.block_imported(&hash, number);
link.request_justification(&hash, number);
1
},
Err(BlockImportError::IncompleteHeader(who)) => { Err(BlockImportError::IncompleteHeader(who)) => {
if let Some(peer) = who { if let Some(peer) = who {
link.useless_peer(peer, "Sent block with incomplete header to import"); link.useless_peer(peer, "Sent block with incomplete header to import");
@@ -146,7 +146,6 @@ where
} }
/// Inspect pending changes. /// Inspect pending changes.
#[cfg(test)]
pub(crate) fn pending_changes(&self) -> &[PendingChange<H, N>] { pub(crate) fn pending_changes(&self) -> &[PendingChange<H, N>] {
&self.pending_changes &self.pending_changes
} }
@@ -261,7 +260,7 @@ pub(crate) struct PendingChange<H, N> {
impl<H, N: Add<Output=N> + Clone> PendingChange<H, N> { impl<H, N: Add<Output=N> + Clone> PendingChange<H, N> {
/// Returns the effective number this change will be applied at. /// Returns the effective number this change will be applied at.
fn effective_number(&self) -> N { pub fn effective_number(&self) -> N {
self.canon_height.clone() + self.finalization_depth.clone() self.canon_height.clone() + self.finalization_depth.clone()
} }
} }
+219 -105
View File
@@ -92,6 +92,7 @@ use client::{
use client::blockchain::HeaderBackend; use client::blockchain::HeaderBackend;
use codec::{Encode, Decode}; use codec::{Encode, Decode};
use consensus_common::{BlockImport, Error as ConsensusError, ErrorKind as ConsensusErrorKind, ImportBlock, ImportResult, Authorities}; use consensus_common::{BlockImport, Error as ConsensusError, ErrorKind as ConsensusErrorKind, ImportBlock, ImportResult, Authorities};
use runtime_primitives::Justification;
use runtime_primitives::traits::{ use runtime_primitives::traits::{
NumberFor, Block as BlockT, Header as HeaderT, DigestFor, ProvideRuntimeApi, Hash as HashT, NumberFor, Block as BlockT, Header as HeaderT, DigestFor, ProvideRuntimeApi, Hash as HashT,
DigestItemFor, DigestItem, As, Zero, DigestItemFor, DigestItem, As, Zero,
@@ -315,7 +316,7 @@ impl<B, E, Block: BlockT<Hash=H256>, RA> BlockStatus<Block> for Arc<Client<B, E,
} }
/// Consensus-related data changes tracker. /// Consensus-related data changes tracker.
#[derive(Debug, Encode, Decode)] #[derive(Clone, Debug, Encode, Decode)]
struct ConsensusChanges<H, N> { struct ConsensusChanges<H, N> {
pending_changes: Vec<(N, H)>, pending_changes: Vec<(N, H)>,
} }
@@ -796,6 +797,13 @@ fn finalize_block<B, Block: BlockT<Hash=H256>, E, RA>(
{ {
// lock must be held through writing to DB to avoid race // lock must be held through writing to DB to avoid race
let mut authority_set = authority_set.inner().write(); let mut authority_set = authority_set.inner().write();
// TODO [andre]: clone only when changed (#1483)
let old_authority_set = authority_set.clone();
// needed in case there is an authority set change, used for reverting in
// case of error
let mut old_last_completed = None;
let mut consensus_changes = consensus_changes.lock(); let mut consensus_changes = consensus_changes.lock();
let status = authority_set.apply_changes(number, |canon_number| { let status = authority_set.apply_changes(number, |canon_number| {
canonical_at_height(client, (hash, number), canon_number) canonical_at_height(client, (hash, number), canon_number)
@@ -813,6 +821,8 @@ fn finalize_block<B, Block: BlockT<Hash=H256>, E, RA>(
let last_completed: LastCompleted<_, _> = (0, round_state); let last_completed: LastCompleted<_, _> = (0, round_state);
let encoded = last_completed.encode(); let encoded = last_completed.encode();
old_last_completed = Backend::get_aux(&**client.backend(), LAST_COMPLETED_KEY)?;
Backend::insert_aux( Backend::insert_aux(
&**client.backend(), &**client.backend(),
&[ &[
@@ -836,7 +846,13 @@ fn finalize_block<B, Block: BlockT<Hash=H256>, E, RA>(
// check if this is this is the first finalization of some consensus changes // check if this is this is the first finalization of some consensus changes
let (alters_consensus_changes, finalizes_consensus_changes) = consensus_changes let (alters_consensus_changes, finalizes_consensus_changes) = consensus_changes
.finalize((number, hash), |at_height| canonical_at_height(client, (hash, number), at_height))?; .finalize((number, hash), |at_height| canonical_at_height(client, (hash, number), at_height))?;
// holds the old consensus changes in case it is changed below, needed for
// reverting in case of failure
let mut old_consensus_changes = None;
if alters_consensus_changes { if alters_consensus_changes {
old_consensus_changes = Some(consensus_changes.clone());
let encoded = consensus_changes.encode(); let encoded = consensus_changes.encode();
let write_result = Backend::insert_aux(&**client.backend(), &[(CONSENSUS_CHANGES_KEY, &encoded[..])], &[]); let write_result = Backend::insert_aux(&**client.backend(), &[(CONSENSUS_CHANGES_KEY, &encoded[..])], &[]);
if let Err(e) = write_result { if let Err(e) = write_result {
@@ -847,75 +863,115 @@ fn finalize_block<B, Block: BlockT<Hash=H256>, E, RA>(
} }
} }
// NOTE: this code assumes that honest voters will never vote past a let aux = |authority_set: &authorities::AuthoritySet<Block::Hash, NumberFor<Block>>| {
// transition block, thus we don't have to worry about the case where // NOTE: this code assumes that honest voters will never vote past a
// we have a transition with `effective_block = N`, but we finalize // transition block, thus we don't have to worry about the case where
// `N+1`. this assumption is required to make sure we store // we have a transition with `effective_block = N`, but we finalize
// justifications for transition blocks which will be requested by // `N+1`. this assumption is required to make sure we store
// syncing clients. // justifications for transition blocks which will be requested by
let justification = match justification_or_commit { // syncing clients.
JustificationOrCommit::Justification(justification) => Some(justification.encode()), let justification = match justification_or_commit {
JustificationOrCommit::Commit((round_number, commit)) => { JustificationOrCommit::Justification(justification) => Some(justification.encode()),
let mut justification_required = JustificationOrCommit::Commit((round_number, commit)) => {
// justification is always required when block that enacts new authorities let mut justification_required =
// set is finalized // justification is always required when block that enacts new authorities
status.new_set_block.is_some() || // set is finalized
// justification is required when consensus changes are finalized status.new_set_block.is_some() ||
finalizes_consensus_changes; // justification is required when consensus changes are finalized
finalizes_consensus_changes;
// justification is required every N blocks to be able to prove blocks // justification is required every N blocks to be able to prove blocks
// finalization to remote nodes // finalization to remote nodes
if !justification_required { if !justification_required {
if let Some(justification_period) = justification_period { if let Some(justification_period) = justification_period {
let last_finalized_number = client.info()?.chain.finalized_number; let last_finalized_number = client.info()?.chain.finalized_number;
justification_required = (!last_finalized_number.is_zero() || justification_required =
number - last_finalized_number == justification_period) && (!last_finalized_number.is_zero() || number - last_finalized_number == justification_period) &&
(last_finalized_number / justification_period != number / justification_period); (last_finalized_number / justification_period != number / justification_period);
}
} }
}
if justification_required { if justification_required {
let justification = GrandpaJustification::from_commit( let justification = GrandpaJustification::from_commit(
client, client,
round_number, round_number,
commit, commit,
)?; )?;
Some(justification.encode()) Some(justification.encode())
} else {
None
}
},
};
debug!(target: "afg", "Finalizing blocks up to ({:?}, {})", number, hash);
// ideally some handle to a synchronization oracle would be used
// to avoid unconditionally notifying.
client.finalize_block(BlockId::Hash(hash), justification, true).map_err(|e| {
warn!(target: "finality", "Error applying finality to block {:?}: {:?}", (hash, number), e);
e
})?;
if let Some((canon_hash, canon_number)) = status.new_set_block {
// the authority set has changed.
let (new_id, set_ref) = authority_set.current();
if set_ref.len() > 16 {
info!("Applying GRANDPA set change to new set with {} authorities", set_ref.len());
} else { } else {
None info!("Applying GRANDPA set change to new set {:?}", set_ref);
} }
},
Err(ExitOrError::AuthoritiesChanged(NewAuthoritySet {
canon_hash,
canon_number,
set_id: new_id,
authorities: set_ref.to_vec(),
}))
} else {
Ok(())
}
}; };
debug!(target: "afg", "Finalizing blocks up to ({:?}, {})", number, hash); match aux(&authority_set) {
Err(ExitOrError::Error(err)) => {
debug!(target: "afg", "Reverting authority set and/or consensus changes after block finalization error: {:?}", err);
// ideally some handle to a synchronization oracle would be used let mut revert_aux = Vec::new();
// to avoid unconditionally notifying.
client.finalize_block(BlockId::Hash(hash), justification, true).map_err(|e| {
warn!(target: "finality", "Error applying finality to block {:?}: {:?}", (hash, number), e);
warn!(target: "finality", "Node is in a potentially inconsistent state.");
e
})?;
if let Some((canon_hash, canon_number)) = status.new_set_block { if status.changed {
// the authority set has changed. revert_aux.push((AUTHORITY_SET_KEY, old_authority_set.encode()));
let (new_id, set_ref) = authority_set.current(); if let Some(old_last_completed) = old_last_completed {
revert_aux.push((LAST_COMPLETED_KEY, old_last_completed));
}
if set_ref.len() > 16 { *authority_set = old_authority_set.clone();
info!("Applying GRANDPA set change to new set with {} authorities", set_ref.len()); }
} else {
info!("Applying GRANDPA set change to new set {:?}", set_ref);
}
Err(ExitOrError::AuthoritiesChanged(NewAuthoritySet { if let Some(old_consensus_changes) = old_consensus_changes {
canon_hash, revert_aux.push((CONSENSUS_CHANGES_KEY, old_consensus_changes.encode()));
canon_number,
set_id: new_id, *consensus_changes = old_consensus_changes;
authorities: set_ref.to_vec(), }
}))
} else { let write_result = Backend::insert_aux(
Ok(()) &**client.backend(),
revert_aux.iter().map(|(k, v)| (*k, &**v)).collect::<Vec<_>>().iter(),
&[],
);
if let Err(e) = write_result {
warn!(target: "finality", "Failed to revert consensus changes to disk. Bailing.");
warn!(target: "finality", "Node is in a potentially inconsistent state.");
return Err(e.into());
}
Err(ExitOrError::Error(err))
},
res => res,
} }
} }
@@ -949,6 +1005,33 @@ impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> BlockImport<Block>
{ {
type Error = ConsensusError; type Error = ConsensusError;
fn on_start(&self, link: &::consensus_common::import_queue::Link<Block>) {
let chain_info = match self.inner.info() {
Ok(info) => info.chain,
_ => return,
};
// request justifications for all pending changes for which change blocks have already been imported
for pending_change in self.authority_set.inner().read().pending_changes() {
if pending_change.effective_number() > chain_info.finalized_number &&
pending_change.effective_number() <= chain_info.best_number
{
let effective_block_hash = self.inner.best_containing(
pending_change.canon_hash,
Some(pending_change.effective_number()),
);
if let Ok(Some(hash)) = effective_block_hash {
if let Ok(Some(header)) = self.inner.header(&BlockId::Hash(hash)) {
if *header.number() == pending_change.effective_number() {
link.request_justification(&header.hash(), *header.number());
}
}
}
}
}
}
fn import_block(&self, mut block: ImportBlock<Block>, new_authorities: Option<Vec<Ed25519AuthorityId>>) fn import_block(&self, mut block: ImportBlock<Block>, new_authorities: Option<Vec<Ed25519AuthorityId>>)
-> Result<ImportResult, Self::Error> -> Result<ImportResult, Self::Error>
{ {
@@ -1045,52 +1128,7 @@ impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> BlockImport<Block>
match justification { match justification {
Some(justification) => { Some(justification) => {
let justification = GrandpaJustification::decode_and_verify( self.import_justification(hash, number, justification, enacts_change)?;
justification,
self.authority_set.set_id(),
&self.authority_set.current_authorities(),
);
let justification = match justification {
Err(e) => return Err(ConsensusErrorKind::ClientImport(e.to_string()).into()),
Ok(justification) => justification,
};
let result = finalize_block(
&*self.inner,
&self.authority_set,
&self.consensus_changes,
None,
hash,
number,
justification.into(),
);
match result {
Ok(_) => {
assert!(!enacts_change, "returns Ok when no authority set change should be enacted; qed;");
},
Err(ExitOrError::AuthoritiesChanged(new)) => {
assert!(
enacts_change,
"returns AuthoritiesChanged when authority set change should be enacted; qed;"
);
debug!(target: "finality", "Imported justified block #{} that enacts authority set change, signalling voter.", number);
if let Err(e) = self.authority_set_change.unbounded_send(new) {
return Err(ConsensusErrorKind::ClientImport(e.to_string()).into());
}
},
Err(ExitOrError::Error(e)) => {
match e {
Error::Grandpa(error) => return Err(ConsensusErrorKind::ClientImport(error.to_string()).into()),
Error::Network(error) => return Err(ConsensusErrorKind::ClientImport(error).into()),
Error::Blockchain(error) => return Err(ConsensusErrorKind::ClientImport(error).into()),
Error::Client(error) => return Err(ConsensusErrorKind::ClientImport(error.to_string()).into()),
Error::Timer(error) => return Err(ConsensusErrorKind::ClientImport(error.to_string()).into()),
}
},
}
}, },
None => { None => {
if enacts_change { if enacts_change {
@@ -1106,11 +1144,87 @@ impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> BlockImport<Block>
if enacts_consensus_change { if enacts_consensus_change {
self.consensus_changes.lock().note_change((number, hash)); self.consensus_changes.lock().note_change((number, hash));
} }
},
return Ok(ImportResult::NeedsJustification);
}
} }
Ok(import_result) Ok(import_result)
} }
fn import_justification(
&self,
hash: Block::Hash,
number: NumberFor<Block>,
justification: Justification,
) -> Result<(), Self::Error> {
self.import_justification(hash, number, justification, false)
}
}
impl<B, E, Block: BlockT<Hash=H256>, RA, PRA>
GrandpaBlockImport<B, E, Block, RA, PRA> where
NumberFor<Block>: grandpa::BlockNumberOps,
B: Backend<Block, Blake2Hasher> + 'static,
E: CallExecutor<Block, Blake2Hasher> + 'static + Clone + Send + Sync,
RA: Send + Sync,
{
/// Import a block justification and finalize the block.
///
/// If `enacts_change` is set to true, then finalizing this block *must*
/// enact an authority set change, the function will panic otherwise.
fn import_justification(
&self,
hash: Block::Hash,
number: NumberFor<Block>,
justification: Justification,
enacts_change: bool,
) -> Result<(), ConsensusError> {
let justification = GrandpaJustification::decode_and_verify(
justification,
self.authority_set.set_id(),
&self.authority_set.current_authorities(),
);
let justification = match justification {
Err(e) => return Err(ConsensusErrorKind::ClientImport(e.to_string()).into()),
Ok(justification) => justification,
};
let result = finalize_block(
&*self.inner,
&self.authority_set,
&self.consensus_changes,
None,
hash,
number,
justification.into(),
);
match result {
Err(ExitOrError::AuthoritiesChanged(new)) => {
info!(target: "finality", "Imported justification for block #{} that enacts authority set change, signalling voter.", number);
if let Err(e) = self.authority_set_change.unbounded_send(new) {
return Err(ConsensusErrorKind::ClientImport(e.to_string()).into());
}
},
Err(ExitOrError::Error(e)) => {
return Err(match e {
Error::Grandpa(error) => ConsensusErrorKind::ClientImport(error.to_string()),
Error::Network(error) => ConsensusErrorKind::ClientImport(error),
Error::Blockchain(error) => ConsensusErrorKind::ClientImport(error),
Error::Client(error) => ConsensusErrorKind::ClientImport(error.to_string()),
Error::Timer(error) => ConsensusErrorKind::ClientImport(error.to_string()),
}.into());
},
Ok(_) => {
assert!(!enacts_change, "returns Ok when no authority set change should be enacted; qed;");
},
}
Ok(())
}
} }
/// Using the given base get the block at the given height on this chain. The /// Using the given base get the block at the given height on this chain. The
+54 -2
View File
@@ -494,7 +494,7 @@ fn transition_3_voters_twice_1_observer() {
let transitions = api.scheduled_changes.clone(); let transitions = api.scheduled_changes.clone();
let net = Arc::new(Mutex::new(GrandpaTestNet::new(api, 8))); let net = Arc::new(Mutex::new(GrandpaTestNet::new(api, 8)));
let mut runtime = tokio::runtime::Runtime::new().unwrap(); let mut runtime = current_thread::Runtime::new().unwrap();
net.lock().peer(0).push_blocks(1, false); net.lock().peer(0).push_blocks(1, false);
net.lock().sync(); net.lock().sync();
@@ -619,6 +619,7 @@ fn transition_3_voters_twice_1_observer() {
let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL) let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL)
.for_each(move |_| { .for_each(move |_| {
net.lock().send_import_notifications(); net.lock().send_import_notifications();
net.lock().send_finality_notifications();
net.lock().sync(); net.lock().sync();
Ok(()) Ok(())
}) })
@@ -682,4 +683,55 @@ fn consensus_changes_works() {
changes.note_change((1, 1.into())); changes.note_change((1, 1.into()));
changes.note_change((1, 101.into())); changes.note_change((1, 101.into()));
assert_eq!(changes.finalize((10, 10.into()), |_| Ok(Some(1.into()))).unwrap(), (true, true)); assert_eq!(changes.finalize((10, 10.into()), |_| Ok(Some(1.into()))).unwrap(), (true, true));
} }
#[test]
fn sync_justifications_on_change_blocks() {
::env_logger::init();
let peers_a = &[Keyring::Alice, Keyring::Bob, Keyring::Charlie];
let peers_b = &[Keyring::Alice, Keyring::Bob];
let voters = make_ids(peers_b);
// 4 peers, 3 of them are authorities and participate in grandpa
let api = TestApi::new(voters);
let transitions = api.scheduled_changes.clone();
let mut net = GrandpaTestNet::new(api, 4);
// add 20 blocks
net.peer(0).push_blocks(20, false);
// at block 21 we do add a transition which is instant
net.peer(0).generate_blocks(1, BlockOrigin::File, |builder| {
let block = builder.bake().unwrap();
transitions.lock().insert(*block.header.parent_hash(), ScheduledChange {
next_authorities: make_ids(peers_b),
delay: 0,
});
block
});
// add more blocks on top of it (until we have 25)
net.peer(0).push_blocks(4, false);
net.sync();
for i in 0..4 {
assert_eq!(net.peer(i).client().info().unwrap().chain.best_number, 25,
"Peer #{} failed to sync", i);
}
let net = Arc::new(Mutex::new(net));
run_to_completion(25, net.clone(), peers_a);
// the first 3 peers are grandpa voters and therefore have already finalized
// block 21 and stored a justification
for i in 0..3 {
assert!(net.lock().peer(i).client().justification(&BlockId::Number(21)).unwrap().is_some());
}
// the last peer should get the justification by syncing from other peers
assert!(net.lock().peer(3).client().justification(&BlockId::Number(21)).unwrap().is_none());
while net.lock().peer(3).client().justification(&BlockId::Number(21)).unwrap().is_none() {
net.lock().sync_steps(100);
}
}
-1
View File
@@ -48,7 +48,6 @@ pub type BlockRequest<B> = generic::BlockRequest<
<<B as BlockT>::Header as HeaderT>::Number, <<B as BlockT>::Header as HeaderT>::Number,
>; >;
/// Type alias for using the BlockData type using block type parameters. /// Type alias for using the BlockData type using block type parameters.
pub type BlockData<B> = generic::BlockData< pub type BlockData<B> = generic::BlockData<
<B as BlockT>::Header, <B as BlockT>::Header,
+42 -17
View File
@@ -15,7 +15,7 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>. // along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::collections::{HashMap, HashSet, BTreeMap}; use std::collections::{HashMap, HashSet, BTreeMap};
use std::{mem, cmp}; use std::cmp;
use std::sync::Arc; use std::sync::Arc;
use std::time; use std::time;
use parking_lot::RwLock; use parking_lot::RwLock;
@@ -273,7 +273,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
let mut peers = self.context_data.peers.write(); let mut peers = self.context_data.peers.write();
if let Some(ref mut peer) = peers.get_mut(&who) { if let Some(ref mut peer) = peers.get_mut(&who) {
peer.request_timestamp = None; peer.request_timestamp = None;
match mem::replace(&mut peer.block_request, None) { match peer.block_request.take() {
Some(r) => r, Some(r) => r,
None => { None => {
io.report_peer(who, Severity::Bad("Unexpected response packet received from peer")); io.report_peer(who, Severity::Bad("Unexpected response packet received from peer"));
@@ -285,10 +285,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
return; return;
} }
}; };
if request.id != r.id { if request.id != r.id {
trace!(target: "sync", "Ignoring mismatched response packet from {} (expected {} got {})", who, request.id, r.id); trace!(target: "sync", "Ignoring mismatched response packet from {} (expected {} got {})", who, request.id, r.id);
return; return;
} }
self.on_block_response(io, who, request, r); self.on_block_response(io, who, request, r);
}, },
GenericMessage::BlockAnnounce(announce) => self.on_block_announce(io, who, announce), GenericMessage::BlockAnnounce(announce) => self.on_block_announce(io, who, announce),
@@ -330,7 +332,6 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
pub fn on_peer_disconnected(&self, io: &mut SyncIo, peer: NodeIndex) { pub fn on_peer_disconnected(&self, io: &mut SyncIo, peer: NodeIndex) {
trace!(target: "sync", "Disconnecting {}: {}", peer, io.peer_debug_info(peer)); trace!(target: "sync", "Disconnecting {}: {}", peer, io.peer_debug_info(peer));
// lock all the the peer lists so that add/remove peer events are in order // lock all the the peer lists so that add/remove peer events are in order
let mut sync = self.sync.write(); let mut sync = self.sync.write();
let mut spec = self.specialization.write(); let mut spec = self.specialization.write();
@@ -351,7 +352,15 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
} }
fn on_block_request(&self, io: &mut SyncIo, peer: NodeIndex, request: message::BlockRequest<B>) { fn on_block_request(&self, io: &mut SyncIo, peer: NodeIndex, request: message::BlockRequest<B>) {
trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?}", request.id, peer, request.from, request.to, request.max); trace!(target: "sync", "BlockRequest {} from {} with fields {:?}: from {:?} to {:?} max {:?}",
request.id,
peer,
request.fields,
request.from,
request.to,
request.max,
);
let mut blocks = Vec::new(); let mut blocks = Vec::new();
let mut id = match request.from { let mut id = match request.from {
message::FromBlock::Hash(h) => BlockId::Hash(h), message::FromBlock::Hash(h) => BlockId::Hash(h),
@@ -409,25 +418,36 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
trace!(target: "sync", "BlockResponse {} from {} with {} blocks{}", trace!(target: "sync", "BlockResponse {} from {} with {} blocks{}",
response.id, peer, response.blocks.len(), blocks_range); response.id, peer, response.blocks.len(), blocks_range);
// import_queue.import_blocks also acquires sync.write(); // TODO [andre]: move this logic to the import queue so that
// Break the cycle by doing these separately from the outside; // justifications are imported asynchronously (#1482)
let new_blocks = { if request.fields == message::BlockAttributes::JUSTIFICATION {
let mut sync = self.sync.write(); let mut sync = self.sync.write();
sync.on_block_data(&mut ProtocolContext::new(&self.context_data, io), peer, request, response) sync.on_block_justification_data(
}; &mut ProtocolContext::new(&self.context_data, io),
peer,
request,
response,
);
} else {
// import_queue.import_blocks also acquires sync.write();
// Break the cycle by doing these separately from the outside;
let new_blocks = {
let mut sync = self.sync.write();
sync.on_block_data(&mut ProtocolContext::new(&self.context_data, io), peer, request, response)
};
if let Some((origin, new_blocks)) = new_blocks { if let Some((origin, new_blocks)) = new_blocks {
let import_queue = self.sync.read().import_queue(); let import_queue = self.sync.read().import_queue();
import_queue.import_blocks(origin, new_blocks); import_queue.import_blocks(origin, new_blocks);
}
} }
} }
/// Perform time based maintenance. /// Perform time based maintenance.
pub fn tick(&self, io: &mut SyncIo) { pub fn tick(&self, io: &mut SyncIo) {
self.consensus_gossip.write().collect_garbage(|_| true); self.consensus_gossip.write().collect_garbage(|_| true);
self.maintain_peers(io); self.maintain_peers(io);
self.sync.write().tick(&mut ProtocolContext::new(&self.context_data, io));
self.on_demand.as_ref().map(|s| s.maintain_peers(io)); self.on_demand.as_ref().map(|s| s.maintain_peers(io));
} }
@@ -439,7 +459,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
let handshaking_peers = self.handshaking_peers.read(); let handshaking_peers = self.handshaking_peers.read();
for (who, timestamp) in peers.iter() for (who, timestamp) in peers.iter()
.filter_map(|(id, peer)| peer.request_timestamp.as_ref().map(|r| (id, r))) .filter_map(|(id, peer)| peer.request_timestamp.as_ref().map(|r| (id, r)))
.chain(handshaking_peers.iter()) { .chain(handshaking_peers.iter())
{
if (tick - *timestamp).as_secs() > REQUEST_TIMEOUT_SEC { if (tick - *timestamp).as_secs() > REQUEST_TIMEOUT_SEC {
trace!(target: "sync", "Timeout {}", who); trace!(target: "sync", "Timeout {}", who);
aborting.push(*who); aborting.push(*who);
@@ -648,6 +669,10 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
} }
} }
pub fn on_block_finalized(&self, _io: &mut SyncIo, hash: B::Hash, header: &B::Header) {
self.sync.write().block_finalized(&hash, *header.number());
}
fn on_remote_call_request(&self, io: &mut SyncIo, who: NodeIndex, request: message::RemoteCallRequest<B::Hash>) { fn on_remote_call_request(&self, io: &mut SyncIo, who: NodeIndex, request: message::RemoteCallRequest<B::Hash>) {
trace!(target: "sync", "Remote call request {} from {} ({} at {})", request.id, who, request.method, request.block); trace!(target: "sync", "Remote call request {} from {} ({} at {})", request.id, who, request.method, request.block);
let proof = match self.context_data.chain.execution_proof(&request.block, &request.method, &request.data) { let proof = match self.context_data.chain.execution_proof(&request.block, &request.method, &request.data) {
@@ -752,8 +777,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
} }
fn send_message<B: BlockT, H: ExHashT>(peers: &RwLock<HashMap<NodeIndex, Peer<B, H>>>, io: &mut SyncIo, who: NodeIndex, mut message: Message<B>) { fn send_message<B: BlockT, H: ExHashT>(peers: &RwLock<HashMap<NodeIndex, Peer<B, H>>>, io: &mut SyncIo, who: NodeIndex, mut message: Message<B>) {
match &mut message { match message {
&mut GenericMessage::BlockRequest(ref mut r) => { GenericMessage::BlockRequest(ref mut r) => {
let mut peers = peers.write(); let mut peers = peers.write();
if let Some(ref mut peer) = peers.get_mut(&who) { if let Some(ref mut peer) = peers.get_mut(&who) {
r.id = peer.next_request_id; r.id = peer.next_request_id;
+9
View File
@@ -94,6 +94,10 @@ impl<B: BlockT, E: ExecuteInContext<B>> Link<B> for NetworkLink<B, E> {
self.with_sync(|sync, _| sync.block_imported(&hash, number)) self.with_sync(|sync, _| sync.block_imported(&hash, number))
} }
fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) {
self.with_sync(|sync, protocol| sync.request_justification(hash, number, protocol))
}
fn maintain_sync(&self) { fn maintain_sync(&self) {
self.with_sync(|sync, protocol| sync.maintain_sync(protocol)) self.with_sync(|sync, protocol| sync.maintain_sync(protocol))
} }
@@ -174,6 +178,11 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Service<B, S,
self.handler.on_block_imported(&mut NetSyncIo::new(&self.network, self.protocol_id), hash, header) self.handler.on_block_imported(&mut NetSyncIo::new(&self.network, self.protocol_id), hash, header)
} }
/// Called when a new block is finalized by the client.
pub fn on_block_finalized(&self, hash: B::Hash, header: &B::Header) {
self.handler.on_block_finalized(&mut NetSyncIo::new(&self.network, self.protocol_id), hash, header)
}
/// Called when new transactons are imported by the client. /// Called when new transactons are imported by the client.
pub fn trigger_repropagate(&self) { pub fn trigger_repropagate(&self) {
self.handler.propagate_extrinsics(&mut NetSyncIo::new(&self.network, self.protocol_id)); self.handler.propagate_extrinsics(&mut NetSyncIo::new(&self.network, self.protocol_id));
+259 -3
View File
@@ -14,8 +14,9 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>. // along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::collections::HashMap; use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant};
use protocol::Context; use protocol::Context;
use network_libp2p::{Severity, NodeIndex}; use network_libp2p::{Severity, NodeIndex};
use client::{BlockStatus, ClientInfo}; use client::{BlockStatus, ClientInfo};
@@ -23,6 +24,7 @@ use consensus::BlockOrigin;
use consensus::import_queue::{ImportQueue, IncomingBlock}; use consensus::import_queue::{ImportQueue, IncomingBlock};
use client::error::Error as ClientError; use client::error::Error as ClientError;
use blocks::BlockCollection; use blocks::BlockCollection;
use runtime_primitives::Justification;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, NumberFor, Zero}; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, NumberFor, Zero};
use runtime_primitives::generic::BlockId; use runtime_primitives::generic::BlockId;
use message::{self, generic::Message as GenericMessage}; use message::{self, generic::Message as GenericMessage};
@@ -34,6 +36,8 @@ const MAX_BLOCKS_TO_REQUEST: usize = 128;
const MAX_IMPORTING_BLOCKS: usize = 2048; const MAX_IMPORTING_BLOCKS: usize = 2048;
// Number of blocks in the queue that prevents ancestry search. // Number of blocks in the queue that prevents ancestry search.
const MAJOR_SYNC_BLOCKS: usize = 5; const MAJOR_SYNC_BLOCKS: usize = 5;
// Time to wait before trying to get a justification from the same peer.
const JUSTIFICATION_RETRY_WAIT: Duration = Duration::from_secs(10);
struct PeerSync<B: BlockT> { struct PeerSync<B: BlockT> {
pub common_number: NumberFor<B>, pub common_number: NumberFor<B>,
@@ -48,6 +52,180 @@ enum PeerSyncState<B: BlockT> {
Available, Available,
DownloadingNew(NumberFor<B>), DownloadingNew(NumberFor<B>),
DownloadingStale(B::Hash), DownloadingStale(B::Hash),
DownloadingJustification(B::Hash),
}
/// Pending justification request for the given block (hash and number).
type PendingJustification<B> = (<B as BlockT>::Hash, NumberFor<B>);
/// Manages pending block justification requests.
struct PendingJustifications<B: BlockT> {
justifications: HashSet<PendingJustification<B>>,
pending_requests: VecDeque<PendingJustification<B>>,
peer_requests: HashMap<NodeIndex, PendingJustification<B>>,
previous_requests: HashMap<PendingJustification<B>, Vec<(NodeIndex, Instant)>>,
}
impl<B: BlockT> PendingJustifications<B> {
fn new() -> PendingJustifications<B> {
PendingJustifications {
justifications: HashSet::new(),
pending_requests: VecDeque::new(),
peer_requests: HashMap::new(),
previous_requests: HashMap::new(),
}
}
/// Dispatches all possible pending requests to the given peers. Peers are
/// filtered according to the current known best block (i.e. we won't send a
/// justification request for block #10 to a peer at block #2), and we also
/// throttle requests to the same peer if a previous justification request
/// yielded no results.
fn dispatch(&mut self, peers: &mut HashMap<NodeIndex, PeerSync<B>>, protocol: &mut Context<B>) {
if self.pending_requests.is_empty() {
return;
}
// clean up previous failed requests so we can retry again
for (_, requests) in self.previous_requests.iter_mut() {
requests.retain(|(_, instant)| instant.elapsed() < JUSTIFICATION_RETRY_WAIT);
}
let mut available_peers = peers.iter().filter_map(|(peer, sync)| {
// don't request to any peers that already have pending requests
if let PeerSyncState::Available = sync.state {
Some((*peer, sync.best_number))
} else {
None
}
}).collect::<VecDeque<_>>();
let mut last_peer = available_peers.back().map(|p| p.0);
let mut unhandled_requests = VecDeque::new();
loop {
let (peer, peer_best_number) = match available_peers.pop_front() {
Some(p) => p,
_ => break,
};
// only ask peers that have synced past the block number that we're
// asking the justification for and to whom we haven't already made
// the same request recently
let peer_eligible = {
let request = match self.pending_requests.front() {
Some(r) => r.clone(),
_ => break,
};
peer_best_number >= request.1 &&
!self.previous_requests
.get(&request)
.map(|requests| requests.iter().any(|i| i.0 == peer))
.unwrap_or(false)
};
if !peer_eligible {
available_peers.push_back((peer, peer_best_number));
// we tried all peers and none can answer this request
if Some(peer) == last_peer {
last_peer = available_peers.back().map(|p| p.0);
let request = self.pending_requests.pop_front()
.expect("verified to be Some in the beginning of the loop; qed");
unhandled_requests.push_back(request);
}
continue;
}
last_peer = available_peers.back().map(|p| p.0);
let request = self.pending_requests.pop_front()
.expect("verified to be Some in the beginning of the loop; qed");
self.peer_requests.insert(peer, request);
peers.get_mut(&peer)
.expect("peer was is taken from available_peers; available_peers is a subset of peers; qed")
.state = PeerSyncState::DownloadingJustification(request.0);
let request = message::generic::BlockRequest {
id: 0,
fields: message::BlockAttributes::JUSTIFICATION,
from: message::FromBlock::Hash(request.0),
to: None,
direction: message::Direction::Ascending,
max: Some(1),
};
protocol.send_message(peer, GenericMessage::BlockRequest(request));
}
self.pending_requests.append(&mut unhandled_requests);
}
/// Queue a justification request (without dispatching it).
fn queue_request(&mut self, justification: &PendingJustification<B>) {
if !self.justifications.insert(*justification) {
return;
}
self.pending_requests.push_back(*justification);
}
/// Retry any pending request if a peer disconnected.
fn peer_disconnected(&mut self, who: NodeIndex) {
if let Some(request) = self.peer_requests.remove(&who) {
self.pending_requests.push_front(request);
}
}
/// Processes the response for the request previously sent to the given
/// peer. Queues a retry in case the import fails or the given justification
/// was `None`.
fn on_response(
&mut self,
who: NodeIndex,
justification: Option<Justification>,
protocol: &mut Context<B>,
import_queue: &ImportQueue<B>,
) {
// we assume that the request maps to the given response, this is
// currently enforced by the outer network protocol before passing on
// messages to chain sync.
if let Some(request) = self.peer_requests.remove(&who) {
if let Some(justification) = justification {
if import_queue.import_justification(request.0, request.1, justification) {
self.justifications.remove(&request);
self.previous_requests.remove(&request);
return;
} else {
protocol.report_peer(
who,
Severity::Bad(&format!("Invalid justification provided for #{}", request.0)),
);
}
} else {
self.previous_requests
.entry(request)
.or_insert(Vec::new())
.push((who, Instant::now()));
}
self.pending_requests.push_front(request);
}
}
/// Removes any pending justification requests for blocks lower than the
/// given best finalized.
fn collect_garbage(&mut self, best_finalized: NumberFor<B>) {
self.justifications.retain(|(_, n)| *n > best_finalized);
self.pending_requests.retain(|(_, n)| *n > best_finalized);
self.peer_requests.retain(|_, (_, n)| *n > best_finalized);
self.previous_requests.retain(|(_, n), _| *n > best_finalized);
}
} }
/// Relay chain sync strategy. /// Relay chain sync strategy.
@@ -59,6 +237,7 @@ pub struct ChainSync<B: BlockT> {
best_queued_hash: B::Hash, best_queued_hash: B::Hash,
required_block_attributes: message::BlockAttributes, required_block_attributes: message::BlockAttributes,
import_queue: Arc<ImportQueue<B>>, import_queue: Arc<ImportQueue<B>>,
justifications: PendingJustifications<B>,
} }
/// Reported sync state. /// Reported sync state.
@@ -104,6 +283,7 @@ impl<B: BlockT> ChainSync<B> {
blocks: BlockCollection::new(), blocks: BlockCollection::new(),
best_queued_hash: info.best_queued_hash.unwrap_or(info.chain.best_hash), best_queued_hash: info.best_queued_hash.unwrap_or(info.chain.best_hash),
best_queued_number: info.best_queued_number.unwrap_or(info.chain.best_number), best_queued_number: info.best_queued_number.unwrap_or(info.chain.best_number),
justifications: PendingJustifications::new(),
required_block_attributes, required_block_attributes,
import_queue, import_queue,
} }
@@ -192,6 +372,7 @@ impl<B: BlockT> ChainSync<B> {
} }
} }
/// Handle new block data.
pub(crate) fn on_block_data( pub(crate) fn on_block_data(
&mut self, &mut self,
protocol: &mut Context<B>, protocol: &mut Context<B>,
@@ -269,10 +450,10 @@ impl<B: BlockT> ChainSync<B> {
} }
} }
}, },
PeerSyncState::Available => Vec::new(), PeerSyncState::Available | PeerSyncState::DownloadingJustification(..) => Vec::new(),
} }
} else { } else {
vec![] Vec::new()
}; };
let best_seen = self.best_seen_block(); let best_seen = self.best_seen_block();
@@ -288,17 +469,87 @@ impl<B: BlockT> ChainSync<B> {
Some((origin, new_blocks)) Some((origin, new_blocks))
} }
/// Handle new justification data.
pub(crate) fn on_block_justification_data(
&mut self,
protocol: &mut Context<B>,
who: NodeIndex,
_request: message::BlockRequest<B>,
response: message::BlockResponse<B>,
) {
if let Some(ref mut peer) = self.peers.get_mut(&who) {
if let PeerSyncState::DownloadingJustification(hash) = peer.state {
peer.state = PeerSyncState::Available;
// we only request one justification at a time
match response.blocks.into_iter().next() {
Some(response) => {
if hash != response.hash {
let msg = format!(
"Invalid block justification provided: requested: {:?} got: {:?}",
hash,
response.hash,
);
protocol.report_peer(who, Severity::Bad(&msg));
return;
}
self.justifications.on_response(
who,
response.justification,
protocol,
&*self.import_queue,
);
},
None => {
let msg = format!(
"Provided empty response for justification request {:?}",
hash,
);
protocol.report_peer(who, Severity::Useless(&msg));
return;
},
}
}
}
self.maintain_sync(protocol);
}
/// Maintain the sync process (download new blocks, fetch justifications).
pub fn maintain_sync(&mut self, protocol: &mut Context<B>) { pub fn maintain_sync(&mut self, protocol: &mut Context<B>) {
let peers: Vec<NodeIndex> = self.peers.keys().map(|p| *p).collect(); let peers: Vec<NodeIndex> = self.peers.keys().map(|p| *p).collect();
for peer in peers { for peer in peers {
self.download_new(protocol, peer); self.download_new(protocol, peer);
} }
self.justifications.dispatch(&mut self.peers, protocol);
} }
/// Called periodically to perform any time-based actions.
pub fn tick(&mut self, protocol: &mut Context<B>) {
self.justifications.dispatch(&mut self.peers, protocol);
}
/// Request a justification for the given block.
///
/// Queues a new justification request and tries to dispatch all pending requests.
pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>, protocol: &mut Context<B>) {
self.justifications.queue_request(&(*hash, number));
self.justifications.dispatch(&mut self.peers, protocol);
}
/// Notify about successful import of the given block.
pub fn block_imported(&mut self, hash: &B::Hash, number: NumberFor<B>) { pub fn block_imported(&mut self, hash: &B::Hash, number: NumberFor<B>) {
trace!(target: "sync", "Block imported successfully {} ({})", number, hash); trace!(target: "sync", "Block imported successfully {} ({})", number, hash);
} }
/// Notify about finalization of the given block.
pub fn block_finalized(&mut self, _hash: &B::Hash, number: NumberFor<B>) {
self.justifications.collect_garbage(number);
}
fn block_queued(&mut self, hash: &B::Hash, number: NumberFor<B>) { fn block_queued(&mut self, hash: &B::Hash, number: NumberFor<B>) {
if number > self.best_queued_number { if number > self.best_queued_number {
self.best_queued_number = number; self.best_queued_number = number;
@@ -324,6 +575,7 @@ impl<B: BlockT> ChainSync<B> {
self.block_queued(&hash, best_header.number().clone()) self.block_queued(&hash, best_header.number().clone())
} }
/// Handle new block announcement.
pub(crate) fn on_block_announce(&mut self, protocol: &mut Context<B>, who: NodeIndex, hash: B::Hash, header: &B::Header) { pub(crate) fn on_block_announce(&mut self, protocol: &mut Context<B>, who: NodeIndex, hash: B::Hash, header: &B::Header) {
let number = *header.number(); let number = *header.number();
if number <= As::sa(0) { if number <= As::sa(0) {
@@ -376,12 +628,15 @@ impl<B: BlockT> ChainSync<B> {
block_status(&*protocol.client(), &*self.import_queue, *hash).ok().map_or(false, |s| s != BlockStatus::Unknown) block_status(&*protocol.client(), &*self.import_queue, *hash).ok().map_or(false, |s| s != BlockStatus::Unknown)
} }
/// Handle disconnected peer.
pub(crate) fn peer_disconnected(&mut self, protocol: &mut Context<B>, who: NodeIndex) { pub(crate) fn peer_disconnected(&mut self, protocol: &mut Context<B>, who: NodeIndex) {
self.blocks.clear_peer_download(who); self.blocks.clear_peer_download(who);
self.peers.remove(&who); self.peers.remove(&who);
self.justifications.peer_disconnected(who);
self.maintain_sync(protocol); self.maintain_sync(protocol);
} }
/// Restart the sync process.
pub(crate) fn restart(&mut self, protocol: &mut Context<B>) { pub(crate) fn restart(&mut self, protocol: &mut Context<B>) {
self.import_queue.clear(); self.import_queue.clear();
self.blocks.clear(); self.blocks.clear();
@@ -403,6 +658,7 @@ impl<B: BlockT> ChainSync<B> {
} }
} }
/// Clear all sync data.
pub(crate) fn clear(&mut self) { pub(crate) fn clear(&mut self) {
self.blocks.clear(); self.blocks.clear();
self.peers.clear(); self.peers.clear();
+34 -1
View File
@@ -30,7 +30,7 @@ use client::block_builder::BlockBuilder;
use primitives::Ed25519AuthorityId; use primitives::Ed25519AuthorityId;
use runtime_primitives::Justification; use runtime_primitives::Justification;
use runtime_primitives::generic::BlockId; use runtime_primitives::generic::BlockId;
use runtime_primitives::traits::{Block as BlockT, Zero, Header, Digest, DigestItem, AuthorityIdFor}; use runtime_primitives::traits::{AuthorityIdFor, Block as BlockT, Digest, DigestItem, Header, NumberFor, Zero};
use io::SyncIo; use io::SyncIo;
use protocol::{Context, Protocol, ProtocolContext}; use protocol::{Context, Protocol, ProtocolContext};
use config::ProtocolConfig; use config::ProtocolConfig;
@@ -190,6 +190,15 @@ impl<B: 'static + BlockT, V: 'static + Verifier<B>> ImportQueue<B> for SyncImpor
fn import_blocks(&self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) { fn import_blocks(&self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
self.link.call(origin, blocks); self.link.call(origin, blocks);
} }
fn import_justification(
&self,
hash: B::Hash,
number: NumberFor<B>,
justification: Justification,
) -> bool {
self.block_import.import_justification(hash, number, justification).is_ok()
}
} }
struct DummyContextExecutor(Arc<Protocol<Block, DummySpecialization, Hash>>, Arc<RwLock<VecDeque<TestPacket>>>); struct DummyContextExecutor(Arc<Protocol<Block, DummySpecialization, Hash>>, Arc<RwLock<VecDeque<TestPacket>>>);
@@ -366,6 +375,13 @@ impl<V: 'static + Verifier<Block>, D> Peer<V, D> {
self.sync.on_block_imported(&mut TestIo::new(&self.queue, None), info.chain.best_hash, &header); self.sync.on_block_imported(&mut TestIo::new(&self.queue, None), info.chain.best_hash, &header);
} }
/// Send block finalization notifications.
pub fn send_finality_notifications(&self) {
let info = self.client.info().expect("In-mem client does not fail");
let header = self.client.header(&BlockId::Hash(info.chain.finalized_hash)).unwrap().unwrap();
self.sync.on_block_finalized(&mut TestIo::new(&self.queue, None), info.chain.finalized_hash, &header);
}
/// Restart sync for a peer. /// Restart sync for a peer.
fn restart_sync(&self) { fn restart_sync(&self) {
self.sync.abort(); self.sync.abort();
@@ -380,6 +396,14 @@ impl<V: 'static + Verifier<Block>, D> Peer<V, D> {
self.sync.gossip_consensus_message(&mut TestIo::new(&self.queue, None), topic, data, broadcast); self.sync.gossip_consensus_message(&mut TestIo::new(&self.queue, None), topic, data, broadcast);
} }
/// Request a justification for the given block.
#[cfg(test)]
fn request_justification(&self, hash: &::primitives::H256, number: NumberFor<Block>) {
self.executor.execute_in_context(|context| {
self.sync.sync().write().request_justification(hash, number, context);
})
}
/// Add blocks to the peer -- edit the block before adding /// Add blocks to the peer -- edit the block before adding
pub fn generate_blocks<F>(&self, count: usize, origin: BlockOrigin, mut edit_block: F) pub fn generate_blocks<F>(&self, count: usize, origin: BlockOrigin, mut edit_block: F)
where F: FnMut(BlockBuilder<Block, (), PeersClient>) -> Block where F: FnMut(BlockBuilder<Block, (), PeersClient>) -> Block
@@ -601,6 +625,15 @@ pub trait TestNetFactory: Sized {
}) })
} }
/// Send block finalization notifications for all peers.
fn send_finality_notifications(&mut self) {
self.mut_peers(|peers| {
for peer in peers {
peer.send_finality_notifications();
}
})
}
/// Restart sync for a peer. /// Restart sync for a peer.
fn restart_peer(&mut self, i: usize) { fn restart_peer(&mut self, i: usize) {
self.peers()[i].restart_sync(); self.peers()[i].restart_sync();
+21
View File
@@ -65,6 +65,27 @@ fn sync_no_common_longer_chain_fails() {
assert!(!net.peer(0).client.backend().blockchain().canon_equals_to(net.peer(1).client.backend().blockchain())); assert!(!net.peer(0).client.backend().blockchain().canon_equals_to(net.peer(1).client.backend().blockchain()));
} }
#[test]
fn sync_justifications() {
::env_logger::init().ok();
let mut net = TestNet::new(3);
net.peer(0).push_blocks(20, false);
net.sync();
// there's currently no justification for block #10
assert_eq!(net.peer(0).client().justification(&BlockId::Number(10)).unwrap(), None);
// we finalize block #10 for peer 0 with a justification
net.peer(0).client().finalize_block(BlockId::Number(10), Some(Vec::new()), true).unwrap();
let header = net.peer(1).client().header(&BlockId::Number(10)).unwrap().unwrap();
net.peer(1).request_justification(&header.hash().into(), 10);
net.sync();
assert_eq!(net.peer(0).client().justification(&BlockId::Number(10)).unwrap(), Some(Vec::new()));
assert_eq!(net.peer(1).client().justification(&BlockId::Number(10)).unwrap(), Some(Vec::new()));
}
#[test] #[test]
fn sync_after_fork_works() { fn sync_after_fork_works() {
::env_logger::init().ok(); ::env_logger::init().ok();
+48 -2
View File
@@ -78,7 +78,7 @@ pub use self::error::{ErrorKind, Error};
pub use config::{Configuration, Roles, PruningMode}; pub use config::{Configuration, Roles, PruningMode};
pub use chain_spec::{ChainSpec, Properties}; pub use chain_spec::{ChainSpec, Properties};
pub use transaction_pool::txpool::{self, Pool as TransactionPool, Options as TransactionPoolOptions, ChainApi, IntoPoolError}; pub use transaction_pool::txpool::{self, Pool as TransactionPool, Options as TransactionPoolOptions, ChainApi, IntoPoolError};
pub use client::ExecutionStrategy; pub use client::{ExecutionStrategy, FinalityNotifications};
pub use components::{ServiceFactory, FullBackend, FullExecutor, LightBackend, pub use components::{ServiceFactory, FullBackend, FullExecutor, LightBackend,
LightExecutor, Components, PoolApi, ComponentClient, LightExecutor, Components, PoolApi, ComponentClient,
@@ -222,6 +222,52 @@ impl<Components: components::Components> Service<Components> {
task_executor.spawn(events); task_executor.spawn(events);
} }
{
// finality notifications
let network = Arc::downgrade(&network);
// A utility stream that drops all ready items and only returns the last one.
// This is used to only keep the last finality notification and avoid
// overloading the sync module with notifications.
struct MostRecentNotification<B: network::BlockT>(futures::stream::Fuse<FinalityNotifications<B>>);
impl<B: network::BlockT> Stream for MostRecentNotification<B> {
type Item = <FinalityNotifications<B> as Stream>::Item;
type Error = <FinalityNotifications<B> as Stream>::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let mut last = None;
let last = loop {
match self.0.poll()? {
Async::Ready(Some(item)) => { last = Some(item) }
Async::Ready(None) => match last {
None => return Ok(Async::Ready(None)),
Some(last) => break last,
},
Async::NotReady => match last {
None => return Ok(Async::NotReady),
Some(last) => break last,
},
}
};
Ok(Async::Ready(Some(last)))
}
}
let events = MostRecentNotification(client.finality_notification_stream().fuse())
.for_each(move |notification| {
if let Some(network) = network.upgrade() {
network.on_block_finalized(notification.hash, &notification.header);
}
Ok(())
})
.select(exit.clone())
.then(|_| Ok(()));
task_executor.spawn(events);
}
{ {
// extrinsic notifications // extrinsic notifications
let network = Arc::downgrade(&network); let network = Arc::downgrade(&network);
@@ -554,7 +600,7 @@ macro_rules! construct_service_factory {
fn new_full( fn new_full(
config: $crate::FactoryFullConfiguration<Self>, config: $crate::FactoryFullConfiguration<Self>,
executor: $crate::TaskExecutor executor: $crate::TaskExecutor,
) -> Result<Self::FullService, $crate::Error> ) -> Result<Self::FullService, $crate::Error>
{ {
( $( $full_service_init )* ) (config, executor.clone()).and_then(|service| { ( $( $full_service_init )* ) (config, executor.clone()).and_then(|service| {