mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-27 17:28:00 +00:00
Get rid of NetworkService in ChainSync (#2143)
Move peer banning from `ChainSync` to `SyncingEngine`.
This commit is contained in:
@@ -32,7 +32,6 @@ use crate::{
|
||||
blocks::BlockCollection,
|
||||
extra_requests::ExtraRequests,
|
||||
schema::v1::StateResponse,
|
||||
service::network::NetworkServiceHandle,
|
||||
state::{ImportResult, StateSync},
|
||||
types::{
|
||||
BadPeer, Metrics, OpaqueStateRequest, OpaqueStateResponse, PeerInfo, SyncMode, SyncState,
|
||||
@@ -50,7 +49,6 @@ use log::{debug, error, info, trace, warn};
|
||||
|
||||
use sc_client_api::{BlockBackend, ProofProvider};
|
||||
use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock};
|
||||
use sc_network::types::ProtocolName;
|
||||
use sc_network_common::sync::message::{
|
||||
BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, Direction, FromBlock,
|
||||
};
|
||||
@@ -212,7 +210,7 @@ pub struct ImportJustificationsAction<B: BlockT> {
|
||||
|
||||
/// Result of [`ChainSync::on_block_data`].
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum OnBlockData<Block: BlockT> {
|
||||
enum OnBlockData<Block: BlockT> {
|
||||
/// The block should be imported.
|
||||
Import(ImportBlocksAction<Block>),
|
||||
/// A new block request needs to be made to the given peer.
|
||||
@@ -223,7 +221,7 @@ pub enum OnBlockData<Block: BlockT> {
|
||||
|
||||
/// Result of [`ChainSync::on_block_justification`].
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum OnBlockJustification<Block: BlockT> {
|
||||
enum OnBlockJustification<Block: BlockT> {
|
||||
/// The justification needs no further handling.
|
||||
Nothing,
|
||||
/// The justification should be imported.
|
||||
@@ -237,7 +235,7 @@ pub enum OnBlockJustification<Block: BlockT> {
|
||||
|
||||
// Result of [`ChainSync::on_state_data`].
|
||||
#[derive(Debug)]
|
||||
pub enum OnStateData<Block: BlockT> {
|
||||
enum OnStateData<Block: BlockT> {
|
||||
/// The block and state that should be imported.
|
||||
Import(BlockOrigin, IncomingBlock<Block>),
|
||||
/// A new state request needs to be made to the given peer.
|
||||
@@ -247,7 +245,7 @@ pub enum OnStateData<Block: BlockT> {
|
||||
/// Action that the parent of [`ChainSync`] should perform after reporting block response with
|
||||
/// [`ChainSync::on_block_response`].
|
||||
pub enum OnBlockResponse<B: BlockT> {
|
||||
/// Nothing to do
|
||||
/// Nothing to do.
|
||||
Nothing,
|
||||
/// Perform block request.
|
||||
SendBlockRequest { peer_id: PeerId, request: BlockRequest<B> },
|
||||
@@ -255,6 +253,19 @@ pub enum OnBlockResponse<B: BlockT> {
|
||||
ImportBlocks(ImportBlocksAction<B>),
|
||||
/// Import justifications.
|
||||
ImportJustifications(ImportJustificationsAction<B>),
|
||||
/// Invalid block response, the peer should be disconnected and reported.
|
||||
DisconnectPeer(BadPeer),
|
||||
}
|
||||
|
||||
/// Action that the parent of [`ChainSync`] should perform after reporting state response with
|
||||
/// [`ChainSync::on_state_response`].
|
||||
pub enum OnStateResponse<B: BlockT> {
|
||||
/// Nothing to do.
|
||||
Nothing,
|
||||
/// Import blocks.
|
||||
ImportBlocks(ImportBlocksAction<B>),
|
||||
/// Invalid state response, the peer should be disconnected and reported.
|
||||
DisconnectPeer(BadPeer),
|
||||
}
|
||||
|
||||
/// The main data structure which contains all the state for a chains
|
||||
@@ -302,10 +313,6 @@ pub struct ChainSync<B: BlockT, Client> {
|
||||
import_existing: bool,
|
||||
/// Gap download process.
|
||||
gap_sync: Option<GapSync<B>>,
|
||||
/// Handle for communicating with `NetworkService`
|
||||
network_service: NetworkServiceHandle,
|
||||
/// Protocol name used for block announcements
|
||||
block_announce_protocol_name: ProtocolName,
|
||||
}
|
||||
|
||||
/// All the data we have about a Peer that we are trying to sync with
|
||||
@@ -396,11 +403,9 @@ where
|
||||
pub fn new(
|
||||
mode: SyncMode,
|
||||
client: Arc<Client>,
|
||||
block_announce_protocol_name: ProtocolName,
|
||||
max_parallel_downloads: u32,
|
||||
max_blocks_per_request: u32,
|
||||
warp_sync_config: Option<WarpSyncConfig<B>>,
|
||||
network_service: NetworkServiceHandle,
|
||||
) -> Result<Self, ClientError> {
|
||||
let mut sync = Self {
|
||||
client,
|
||||
@@ -420,10 +425,8 @@ where
|
||||
warp_sync: None,
|
||||
import_existing: false,
|
||||
gap_sync: None,
|
||||
network_service,
|
||||
warp_sync_config,
|
||||
warp_sync_target_block_header: None,
|
||||
block_announce_protocol_name,
|
||||
};
|
||||
|
||||
sync.reset_sync_start_point()?;
|
||||
@@ -431,9 +434,9 @@ where
|
||||
}
|
||||
|
||||
/// Get peer's best hash & number.
|
||||
pub fn peer_info(&self, who: &PeerId) -> Option<PeerInfo<B>> {
|
||||
pub fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo<B>> {
|
||||
self.peers
|
||||
.get(who)
|
||||
.get(peer_id)
|
||||
.map(|p| PeerInfo { best_hash: p.best_hash, best_number: p.best_number })
|
||||
}
|
||||
|
||||
@@ -509,7 +512,7 @@ where
|
||||
#[must_use]
|
||||
pub fn new_peer(
|
||||
&mut self,
|
||||
who: PeerId,
|
||||
peer_id: PeerId,
|
||||
best_hash: B::Hash,
|
||||
best_number: NumberFor<B>,
|
||||
) -> Result<Option<BlockRequest<B>>, BadPeer> {
|
||||
@@ -517,19 +520,21 @@ where
|
||||
match self.block_status(&best_hash) {
|
||||
Err(e) => {
|
||||
debug!(target:LOG_TARGET, "Error reading blockchain: {e}");
|
||||
Err(BadPeer(who, rep::BLOCKCHAIN_READ_ERROR))
|
||||
Err(BadPeer(peer_id, rep::BLOCKCHAIN_READ_ERROR))
|
||||
},
|
||||
Ok(BlockStatus::KnownBad) => {
|
||||
info!("💔 New peer {who} with known bad best block {best_hash} ({best_number}).");
|
||||
Err(BadPeer(who, rep::BAD_BLOCK))
|
||||
info!(
|
||||
"💔 New peer {peer_id} with known bad best block {best_hash} ({best_number})."
|
||||
);
|
||||
Err(BadPeer(peer_id, rep::BAD_BLOCK))
|
||||
},
|
||||
Ok(BlockStatus::Unknown) => {
|
||||
if best_number.is_zero() {
|
||||
info!(
|
||||
"💔 New peer {} with unknown genesis hash {} ({}).",
|
||||
who, best_hash, best_number,
|
||||
peer_id, best_hash, best_number,
|
||||
);
|
||||
return Err(BadPeer(who, rep::GENESIS_MISMATCH))
|
||||
return Err(BadPeer(peer_id, rep::GENESIS_MISMATCH))
|
||||
}
|
||||
|
||||
// If there are more than `MAJOR_SYNC_BLOCKS` in the import queue then we have
|
||||
@@ -539,14 +544,14 @@ where
|
||||
debug!(
|
||||
target:LOG_TARGET,
|
||||
"New peer {} with unknown best hash {} ({}), assuming common block.",
|
||||
who,
|
||||
peer_id,
|
||||
self.best_queued_hash,
|
||||
self.best_queued_number
|
||||
);
|
||||
self.peers.insert(
|
||||
who,
|
||||
peer_id,
|
||||
PeerSync {
|
||||
peer_id: who,
|
||||
peer_id,
|
||||
common_number: self.best_queued_number,
|
||||
best_hash,
|
||||
best_number,
|
||||
@@ -560,7 +565,7 @@ where
|
||||
let (state, req) = if self.best_queued_number.is_zero() {
|
||||
debug!(
|
||||
target:LOG_TARGET,
|
||||
"New peer {who} with best hash {best_hash} ({best_number}).",
|
||||
"New peer {peer_id} with best hash {best_hash} ({best_number}).",
|
||||
);
|
||||
|
||||
(PeerSyncState::Available, None)
|
||||
@@ -570,7 +575,7 @@ where
|
||||
debug!(
|
||||
target:LOG_TARGET,
|
||||
"New peer {} with unknown best hash {} ({}), searching for common ancestor.",
|
||||
who,
|
||||
peer_id,
|
||||
best_hash,
|
||||
best_number
|
||||
);
|
||||
@@ -585,11 +590,11 @@ where
|
||||
)
|
||||
};
|
||||
|
||||
self.allowed_requests.add(&who);
|
||||
self.allowed_requests.add(&peer_id);
|
||||
self.peers.insert(
|
||||
who,
|
||||
peer_id,
|
||||
PeerSync {
|
||||
peer_id: who,
|
||||
peer_id,
|
||||
common_number: Zero::zero(),
|
||||
best_hash,
|
||||
best_number,
|
||||
@@ -618,19 +623,19 @@ where
|
||||
Ok(BlockStatus::InChainPruned) => {
|
||||
debug!(
|
||||
target: LOG_TARGET,
|
||||
"New peer {who} with known best hash {best_hash} ({best_number}).",
|
||||
"New peer {peer_id} with known best hash {best_hash} ({best_number}).",
|
||||
);
|
||||
self.peers.insert(
|
||||
who,
|
||||
peer_id,
|
||||
PeerSync {
|
||||
peer_id: who,
|
||||
peer_id,
|
||||
common_number: std::cmp::min(self.best_queued_number, best_number),
|
||||
best_hash,
|
||||
best_number,
|
||||
state: PeerSyncState::Available,
|
||||
},
|
||||
);
|
||||
self.allowed_requests.add(&who);
|
||||
self.allowed_requests.add(&peer_id);
|
||||
Ok(None)
|
||||
},
|
||||
}
|
||||
@@ -717,41 +722,41 @@ where
|
||||
|
||||
/// Submit a block response for processing.
|
||||
#[must_use]
|
||||
pub fn on_block_data(
|
||||
fn on_block_data(
|
||||
&mut self,
|
||||
who: &PeerId,
|
||||
peer_id: &PeerId,
|
||||
request: Option<BlockRequest<B>>,
|
||||
response: BlockResponse<B>,
|
||||
) -> Result<OnBlockData<B>, BadPeer> {
|
||||
self.downloaded_blocks += response.blocks.len();
|
||||
let mut gap = false;
|
||||
let new_blocks: Vec<IncomingBlock<B>> = if let Some(peer) = self.peers.get_mut(who) {
|
||||
let new_blocks: Vec<IncomingBlock<B>> = if let Some(peer) = self.peers.get_mut(peer_id) {
|
||||
let mut blocks = response.blocks;
|
||||
if request.as_ref().map_or(false, |r| r.direction == Direction::Descending) {
|
||||
trace!(target: LOG_TARGET, "Reversing incoming block list");
|
||||
blocks.reverse()
|
||||
}
|
||||
self.allowed_requests.add(who);
|
||||
self.allowed_requests.add(peer_id);
|
||||
if let Some(request) = request {
|
||||
match &mut peer.state {
|
||||
PeerSyncState::DownloadingNew(_) => {
|
||||
self.blocks.clear_peer_download(who);
|
||||
self.blocks.clear_peer_download(peer_id);
|
||||
peer.state = PeerSyncState::Available;
|
||||
if let Some(start_block) =
|
||||
validate_blocks::<B>(&blocks, who, Some(request))?
|
||||
validate_blocks::<B>(&blocks, peer_id, Some(request))?
|
||||
{
|
||||
self.blocks.insert(start_block, blocks, *who);
|
||||
self.blocks.insert(start_block, blocks, *peer_id);
|
||||
}
|
||||
self.ready_blocks()
|
||||
},
|
||||
PeerSyncState::DownloadingGap(_) => {
|
||||
peer.state = PeerSyncState::Available;
|
||||
if let Some(gap_sync) = &mut self.gap_sync {
|
||||
gap_sync.blocks.clear_peer_download(who);
|
||||
gap_sync.blocks.clear_peer_download(peer_id);
|
||||
if let Some(start_block) =
|
||||
validate_blocks::<B>(&blocks, who, Some(request))?
|
||||
validate_blocks::<B>(&blocks, peer_id, Some(request))?
|
||||
{
|
||||
gap_sync.blocks.insert(start_block, blocks, *who);
|
||||
gap_sync.blocks.insert(start_block, blocks, *peer_id);
|
||||
}
|
||||
gap = true;
|
||||
let blocks: Vec<_> = gap_sync
|
||||
@@ -787,17 +792,17 @@ where
|
||||
);
|
||||
blocks
|
||||
} else {
|
||||
debug!(target: LOG_TARGET, "Unexpected gap block response from {who}");
|
||||
return Err(BadPeer(*who, rep::NO_BLOCK))
|
||||
debug!(target: LOG_TARGET, "Unexpected gap block response from {peer_id}");
|
||||
return Err(BadPeer(*peer_id, rep::NO_BLOCK))
|
||||
}
|
||||
},
|
||||
PeerSyncState::DownloadingStale(_) => {
|
||||
peer.state = PeerSyncState::Available;
|
||||
if blocks.is_empty() {
|
||||
debug!(target: LOG_TARGET, "Empty block response from {who}");
|
||||
return Err(BadPeer(*who, rep::NO_BLOCK))
|
||||
debug!(target: LOG_TARGET, "Empty block response from {peer_id}");
|
||||
return Err(BadPeer(*peer_id, rep::NO_BLOCK))
|
||||
}
|
||||
validate_blocks::<B>(&blocks, who, Some(request))?;
|
||||
validate_blocks::<B>(&blocks, peer_id, Some(request))?;
|
||||
blocks
|
||||
.into_iter()
|
||||
.map(|b| {
|
||||
@@ -810,7 +815,7 @@ where
|
||||
body: b.body,
|
||||
indexed_body: None,
|
||||
justifications,
|
||||
origin: Some(*who),
|
||||
origin: Some(*peer_id),
|
||||
allow_missing_state: true,
|
||||
import_existing: self.import_existing,
|
||||
skip_execution: self.skip_execution(),
|
||||
@@ -827,23 +832,23 @@ where
|
||||
"Got ancestry block #{} ({}) from peer {}",
|
||||
current,
|
||||
block.hash,
|
||||
who,
|
||||
peer_id,
|
||||
);
|
||||
maybe_our_block_hash.filter(|x| x == &block.hash)
|
||||
},
|
||||
(None, _) => {
|
||||
debug!(
|
||||
target: LOG_TARGET,
|
||||
"Invalid response when searching for ancestor from {who}",
|
||||
"Invalid response when searching for ancestor from {peer_id}",
|
||||
);
|
||||
return Err(BadPeer(*who, rep::UNKNOWN_ANCESTOR))
|
||||
return Err(BadPeer(*peer_id, rep::UNKNOWN_ANCESTOR))
|
||||
},
|
||||
(_, Err(e)) => {
|
||||
info!(
|
||||
target: LOG_TARGET,
|
||||
"❌ Error answering legitimate blockchain query: {e}",
|
||||
);
|
||||
return Err(BadPeer(*who, rep::BLOCKCHAIN_READ_ERROR))
|
||||
return Err(BadPeer(*peer_id, rep::BLOCKCHAIN_READ_ERROR))
|
||||
},
|
||||
};
|
||||
if matching_hash.is_some() {
|
||||
@@ -856,7 +861,7 @@ where
|
||||
trace!(
|
||||
target: LOG_TARGET,
|
||||
"Ancestry search: opportunistically updating peer {} common number from={} => to={}.",
|
||||
*who,
|
||||
*peer_id,
|
||||
peer.common_number,
|
||||
self.best_queued_number,
|
||||
);
|
||||
@@ -865,7 +870,7 @@ where
|
||||
trace!(
|
||||
target: LOG_TARGET,
|
||||
"Ancestry search: updating peer {} common number from={} => to={}.",
|
||||
*who,
|
||||
*peer_id,
|
||||
peer.common_number,
|
||||
*current,
|
||||
);
|
||||
@@ -875,9 +880,9 @@ where
|
||||
if matching_hash.is_none() && current.is_zero() {
|
||||
trace!(
|
||||
target:LOG_TARGET,
|
||||
"Ancestry search: genesis mismatch for peer {who}",
|
||||
"Ancestry search: genesis mismatch for peer {peer_id}",
|
||||
);
|
||||
return Err(BadPeer(*who, rep::GENESIS_MISMATCH))
|
||||
return Err(BadPeer(*peer_id, rep::GENESIS_MISMATCH))
|
||||
}
|
||||
if let Some((next_state, next_num)) =
|
||||
handle_ancestor_search_state(state, *current, matching_hash.is_some())
|
||||
@@ -887,7 +892,10 @@ where
|
||||
start: *start,
|
||||
state: next_state,
|
||||
};
|
||||
return Ok(OnBlockData::Request(*who, ancestry_request::<B>(next_num)))
|
||||
return Ok(OnBlockData::Request(
|
||||
*peer_id,
|
||||
ancestry_request::<B>(next_num),
|
||||
))
|
||||
} else {
|
||||
// Ancestry search is complete. Check if peer is on a stale fork unknown
|
||||
// to us and add it to sync targets if necessary.
|
||||
@@ -908,7 +916,7 @@ where
|
||||
target: LOG_TARGET,
|
||||
"Added fork target {} for {}",
|
||||
peer.best_hash,
|
||||
who,
|
||||
peer_id,
|
||||
);
|
||||
self.fork_targets
|
||||
.entry(peer.best_hash)
|
||||
@@ -918,7 +926,7 @@ where
|
||||
peers: Default::default(),
|
||||
})
|
||||
.peers
|
||||
.insert(*who);
|
||||
.insert(*peer_id);
|
||||
}
|
||||
peer.state = PeerSyncState::Available;
|
||||
Vec::new()
|
||||
@@ -928,32 +936,32 @@ where
|
||||
peer.state = PeerSyncState::Available;
|
||||
if let Some(warp_sync) = &mut self.warp_sync {
|
||||
if blocks.len() == 1 {
|
||||
validate_blocks::<B>(&blocks, who, Some(request))?;
|
||||
validate_blocks::<B>(&blocks, peer_id, Some(request))?;
|
||||
match warp_sync.import_target_block(
|
||||
blocks.pop().expect("`blocks` len checked above."),
|
||||
) {
|
||||
warp::TargetBlockImportResult::Success =>
|
||||
return Ok(OnBlockData::Continue),
|
||||
warp::TargetBlockImportResult::BadResponse =>
|
||||
return Err(BadPeer(*who, rep::VERIFICATION_FAIL)),
|
||||
return Err(BadPeer(*peer_id, rep::VERIFICATION_FAIL)),
|
||||
}
|
||||
} else if blocks.is_empty() {
|
||||
debug!(target: LOG_TARGET, "Empty block response from {who}");
|
||||
return Err(BadPeer(*who, rep::NO_BLOCK))
|
||||
debug!(target: LOG_TARGET, "Empty block response from {peer_id}");
|
||||
return Err(BadPeer(*peer_id, rep::NO_BLOCK))
|
||||
} else {
|
||||
debug!(
|
||||
target: LOG_TARGET,
|
||||
"Too many blocks ({}) in warp target block response from {}",
|
||||
blocks.len(),
|
||||
who,
|
||||
peer_id,
|
||||
);
|
||||
return Err(BadPeer(*who, rep::NOT_REQUESTED))
|
||||
return Err(BadPeer(*peer_id, rep::NOT_REQUESTED))
|
||||
}
|
||||
} else {
|
||||
debug!(
|
||||
target: LOG_TARGET,
|
||||
"Logic error: we think we are downloading warp target block from {}, but no warp sync is happening.",
|
||||
who,
|
||||
peer_id,
|
||||
);
|
||||
return Ok(OnBlockData::Continue)
|
||||
}
|
||||
@@ -965,7 +973,7 @@ where
|
||||
}
|
||||
} else {
|
||||
// When request.is_none() this is a block announcement. Just accept blocks.
|
||||
validate_blocks::<B>(&blocks, who, None)?;
|
||||
validate_blocks::<B>(&blocks, peer_id, None)?;
|
||||
blocks
|
||||
.into_iter()
|
||||
.map(|b| {
|
||||
@@ -978,7 +986,7 @@ where
|
||||
body: b.body,
|
||||
indexed_body: None,
|
||||
justifications,
|
||||
origin: Some(*who),
|
||||
origin: Some(*peer_id),
|
||||
allow_missing_state: true,
|
||||
import_existing: false,
|
||||
skip_execution: true,
|
||||
@@ -989,7 +997,7 @@ where
|
||||
}
|
||||
} else {
|
||||
// We don't know of this peer, so we also did not request anything from it.
|
||||
return Err(BadPeer(*who, rep::NOT_REQUESTED))
|
||||
return Err(BadPeer(*peer_id, rep::NOT_REQUESTED))
|
||||
};
|
||||
|
||||
Ok(OnBlockData::Import(self.validate_and_queue_blocks(new_blocks, gap)))
|
||||
@@ -997,12 +1005,12 @@ where
|
||||
|
||||
/// Submit a justification response for processing.
|
||||
#[must_use]
|
||||
pub fn on_block_justification(
|
||||
fn on_block_justification(
|
||||
&mut self,
|
||||
who: PeerId,
|
||||
peer_id: PeerId,
|
||||
response: BlockResponse<B>,
|
||||
) -> Result<OnBlockJustification<B>, BadPeer> {
|
||||
let peer = if let Some(peer) = self.peers.get_mut(&who) {
|
||||
let peer = if let Some(peer) = self.peers.get_mut(&peer_id) {
|
||||
peer
|
||||
} else {
|
||||
error!(
|
||||
@@ -1012,7 +1020,7 @@ where
|
||||
return Ok(OnBlockJustification::Nothing)
|
||||
};
|
||||
|
||||
self.allowed_requests.add(&who);
|
||||
self.allowed_requests.add(&peer_id);
|
||||
if let PeerSyncState::DownloadingJustification(hash) = peer.state {
|
||||
peer.state = PeerSyncState::Available;
|
||||
|
||||
@@ -1022,11 +1030,11 @@ where
|
||||
warn!(
|
||||
target: LOG_TARGET,
|
||||
"💔 Invalid block justification provided by {}: requested: {:?} got: {:?}",
|
||||
who,
|
||||
peer_id,
|
||||
hash,
|
||||
block.hash,
|
||||
);
|
||||
return Err(BadPeer(who, rep::BAD_JUSTIFICATION))
|
||||
return Err(BadPeer(peer_id, rep::BAD_JUSTIFICATION))
|
||||
}
|
||||
|
||||
block
|
||||
@@ -1037,14 +1045,14 @@ where
|
||||
// had but didn't (regardless of whether it had a justification for it or not).
|
||||
trace!(
|
||||
target: LOG_TARGET,
|
||||
"Peer {who:?} provided empty response for justification request {hash:?}",
|
||||
"Peer {peer_id:?} provided empty response for justification request {hash:?}",
|
||||
);
|
||||
|
||||
None
|
||||
};
|
||||
|
||||
if let Some((peer_id, hash, number, justifications)) =
|
||||
self.extra_justifications.on_response(who, justification)
|
||||
self.extra_justifications.on_response(peer_id, justification)
|
||||
{
|
||||
return Ok(OnBlockJustification::Import { peer_id, hash, number, justifications })
|
||||
}
|
||||
@@ -1105,7 +1113,7 @@ where
|
||||
pub fn on_validated_block_announce(
|
||||
&mut self,
|
||||
is_best: bool,
|
||||
who: PeerId,
|
||||
peer_id: PeerId,
|
||||
announce: &BlockAnnounce<B::Header>,
|
||||
) {
|
||||
let number = *announce.header.number();
|
||||
@@ -1116,7 +1124,7 @@ where
|
||||
let ancient_parent = parent_status == BlockStatus::InChainPruned;
|
||||
|
||||
let known = self.is_known(&hash);
|
||||
let peer = if let Some(peer) = self.peers.get_mut(&who) {
|
||||
let peer = if let Some(peer) = self.peers.get_mut(&peer_id) {
|
||||
peer
|
||||
} else {
|
||||
error!(target: LOG_TARGET, "💔 Called `on_validated_block_announce` with a bad peer ID");
|
||||
@@ -1124,7 +1132,7 @@ where
|
||||
};
|
||||
|
||||
if let PeerSyncState::AncestorSearch { .. } = peer.state {
|
||||
trace!(target: LOG_TARGET, "Peer {} is in the ancestor search state.", who);
|
||||
trace!(target: LOG_TARGET, "Peer {} is in the ancestor search state.", peer_id);
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1138,20 +1146,20 @@ where
|
||||
// is either one further ahead or it's the one they just announced, if we know about it.
|
||||
if is_best {
|
||||
if known && self.best_queued_number >= number {
|
||||
self.update_peer_common_number(&who, number);
|
||||
self.update_peer_common_number(&peer_id, number);
|
||||
} else if announce.header.parent_hash() == &self.best_queued_hash ||
|
||||
known_parent && self.best_queued_number >= number
|
||||
{
|
||||
self.update_peer_common_number(&who, number.saturating_sub(One::one()));
|
||||
self.update_peer_common_number(&peer_id, number.saturating_sub(One::one()));
|
||||
}
|
||||
}
|
||||
self.allowed_requests.add(&who);
|
||||
self.allowed_requests.add(&peer_id);
|
||||
|
||||
// known block case
|
||||
if known || self.is_already_downloading(&hash) {
|
||||
trace!(target: "sync", "Known block announce from {}: {}", who, hash);
|
||||
trace!(target: "sync", "Known block announce from {}: {}", peer_id, hash);
|
||||
if let Some(target) = self.fork_targets.get_mut(&hash) {
|
||||
target.peers.insert(who);
|
||||
target.peers.insert(peer_id);
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -1160,7 +1168,7 @@ where
|
||||
trace!(
|
||||
target: "sync",
|
||||
"Ignored ancient block announced from {}: {} {:?}",
|
||||
who,
|
||||
peer_id,
|
||||
hash,
|
||||
announce.header,
|
||||
);
|
||||
@@ -1171,7 +1179,7 @@ where
|
||||
trace!(
|
||||
target: "sync",
|
||||
"Added sync target for block announced from {}: {} {:?}",
|
||||
who,
|
||||
peer_id,
|
||||
hash,
|
||||
announce.summary(),
|
||||
);
|
||||
@@ -1183,22 +1191,22 @@ where
|
||||
peers: Default::default(),
|
||||
})
|
||||
.peers
|
||||
.insert(who);
|
||||
.insert(peer_id);
|
||||
}
|
||||
}
|
||||
|
||||
/// Notify that a sync peer has disconnected.
|
||||
#[must_use]
|
||||
pub fn peer_disconnected(&mut self, who: &PeerId) -> Option<ImportBlocksAction<B>> {
|
||||
self.blocks.clear_peer_download(who);
|
||||
pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Option<ImportBlocksAction<B>> {
|
||||
self.blocks.clear_peer_download(peer_id);
|
||||
if let Some(gap_sync) = &mut self.gap_sync {
|
||||
gap_sync.blocks.clear_peer_download(who)
|
||||
gap_sync.blocks.clear_peer_download(peer_id)
|
||||
}
|
||||
self.peers.remove(who);
|
||||
self.extra_justifications.peer_disconnected(who);
|
||||
self.peers.remove(peer_id);
|
||||
self.extra_justifications.peer_disconnected(peer_id);
|
||||
self.allowed_requests.set_all();
|
||||
self.fork_targets.retain(|_, target| {
|
||||
target.peers.remove(who);
|
||||
target.peers.remove(peer_id);
|
||||
!target.peers.is_empty()
|
||||
});
|
||||
|
||||
@@ -1565,12 +1573,7 @@ where
|
||||
number,
|
||||
justifications,
|
||||
}),
|
||||
Err(BadPeer(id, repu)) => {
|
||||
self.network_service
|
||||
.disconnect_peer(id, self.block_announce_protocol_name.clone());
|
||||
self.network_service.report_peer(id, repu);
|
||||
OnBlockResponse::Nothing
|
||||
},
|
||||
Err(bad_peer) => OnBlockResponse::DisconnectPeer(bad_peer),
|
||||
}
|
||||
} else {
|
||||
match self.on_block_data(&peer_id, Some(request), block_response) {
|
||||
@@ -1578,12 +1581,7 @@ where
|
||||
Ok(OnBlockData::Request(peer_id, request)) =>
|
||||
OnBlockResponse::SendBlockRequest { peer_id, request },
|
||||
Ok(OnBlockData::Continue) => OnBlockResponse::Nothing,
|
||||
Err(BadPeer(id, repu)) => {
|
||||
self.network_service
|
||||
.disconnect_peer(id, self.block_announce_protocol_name.clone());
|
||||
self.network_service.report_peer(id, repu);
|
||||
OnBlockResponse::Nothing
|
||||
},
|
||||
Err(bad_peer) => OnBlockResponse::DisconnectPeer(bad_peer),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1594,26 +1592,12 @@ where
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
response: OpaqueStateResponse,
|
||||
) -> Option<ImportBlocksAction<B>> {
|
||||
) -> OnStateResponse<B> {
|
||||
match self.on_state_data(&peer_id, response) {
|
||||
Ok(OnStateData::Import(origin, block)) =>
|
||||
Some(ImportBlocksAction { origin, blocks: vec![block] }),
|
||||
Ok(OnStateData::Continue) => None,
|
||||
Err(BadPeer(id, repu)) => {
|
||||
self.network_service
|
||||
.disconnect_peer(id, self.block_announce_protocol_name.clone());
|
||||
self.network_service.report_peer(id, repu);
|
||||
None
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Submit a warp proof response received.
|
||||
pub fn on_warp_sync_response(&mut self, peer_id: PeerId, response: EncodedProof) {
|
||||
if let Err(BadPeer(id, repu)) = self.on_warp_sync_data(&peer_id, response) {
|
||||
self.network_service
|
||||
.disconnect_peer(id, self.block_announce_protocol_name.clone());
|
||||
self.network_service.report_peer(id, repu);
|
||||
OnStateResponse::ImportBlocks(ImportBlocksAction { origin, blocks: vec![block] }),
|
||||
Ok(OnStateData::Continue) => OnStateResponse::Nothing,
|
||||
Err(bad_peer) => OnStateResponse::DisconnectPeer(bad_peer),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1851,7 +1835,7 @@ where
|
||||
|
||||
fn on_state_data(
|
||||
&mut self,
|
||||
who: &PeerId,
|
||||
peer_id: &PeerId,
|
||||
response: OpaqueStateResponse,
|
||||
) -> Result<OnStateData<B>, BadPeer> {
|
||||
let response: Box<StateResponse> = response.0.downcast().map_err(|_error| {
|
||||
@@ -1860,10 +1844,10 @@ where
|
||||
"Failed to downcast opaque state response, this is an implementation bug."
|
||||
);
|
||||
|
||||
BadPeer(*who, rep::BAD_RESPONSE)
|
||||
BadPeer(*peer_id, rep::BAD_RESPONSE)
|
||||
})?;
|
||||
|
||||
if let Some(peer) = self.peers.get_mut(who) {
|
||||
if let Some(peer) = self.peers.get_mut(peer_id) {
|
||||
if let PeerSyncState::DownloadingState = peer.state {
|
||||
peer.state = PeerSyncState::Available;
|
||||
self.allowed_requests.set_all();
|
||||
@@ -1873,7 +1857,7 @@ where
|
||||
debug!(
|
||||
target: LOG_TARGET,
|
||||
"Importing state data from {} with {} keys, {} proof nodes.",
|
||||
who,
|
||||
peer_id,
|
||||
response.entries.len(),
|
||||
response.proof.len(),
|
||||
);
|
||||
@@ -1882,14 +1866,14 @@ where
|
||||
debug!(
|
||||
target: LOG_TARGET,
|
||||
"Importing state data from {} with {} keys, {} proof nodes.",
|
||||
who,
|
||||
peer_id,
|
||||
response.entries.len(),
|
||||
response.proof.len(),
|
||||
);
|
||||
sync.import_state(*response)
|
||||
} else {
|
||||
debug!(target: LOG_TARGET, "Ignored obsolete state response from {who}");
|
||||
return Err(BadPeer(*who, rep::NOT_REQUESTED))
|
||||
debug!(target: LOG_TARGET, "Ignored obsolete state response from {peer_id}");
|
||||
return Err(BadPeer(*peer_id, rep::NOT_REQUESTED))
|
||||
};
|
||||
|
||||
match import_result {
|
||||
@@ -1912,14 +1896,20 @@ where
|
||||
},
|
||||
ImportResult::Continue => Ok(OnStateData::Continue),
|
||||
ImportResult::BadResponse => {
|
||||
debug!(target: LOG_TARGET, "Bad state data received from {who}");
|
||||
Err(BadPeer(*who, rep::BAD_BLOCK))
|
||||
debug!(target: LOG_TARGET, "Bad state data received from {peer_id}");
|
||||
Err(BadPeer(*peer_id, rep::BAD_BLOCK))
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn on_warp_sync_data(&mut self, who: &PeerId, response: EncodedProof) -> Result<(), BadPeer> {
|
||||
if let Some(peer) = self.peers.get_mut(who) {
|
||||
/// Submit a warp proof response received.
|
||||
#[must_use]
|
||||
pub fn on_warp_sync_response(
|
||||
&mut self,
|
||||
peer_id: &PeerId,
|
||||
response: EncodedProof,
|
||||
) -> Result<(), BadPeer> {
|
||||
if let Some(peer) = self.peers.get_mut(peer_id) {
|
||||
if let PeerSyncState::DownloadingWarpProof = peer.state {
|
||||
peer.state = PeerSyncState::Available;
|
||||
self.allowed_requests.set_all();
|
||||
@@ -1929,20 +1919,20 @@ where
|
||||
debug!(
|
||||
target: LOG_TARGET,
|
||||
"Importing warp proof data from {}, {} bytes.",
|
||||
who,
|
||||
peer_id,
|
||||
response.0.len(),
|
||||
);
|
||||
sync.import_warp_proof(response)
|
||||
} else {
|
||||
debug!(target: LOG_TARGET, "Ignored obsolete warp sync response from {who}");
|
||||
return Err(BadPeer(*who, rep::NOT_REQUESTED))
|
||||
debug!(target: LOG_TARGET, "Ignored obsolete warp sync response from {peer_id}");
|
||||
return Err(BadPeer(*peer_id, rep::NOT_REQUESTED))
|
||||
};
|
||||
|
||||
match import_result {
|
||||
WarpProofImportResult::Success => Ok(()),
|
||||
WarpProofImportResult::BadResponse => {
|
||||
debug!(target: LOG_TARGET, "Bad proof data received from {who}");
|
||||
Err(BadPeer(*who, rep::BAD_BLOCK))
|
||||
debug!(target: LOG_TARGET, "Bad proof data received from {peer_id}");
|
||||
Err(BadPeer(*peer_id, rep::BAD_BLOCK))
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -1979,11 +1969,11 @@ where
|
||||
has_error |= result.is_err();
|
||||
|
||||
match result {
|
||||
Ok(BlockImportStatus::ImportedKnown(number, who)) =>
|
||||
if let Some(peer) = who {
|
||||
Ok(BlockImportStatus::ImportedKnown(number, peer_id)) =>
|
||||
if let Some(peer) = peer_id {
|
||||
self.update_peer_common_number(&peer, number);
|
||||
},
|
||||
Ok(BlockImportStatus::ImportedUnknown(number, aux, who)) => {
|
||||
Ok(BlockImportStatus::ImportedUnknown(number, aux, peer_id)) => {
|
||||
if aux.clear_justification_requests {
|
||||
trace!(
|
||||
target: LOG_TARGET,
|
||||
@@ -2001,13 +1991,13 @@ where
|
||||
}
|
||||
|
||||
if aux.bad_justification {
|
||||
if let Some(ref peer) = who {
|
||||
if let Some(ref peer) = peer_id {
|
||||
warn!("💔 Sent block with bad justification to import");
|
||||
output.push(Err(BadPeer(*peer, rep::BAD_JUSTIFICATION)));
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(peer) = who {
|
||||
if let Some(peer) = peer_id {
|
||||
self.update_peer_common_number(&peer, number);
|
||||
}
|
||||
let state_sync_complete =
|
||||
@@ -2046,8 +2036,8 @@ where
|
||||
self.gap_sync = None;
|
||||
}
|
||||
},
|
||||
Err(BlockImportError::IncompleteHeader(who)) =>
|
||||
if let Some(peer) = who {
|
||||
Err(BlockImportError::IncompleteHeader(peer_id)) =>
|
||||
if let Some(peer) = peer_id {
|
||||
warn!(
|
||||
target: LOG_TARGET,
|
||||
"💔 Peer sent block with incomplete header to import",
|
||||
@@ -2055,23 +2045,23 @@ where
|
||||
output.push(Err(BadPeer(peer, rep::INCOMPLETE_HEADER)));
|
||||
output.extend(self.restart());
|
||||
},
|
||||
Err(BlockImportError::VerificationFailed(who, e)) => {
|
||||
let extra_message =
|
||||
who.map_or_else(|| "".into(), |peer| format!(" received from ({peer})"));
|
||||
Err(BlockImportError::VerificationFailed(peer_id, e)) => {
|
||||
let extra_message = peer_id
|
||||
.map_or_else(|| "".into(), |peer| format!(" received from ({peer})"));
|
||||
|
||||
warn!(
|
||||
target: LOG_TARGET,
|
||||
"💔 Verification failed for block {hash:?}{extra_message}: {e:?}",
|
||||
);
|
||||
|
||||
if let Some(peer) = who {
|
||||
if let Some(peer) = peer_id {
|
||||
output.push(Err(BadPeer(peer, rep::VERIFICATION_FAIL)));
|
||||
}
|
||||
|
||||
output.extend(self.restart());
|
||||
},
|
||||
Err(BlockImportError::BadBlock(who)) =>
|
||||
if let Some(peer) = who {
|
||||
Err(BlockImportError::BadBlock(peer_id)) =>
|
||||
if let Some(peer) = peer_id {
|
||||
warn!(
|
||||
target: LOG_TARGET,
|
||||
"💔 Block {hash:?} received from peer {peer} has been blacklisted",
|
||||
@@ -2365,7 +2355,7 @@ where
|
||||
/// It is expected that `blocks` are in ascending order.
|
||||
fn validate_blocks<Block: BlockT>(
|
||||
blocks: &Vec<BlockData<Block>>,
|
||||
who: &PeerId,
|
||||
peer_id: &PeerId,
|
||||
request: Option<BlockRequest<Block>>,
|
||||
) -> Result<Option<NumberFor<Block>>, BadPeer> {
|
||||
if let Some(request) = request {
|
||||
@@ -2373,12 +2363,12 @@ fn validate_blocks<Block: BlockT>(
|
||||
debug!(
|
||||
target: LOG_TARGET,
|
||||
"Received more blocks than requested from {}. Expected in maximum {:?}, got {}.",
|
||||
who,
|
||||
peer_id,
|
||||
request.max,
|
||||
blocks.len(),
|
||||
);
|
||||
|
||||
return Err(BadPeer(*who, rep::NOT_REQUESTED))
|
||||
return Err(BadPeer(*peer_id, rep::NOT_REQUESTED))
|
||||
}
|
||||
|
||||
let block_header =
|
||||
@@ -2398,7 +2388,7 @@ fn validate_blocks<Block: BlockT>(
|
||||
block_header,
|
||||
);
|
||||
|
||||
return Err(BadPeer(*who, rep::NOT_REQUESTED))
|
||||
return Err(BadPeer(*peer_id, rep::NOT_REQUESTED))
|
||||
}
|
||||
|
||||
if request.fields.contains(BlockAttributes::HEADER) &&
|
||||
@@ -2406,20 +2396,20 @@ fn validate_blocks<Block: BlockT>(
|
||||
{
|
||||
trace!(
|
||||
target: LOG_TARGET,
|
||||
"Missing requested header for a block in response from {who}.",
|
||||
"Missing requested header for a block in response from {peer_id}.",
|
||||
);
|
||||
|
||||
return Err(BadPeer(*who, rep::BAD_RESPONSE))
|
||||
return Err(BadPeer(*peer_id, rep::BAD_RESPONSE))
|
||||
}
|
||||
|
||||
if request.fields.contains(BlockAttributes::BODY) && blocks.iter().any(|b| b.body.is_none())
|
||||
{
|
||||
trace!(
|
||||
target: LOG_TARGET,
|
||||
"Missing requested body for a block in response from {who}.",
|
||||
"Missing requested body for a block in response from {peer_id}.",
|
||||
);
|
||||
|
||||
return Err(BadPeer(*who, rep::BAD_RESPONSE))
|
||||
return Err(BadPeer(*peer_id, rep::BAD_RESPONSE))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2430,11 +2420,11 @@ fn validate_blocks<Block: BlockT>(
|
||||
debug!(
|
||||
target:LOG_TARGET,
|
||||
"Bad header received from {}. Expected hash {:?}, got {:?}",
|
||||
who,
|
||||
peer_id,
|
||||
b.hash,
|
||||
hash,
|
||||
);
|
||||
return Err(BadPeer(*who, rep::BAD_BLOCK))
|
||||
return Err(BadPeer(*peer_id, rep::BAD_BLOCK))
|
||||
}
|
||||
}
|
||||
if let (Some(header), Some(body)) = (&b.header, &b.body) {
|
||||
@@ -2448,11 +2438,11 @@ fn validate_blocks<Block: BlockT>(
|
||||
target:LOG_TARGET,
|
||||
"Bad extrinsic root for a block {} received from {}. Expected {:?}, got {:?}",
|
||||
b.hash,
|
||||
who,
|
||||
peer_id,
|
||||
expected,
|
||||
got,
|
||||
);
|
||||
return Err(BadPeer(*who, rep::BAD_BLOCK))
|
||||
return Err(BadPeer(*peer_id, rep::BAD_BLOCK))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user