diff --git a/substrate/core/client/src/client.rs b/substrate/core/client/src/client.rs index 010912e75f..10e1570934 100644 --- a/substrate/core/client/src/client.rs +++ b/substrate/core/client/src/client.rs @@ -728,7 +728,11 @@ impl Client where // find tree route from last finalized to given block. let last_finalized = self.backend.blockchain().last_finalized()?; - if block == last_finalized { return Ok(()) } + if block == last_finalized { + warn!("Possible safety violation: attempted to re-finalize last finalized block {:?} ", last_finalized); + return Ok(()); + } + let route_from_finalized = crate::blockchain::tree_route( self.backend.blockchain(), BlockId::Hash(last_finalized), @@ -1093,7 +1097,7 @@ impl consensus::BlockImport for Client match self.backend.blockchain().status(BlockId::Hash(parent_hash)) { Ok(blockchain::BlockStatus::InChain) => {}, Ok(blockchain::BlockStatus::Unknown) => return Ok(ImportResult::UnknownParent), - Err(e) => return Err(ConsensusErrorKind::ClientImport(e.to_string()).into()) + Err(e) => return Err(ConsensusErrorKind::ClientImport(e.to_string()).into()), } let import_headers = if post_digests.is_empty() { @@ -1131,6 +1135,19 @@ impl consensus::BlockImport for Client ); result.map_err(|e| ConsensusErrorKind::ClientImport(e.to_string()).into()) } + + /// Import a block justification and finalize the block. The justification + /// isn't interpreted by the client and is assumed to have been validated + /// previously. The block is finalized unconditionally. + fn import_justification( + &self, + hash: Block::Hash, + _number: NumberFor, + justification: Justification, + ) -> Result<(), Self::Error> { + self.finalize_block(BlockId::Hash(hash), Some(justification), true) + .map_err(|_| ConsensusErrorKind::InvalidJustification.into()) + } } impl consensus::Authorities for Client where diff --git a/substrate/core/consensus/common/src/block_import.rs b/substrate/core/consensus/common/src/block_import.rs index a683bc3019..4170662721 100644 --- a/substrate/core/consensus/common/src/block_import.rs +++ b/substrate/core/consensus/common/src/block_import.rs @@ -16,7 +16,7 @@ //! Block import helpers. -use runtime_primitives::traits::{AuthorityIdFor, Block as BlockT, Header as HeaderT, DigestItemFor}; +use runtime_primitives::traits::{AuthorityIdFor, Block as BlockT, DigestItemFor, Header as HeaderT, NumberFor}; use runtime_primitives::Justification; use std::borrow::Cow; @@ -33,6 +33,9 @@ pub enum ImportResult { KnownBad, /// Block parent is not in the chain. UnknownParent, + /// Added to the import queue but must be justified + /// (usually required to safely enact consensus changes). + NeedsJustification, } /// Block data origin. @@ -140,9 +143,22 @@ impl ImportBlock { /// Block import trait. pub trait BlockImport { type Error: ::std::error::Error + Send + 'static; - /// Import a Block alongside the new authorities valid form this block forward - fn import_block(&self, + + /// Called by the import queue when it is started. + fn on_start(&self, _link: &::import_queue::Link) { } + + /// Import a Block alongside the new authorities valid from this block forward + fn import_block( + &self, block: ImportBlock, - new_authorities: Option>> + new_authorities: Option>>, ) -> Result; + + /// Import a Block justification and finalize the given block. + fn import_justification( + &self, + hash: B::Hash, + number: NumberFor, + justification: Justification, + ) -> Result<(), Self::Error>; } diff --git a/substrate/core/consensus/common/src/import_queue.rs b/substrate/core/consensus/common/src/import_queue.rs index 5264b29cef..3660a59b03 100644 --- a/substrate/core/consensus/common/src/import_queue.rs +++ b/substrate/core/consensus/common/src/import_queue.rs @@ -92,6 +92,8 @@ pub trait ImportQueue: Send + Sync { fn is_importing(&self, hash: &B::Hash) -> bool; /// Import bunch of blocks. fn import_blocks(&self, origin: BlockOrigin, blocks: Vec>); + /// Import a block justification. + fn import_justification(&self, hash: B::Hash, number: NumberFor, justification: Justification) -> bool; } /// Import queue status. It isn't completely accurate. @@ -161,6 +163,7 @@ impl> ImportQueue for BasicQueue { let verifier = self.verifier.clone(); let block_import = self.block_import.clone(); *self.handle.lock() = Some(::std::thread::Builder::new().name("ImportQueue".into()).spawn(move || { + block_import.on_start(&link); import_thread(block_import, link, qdata, verifier) })?); Ok(()) @@ -218,6 +221,10 @@ impl> ImportQueue for BasicQueue { queue.push_back((origin, blocks)); self.data.signal.notify_one(); } + + fn import_justification(&self, hash: B::Hash, number: NumberFor, justification: Justification) -> bool { + self.block_import.import_justification(hash, number, justification).is_ok() + } } impl> Drop for BasicQueue { @@ -279,6 +286,8 @@ fn import_thread, V: Verifier>( pub trait Link: Send { /// Block imported. fn block_imported(&self, _hash: &B::Hash, _number: NumberFor) { } + /// Request a justification for the given block. + fn request_justification(&self, _hash: &B::Hash, _number: NumberFor) { } /// Maintain sync. fn maintain_sync(&self) { } /// Disconnect from peer. @@ -296,6 +305,8 @@ pub enum BlockImportResult>( trace!(target: "sync", "Block queued {}: {:?}", number, hash); Ok(BlockImportResult::ImportedUnknown(hash, number)) }, + Ok(ImportResult::NeedsJustification) => { + trace!(target: "sync", "Block queued but requires justification {}: {:?}", number, hash); + Ok(BlockImportResult::ImportedUnjustified(hash, number)) + }, Ok(ImportResult::UnknownParent) => { debug!(target: "sync", "Block with unknown parent {}: {:?}, parent: {:?}", number, hash, parent); Err(BlockImportError::UnknownParent) @@ -416,7 +431,7 @@ pub fn import_single_block>( Ok(ImportResult::KnownBad) => { debug!(target: "sync", "Peer gave us a bad block {}: {:?}", number, hash); Err(BlockImportError::BadBlock(peer)) //TODO: use persistent ID - } + }, Err(e) => { debug!(target: "sync", "Error importing block {}: {:?}: {:?}", number, hash, e); Err(BlockImportError::Error) @@ -439,6 +454,11 @@ pub fn process_import_result( link.block_imported(&hash, number); 1 }, + Ok(BlockImportResult::ImportedUnjustified(hash, number)) => { + link.block_imported(&hash, number); + link.request_justification(&hash, number); + 1 + }, Err(BlockImportError::IncompleteHeader(who)) => { if let Some(peer) = who { link.useless_peer(peer, "Sent block with incomplete header to import"); diff --git a/substrate/core/finality-grandpa/src/authorities.rs b/substrate/core/finality-grandpa/src/authorities.rs index ca9bd61a88..d470ee82e3 100644 --- a/substrate/core/finality-grandpa/src/authorities.rs +++ b/substrate/core/finality-grandpa/src/authorities.rs @@ -146,7 +146,6 @@ where } /// Inspect pending changes. - #[cfg(test)] pub(crate) fn pending_changes(&self) -> &[PendingChange] { &self.pending_changes } @@ -261,7 +260,7 @@ pub(crate) struct PendingChange { impl + Clone> PendingChange { /// Returns the effective number this change will be applied at. - fn effective_number(&self) -> N { + pub fn effective_number(&self) -> N { self.canon_height.clone() + self.finalization_depth.clone() } } diff --git a/substrate/core/finality-grandpa/src/lib.rs b/substrate/core/finality-grandpa/src/lib.rs index 54833f2880..1e3c307cb5 100644 --- a/substrate/core/finality-grandpa/src/lib.rs +++ b/substrate/core/finality-grandpa/src/lib.rs @@ -92,6 +92,7 @@ use client::{ use client::blockchain::HeaderBackend; use codec::{Encode, Decode}; use consensus_common::{BlockImport, Error as ConsensusError, ErrorKind as ConsensusErrorKind, ImportBlock, ImportResult, Authorities}; +use runtime_primitives::Justification; use runtime_primitives::traits::{ NumberFor, Block as BlockT, Header as HeaderT, DigestFor, ProvideRuntimeApi, Hash as HashT, DigestItemFor, DigestItem, As, Zero, @@ -315,7 +316,7 @@ impl, RA> BlockStatus for Arc { pending_changes: Vec<(N, H)>, } @@ -796,6 +797,13 @@ fn finalize_block, E, RA>( { // lock must be held through writing to DB to avoid race let mut authority_set = authority_set.inner().write(); + + // TODO [andre]: clone only when changed (#1483) + let old_authority_set = authority_set.clone(); + // needed in case there is an authority set change, used for reverting in + // case of error + let mut old_last_completed = None; + let mut consensus_changes = consensus_changes.lock(); let status = authority_set.apply_changes(number, |canon_number| { canonical_at_height(client, (hash, number), canon_number) @@ -813,6 +821,8 @@ fn finalize_block, E, RA>( let last_completed: LastCompleted<_, _> = (0, round_state); let encoded = last_completed.encode(); + old_last_completed = Backend::get_aux(&**client.backend(), LAST_COMPLETED_KEY)?; + Backend::insert_aux( &**client.backend(), &[ @@ -836,7 +846,13 @@ fn finalize_block, E, RA>( // check if this is this is the first finalization of some consensus changes let (alters_consensus_changes, finalizes_consensus_changes) = consensus_changes .finalize((number, hash), |at_height| canonical_at_height(client, (hash, number), at_height))?; + + // holds the old consensus changes in case it is changed below, needed for + // reverting in case of failure + let mut old_consensus_changes = None; if alters_consensus_changes { + old_consensus_changes = Some(consensus_changes.clone()); + let encoded = consensus_changes.encode(); let write_result = Backend::insert_aux(&**client.backend(), &[(CONSENSUS_CHANGES_KEY, &encoded[..])], &[]); if let Err(e) = write_result { @@ -847,75 +863,115 @@ fn finalize_block, E, RA>( } } - // NOTE: this code assumes that honest voters will never vote past a - // transition block, thus we don't have to worry about the case where - // we have a transition with `effective_block = N`, but we finalize - // `N+1`. this assumption is required to make sure we store - // justifications for transition blocks which will be requested by - // syncing clients. - let justification = match justification_or_commit { - JustificationOrCommit::Justification(justification) => Some(justification.encode()), - JustificationOrCommit::Commit((round_number, commit)) => { - let mut justification_required = - // justification is always required when block that enacts new authorities - // set is finalized - status.new_set_block.is_some() || - // justification is required when consensus changes are finalized - finalizes_consensus_changes; + let aux = |authority_set: &authorities::AuthoritySet>| { + // NOTE: this code assumes that honest voters will never vote past a + // transition block, thus we don't have to worry about the case where + // we have a transition with `effective_block = N`, but we finalize + // `N+1`. this assumption is required to make sure we store + // justifications for transition blocks which will be requested by + // syncing clients. + let justification = match justification_or_commit { + JustificationOrCommit::Justification(justification) => Some(justification.encode()), + JustificationOrCommit::Commit((round_number, commit)) => { + let mut justification_required = + // justification is always required when block that enacts new authorities + // set is finalized + status.new_set_block.is_some() || + // justification is required when consensus changes are finalized + finalizes_consensus_changes; - // justification is required every N blocks to be able to prove blocks - // finalization to remote nodes - if !justification_required { - if let Some(justification_period) = justification_period { - let last_finalized_number = client.info()?.chain.finalized_number; - justification_required = (!last_finalized_number.is_zero() || - number - last_finalized_number == justification_period) && - (last_finalized_number / justification_period != number / justification_period); + // justification is required every N blocks to be able to prove blocks + // finalization to remote nodes + if !justification_required { + if let Some(justification_period) = justification_period { + let last_finalized_number = client.info()?.chain.finalized_number; + justification_required = + (!last_finalized_number.is_zero() || number - last_finalized_number == justification_period) && + (last_finalized_number / justification_period != number / justification_period); + } } - } - if justification_required { - let justification = GrandpaJustification::from_commit( - client, - round_number, - commit, - )?; + if justification_required { + let justification = GrandpaJustification::from_commit( + client, + round_number, + commit, + )?; - Some(justification.encode()) + Some(justification.encode()) + } else { + None + } + }, + }; + + debug!(target: "afg", "Finalizing blocks up to ({:?}, {})", number, hash); + + // ideally some handle to a synchronization oracle would be used + // to avoid unconditionally notifying. + client.finalize_block(BlockId::Hash(hash), justification, true).map_err(|e| { + warn!(target: "finality", "Error applying finality to block {:?}: {:?}", (hash, number), e); + e + })?; + + if let Some((canon_hash, canon_number)) = status.new_set_block { + // the authority set has changed. + let (new_id, set_ref) = authority_set.current(); + + if set_ref.len() > 16 { + info!("Applying GRANDPA set change to new set with {} authorities", set_ref.len()); } else { - None + info!("Applying GRANDPA set change to new set {:?}", set_ref); } - }, + + Err(ExitOrError::AuthoritiesChanged(NewAuthoritySet { + canon_hash, + canon_number, + set_id: new_id, + authorities: set_ref.to_vec(), + })) + } else { + Ok(()) + } }; - debug!(target: "afg", "Finalizing blocks up to ({:?}, {})", number, hash); + match aux(&authority_set) { + Err(ExitOrError::Error(err)) => { + debug!(target: "afg", "Reverting authority set and/or consensus changes after block finalization error: {:?}", err); - // ideally some handle to a synchronization oracle would be used - // to avoid unconditionally notifying. - client.finalize_block(BlockId::Hash(hash), justification, true).map_err(|e| { - warn!(target: "finality", "Error applying finality to block {:?}: {:?}", (hash, number), e); - warn!(target: "finality", "Node is in a potentially inconsistent state."); - e - })?; + let mut revert_aux = Vec::new(); - if let Some((canon_hash, canon_number)) = status.new_set_block { - // the authority set has changed. - let (new_id, set_ref) = authority_set.current(); + if status.changed { + revert_aux.push((AUTHORITY_SET_KEY, old_authority_set.encode())); + if let Some(old_last_completed) = old_last_completed { + revert_aux.push((LAST_COMPLETED_KEY, old_last_completed)); + } - if set_ref.len() > 16 { - info!("Applying GRANDPA set change to new set with {} authorities", set_ref.len()); - } else { - info!("Applying GRANDPA set change to new set {:?}", set_ref); - } + *authority_set = old_authority_set.clone(); + } - Err(ExitOrError::AuthoritiesChanged(NewAuthoritySet { - canon_hash, - canon_number, - set_id: new_id, - authorities: set_ref.to_vec(), - })) - } else { - Ok(()) + if let Some(old_consensus_changes) = old_consensus_changes { + revert_aux.push((CONSENSUS_CHANGES_KEY, old_consensus_changes.encode())); + + *consensus_changes = old_consensus_changes; + } + + let write_result = Backend::insert_aux( + &**client.backend(), + revert_aux.iter().map(|(k, v)| (*k, &**v)).collect::>().iter(), + &[], + ); + + if let Err(e) = write_result { + warn!(target: "finality", "Failed to revert consensus changes to disk. Bailing."); + warn!(target: "finality", "Node is in a potentially inconsistent state."); + + return Err(e.into()); + } + + Err(ExitOrError::Error(err)) + }, + res => res, } } @@ -949,6 +1005,33 @@ impl, RA, PRA> BlockImport { type Error = ConsensusError; + fn on_start(&self, link: &::consensus_common::import_queue::Link) { + let chain_info = match self.inner.info() { + Ok(info) => info.chain, + _ => return, + }; + + // request justifications for all pending changes for which change blocks have already been imported + for pending_change in self.authority_set.inner().read().pending_changes() { + if pending_change.effective_number() > chain_info.finalized_number && + pending_change.effective_number() <= chain_info.best_number + { + let effective_block_hash = self.inner.best_containing( + pending_change.canon_hash, + Some(pending_change.effective_number()), + ); + + if let Ok(Some(hash)) = effective_block_hash { + if let Ok(Some(header)) = self.inner.header(&BlockId::Hash(hash)) { + if *header.number() == pending_change.effective_number() { + link.request_justification(&header.hash(), *header.number()); + } + } + } + } + } + } + fn import_block(&self, mut block: ImportBlock, new_authorities: Option>) -> Result { @@ -1045,52 +1128,7 @@ impl, RA, PRA> BlockImport match justification { Some(justification) => { - let justification = GrandpaJustification::decode_and_verify( - justification, - self.authority_set.set_id(), - &self.authority_set.current_authorities(), - ); - - let justification = match justification { - Err(e) => return Err(ConsensusErrorKind::ClientImport(e.to_string()).into()), - Ok(justification) => justification, - }; - - let result = finalize_block( - &*self.inner, - &self.authority_set, - &self.consensus_changes, - None, - hash, - number, - justification.into(), - ); - - match result { - Ok(_) => { - assert!(!enacts_change, "returns Ok when no authority set change should be enacted; qed;"); - }, - Err(ExitOrError::AuthoritiesChanged(new)) => { - assert!( - enacts_change, - "returns AuthoritiesChanged when authority set change should be enacted; qed;" - ); - - debug!(target: "finality", "Imported justified block #{} that enacts authority set change, signalling voter.", number); - if let Err(e) = self.authority_set_change.unbounded_send(new) { - return Err(ConsensusErrorKind::ClientImport(e.to_string()).into()); - } - }, - Err(ExitOrError::Error(e)) => { - match e { - Error::Grandpa(error) => return Err(ConsensusErrorKind::ClientImport(error.to_string()).into()), - Error::Network(error) => return Err(ConsensusErrorKind::ClientImport(error).into()), - Error::Blockchain(error) => return Err(ConsensusErrorKind::ClientImport(error).into()), - Error::Client(error) => return Err(ConsensusErrorKind::ClientImport(error.to_string()).into()), - Error::Timer(error) => return Err(ConsensusErrorKind::ClientImport(error.to_string()).into()), - } - }, - } + self.import_justification(hash, number, justification, enacts_change)?; }, None => { if enacts_change { @@ -1106,11 +1144,87 @@ impl, RA, PRA> BlockImport if enacts_consensus_change { self.consensus_changes.lock().note_change((number, hash)); } - }, + + return Ok(ImportResult::NeedsJustification); + } } Ok(import_result) } + + fn import_justification( + &self, + hash: Block::Hash, + number: NumberFor, + justification: Justification, + ) -> Result<(), Self::Error> { + self.import_justification(hash, number, justification, false) + } +} + +impl, RA, PRA> + GrandpaBlockImport where + NumberFor: grandpa::BlockNumberOps, + B: Backend + 'static, + E: CallExecutor + 'static + Clone + Send + Sync, + RA: Send + Sync, +{ + + /// Import a block justification and finalize the block. + /// + /// If `enacts_change` is set to true, then finalizing this block *must* + /// enact an authority set change, the function will panic otherwise. + fn import_justification( + &self, + hash: Block::Hash, + number: NumberFor, + justification: Justification, + enacts_change: bool, + ) -> Result<(), ConsensusError> { + let justification = GrandpaJustification::decode_and_verify( + justification, + self.authority_set.set_id(), + &self.authority_set.current_authorities(), + ); + + let justification = match justification { + Err(e) => return Err(ConsensusErrorKind::ClientImport(e.to_string()).into()), + Ok(justification) => justification, + }; + + let result = finalize_block( + &*self.inner, + &self.authority_set, + &self.consensus_changes, + None, + hash, + number, + justification.into(), + ); + + match result { + Err(ExitOrError::AuthoritiesChanged(new)) => { + info!(target: "finality", "Imported justification for block #{} that enacts authority set change, signalling voter.", number); + if let Err(e) = self.authority_set_change.unbounded_send(new) { + return Err(ConsensusErrorKind::ClientImport(e.to_string()).into()); + } + }, + Err(ExitOrError::Error(e)) => { + return Err(match e { + Error::Grandpa(error) => ConsensusErrorKind::ClientImport(error.to_string()), + Error::Network(error) => ConsensusErrorKind::ClientImport(error), + Error::Blockchain(error) => ConsensusErrorKind::ClientImport(error), + Error::Client(error) => ConsensusErrorKind::ClientImport(error.to_string()), + Error::Timer(error) => ConsensusErrorKind::ClientImport(error.to_string()), + }.into()); + }, + Ok(_) => { + assert!(!enacts_change, "returns Ok when no authority set change should be enacted; qed;"); + }, + } + + Ok(()) + } } /// Using the given base get the block at the given height on this chain. The diff --git a/substrate/core/finality-grandpa/src/tests.rs b/substrate/core/finality-grandpa/src/tests.rs index 39924263ad..344958c044 100644 --- a/substrate/core/finality-grandpa/src/tests.rs +++ b/substrate/core/finality-grandpa/src/tests.rs @@ -494,7 +494,7 @@ fn transition_3_voters_twice_1_observer() { let transitions = api.scheduled_changes.clone(); let net = Arc::new(Mutex::new(GrandpaTestNet::new(api, 8))); - let mut runtime = tokio::runtime::Runtime::new().unwrap(); + let mut runtime = current_thread::Runtime::new().unwrap(); net.lock().peer(0).push_blocks(1, false); net.lock().sync(); @@ -619,6 +619,7 @@ fn transition_3_voters_twice_1_observer() { let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL) .for_each(move |_| { net.lock().send_import_notifications(); + net.lock().send_finality_notifications(); net.lock().sync(); Ok(()) }) @@ -682,4 +683,55 @@ fn consensus_changes_works() { changes.note_change((1, 1.into())); changes.note_change((1, 101.into())); assert_eq!(changes.finalize((10, 10.into()), |_| Ok(Some(1.into()))).unwrap(), (true, true)); -} \ No newline at end of file +} + +#[test] +fn sync_justifications_on_change_blocks() { + ::env_logger::init(); + + let peers_a = &[Keyring::Alice, Keyring::Bob, Keyring::Charlie]; + let peers_b = &[Keyring::Alice, Keyring::Bob]; + let voters = make_ids(peers_b); + + // 4 peers, 3 of them are authorities and participate in grandpa + let api = TestApi::new(voters); + let transitions = api.scheduled_changes.clone(); + let mut net = GrandpaTestNet::new(api, 4); + + // add 20 blocks + net.peer(0).push_blocks(20, false); + + // at block 21 we do add a transition which is instant + net.peer(0).generate_blocks(1, BlockOrigin::File, |builder| { + let block = builder.bake().unwrap(); + transitions.lock().insert(*block.header.parent_hash(), ScheduledChange { + next_authorities: make_ids(peers_b), + delay: 0, + }); + block + }); + + // add more blocks on top of it (until we have 25) + net.peer(0).push_blocks(4, false); + net.sync(); + + for i in 0..4 { + assert_eq!(net.peer(i).client().info().unwrap().chain.best_number, 25, + "Peer #{} failed to sync", i); + } + + let net = Arc::new(Mutex::new(net)); + run_to_completion(25, net.clone(), peers_a); + + // the first 3 peers are grandpa voters and therefore have already finalized + // block 21 and stored a justification + for i in 0..3 { + assert!(net.lock().peer(i).client().justification(&BlockId::Number(21)).unwrap().is_some()); + } + + // the last peer should get the justification by syncing from other peers + assert!(net.lock().peer(3).client().justification(&BlockId::Number(21)).unwrap().is_none()); + while net.lock().peer(3).client().justification(&BlockId::Number(21)).unwrap().is_none() { + net.lock().sync_steps(100); + } +} diff --git a/substrate/core/network/src/message.rs b/substrate/core/network/src/message.rs index e422c77418..3a7238f9b5 100644 --- a/substrate/core/network/src/message.rs +++ b/substrate/core/network/src/message.rs @@ -48,7 +48,6 @@ pub type BlockRequest = generic::BlockRequest< <::Header as HeaderT>::Number, >; - /// Type alias for using the BlockData type using block type parameters. pub type BlockData = generic::BlockData< ::Header, diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index 31891450e1..7184bd6d4e 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -15,7 +15,7 @@ // along with Substrate. If not, see . use std::collections::{HashMap, HashSet, BTreeMap}; -use std::{mem, cmp}; +use std::cmp; use std::sync::Arc; use std::time; use parking_lot::RwLock; @@ -273,7 +273,7 @@ impl, H: ExHashT> Protocol { let mut peers = self.context_data.peers.write(); if let Some(ref mut peer) = peers.get_mut(&who) { peer.request_timestamp = None; - match mem::replace(&mut peer.block_request, None) { + match peer.block_request.take() { Some(r) => r, None => { io.report_peer(who, Severity::Bad("Unexpected response packet received from peer")); @@ -285,10 +285,12 @@ impl, H: ExHashT> Protocol { return; } }; + if request.id != r.id { trace!(target: "sync", "Ignoring mismatched response packet from {} (expected {} got {})", who, request.id, r.id); return; } + self.on_block_response(io, who, request, r); }, GenericMessage::BlockAnnounce(announce) => self.on_block_announce(io, who, announce), @@ -330,7 +332,6 @@ impl, H: ExHashT> Protocol { pub fn on_peer_disconnected(&self, io: &mut SyncIo, peer: NodeIndex) { trace!(target: "sync", "Disconnecting {}: {}", peer, io.peer_debug_info(peer)); - // lock all the the peer lists so that add/remove peer events are in order let mut sync = self.sync.write(); let mut spec = self.specialization.write(); @@ -351,7 +352,15 @@ impl, H: ExHashT> Protocol { } fn on_block_request(&self, io: &mut SyncIo, peer: NodeIndex, request: message::BlockRequest) { - trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?}", request.id, peer, request.from, request.to, request.max); + trace!(target: "sync", "BlockRequest {} from {} with fields {:?}: from {:?} to {:?} max {:?}", + request.id, + peer, + request.fields, + request.from, + request.to, + request.max, + ); + let mut blocks = Vec::new(); let mut id = match request.from { message::FromBlock::Hash(h) => BlockId::Hash(h), @@ -409,25 +418,36 @@ impl, H: ExHashT> Protocol { trace!(target: "sync", "BlockResponse {} from {} with {} blocks{}", response.id, peer, response.blocks.len(), blocks_range); - // import_queue.import_blocks also acquires sync.write(); - // Break the cycle by doing these separately from the outside; - let new_blocks = { + // TODO [andre]: move this logic to the import queue so that + // justifications are imported asynchronously (#1482) + if request.fields == message::BlockAttributes::JUSTIFICATION { let mut sync = self.sync.write(); - sync.on_block_data(&mut ProtocolContext::new(&self.context_data, io), peer, request, response) - }; + sync.on_block_justification_data( + &mut ProtocolContext::new(&self.context_data, io), + peer, + request, + response, + ); + } else { + // import_queue.import_blocks also acquires sync.write(); + // Break the cycle by doing these separately from the outside; + let new_blocks = { + let mut sync = self.sync.write(); + sync.on_block_data(&mut ProtocolContext::new(&self.context_data, io), peer, request, response) + }; - if let Some((origin, new_blocks)) = new_blocks { - let import_queue = self.sync.read().import_queue(); - import_queue.import_blocks(origin, new_blocks); + if let Some((origin, new_blocks)) = new_blocks { + let import_queue = self.sync.read().import_queue(); + import_queue.import_blocks(origin, new_blocks); + } } - - } /// Perform time based maintenance. pub fn tick(&self, io: &mut SyncIo) { self.consensus_gossip.write().collect_garbage(|_| true); self.maintain_peers(io); + self.sync.write().tick(&mut ProtocolContext::new(&self.context_data, io)); self.on_demand.as_ref().map(|s| s.maintain_peers(io)); } @@ -439,7 +459,8 @@ impl, H: ExHashT> Protocol { let handshaking_peers = self.handshaking_peers.read(); for (who, timestamp) in peers.iter() .filter_map(|(id, peer)| peer.request_timestamp.as_ref().map(|r| (id, r))) - .chain(handshaking_peers.iter()) { + .chain(handshaking_peers.iter()) + { if (tick - *timestamp).as_secs() > REQUEST_TIMEOUT_SEC { trace!(target: "sync", "Timeout {}", who); aborting.push(*who); @@ -648,6 +669,10 @@ impl, H: ExHashT> Protocol { } } + pub fn on_block_finalized(&self, _io: &mut SyncIo, hash: B::Hash, header: &B::Header) { + self.sync.write().block_finalized(&hash, *header.number()); + } + fn on_remote_call_request(&self, io: &mut SyncIo, who: NodeIndex, request: message::RemoteCallRequest) { trace!(target: "sync", "Remote call request {} from {} ({} at {})", request.id, who, request.method, request.block); let proof = match self.context_data.chain.execution_proof(&request.block, &request.method, &request.data) { @@ -752,8 +777,8 @@ impl, H: ExHashT> Protocol { } fn send_message(peers: &RwLock>>, io: &mut SyncIo, who: NodeIndex, mut message: Message) { - match &mut message { - &mut GenericMessage::BlockRequest(ref mut r) => { + match message { + GenericMessage::BlockRequest(ref mut r) => { let mut peers = peers.write(); if let Some(ref mut peer) = peers.get_mut(&who) { r.id = peer.next_request_id; diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index 635b31566f..4c531da1c9 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -94,6 +94,10 @@ impl> Link for NetworkLink { self.with_sync(|sync, _| sync.block_imported(&hash, number)) } + fn request_justification(&self, hash: &B::Hash, number: NumberFor) { + self.with_sync(|sync, protocol| sync.request_justification(hash, number, protocol)) + } + fn maintain_sync(&self) { self.with_sync(|sync, protocol| sync.maintain_sync(protocol)) } @@ -174,6 +178,11 @@ impl, H: ExHashT> Service. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::Arc; +use std::time::{Duration, Instant}; use protocol::Context; use network_libp2p::{Severity, NodeIndex}; use client::{BlockStatus, ClientInfo}; @@ -23,6 +24,7 @@ use consensus::BlockOrigin; use consensus::import_queue::{ImportQueue, IncomingBlock}; use client::error::Error as ClientError; use blocks::BlockCollection; +use runtime_primitives::Justification; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, NumberFor, Zero}; use runtime_primitives::generic::BlockId; use message::{self, generic::Message as GenericMessage}; @@ -34,6 +36,8 @@ const MAX_BLOCKS_TO_REQUEST: usize = 128; const MAX_IMPORTING_BLOCKS: usize = 2048; // Number of blocks in the queue that prevents ancestry search. const MAJOR_SYNC_BLOCKS: usize = 5; +// Time to wait before trying to get a justification from the same peer. +const JUSTIFICATION_RETRY_WAIT: Duration = Duration::from_secs(10); struct PeerSync { pub common_number: NumberFor, @@ -48,6 +52,180 @@ enum PeerSyncState { Available, DownloadingNew(NumberFor), DownloadingStale(B::Hash), + DownloadingJustification(B::Hash), +} + +/// Pending justification request for the given block (hash and number). +type PendingJustification = (::Hash, NumberFor); + +/// Manages pending block justification requests. +struct PendingJustifications { + justifications: HashSet>, + pending_requests: VecDeque>, + peer_requests: HashMap>, + previous_requests: HashMap, Vec<(NodeIndex, Instant)>>, +} + +impl PendingJustifications { + fn new() -> PendingJustifications { + PendingJustifications { + justifications: HashSet::new(), + pending_requests: VecDeque::new(), + peer_requests: HashMap::new(), + previous_requests: HashMap::new(), + } + } + + /// Dispatches all possible pending requests to the given peers. Peers are + /// filtered according to the current known best block (i.e. we won't send a + /// justification request for block #10 to a peer at block #2), and we also + /// throttle requests to the same peer if a previous justification request + /// yielded no results. + fn dispatch(&mut self, peers: &mut HashMap>, protocol: &mut Context) { + if self.pending_requests.is_empty() { + return; + } + + // clean up previous failed requests so we can retry again + for (_, requests) in self.previous_requests.iter_mut() { + requests.retain(|(_, instant)| instant.elapsed() < JUSTIFICATION_RETRY_WAIT); + } + + let mut available_peers = peers.iter().filter_map(|(peer, sync)| { + // don't request to any peers that already have pending requests + if let PeerSyncState::Available = sync.state { + Some((*peer, sync.best_number)) + } else { + None + } + }).collect::>(); + + let mut last_peer = available_peers.back().map(|p| p.0); + let mut unhandled_requests = VecDeque::new(); + + loop { + let (peer, peer_best_number) = match available_peers.pop_front() { + Some(p) => p, + _ => break, + }; + + // only ask peers that have synced past the block number that we're + // asking the justification for and to whom we haven't already made + // the same request recently + let peer_eligible = { + let request = match self.pending_requests.front() { + Some(r) => r.clone(), + _ => break, + }; + + peer_best_number >= request.1 && + !self.previous_requests + .get(&request) + .map(|requests| requests.iter().any(|i| i.0 == peer)) + .unwrap_or(false) + }; + + if !peer_eligible { + available_peers.push_back((peer, peer_best_number)); + + // we tried all peers and none can answer this request + if Some(peer) == last_peer { + last_peer = available_peers.back().map(|p| p.0); + + let request = self.pending_requests.pop_front() + .expect("verified to be Some in the beginning of the loop; qed"); + + unhandled_requests.push_back(request); + } + + continue; + } + + last_peer = available_peers.back().map(|p| p.0); + + let request = self.pending_requests.pop_front() + .expect("verified to be Some in the beginning of the loop; qed"); + + self.peer_requests.insert(peer, request); + + peers.get_mut(&peer) + .expect("peer was is taken from available_peers; available_peers is a subset of peers; qed") + .state = PeerSyncState::DownloadingJustification(request.0); + + let request = message::generic::BlockRequest { + id: 0, + fields: message::BlockAttributes::JUSTIFICATION, + from: message::FromBlock::Hash(request.0), + to: None, + direction: message::Direction::Ascending, + max: Some(1), + }; + + protocol.send_message(peer, GenericMessage::BlockRequest(request)); + } + + self.pending_requests.append(&mut unhandled_requests); + } + + /// Queue a justification request (without dispatching it). + fn queue_request(&mut self, justification: &PendingJustification) { + if !self.justifications.insert(*justification) { + return; + } + self.pending_requests.push_back(*justification); + } + + /// Retry any pending request if a peer disconnected. + fn peer_disconnected(&mut self, who: NodeIndex) { + if let Some(request) = self.peer_requests.remove(&who) { + self.pending_requests.push_front(request); + } + } + + /// Processes the response for the request previously sent to the given + /// peer. Queues a retry in case the import fails or the given justification + /// was `None`. + fn on_response( + &mut self, + who: NodeIndex, + justification: Option, + protocol: &mut Context, + import_queue: &ImportQueue, + ) { + // we assume that the request maps to the given response, this is + // currently enforced by the outer network protocol before passing on + // messages to chain sync. + if let Some(request) = self.peer_requests.remove(&who) { + if let Some(justification) = justification { + if import_queue.import_justification(request.0, request.1, justification) { + self.justifications.remove(&request); + self.previous_requests.remove(&request); + return; + } else { + protocol.report_peer( + who, + Severity::Bad(&format!("Invalid justification provided for #{}", request.0)), + ); + } + } else { + self.previous_requests + .entry(request) + .or_insert(Vec::new()) + .push((who, Instant::now())); + } + + self.pending_requests.push_front(request); + } + } + + /// Removes any pending justification requests for blocks lower than the + /// given best finalized. + fn collect_garbage(&mut self, best_finalized: NumberFor) { + self.justifications.retain(|(_, n)| *n > best_finalized); + self.pending_requests.retain(|(_, n)| *n > best_finalized); + self.peer_requests.retain(|_, (_, n)| *n > best_finalized); + self.previous_requests.retain(|(_, n), _| *n > best_finalized); + } } /// Relay chain sync strategy. @@ -59,6 +237,7 @@ pub struct ChainSync { best_queued_hash: B::Hash, required_block_attributes: message::BlockAttributes, import_queue: Arc>, + justifications: PendingJustifications, } /// Reported sync state. @@ -104,6 +283,7 @@ impl ChainSync { blocks: BlockCollection::new(), best_queued_hash: info.best_queued_hash.unwrap_or(info.chain.best_hash), best_queued_number: info.best_queued_number.unwrap_or(info.chain.best_number), + justifications: PendingJustifications::new(), required_block_attributes, import_queue, } @@ -192,6 +372,7 @@ impl ChainSync { } } + /// Handle new block data. pub(crate) fn on_block_data( &mut self, protocol: &mut Context, @@ -269,10 +450,10 @@ impl ChainSync { } } }, - PeerSyncState::Available => Vec::new(), + PeerSyncState::Available | PeerSyncState::DownloadingJustification(..) => Vec::new(), } } else { - vec![] + Vec::new() }; let best_seen = self.best_seen_block(); @@ -288,17 +469,87 @@ impl ChainSync { Some((origin, new_blocks)) } + /// Handle new justification data. + pub(crate) fn on_block_justification_data( + &mut self, + protocol: &mut Context, + who: NodeIndex, + _request: message::BlockRequest, + response: message::BlockResponse, + ) { + if let Some(ref mut peer) = self.peers.get_mut(&who) { + if let PeerSyncState::DownloadingJustification(hash) = peer.state { + peer.state = PeerSyncState::Available; + + // we only request one justification at a time + match response.blocks.into_iter().next() { + Some(response) => { + if hash != response.hash { + let msg = format!( + "Invalid block justification provided: requested: {:?} got: {:?}", + hash, + response.hash, + ); + + protocol.report_peer(who, Severity::Bad(&msg)); + return; + } + + self.justifications.on_response( + who, + response.justification, + protocol, + &*self.import_queue, + ); + }, + None => { + let msg = format!( + "Provided empty response for justification request {:?}", + hash, + ); + + protocol.report_peer(who, Severity::Useless(&msg)); + return; + }, + } + } + } + + self.maintain_sync(protocol); + } + + /// Maintain the sync process (download new blocks, fetch justifications). pub fn maintain_sync(&mut self, protocol: &mut Context) { let peers: Vec = self.peers.keys().map(|p| *p).collect(); for peer in peers { self.download_new(protocol, peer); } + self.justifications.dispatch(&mut self.peers, protocol); } + /// Called periodically to perform any time-based actions. + pub fn tick(&mut self, protocol: &mut Context) { + self.justifications.dispatch(&mut self.peers, protocol); + } + + /// Request a justification for the given block. + /// + /// Queues a new justification request and tries to dispatch all pending requests. + pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor, protocol: &mut Context) { + self.justifications.queue_request(&(*hash, number)); + self.justifications.dispatch(&mut self.peers, protocol); + } + + /// Notify about successful import of the given block. pub fn block_imported(&mut self, hash: &B::Hash, number: NumberFor) { trace!(target: "sync", "Block imported successfully {} ({})", number, hash); } + /// Notify about finalization of the given block. + pub fn block_finalized(&mut self, _hash: &B::Hash, number: NumberFor) { + self.justifications.collect_garbage(number); + } + fn block_queued(&mut self, hash: &B::Hash, number: NumberFor) { if number > self.best_queued_number { self.best_queued_number = number; @@ -324,6 +575,7 @@ impl ChainSync { self.block_queued(&hash, best_header.number().clone()) } + /// Handle new block announcement. pub(crate) fn on_block_announce(&mut self, protocol: &mut Context, who: NodeIndex, hash: B::Hash, header: &B::Header) { let number = *header.number(); if number <= As::sa(0) { @@ -376,12 +628,15 @@ impl ChainSync { block_status(&*protocol.client(), &*self.import_queue, *hash).ok().map_or(false, |s| s != BlockStatus::Unknown) } + /// Handle disconnected peer. pub(crate) fn peer_disconnected(&mut self, protocol: &mut Context, who: NodeIndex) { self.blocks.clear_peer_download(who); self.peers.remove(&who); + self.justifications.peer_disconnected(who); self.maintain_sync(protocol); } + /// Restart the sync process. pub(crate) fn restart(&mut self, protocol: &mut Context) { self.import_queue.clear(); self.blocks.clear(); @@ -403,6 +658,7 @@ impl ChainSync { } } + /// Clear all sync data. pub(crate) fn clear(&mut self) { self.blocks.clear(); self.peers.clear(); diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index 9523816015..3e45188544 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -30,7 +30,7 @@ use client::block_builder::BlockBuilder; use primitives::Ed25519AuthorityId; use runtime_primitives::Justification; use runtime_primitives::generic::BlockId; -use runtime_primitives::traits::{Block as BlockT, Zero, Header, Digest, DigestItem, AuthorityIdFor}; +use runtime_primitives::traits::{AuthorityIdFor, Block as BlockT, Digest, DigestItem, Header, NumberFor, Zero}; use io::SyncIo; use protocol::{Context, Protocol, ProtocolContext}; use config::ProtocolConfig; @@ -190,6 +190,15 @@ impl> ImportQueue for SyncImpor fn import_blocks(&self, origin: BlockOrigin, blocks: Vec>) { self.link.call(origin, blocks); } + + fn import_justification( + &self, + hash: B::Hash, + number: NumberFor, + justification: Justification, + ) -> bool { + self.block_import.import_justification(hash, number, justification).is_ok() + } } struct DummyContextExecutor(Arc>, Arc>>); @@ -366,6 +375,13 @@ impl, D> Peer { self.sync.on_block_imported(&mut TestIo::new(&self.queue, None), info.chain.best_hash, &header); } + /// Send block finalization notifications. + pub fn send_finality_notifications(&self) { + let info = self.client.info().expect("In-mem client does not fail"); + let header = self.client.header(&BlockId::Hash(info.chain.finalized_hash)).unwrap().unwrap(); + self.sync.on_block_finalized(&mut TestIo::new(&self.queue, None), info.chain.finalized_hash, &header); + } + /// Restart sync for a peer. fn restart_sync(&self) { self.sync.abort(); @@ -380,6 +396,14 @@ impl, D> Peer { self.sync.gossip_consensus_message(&mut TestIo::new(&self.queue, None), topic, data, broadcast); } + /// Request a justification for the given block. + #[cfg(test)] + fn request_justification(&self, hash: &::primitives::H256, number: NumberFor) { + self.executor.execute_in_context(|context| { + self.sync.sync().write().request_justification(hash, number, context); + }) + } + /// Add blocks to the peer -- edit the block before adding pub fn generate_blocks(&self, count: usize, origin: BlockOrigin, mut edit_block: F) where F: FnMut(BlockBuilder) -> Block @@ -601,6 +625,15 @@ pub trait TestNetFactory: Sized { }) } + /// Send block finalization notifications for all peers. + fn send_finality_notifications(&mut self) { + self.mut_peers(|peers| { + for peer in peers { + peer.send_finality_notifications(); + } + }) + } + /// Restart sync for a peer. fn restart_peer(&mut self, i: usize) { self.peers()[i].restart_sync(); diff --git a/substrate/core/network/src/test/sync.rs b/substrate/core/network/src/test/sync.rs index 22dedf7b3e..72d105a499 100644 --- a/substrate/core/network/src/test/sync.rs +++ b/substrate/core/network/src/test/sync.rs @@ -65,6 +65,27 @@ fn sync_no_common_longer_chain_fails() { assert!(!net.peer(0).client.backend().blockchain().canon_equals_to(net.peer(1).client.backend().blockchain())); } +#[test] +fn sync_justifications() { + ::env_logger::init().ok(); + let mut net = TestNet::new(3); + net.peer(0).push_blocks(20, false); + net.sync(); + + // there's currently no justification for block #10 + assert_eq!(net.peer(0).client().justification(&BlockId::Number(10)).unwrap(), None); + // we finalize block #10 for peer 0 with a justification + net.peer(0).client().finalize_block(BlockId::Number(10), Some(Vec::new()), true).unwrap(); + + let header = net.peer(1).client().header(&BlockId::Number(10)).unwrap().unwrap(); + net.peer(1).request_justification(&header.hash().into(), 10); + + net.sync(); + + assert_eq!(net.peer(0).client().justification(&BlockId::Number(10)).unwrap(), Some(Vec::new())); + assert_eq!(net.peer(1).client().justification(&BlockId::Number(10)).unwrap(), Some(Vec::new())); +} + #[test] fn sync_after_fork_works() { ::env_logger::init().ok(); diff --git a/substrate/core/service/src/lib.rs b/substrate/core/service/src/lib.rs index a261ce5666..2eeaff6385 100644 --- a/substrate/core/service/src/lib.rs +++ b/substrate/core/service/src/lib.rs @@ -78,7 +78,7 @@ pub use self::error::{ErrorKind, Error}; pub use config::{Configuration, Roles, PruningMode}; pub use chain_spec::{ChainSpec, Properties}; pub use transaction_pool::txpool::{self, Pool as TransactionPool, Options as TransactionPoolOptions, ChainApi, IntoPoolError}; -pub use client::ExecutionStrategy; +pub use client::{ExecutionStrategy, FinalityNotifications}; pub use components::{ServiceFactory, FullBackend, FullExecutor, LightBackend, LightExecutor, Components, PoolApi, ComponentClient, @@ -222,6 +222,52 @@ impl Service { task_executor.spawn(events); } + { + // finality notifications + let network = Arc::downgrade(&network); + + // A utility stream that drops all ready items and only returns the last one. + // This is used to only keep the last finality notification and avoid + // overloading the sync module with notifications. + struct MostRecentNotification(futures::stream::Fuse>); + + impl Stream for MostRecentNotification { + type Item = as Stream>::Item; + type Error = as Stream>::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + let mut last = None; + let last = loop { + match self.0.poll()? { + Async::Ready(Some(item)) => { last = Some(item) } + Async::Ready(None) => match last { + None => return Ok(Async::Ready(None)), + Some(last) => break last, + }, + Async::NotReady => match last { + None => return Ok(Async::NotReady), + Some(last) => break last, + }, + } + }; + + Ok(Async::Ready(Some(last))) + } + } + + let events = MostRecentNotification(client.finality_notification_stream().fuse()) + .for_each(move |notification| { + if let Some(network) = network.upgrade() { + network.on_block_finalized(notification.hash, ¬ification.header); + } + Ok(()) + }) + .select(exit.clone()) + .then(|_| Ok(())); + + task_executor.spawn(events); + } + { // extrinsic notifications let network = Arc::downgrade(&network); @@ -554,7 +600,7 @@ macro_rules! construct_service_factory { fn new_full( config: $crate::FactoryFullConfiguration, - executor: $crate::TaskExecutor + executor: $crate::TaskExecutor, ) -> Result { ( $( $full_service_init )* ) (config, executor.clone()).and_then(|service| {