Add exponential search for ancestor search (#1875)

* feat: add exponential search

* add tests

* chore: improve code
This commit is contained in:
Marcio Diaz
2019-03-15 11:15:08 +01:00
committed by GitHub
parent a57f6bbe6a
commit 23177ca8a4
2 changed files with 169 additions and 46 deletions
+91 -46
View File
@@ -27,7 +27,7 @@ use consensus::import_queue::{ImportQueue, IncomingBlock};
use client::error::Error as ClientError;
use crate::blocks::BlockCollection;
use runtime_primitives::Justification;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, NumberFor, Zero};
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, NumberFor, Zero, CheckedSub};
use runtime_primitives::generic::BlockId;
use crate::message::{self, generic::Message as GenericMessage};
use crate::config::Roles;
@@ -58,9 +58,19 @@ struct PeerSync<B: BlockT> {
pub recently_announced: VecDeque<B::Hash>,
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
enum AncestorSearchState<B: BlockT> {
/// Use exponential backoff to find an ancestor, then switch to binary search.
/// We keep track of the exponent.
ExponentialBackoff(NumberFor<B>),
/// Using binary search to find the best ancestor.
/// We keep track of left and right bounds.
BinarySearch(NumberFor<B>, NumberFor<B>),
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
enum PeerSyncState<B: BlockT> {
AncestorSearch(NumberFor<B>),
AncestorSearch(NumberFor<B>, AncestorSearchState<B>),
Available,
DownloadingNew(NumberFor<B>),
DownloadingStale(B::Hash),
@@ -138,9 +148,9 @@ impl<B: BlockT> PendingJustifications<B> {
peer_best_number >= request.1 &&
!self.previous_requests
.get(&request)
.map(|requests| requests.iter().any(|i| i.0 == peer))
.unwrap_or(false)
.get(&request)
.map(|requests| requests.iter().any(|i| i.0 == peer))
.unwrap_or(false)
};
if !peer_eligible {
@@ -228,9 +238,9 @@ impl<B: BlockT> PendingJustifications<B> {
if success {
if self.justifications.finalize_root(&request.0).is_none() {
warn!(target: "sync", "Imported justification for {:?} {:?} which isn't a root in the tree: {:?}",
request.0,
request.1,
self.justifications.roots().collect::<Vec<_>>(),
request.0,
request.1,
self.justifications.roots().collect::<Vec<_>>(),
);
return;
};
@@ -419,7 +429,8 @@ impl<B: BlockT> ChainSync<B> {
let previous_state = self.state(&previous_best_seen);
if let Some(info) = protocol.peer_info(who) {
match (block_status(&*protocol.client(), &self.queue_blocks, info.best_hash), info.best_number) {
let status = block_status(&*protocol.client(), &self.queue_blocks, info.best_hash);
match (status, info.best_number) {
(Err(e), _) => {
debug!(target:"sync", "Error reading blockchain: {:?}", e);
let reason = format!("Error legimimately reading blockchain status: {:?}", e);
@@ -453,7 +464,7 @@ impl<B: BlockT> ChainSync<B> {
common_number: As::sa(0),
best_hash: info.best_hash,
best_number: info.best_number,
state: PeerSyncState::AncestorSearch(common_best),
state: PeerSyncState::AncestorSearch(common_best, AncestorSearchState::ExponentialBackoff(As::sa(1))),
recently_announced: Default::default(),
});
Self::request_ancestry(protocol, who, common_best)
@@ -500,6 +511,44 @@ impl<B: BlockT> ChainSync<B> {
}
}
fn handle_ancestor_search_state(
state: AncestorSearchState<B>,
curr_block_num: NumberFor<B>,
block_hash_match: bool,
) -> Option<(AncestorSearchState<B>, NumberFor<B>)> {
match state {
AncestorSearchState::ExponentialBackoff(next_distance_to_tip) => {
if block_hash_match && next_distance_to_tip == As::sa(1) {
// We found the ancestor in the first step so there is no need to execute binary search.
return None;
}
if block_hash_match {
let left = curr_block_num;
let right = left + next_distance_to_tip / As::sa(2);
let middle = left + (right - left) / As::sa(2);
Some((AncestorSearchState::BinarySearch(left, right), middle))
} else {
let next_block_num = curr_block_num.checked_sub(&next_distance_to_tip).unwrap_or(As::sa(0));
let next_distance_to_tip = next_distance_to_tip * As::sa(2);
Some((AncestorSearchState::ExponentialBackoff(next_distance_to_tip), next_block_num))
}
},
AncestorSearchState::BinarySearch(mut left, mut right) => {
if left >= curr_block_num {
return None;
}
if block_hash_match {
left = curr_block_num;
} else {
right = curr_block_num;
}
assert!(right >= left);
let middle = left + (right - left) / As::sa(2);
Some((AncestorSearchState::BinarySearch(left, right), middle))
},
}
}
/// Handle new block data.
pub(crate) fn on_block_data(
&mut self,
@@ -514,7 +563,8 @@ impl<B: BlockT> ChainSync<B> {
trace!(target: "sync", "Reversing incoming block list");
blocks.reverse();
}
match peer.state {
let peer_state = peer.state.clone();
match peer_state {
PeerSyncState::DownloadingNew(start_block) => {
self.blocks.clear_peer_download(who);
peer.state = PeerSyncState::Available;
@@ -544,43 +594,38 @@ impl<B: BlockT> ChainSync<B> {
}
}).collect()
},
PeerSyncState::AncestorSearch(n) => {
match blocks.get(0) {
Some(ref block) => {
trace!(target: "sync", "Got ancestry block #{} ({}) from peer {}", n, block.hash, who);
match protocol.client().block_hash(n) {
Ok(Some(block_hash)) if block_hash == block.hash => {
if peer.common_number < n {
peer.common_number = n;
}
peer.state = PeerSyncState::Available;
trace!(target:"sync", "Found common ancestor for peer {}: {} ({})", who, block.hash, n);
vec![]
},
Ok(our_best) if n > As::sa(0) => {
trace!(target:"sync", "Ancestry block mismatch for peer {}: theirs: {} ({}), ours: {:?}", who, block.hash, n, our_best);
let n = n - As::sa(1);
peer.state = PeerSyncState::AncestorSearch(n);
Self::request_ancestry(protocol, who, n);
return;
},
Ok(_) => { // genesis mismatch
trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", who);
protocol.report_peer(who, Severity::Bad("Ancestry search: genesis mismatch for peer".to_string()));
return;
},
Err(e) => {
let reason = format!("Error answering legitimate blockchain query: {:?}", e);
protocol.report_peer(who, Severity::Useless(reason));
return;
}
}
PeerSyncState::AncestorSearch(num, state) => {
let block_hash_match = match (blocks.get(0), protocol.client().block_hash(num)) {
(Some(ref block), Ok(maybe_our_block_hash)) => {
trace!(target: "sync", "Got ancestry block #{} ({}) from peer {}", num, block.hash, who);
maybe_our_block_hash.map_or(false, |x| x == block.hash)
},
None => {
(None, _) => {
trace!(target:"sync", "Invalid response when searching for ancestor from {}", who);
protocol.report_peer(who, Severity::Bad("Invalid response when searching for ancestor".to_string()));
return;
}
},
(_, Err(e)) => {
let reason = format!("Error answering legitimate blockchain query: {:?}", e);
protocol.report_peer(who, Severity::Useless(reason));
return;
},
};
if block_hash_match && peer.common_number < num {
peer.common_number = num;
}
if !block_hash_match && num == As::sa(0) {
trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", who);
protocol.report_peer(who, Severity::Bad("Ancestry search: genesis mismatch for peer".to_string()));
return;
}
if let Some((next_state, next_block_num)) = Self::handle_ancestor_search_state(state, num, block_hash_match) {
peer.state = PeerSyncState::AncestorSearch(next_block_num, next_state);
Self::request_ancestry(protocol, who, next_block_num);
return;
} else {
peer.state = PeerSyncState::Available;
vec![]
}
},
PeerSyncState::Available | PeerSyncState::DownloadingJustification(..) => Vec::new(),
@@ -745,7 +790,7 @@ impl<B: BlockT> ChainSync<B> {
}
// Update common blocks
for (n, peer) in self.peers.iter_mut() {
if let PeerSyncState::AncestorSearch(_) = peer.state {
if let PeerSyncState::AncestorSearch(_, _) = peer.state {
// Abort search.
peer.state = PeerSyncState::Available;
}
@@ -782,7 +827,7 @@ impl<B: BlockT> ChainSync<B> {
peer.best_number = number;
peer.best_hash = hash;
}
if let PeerSyncState::AncestorSearch(_) = peer.state {
if let PeerSyncState::AncestorSearch(_, _) = peer.state {
return;
}
if header.parent_hash() == &self.best_queued_hash || known_parent {
+78
View File
@@ -24,6 +24,24 @@ use std::thread;
use std::time::Duration;
use super::*;
fn test_ancestor_search_when_common_is(n: usize) {
let _ = ::env_logger::try_init();
let mut net = TestNet::new(3);
net.peer(0).push_blocks(n, false);
net.peer(1).push_blocks(n, false);
net.peer(2).push_blocks(n, false);
net.peer(0).push_blocks(10, true);
net.peer(1).push_blocks(100, false);
net.peer(2).push_blocks(100, false);
net.restart_peer(0);
net.sync();
assert!(net.peer(0).client.backend().as_in_memory().blockchain()
.canon_equals_to(net.peer(1).client.backend().as_in_memory().blockchain()));
}
#[test]
fn sync_peers_works() {
let _ = ::env_logger::try_init();
@@ -140,6 +158,66 @@ fn sync_from_two_peers_with_ancestry_search_works() {
.canon_equals_to(net.peer(1).client.backend().as_in_memory().blockchain()));
}
#[test]
fn ancestry_search_works_when_common_is_hundred() {
let _ = ::env_logger::try_init();
let mut net = TestNet::new(3);
net.peer(0).push_blocks(100, false);
net.peer(1).push_blocks(100, false);
net.peer(2).push_blocks(100, false);
net.peer(0).push_blocks(10, true);
net.peer(1).push_blocks(100, false);
net.peer(2).push_blocks(100, false);
net.restart_peer(0);
net.sync();
assert!(net.peer(0).client.backend().as_in_memory().blockchain()
.canon_equals_to(net.peer(1).client.backend().as_in_memory().blockchain()));
}
#[test]
fn ancestry_search_works_when_backoff_is_one() {
let _ = ::env_logger::try_init();
let mut net = TestNet::new(3);
net.peer(0).push_blocks(1, false);
net.peer(1).push_blocks(2, false);
net.peer(2).push_blocks(2, false);
net.restart_peer(0);
net.sync();
assert!(net.peer(0).client.backend().as_in_memory().blockchain()
.canon_equals_to(net.peer(1).client.backend().as_in_memory().blockchain()));
}
#[test]
fn ancestry_search_works_when_ancestor_is_genesis() {
let _ = ::env_logger::try_init();
let mut net = TestNet::new(3);
net.peer(0).push_blocks(13, true);
net.peer(1).push_blocks(100, false);
net.peer(2).push_blocks(100, false);
net.restart_peer(0);
net.sync();
assert!(net.peer(0).client.backend().as_in_memory().blockchain()
.canon_equals_to(net.peer(1).client.backend().as_in_memory().blockchain()));
}
#[test]
fn ancestry_search_works_when_common_is_one() {
test_ancestor_search_when_common_is(1);
}
#[test]
fn ancestry_search_works_when_common_is_two() {
test_ancestor_search_when_common_is(2);
}
#[test]
fn sync_long_chain_works() {
let mut net = TestNet::new(2);