Download unknown but announced forks (#1604)

* test reproducing fork sync issues

* update to new announce_block API

* Download unknown forks

* Reverted download_stale

* Avoid cloning the hash

* Typo
This commit is contained in:
Robert Habermeier
2019-01-28 16:41:33 -03:00
committed by GitHub
parent 431ad3ca76
commit ced9e72824
5 changed files with 109 additions and 18 deletions
@@ -17,6 +17,7 @@
//! Incoming message streams that verify signatures, and outgoing message streams
//! that sign or re-shape.
use std::collections::HashMap;
use grandpa::VoterSet;
use futures::prelude::*;
use futures::sync::mpsc;
+7 -7
View File
@@ -468,6 +468,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
}
let number = header.number().clone();
let hash = header.hash();
let parent_hash = header.parent_hash().clone();
let justification = if get_justification { self.context_data.chain.justification(&BlockId::Hash(hash)).unwrap_or(None) } else { None };
let block_data = message::generic::BlockData {
hash: hash,
@@ -484,7 +485,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
if number == As::sa(0) {
break;
}
id = BlockId::Number(number - As::sa(1))
id = BlockId::Hash(parent_hash)
}
}
}
@@ -705,12 +706,11 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
let mut peers = self.context_data.peers.write();
let hash = header.hash();
for (who, ref mut peer) in peers.iter_mut() {
if peer.known_blocks.insert(hash.clone()) {
trace!(target: "sync", "Reannouncing block {:?} to {}", hash, who);
self.send_message(io, *who, GenericMessage::BlockAnnounce(message::BlockAnnounce {
header: header.clone()
}));
}
trace!(target: "sync", "Reannouncing block {:?} to {}", hash, who);
peer.known_blocks.insert(hash);
self.send_message(io, *who, GenericMessage::BlockAnnounce(message::BlockAnnounce {
header: header.clone()
}));
}
}
+36 -7
View File
@@ -40,6 +40,9 @@ const MAJOR_SYNC_BLOCKS: usize = 5;
const JUSTIFICATION_RETRY_WAIT: Duration = Duration::from_secs(10);
// Number of recently announced blocks to track for each peer.
const ANNOUNCE_HISTORY_SIZE: usize = 64;
// Max number of blocks to download for unknown forks.
// TODO: this should take finality into account. See https://github.com/paritytech/substrate/issues/1606
const MAX_UNKNOWN_FORK_DOWNLOAD_LEN: u32 = 32;
struct PeerSync<B: BlockT> {
pub common_number: NumberFor<B>,
@@ -385,16 +388,20 @@ impl<B: BlockT> ChainSync<B> {
&mut self,
protocol: &mut Context<B>,
who: NodeIndex,
_request: message::BlockRequest<B>,
request: message::BlockRequest<B>,
response: message::BlockResponse<B>
) -> Option<(BlockOrigin, Vec<IncomingBlock<B>>)> {
let new_blocks: Vec<IncomingBlock<B>> = if let Some(ref mut peer) = self.peers.get_mut(&who) {
let mut blocks = response.blocks;
if request.direction == message::Direction::Descending {
trace!(target: "sync", "Reversing incoming block list");
blocks.reverse();
}
match peer.state {
PeerSyncState::DownloadingNew(start_block) => {
self.blocks.clear_peer_download(who);
peer.state = PeerSyncState::Available;
self.blocks.insert(start_block, response.blocks, who);
self.blocks.insert(start_block, blocks, who);
self.blocks
.drain(self.best_queued_number + As::sa(1))
.into_iter()
@@ -410,7 +417,7 @@ impl<B: BlockT> ChainSync<B> {
},
PeerSyncState::DownloadingStale(_) => {
peer.state = PeerSyncState::Available;
response.blocks.into_iter().map(|b| {
blocks.into_iter().map(|b| {
IncomingBlock {
hash: b.hash,
header: b.header,
@@ -421,7 +428,7 @@ impl<B: BlockT> ChainSync<B> {
}).collect()
},
PeerSyncState::AncestorSearch(n) => {
match response.blocks.get(0) {
match blocks.get(0) {
Some(ref block) => {
trace!(target: "sync", "Got ancestry block #{} ({}) from peer {}", n, block.hash, who);
match protocol.client().block_hash(n) {
@@ -621,7 +628,8 @@ impl<B: BlockT> ChainSync<B> {
let stale = number <= self.best_queued_number;
if stale {
if !(known_parent || self.is_already_downloading(header.parent_hash())) {
trace!(target: "sync", "Ignoring unknown stale block announce from {}: {} {:?}", who, hash, header);
trace!(target: "sync", "Considering new unknown stale block announced from {}: {} {:?}", who, hash, header);
self.download_unknown_stale(protocol, who, &hash);
} else {
trace!(target: "sync", "Considering new stale block announced from {}: {} {:?}", who, hash, header);
self.download_stale(protocol, who, &hash);
@@ -679,7 +687,7 @@ impl<B: BlockT> ChainSync<B> {
self.peers.clear();
}
// Download old block.
// Download old block with known parent.
fn download_stale(&mut self, protocol: &mut Context<B>, who: NodeIndex, hash: &B::Hash) {
if let Some(ref mut peer) = self.peers.get_mut(&who) {
match peer.state {
@@ -700,6 +708,27 @@ impl<B: BlockT> ChainSync<B> {
}
}
// Download old block with unknown parent.
fn download_unknown_stale(&mut self, protocol: &mut Context<B>, who: NodeIndex, hash: &B::Hash) {
if let Some(ref mut peer) = self.peers.get_mut(&who) {
match peer.state {
PeerSyncState::Available => {
let request = message::generic::BlockRequest {
id: 0,
fields: self.required_block_attributes.clone(),
from: message::FromBlock::Hash(*hash),
to: None,
direction: message::Direction::Descending,
max: Some(MAX_UNKNOWN_FORK_DOWNLOAD_LEN),
};
peer.state = PeerSyncState::DownloadingStale(*hash);
protocol.send_message(who, GenericMessage::BlockRequest(request));
},
_ => (),
}
}
}
// Issue a request for a peer to download new blocks, if any are available
fn download_new(&mut self, protocol: &mut Context<B>, who: NodeIndex) {
if let Some(ref mut peer) = self.peers.get_mut(&who) {
+26 -4
View File
@@ -399,6 +399,11 @@ impl<V: 'static + Verifier<Block>, D> Peer<V, D> {
self.sync.gossip_consensus_message(&mut TestIo::new(&self.queue, None), topic, data, broadcast);
}
/// Announce a block to peers.
pub fn announce_block(&self, block: Hash) {
self.sync.announce_block(&mut TestIo::new(&self.queue, None), block);
}
/// Request a justification for the given block.
#[cfg(test)]
fn request_justification(&self, hash: &::primitives::H256, number: NumberFor<Block>) {
@@ -408,15 +413,25 @@ impl<V: 'static + Verifier<Block>, D> Peer<V, D> {
}
/// Add blocks to the peer -- edit the block before adding
pub fn generate_blocks<F>(&self, count: usize, origin: BlockOrigin, mut edit_block: F)
pub fn generate_blocks<F>(&self, count: usize, origin: BlockOrigin, edit_block: F)
where F: FnMut(BlockBuilder<Block, PeersClient>) -> Block
{
let best_hash = self.client.info().unwrap().chain.best_hash;
self.generate_blocks_at(BlockId::Hash(best_hash), count, origin, edit_block)
}
/// Add blocks to the peer -- edit the block before adding. The chain will
/// start at the given block iD.
pub fn generate_blocks_at<F>(&self, mut at: BlockId<Block>, count: usize, origin: BlockOrigin, mut edit_block: F)
where F: FnMut(BlockBuilder<Block, PeersClient>) -> Block
{
for _ in 0..count {
let builder = self.client.new_block().unwrap();
let builder = self.client.new_block_at(&at).unwrap();
let block = edit_block(builder);
let hash = block.header.hash();
trace!("Generating {}, (#{}, parent={})", hash, block.header.number, block.header.parent_hash);
let header = block.header.clone();
at = BlockId::Hash(hash);
// NOTE: if we use a non-synchronous queue in the test-net in the future,
// this may not work.
@@ -435,9 +450,16 @@ impl<V: 'static + Verifier<Block>, D> Peer<V, D> {
/// Push blocks to the peer (simplified: with or without a TX)
pub fn push_blocks(&self, count: usize, with_tx: bool) {
let best_hash = self.client.info().unwrap().chain.best_hash;
self.push_blocks_at(BlockId::Hash(best_hash), count, with_tx);
}
/// Push blocks to the peer (simplified: with or without a TX) starting from
/// given hash.
pub fn push_blocks_at(&self, at: BlockId<Block>, count: usize, with_tx: bool) {
let mut nonce = 0;
if with_tx {
self.generate_blocks(count, BlockOrigin::File, |mut builder| {
self.generate_blocks_at(at, count, BlockOrigin::File, |mut builder| {
let transfer = Transfer {
from: Keyring::Alice.to_raw_public().into(),
to: Keyring::Alice.to_raw_public().into(),
@@ -450,7 +472,7 @@ impl<V: 'static + Verifier<Block>, D> Peer<V, D> {
builder.bake().unwrap()
});
} else {
self.generate_blocks(count, BlockOrigin::File, |builder| builder.bake().unwrap());
self.generate_blocks_at(at, count, BlockOrigin::File, |builder| builder.bake().unwrap());
}
}
+39
View File
@@ -178,3 +178,42 @@ fn blocks_are_not_announced_by_light_nodes() {
assert_eq!(net.peer(1).client.backend().blockchain().info().unwrap().best_number, 1);
assert_eq!(net.peer(2).client.backend().blockchain().info().unwrap().best_number, 0);
}
#[test]
fn can_sync_small_non_best_forks() {
let _ = ::env_logger::try_init();
let mut net = TestNet::new(2);
net.sync_step();
net.peer(0).push_blocks(30, false);
net.peer(1).push_blocks(30, false);
// small fork + reorg on peer 1.
net.peer(0).push_blocks_at(BlockId::Number(30), 2, true);
let small_hash = net.peer(0).client().info().unwrap().chain.best_hash;
net.peer(0).push_blocks_at(BlockId::Number(30), 10, false);
assert_eq!(net.peer(0).client().info().unwrap().chain.best_number, 40);
// peer 1 only ever had the long fork.
net.peer(1).push_blocks(10, false);
assert_eq!(net.peer(1).client().info().unwrap().chain.best_number, 40);
assert!(net.peer(0).client().header(&BlockId::Hash(small_hash)).unwrap().is_some());
assert!(net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_none());
net.sync();
// synchronization: 0 synced to longer chain and 1 didn't sync to small chain.
assert_eq!(net.peer(0).client().info().unwrap().chain.best_number, 40);
assert!(net.peer(0).client().header(&BlockId::Hash(small_hash)).unwrap().is_some());
assert!(!net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_some());
net.peer(0).announce_block(small_hash);
net.sync();
// after announcing, peer 1 downloads the block.
assert!(net.peer(0).client().header(&BlockId::Hash(small_hash)).unwrap().is_some());
assert!(net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_some());
}