diff --git a/substrate/core/finality-grandpa/src/communication.rs b/substrate/core/finality-grandpa/src/communication.rs index 9d661ae31b..fe6e2a0477 100644 --- a/substrate/core/finality-grandpa/src/communication.rs +++ b/substrate/core/finality-grandpa/src/communication.rs @@ -17,6 +17,7 @@ //! Incoming message streams that verify signatures, and outgoing message streams //! that sign or re-shape. +use std::collections::HashMap; use grandpa::VoterSet; use futures::prelude::*; use futures::sync::mpsc; diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index 02b2ff7963..49343cf208 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -468,6 +468,7 @@ impl, H: ExHashT> Protocol { } let number = header.number().clone(); let hash = header.hash(); + let parent_hash = header.parent_hash().clone(); let justification = if get_justification { self.context_data.chain.justification(&BlockId::Hash(hash)).unwrap_or(None) } else { None }; let block_data = message::generic::BlockData { hash: hash, @@ -484,7 +485,7 @@ impl, H: ExHashT> Protocol { if number == As::sa(0) { break; } - id = BlockId::Number(number - As::sa(1)) + id = BlockId::Hash(parent_hash) } } } @@ -705,12 +706,11 @@ impl, H: ExHashT> Protocol { let mut peers = self.context_data.peers.write(); let hash = header.hash(); for (who, ref mut peer) in peers.iter_mut() { - if peer.known_blocks.insert(hash.clone()) { - trace!(target: "sync", "Reannouncing block {:?} to {}", hash, who); - self.send_message(io, *who, GenericMessage::BlockAnnounce(message::BlockAnnounce { - header: header.clone() - })); - } + trace!(target: "sync", "Reannouncing block {:?} to {}", hash, who); + peer.known_blocks.insert(hash); + self.send_message(io, *who, GenericMessage::BlockAnnounce(message::BlockAnnounce { + header: header.clone() + })); } } diff --git a/substrate/core/network/src/sync.rs b/substrate/core/network/src/sync.rs index dcb38d7829..dc929899c5 100644 --- a/substrate/core/network/src/sync.rs +++ b/substrate/core/network/src/sync.rs @@ -40,6 +40,9 @@ const MAJOR_SYNC_BLOCKS: usize = 5; const JUSTIFICATION_RETRY_WAIT: Duration = Duration::from_secs(10); // Number of recently announced blocks to track for each peer. const ANNOUNCE_HISTORY_SIZE: usize = 64; +// Max number of blocks to download for unknown forks. +// TODO: this should take finality into account. See https://github.com/paritytech/substrate/issues/1606 +const MAX_UNKNOWN_FORK_DOWNLOAD_LEN: u32 = 32; struct PeerSync { pub common_number: NumberFor, @@ -385,16 +388,20 @@ impl ChainSync { &mut self, protocol: &mut Context, who: NodeIndex, - _request: message::BlockRequest, + request: message::BlockRequest, response: message::BlockResponse ) -> Option<(BlockOrigin, Vec>)> { let new_blocks: Vec> = if let Some(ref mut peer) = self.peers.get_mut(&who) { + let mut blocks = response.blocks; + if request.direction == message::Direction::Descending { + trace!(target: "sync", "Reversing incoming block list"); + blocks.reverse(); + } match peer.state { PeerSyncState::DownloadingNew(start_block) => { self.blocks.clear_peer_download(who); peer.state = PeerSyncState::Available; - - self.blocks.insert(start_block, response.blocks, who); + self.blocks.insert(start_block, blocks, who); self.blocks .drain(self.best_queued_number + As::sa(1)) .into_iter() @@ -410,7 +417,7 @@ impl ChainSync { }, PeerSyncState::DownloadingStale(_) => { peer.state = PeerSyncState::Available; - response.blocks.into_iter().map(|b| { + blocks.into_iter().map(|b| { IncomingBlock { hash: b.hash, header: b.header, @@ -421,7 +428,7 @@ impl ChainSync { }).collect() }, PeerSyncState::AncestorSearch(n) => { - match response.blocks.get(0) { + match blocks.get(0) { Some(ref block) => { trace!(target: "sync", "Got ancestry block #{} ({}) from peer {}", n, block.hash, who); match protocol.client().block_hash(n) { @@ -621,7 +628,8 @@ impl ChainSync { let stale = number <= self.best_queued_number; if stale { if !(known_parent || self.is_already_downloading(header.parent_hash())) { - trace!(target: "sync", "Ignoring unknown stale block announce from {}: {} {:?}", who, hash, header); + trace!(target: "sync", "Considering new unknown stale block announced from {}: {} {:?}", who, hash, header); + self.download_unknown_stale(protocol, who, &hash); } else { trace!(target: "sync", "Considering new stale block announced from {}: {} {:?}", who, hash, header); self.download_stale(protocol, who, &hash); @@ -679,7 +687,7 @@ impl ChainSync { self.peers.clear(); } - // Download old block. + // Download old block with known parent. fn download_stale(&mut self, protocol: &mut Context, who: NodeIndex, hash: &B::Hash) { if let Some(ref mut peer) = self.peers.get_mut(&who) { match peer.state { @@ -700,6 +708,27 @@ impl ChainSync { } } + // Download old block with unknown parent. + fn download_unknown_stale(&mut self, protocol: &mut Context, who: NodeIndex, hash: &B::Hash) { + if let Some(ref mut peer) = self.peers.get_mut(&who) { + match peer.state { + PeerSyncState::Available => { + let request = message::generic::BlockRequest { + id: 0, + fields: self.required_block_attributes.clone(), + from: message::FromBlock::Hash(*hash), + to: None, + direction: message::Direction::Descending, + max: Some(MAX_UNKNOWN_FORK_DOWNLOAD_LEN), + }; + peer.state = PeerSyncState::DownloadingStale(*hash); + protocol.send_message(who, GenericMessage::BlockRequest(request)); + }, + _ => (), + } + } + } + // Issue a request for a peer to download new blocks, if any are available fn download_new(&mut self, protocol: &mut Context, who: NodeIndex) { if let Some(ref mut peer) = self.peers.get_mut(&who) { diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index 72671d7240..e0887b0df0 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -399,6 +399,11 @@ impl, D> Peer { self.sync.gossip_consensus_message(&mut TestIo::new(&self.queue, None), topic, data, broadcast); } + /// Announce a block to peers. + pub fn announce_block(&self, block: Hash) { + self.sync.announce_block(&mut TestIo::new(&self.queue, None), block); + } + /// Request a justification for the given block. #[cfg(test)] fn request_justification(&self, hash: &::primitives::H256, number: NumberFor) { @@ -408,15 +413,25 @@ impl, D> Peer { } /// Add blocks to the peer -- edit the block before adding - pub fn generate_blocks(&self, count: usize, origin: BlockOrigin, mut edit_block: F) + pub fn generate_blocks(&self, count: usize, origin: BlockOrigin, edit_block: F) + where F: FnMut(BlockBuilder) -> Block + { + let best_hash = self.client.info().unwrap().chain.best_hash; + self.generate_blocks_at(BlockId::Hash(best_hash), count, origin, edit_block) + } + + /// Add blocks to the peer -- edit the block before adding. The chain will + /// start at the given block iD. + pub fn generate_blocks_at(&self, mut at: BlockId, count: usize, origin: BlockOrigin, mut edit_block: F) where F: FnMut(BlockBuilder) -> Block { for _ in 0..count { - let builder = self.client.new_block().unwrap(); + let builder = self.client.new_block_at(&at).unwrap(); let block = edit_block(builder); let hash = block.header.hash(); trace!("Generating {}, (#{}, parent={})", hash, block.header.number, block.header.parent_hash); let header = block.header.clone(); + at = BlockId::Hash(hash); // NOTE: if we use a non-synchronous queue in the test-net in the future, // this may not work. @@ -435,9 +450,16 @@ impl, D> Peer { /// Push blocks to the peer (simplified: with or without a TX) pub fn push_blocks(&self, count: usize, with_tx: bool) { + let best_hash = self.client.info().unwrap().chain.best_hash; + self.push_blocks_at(BlockId::Hash(best_hash), count, with_tx); + } + + /// Push blocks to the peer (simplified: with or without a TX) starting from + /// given hash. + pub fn push_blocks_at(&self, at: BlockId, count: usize, with_tx: bool) { let mut nonce = 0; if with_tx { - self.generate_blocks(count, BlockOrigin::File, |mut builder| { + self.generate_blocks_at(at, count, BlockOrigin::File, |mut builder| { let transfer = Transfer { from: Keyring::Alice.to_raw_public().into(), to: Keyring::Alice.to_raw_public().into(), @@ -450,7 +472,7 @@ impl, D> Peer { builder.bake().unwrap() }); } else { - self.generate_blocks(count, BlockOrigin::File, |builder| builder.bake().unwrap()); + self.generate_blocks_at(at, count, BlockOrigin::File, |builder| builder.bake().unwrap()); } } diff --git a/substrate/core/network/src/test/sync.rs b/substrate/core/network/src/test/sync.rs index dbea4fdfe2..1313bf4c2c 100644 --- a/substrate/core/network/src/test/sync.rs +++ b/substrate/core/network/src/test/sync.rs @@ -178,3 +178,42 @@ fn blocks_are_not_announced_by_light_nodes() { assert_eq!(net.peer(1).client.backend().blockchain().info().unwrap().best_number, 1); assert_eq!(net.peer(2).client.backend().blockchain().info().unwrap().best_number, 0); } + +#[test] +fn can_sync_small_non_best_forks() { + let _ = ::env_logger::try_init(); + let mut net = TestNet::new(2); + net.sync_step(); + net.peer(0).push_blocks(30, false); + net.peer(1).push_blocks(30, false); + + // small fork + reorg on peer 1. + net.peer(0).push_blocks_at(BlockId::Number(30), 2, true); + let small_hash = net.peer(0).client().info().unwrap().chain.best_hash; + net.peer(0).push_blocks_at(BlockId::Number(30), 10, false); + assert_eq!(net.peer(0).client().info().unwrap().chain.best_number, 40); + + // peer 1 only ever had the long fork. + net.peer(1).push_blocks(10, false); + assert_eq!(net.peer(1).client().info().unwrap().chain.best_number, 40); + + assert!(net.peer(0).client().header(&BlockId::Hash(small_hash)).unwrap().is_some()); + assert!(net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_none()); + + net.sync(); + + // synchronization: 0 synced to longer chain and 1 didn't sync to small chain. + + assert_eq!(net.peer(0).client().info().unwrap().chain.best_number, 40); + + assert!(net.peer(0).client().header(&BlockId::Hash(small_hash)).unwrap().is_some()); + assert!(!net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_some()); + + net.peer(0).announce_block(small_hash); + net.sync(); + + // after announcing, peer 1 downloads the block. + + assert!(net.peer(0).client().header(&BlockId::Hash(small_hash)).unwrap().is_some()); + assert!(net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_some()); +}