Minor sync refactoring (#2767)

* Make maintain_sync private

* Remove sync::Context::peer_info

* Print errors if sync state mismatch

* Line width
This commit is contained in:
Pierre Krieger
2019-06-05 09:15:54 +02:00
committed by Gavin Wood
parent 4d9efbc1d5
commit 9ad9f7eebd
2 changed files with 170 additions and 150 deletions
+7 -11
View File
@@ -119,7 +119,7 @@ pub struct ProtocolStatus<B: BlockT> {
}
/// Peer information
#[derive(Debug)]
#[derive(Debug, Clone)]
struct Peer<B: BlockT, H: ExHashT> {
info: PeerInfo<B>,
/// Current block request, if any.
@@ -342,10 +342,6 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> SyncContext<B> for ProtocolContext<'a,
self.network_out.disconnect_peer(who)
}
fn peer_info(&self, who: &PeerId) -> Option<PeerInfo<B>> {
self.context_data.peers.get(who).map(|p| p.info.clone())
}
fn client(&self) -> &dyn Client<B> {
&*self.context_data.chain
}
@@ -906,9 +902,10 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
status.version
};
let info = self.context_data.peers.get(&who).expect("We just inserted above; QED").info.clone();
self.on_demand_core.on_connect(&mut network_out, who.clone(), status.roles, status.best_number);
let mut context = ProtocolContext::new(&mut self.context_data, network_out);
self.sync.new_peer(&mut context, who.clone());
self.sync.new_peer(&mut context, who.clone(), info);
if protocol_version > 2 {
self.consensus_gossip.new_peer(&mut context, who.clone(), status.roles);
}
@@ -1188,16 +1185,15 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
processed_blocks: Vec<B::Hash>,
has_error: bool
) {
self.sync.blocks_processed(processed_blocks, has_error);
let mut context =
ProtocolContext::new(&mut self.context_data, network_out);
self.sync.maintain_sync(&mut context);
let mut context = ProtocolContext::new(&mut self.context_data, network_out);
self.sync.blocks_processed(&mut context, processed_blocks, has_error);
}
/// Restart the sync process.
pub fn restart(&mut self, network_out: &mut dyn NetworkOut<B>) {
let peers = self.context_data.peers.clone();
let mut context = ProtocolContext::new(&mut self.context_data, network_out);
self.sync.restart(&mut context);
self.sync.restart(&mut context, |peer_id| peers.get(peer_id).map(|i| i.info.clone()));
}
/// Notify about successful import of the given block.
+163 -139
View File
@@ -33,7 +33,7 @@
use std::cmp::max;
use std::ops::Range;
use std::collections::{HashMap, VecDeque};
use log::{debug, trace, warn, info};
use log::{debug, trace, warn, info, error};
use crate::protocol::PeerInfo as ProtocolPeerInfo;
use network_libp2p::PeerId;
use client::{BlockStatus, ClientInfo};
@@ -81,9 +81,6 @@ pub trait Context<B: BlockT> {
/// Force disconnecting from a peer. Use this when a peer misbehaved.
fn disconnect_peer(&mut self, who: PeerId);
/// Get peer info.
fn peer_info(&self, peer: &PeerId) -> Option<ProtocolPeerInfo<B>>;
/// Request a finality proof from a peer.
fn send_finality_proof_request(&mut self, who: PeerId, request: message::FinalityProofRequest<B::Hash>);
@@ -237,82 +234,90 @@ impl<B: BlockT> ChainSync<B> {
}
/// Handle new connected peer. Call this method whenever we connect to a new peer.
pub(crate) fn new_peer(&mut self, protocol: &mut dyn Context<B>, who: PeerId) {
if let Some(info) = protocol.peer_info(&who) {
// there's nothing sync can get from the node that has no blockchain data
// (the opposite is not true, but all requests are served at protocol level)
if !info.roles.is_full() {
return;
}
pub(crate) fn new_peer(
&mut self,
protocol: &mut dyn Context<B>,
who: PeerId,
info: ProtocolPeerInfo<B>
) {
// there's nothing sync can get from the node that has no blockchain data
// (the opposite is not true, but all requests are served at protocol level)
if !info.roles.is_full() {
return;
}
let status = block_status(&*protocol.client(), &self.queue_blocks, info.best_hash);
match (status, info.best_number) {
(Err(e), _) => {
debug!(target:"sync", "Error reading blockchain: {:?}", e);
protocol.report_peer(who.clone(), BLOCKCHAIN_STATUS_READ_ERROR_REPUTATION_CHANGE);
protocol.disconnect_peer(who);
},
(Ok(BlockStatus::KnownBad), _) => {
info!("New peer with known bad best block {} ({}).", info.best_hash, info.best_number);
protocol.report_peer(who.clone(), i32::min_value());
protocol.disconnect_peer(who);
},
(Ok(BlockStatus::Unknown), b) if b.is_zero() => {
info!("New peer with unknown genesis hash {} ({}).", info.best_hash, info.best_number);
protocol.report_peer(who.clone(), i32::min_value());
protocol.disconnect_peer(who);
},
(Ok(BlockStatus::Unknown), _) if self.queue_blocks.len() > MAJOR_SYNC_BLOCKS => {
// when actively syncing the common point moves too fast.
debug!(target:"sync", "New peer with unknown best hash {} ({}), assuming common block.", self.best_queued_hash, self.best_queued_number);
self.peers.insert(who, PeerSync {
common_number: self.best_queued_number,
best_hash: info.best_hash,
best_number: info.best_number,
state: PeerSyncState::Available,
recently_announced: Default::default(),
});
}
(Ok(BlockStatus::Unknown), _) => {
let our_best = self.best_queued_number;
if our_best.is_zero() {
// We are at genesis, just start downloading
debug!(target:"sync", "New peer with best hash {} ({}).", info.best_hash, info.best_number);
self.peers.insert(who.clone(), PeerSync {
common_number: Zero::zero(),
best_hash: info.best_hash,
best_number: info.best_number,
state: PeerSyncState::Available,
recently_announced: Default::default(),
});
self.download_new(protocol, who)
} else {
let common_best = ::std::cmp::min(our_best, info.best_number);
debug!(target:"sync",
"New peer with unknown best hash {} ({}), searching for common ancestor.",
info.best_hash,
info.best_number
);
self.peers.insert(who.clone(), PeerSync {
common_number: Zero::zero(),
best_hash: info.best_hash,
best_number: info.best_number,
state: PeerSyncState::AncestorSearch(common_best, AncestorSearchState::ExponentialBackoff(One::one())),
recently_announced: Default::default(),
});
Self::request_ancestry(protocol, who, common_best)
}
},
(Ok(BlockStatus::Queued), _) | (Ok(BlockStatus::InChainWithState), _) | (Ok(BlockStatus::InChainPruned), _) => {
debug!(target:"sync", "New peer with known best hash {} ({}).", info.best_hash, info.best_number);
let status = block_status(&*protocol.client(), &self.queue_blocks, info.best_hash);
match (status, info.best_number) {
(Err(e), _) => {
debug!(target:"sync", "Error reading blockchain: {:?}", e);
protocol.report_peer(who.clone(), BLOCKCHAIN_STATUS_READ_ERROR_REPUTATION_CHANGE);
protocol.disconnect_peer(who);
},
(Ok(BlockStatus::KnownBad), _) => {
info!("New peer with known bad best block {} ({}).", info.best_hash, info.best_number);
protocol.report_peer(who.clone(), i32::min_value());
protocol.disconnect_peer(who);
},
(Ok(BlockStatus::Unknown), b) if b.is_zero() => {
info!("New peer with unknown genesis hash {} ({}).", info.best_hash, info.best_number);
protocol.report_peer(who.clone(), i32::min_value());
protocol.disconnect_peer(who);
},
(Ok(BlockStatus::Unknown), _) if self.queue_blocks.len() > MAJOR_SYNC_BLOCKS => {
// when actively syncing the common point moves too fast.
debug!(
target:"sync",
"New peer with unknown best hash {} ({}), assuming common block.",
self.best_queued_hash,
self.best_queued_number
);
self.peers.insert(who, PeerSync {
common_number: self.best_queued_number,
best_hash: info.best_hash,
best_number: info.best_number,
state: PeerSyncState::Available,
recently_announced: Default::default(),
});
}
(Ok(BlockStatus::Unknown), _) => {
let our_best = self.best_queued_number;
if our_best.is_zero() {
// We are at genesis, just start downloading
debug!(target:"sync", "New peer with best hash {} ({}).", info.best_hash, info.best_number);
self.peers.insert(who.clone(), PeerSync {
common_number: info.best_number,
common_number: Zero::zero(),
best_hash: info.best_hash,
best_number: info.best_number,
state: PeerSyncState::Available,
recently_announced: Default::default(),
});
self.download_new(protocol, who)
} else {
let common_best = ::std::cmp::min(our_best, info.best_number);
debug!(target:"sync",
"New peer with unknown best hash {} ({}), searching for common ancestor.",
info.best_hash,
info.best_number
);
self.peers.insert(who.clone(), PeerSync {
common_number: Zero::zero(),
best_hash: info.best_hash,
best_number: info.best_number,
state: PeerSyncState::AncestorSearch(common_best, AncestorSearchState::ExponentialBackoff(One::one())),
recently_announced: Default::default(),
});
Self::request_ancestry(protocol, who, common_best)
}
},
(Ok(BlockStatus::Queued), _) | (Ok(BlockStatus::InChainWithState), _) | (Ok(BlockStatus::InChainPruned), _) => {
debug!(target:"sync", "New peer with known best hash {} ({}).", info.best_hash, info.best_number);
self.peers.insert(who.clone(), PeerSync {
common_number: info.best_number,
best_hash: info.best_hash,
best_number: info.best_number,
state: PeerSyncState::Available,
recently_announced: Default::default(),
});
}
}
}
@@ -488,36 +493,41 @@ impl<B: BlockT> ChainSync<B> {
_request: message::BlockRequest<B>,
response: message::BlockResponse<B>,
) -> Option<(PeerId, B::Hash, NumberFor<B>, Justification)> {
if let Some(ref mut peer) = self.peers.get_mut(&who) {
if let PeerSyncState::DownloadingJustification(hash) = peer.state {
peer.state = PeerSyncState::Available;
let peer = if let Some(peer) = self.peers.get_mut(&who) {
peer
} else {
error!(target: "sync", "Called on_block_justification_data with a bad peer ID");
return None;
};
// we only request one justification at a time
match response.blocks.into_iter().next() {
Some(response) => {
if hash != response.hash {
info!("Invalid block justification provided by {}: requested: {:?} got: {:?}",
who, hash, response.hash);
protocol.report_peer(who.clone(), i32::min_value());
protocol.disconnect_peer(who);
return None;
}
if let PeerSyncState::DownloadingJustification(hash) = peer.state {
peer.state = PeerSyncState::Available;
return self.extra_requests.justifications().on_response(
who,
response.justification,
);
},
None => {
// we might have asked the peer for a justification on a block that we thought it had
// (regardless of whether it had a justification for it or not).
trace!(target: "sync", "Peer {:?} provided empty response for justification request {:?}",
who,
hash,
);
// we only request one justification at a time
match response.blocks.into_iter().next() {
Some(response) => {
if hash != response.hash {
info!("Invalid block justification provided by {}: requested: {:?} got: {:?}",
who, hash, response.hash);
protocol.report_peer(who.clone(), i32::min_value());
protocol.disconnect_peer(who);
return None;
},
}
}
return self.extra_requests.justifications().on_response(
who,
response.justification,
);
},
None => {
// we might have asked the peer for a justification on a block that we thought it had
// (regardless of whether it had a justification for it or not).
trace!(target: "sync", "Peer {:?} provided empty response for justification request {:?}",
who,
hash,
);
return None;
},
}
}
@@ -532,28 +542,33 @@ impl<B: BlockT> ChainSync<B> {
who: PeerId,
response: message::FinalityProofResponse<B::Hash>,
) -> Option<(PeerId, B::Hash, NumberFor<B>, Vec<u8>)> {
if let Some(ref mut peer) = self.peers.get_mut(&who) {
if let PeerSyncState::DownloadingFinalityProof(hash) = peer.state {
peer.state = PeerSyncState::Available;
let peer = if let Some(peer) = self.peers.get_mut(&who) {
peer
} else {
error!(target: "sync", "Called on_block_finality_proof_data with a bad peer ID");
return None;
};
// we only request one finality proof at a time
if hash != response.block {
info!(
"Invalid block finality proof provided: requested: {:?} got: {:?}",
hash,
response.block,
);
if let PeerSyncState::DownloadingFinalityProof(hash) = peer.state {
peer.state = PeerSyncState::Available;
protocol.report_peer(who.clone(), i32::min_value());
protocol.disconnect_peer(who);
return None;
}
return self.extra_requests.finality_proofs().on_response(
who,
response.proof,
// we only request one finality proof at a time
if hash != response.block {
info!(
"Invalid block finality proof provided: requested: {:?} got: {:?}",
hash,
response.block,
);
protocol.report_peer(who.clone(), i32::min_value());
protocol.disconnect_peer(who);
return None;
}
return self.extra_requests.finality_proofs().on_response(
who,
response.proof,
);
}
self.maintain_sync(protocol);
@@ -563,17 +578,18 @@ impl<B: BlockT> ChainSync<B> {
/// A batch of blocks have been processed, with or without errors.
/// Call this when a batch of blocks have been processed by the import queue, with or without
/// errors.
pub fn blocks_processed(&mut self, processed_blocks: Vec<B::Hash>, has_error: bool) {
pub fn blocks_processed(&mut self, protocol: &mut Context<B>, processed_blocks: Vec<B::Hash>, has_error: bool) {
for hash in processed_blocks {
self.queue_blocks.remove(&hash);
}
if has_error {
self.best_importing_number = Zero::zero();
}
self.maintain_sync(protocol)
}
/// Maintain the sync process (download new blocks, fetch justifications).
pub fn maintain_sync(&mut self, protocol: &mut dyn Context<B>) {
fn maintain_sync(&mut self, protocol: &mut dyn Context<B>) {
let peers: Vec<PeerId> = self.peers.keys().map(|p| p.clone()).collect();
for peer in peers {
self.download_new(protocol, peer);
@@ -716,26 +732,28 @@ impl<B: BlockT> ChainSync<B> {
let ancient_parent = parent_status == BlockStatus::InChainPruned;
let known = self.is_known(protocol, &hash);
if let Some(ref mut peer) = self.peers.get_mut(&who) {
while peer.recently_announced.len() >= ANNOUNCE_HISTORY_SIZE {
peer.recently_announced.pop_front();
}
peer.recently_announced.push_back(hash.clone());
if number > peer.best_number {
// update their best block
peer.best_number = number;
peer.best_hash = hash;
}
if let PeerSyncState::AncestorSearch(_, _) = peer.state {
return false;
}
if header.parent_hash() == &self.best_queued_hash || known_parent {
peer.common_number = number - One::one();
} else if known {
peer.common_number = number
}
let peer = if let Some(peer) = self.peers.get_mut(&who) {
peer
} else {
error!(target: "sync", "Called on_block_announce with a bad peer ID");
return false;
};
while peer.recently_announced.len() >= ANNOUNCE_HISTORY_SIZE {
peer.recently_announced.pop_front();
}
peer.recently_announced.push_back(hash.clone());
if number > peer.best_number {
// update their best block
peer.best_number = number;
peer.best_hash = hash;
}
if let PeerSyncState::AncestorSearch(_, _) = peer.state {
return false;
}
if header.parent_hash() == &self.best_queued_hash || known_parent {
peer.common_number = number - One::one();
} else if known {
peer.common_number = number
}
// known block case
@@ -825,7 +843,11 @@ impl<B: BlockT> ChainSync<B> {
}
/// Restart the sync process.
pub(crate) fn restart(&mut self, protocol: &mut dyn Context<B>) {
pub(crate) fn restart(
&mut self,
protocol: &mut dyn Context<B>,
mut peer_info: impl FnMut(&PeerId) -> Option<ProtocolPeerInfo<B>>
) {
self.queue_blocks.clear();
self.best_importing_number = Zero::zero();
self.blocks.clear();
@@ -835,7 +857,9 @@ impl<B: BlockT> ChainSync<B> {
debug!(target:"sync", "Restarted with {} ({})", self.best_queued_number, self.best_queued_hash);
let ids: Vec<PeerId> = self.peers.drain().map(|(id, _)| id).collect();
for id in ids {
self.new_peer(protocol, id);
if let Some(info) = peer_info(&id) {
self.new_peer(protocol, id, info);
}
}
}