Move block/state/warpc sync requests/responses to ChainSync (#12739)

* Move block/state/warpc sync requests/responses to `ChainSync`

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <git@kchr.de>

* Apply review suggestions

* cargo-fmt + doc fix

* Fix tests

Co-authored-by: Bastian Köcher <git@kchr.de>
This commit is contained in:
Aaro Altonen
2022-11-22 10:19:17 +02:00
committed by GitHub
parent 4cb24da8f2
commit 1b5d52deb2
16 changed files with 1094 additions and 1126 deletions
+31 -365
View File
@@ -20,10 +20,9 @@ use crate::config;
use bytes::Bytes;
use codec::{Decode, DecodeAll, Encode};
use futures::{channel::oneshot, prelude::*};
use futures::prelude::*;
use libp2p::{
core::{connection::ConnectionId, transport::ListenerId, ConnectedPoint},
request_response::OutboundFailure,
swarm::{
ConnectionHandler, IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction,
PollParameters,
@@ -43,15 +42,9 @@ use sc_network_common::{
config::NonReservedPeerMode,
error,
protocol::{role::Roles, ProtocolName},
request_responses::RequestFailure,
sync::{
message::{
BlockAnnounce, BlockAnnouncesHandshake, BlockAttributes, BlockData, BlockRequest,
BlockResponse, BlockState,
},
warp::{EncodedProof, WarpProofRequest},
BadPeer, ChainSync, OnBlockData, OnBlockJustification, OnStateData, OpaqueBlockRequest,
OpaqueBlockResponse, OpaqueStateRequest, OpaqueStateResponse, PollBlockAnnounceValidation,
message::{BlockAnnounce, BlockAnnouncesHandshake, BlockData, BlockResponse, BlockState},
BadPeer, ChainSync, ImportResult, OnBlockData, PollBlockAnnounceValidation, PollResult,
SyncStatus,
},
utils::{interval, LruHashSet},
@@ -102,18 +95,12 @@ const LIGHT_MAXIMAL_BLOCKS_DIFFERENCE: u64 = 8192;
mod rep {
use sc_peerset::ReputationChange as Rep;
/// Reputation change when a peer doesn't respond in time to our messages.
pub const TIMEOUT: Rep = Rep::new(-(1 << 10), "Request timeout");
/// Reputation change when a peer refuses a request.
pub const REFUSED: Rep = Rep::new(-(1 << 10), "Request refused");
/// Reputation change when we are a light client and a peer is behind us.
pub const PEER_BEHIND_US_LIGHT: Rep = Rep::new(-(1 << 8), "Useless for a light peer");
/// We received a message that failed to decode.
pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
/// Peer has different genesis.
pub const GENESIS_MISMATCH: Rep = Rep::new_fatal("Genesis mismatch");
/// Peer is on unsupported protocol version.
pub const BAD_PROTOCOL: Rep = Rep::new_fatal("Unsupported protocol");
/// Peer role does not match (e.g. light peer connecting to another light peer).
pub const BAD_ROLE: Rep = Rep::new_fatal("Unsupported role");
/// Peer send us a block announcement that failed at validation.
@@ -204,19 +191,10 @@ pub struct Protocol<B: BlockT, Client> {
block_announce_data_cache: LruCache<B::Hash, Vec<u8>>,
}
#[derive(Debug)]
enum PeerRequest<B: BlockT> {
Block(BlockRequest<B>),
State,
WarpProof,
}
/// Peer information
#[derive(Debug)]
struct Peer<B: BlockT> {
info: PeerInfo<B>,
/// Current request, if any. Started by emitting [`CustomMessageOutcome::BlockRequest`].
request: Option<(PeerRequest<B>, oneshot::Receiver<Result<Vec<u8>, RequestFailure>>)>,
/// Holds a set of blocks known to this peer.
known_blocks: LruHashSet<B::Hash>,
}
@@ -432,7 +410,7 @@ where
/// Returns the number of peers we're connected to and that are being queried.
pub fn num_active_peers(&self) -> usize {
self.peers.values().filter(|p| p.request.is_some()).count()
self.chain_sync.num_active_peers()
}
/// Current global sync state.
@@ -521,106 +499,6 @@ where
self.peerset_handle.report_peer(who, reputation)
}
/// Must be called in response to a [`CustomMessageOutcome::BlockRequest`] being emitted.
/// Must contain the same `PeerId` and request that have been emitted.
pub fn on_block_response(
&mut self,
peer_id: PeerId,
request: BlockRequest<B>,
response: OpaqueBlockResponse,
) -> CustomMessageOutcome<B> {
let blocks = match self.chain_sync.block_response_into_blocks(&request, response) {
Ok(blocks) => blocks,
Err(err) => {
debug!(target: "sync", "Failed to decode block response from {}: {}", peer_id, err);
self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE);
return CustomMessageOutcome::None
},
};
let block_response = BlockResponse::<B> { id: request.id, blocks };
let blocks_range = || match (
block_response
.blocks
.first()
.and_then(|b| b.header.as_ref().map(|h| h.number())),
block_response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
) {
(Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
(Some(first), Some(_)) => format!(" ({})", first),
_ => Default::default(),
};
trace!(target: "sync", "BlockResponse {} from {} with {} blocks {}",
block_response.id,
peer_id,
block_response.blocks.len(),
blocks_range(),
);
if request.fields == BlockAttributes::JUSTIFICATION {
match self.chain_sync.on_block_justification(peer_id, block_response) {
Ok(OnBlockJustification::Nothing) => CustomMessageOutcome::None,
Ok(OnBlockJustification::Import { peer, hash, number, justifications }) =>
CustomMessageOutcome::JustificationImport(peer, hash, number, justifications),
Err(BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC);
self.peerset_handle.report_peer(id, repu);
CustomMessageOutcome::None
},
}
} else {
match self.chain_sync.on_block_data(&peer_id, Some(request), block_response) {
Ok(OnBlockData::Import(origin, blocks)) =>
CustomMessageOutcome::BlockImport(origin, blocks),
Ok(OnBlockData::Request(peer, req)) =>
prepare_block_request(self.chain_sync.as_ref(), &mut self.peers, peer, req),
Ok(OnBlockData::Continue) => CustomMessageOutcome::None,
Err(BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC);
self.peerset_handle.report_peer(id, repu);
CustomMessageOutcome::None
},
}
}
}
/// Must be called in response to a [`CustomMessageOutcome::StateRequest`] being emitted.
/// Must contain the same `PeerId` and request that have been emitted.
pub fn on_state_response(
&mut self,
peer_id: PeerId,
response: OpaqueStateResponse,
) -> CustomMessageOutcome<B> {
match self.chain_sync.on_state_data(&peer_id, response) {
Ok(OnStateData::Import(origin, block)) =>
CustomMessageOutcome::BlockImport(origin, vec![block]),
Ok(OnStateData::Continue) => CustomMessageOutcome::None,
Err(BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC);
self.peerset_handle.report_peer(id, repu);
CustomMessageOutcome::None
},
}
}
/// Must be called in response to a [`CustomMessageOutcome::WarpSyncRequest`] being emitted.
/// Must contain the same `PeerId` and request that have been emitted.
pub fn on_warp_sync_response(
&mut self,
peer_id: PeerId,
response: EncodedProof,
) -> CustomMessageOutcome<B> {
match self.chain_sync.on_warp_sync_data(&peer_id, response) {
Ok(()) => CustomMessageOutcome::None,
Err(BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC);
self.peerset_handle.report_peer(id, repu);
CustomMessageOutcome::None
},
}
}
/// Perform time based maintenance.
///
/// > **Note**: This method normally doesn't have to be called except for testing purposes.
@@ -721,7 +599,6 @@ where
best_hash: status.best_hash,
best_number: status.best_number,
},
request: None,
known_blocks: LruHashSet::new(
NonZeroUsize::new(MAX_KNOWN_BLOCKS).expect("Constant is nonzero"),
),
@@ -750,12 +627,7 @@ where
.push_back(CustomMessageOutcome::PeerNewBest(who, status.best_number));
if let Some(req) = req {
self.pending_messages.push_back(prepare_block_request(
self.chain_sync.as_ref(),
&mut self.peers,
who,
req,
));
self.chain_sync.send_block_request(who, req);
}
Ok(())
@@ -921,8 +793,10 @@ where
match blocks_to_import {
Ok(OnBlockData::Import(origin, blocks)) =>
CustomMessageOutcome::BlockImport(origin, blocks),
Ok(OnBlockData::Request(peer, req)) =>
prepare_block_request(self.chain_sync.as_ref(), &mut self.peers, peer, req),
Ok(OnBlockData::Request(peer, req)) => {
self.chain_sync.send_block_request(peer, req);
CustomMessageOutcome::None
},
Ok(OnBlockData::Continue) => CustomMessageOutcome::None,
Err(BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC);
@@ -963,14 +837,7 @@ where
let results = self.chain_sync.on_blocks_processed(imported, count, results);
for result in results {
match result {
Ok((id, req)) => {
self.pending_messages.push_back(prepare_block_request(
self.chain_sync.as_ref(),
&mut self.peers,
id,
req,
));
},
Ok((id, req)) => self.chain_sync.send_block_request(id, req),
Err(BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC);
self.peerset_handle.report_peer(id, repu)
@@ -1096,16 +963,6 @@ where
}
}
/// Encode implementation-specific block request.
pub fn encode_block_request(&self, request: &OpaqueBlockRequest) -> Result<Vec<u8>, String> {
self.chain_sync.encode_block_request(request)
}
/// Encode implementation-specific state request.
pub fn encode_state_request(&self, request: &OpaqueStateRequest) -> Result<Vec<u8>, String> {
self.chain_sync.encode_state_request(request)
}
fn report_metrics(&self) {
if let Some(metrics) = &self.metrics {
let n = u64::try_from(self.peers.len()).unwrap_or(std::u64::MAX);
@@ -1136,49 +993,6 @@ where
}
}
fn prepare_block_request<B: BlockT>(
chain_sync: &dyn ChainSync<B>,
peers: &mut HashMap<PeerId, Peer<B>>,
who: PeerId,
request: BlockRequest<B>,
) -> CustomMessageOutcome<B> {
let (tx, rx) = oneshot::channel();
if let Some(ref mut peer) = peers.get_mut(&who) {
peer.request = Some((PeerRequest::Block(request.clone()), rx));
}
let request = chain_sync.create_opaque_block_request(&request);
CustomMessageOutcome::BlockRequest { target: who, request, pending_response: tx }
}
fn prepare_state_request<B: BlockT>(
peers: &mut HashMap<PeerId, Peer<B>>,
who: PeerId,
request: OpaqueStateRequest,
) -> CustomMessageOutcome<B> {
let (tx, rx) = oneshot::channel();
if let Some(ref mut peer) = peers.get_mut(&who) {
peer.request = Some((PeerRequest::State, rx));
}
CustomMessageOutcome::StateRequest { target: who, request, pending_response: tx }
}
fn prepare_warp_sync_request<B: BlockT>(
peers: &mut HashMap<PeerId, Peer<B>>,
who: PeerId,
request: WarpProofRequest<B>,
) -> CustomMessageOutcome<B> {
let (tx, rx) = oneshot::channel();
if let Some(ref mut peer) = peers.get_mut(&who) {
peer.request = Some((PeerRequest::WarpProof, rx));
}
CustomMessageOutcome::WarpSyncRequest { target: who, request, pending_response: tx }
}
/// Outcome of an incoming custom message.
#[derive(Debug)]
#[must_use]
@@ -1210,24 +1024,6 @@ pub enum CustomMessageOutcome<B: BlockT> {
remote: PeerId,
messages: Vec<(ProtocolName, Bytes)>,
},
/// A new block request must be emitted.
BlockRequest {
target: PeerId,
request: OpaqueBlockRequest,
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
},
/// A new storage request must be emitted.
StateRequest {
target: PeerId,
request: OpaqueStateRequest,
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
},
/// A new warp sync request must be emitted.
WarpSyncRequest {
target: PeerId,
request: WarpProofRequest<B>,
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
},
/// Peer has a reported a new head of chain.
PeerNewBest(PeerId, NumberFor<B>),
/// Now connected to a new peer for syncing purposes.
@@ -1305,165 +1101,35 @@ where
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message))
}
// Check for finished outgoing requests.
let mut finished_block_requests = Vec::new();
let mut finished_state_requests = Vec::new();
let mut finished_warp_sync_requests = Vec::new();
for (id, peer) in self.peers.iter_mut() {
if let Peer { request: Some((_, pending_response)), .. } = peer {
match pending_response.poll_unpin(cx) {
Poll::Ready(Ok(Ok(resp))) => {
let (req, _) = peer.request.take().unwrap();
match req {
PeerRequest::Block(req) => {
let response =
match self.chain_sync.decode_block_response(&resp[..]) {
Ok(proto) => proto,
Err(e) => {
debug!(
target: "sync",
"Failed to decode block response from peer {:?}: {:?}.",
id,
e
);
self.peerset_handle.report_peer(*id, rep::BAD_MESSAGE);
self.behaviour
.disconnect_peer(id, HARDCODED_PEERSETS_SYNC);
continue
},
};
finished_block_requests.push((*id, req, response));
},
PeerRequest::State => {
let response =
match self.chain_sync.decode_state_response(&resp[..]) {
Ok(proto) => proto,
Err(e) => {
debug!(
target: "sync",
"Failed to decode state response from peer {:?}: {:?}.",
id,
e
);
self.peerset_handle.report_peer(*id, rep::BAD_MESSAGE);
self.behaviour
.disconnect_peer(id, HARDCODED_PEERSETS_SYNC);
continue
},
};
finished_state_requests.push((*id, response));
},
PeerRequest::WarpProof => {
finished_warp_sync_requests.push((*id, resp));
},
}
},
Poll::Ready(Ok(Err(e))) => {
peer.request.take();
debug!(target: "sync", "Request to peer {:?} failed: {:?}.", id, e);
match e {
RequestFailure::Network(OutboundFailure::Timeout) => {
self.peerset_handle.report_peer(*id, rep::TIMEOUT);
self.behaviour.disconnect_peer(id, HARDCODED_PEERSETS_SYNC);
},
RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => {
self.peerset_handle.report_peer(*id, rep::BAD_PROTOCOL);
self.behaviour.disconnect_peer(id, HARDCODED_PEERSETS_SYNC);
},
RequestFailure::Network(OutboundFailure::DialFailure) => {
self.behaviour.disconnect_peer(id, HARDCODED_PEERSETS_SYNC);
},
RequestFailure::Refused => {
self.peerset_handle.report_peer(*id, rep::REFUSED);
self.behaviour.disconnect_peer(id, HARDCODED_PEERSETS_SYNC);
},
RequestFailure::Network(OutboundFailure::ConnectionClosed) |
RequestFailure::NotConnected => {
self.behaviour.disconnect_peer(id, HARDCODED_PEERSETS_SYNC);
},
RequestFailure::UnknownProtocol => {
debug_assert!(
false,
"Block request protocol should always be known."
);
},
RequestFailure::Obsolete => {
debug_assert!(
false,
"Can not receive `RequestFailure::Obsolete` after dropping the \
response receiver.",
);
},
}
},
Poll::Ready(Err(oneshot::Canceled)) => {
peer.request.take();
trace!(
target: "sync",
"Request to peer {:?} failed due to oneshot being canceled.",
id,
);
self.behaviour.disconnect_peer(id, HARDCODED_PEERSETS_SYNC);
},
Poll::Pending => {},
}
}
}
for (id, req, response) in finished_block_requests {
let ev = self.on_block_response(id, req, response);
self.pending_messages.push_back(ev);
}
for (id, response) in finished_state_requests {
let ev = self.on_state_response(id, response);
self.pending_messages.push_back(ev);
}
for (id, response) in finished_warp_sync_requests {
let ev = self.on_warp_sync_response(id, EncodedProof(response));
self.pending_messages.push_back(ev);
}
while let Poll::Ready(Some(())) = self.tick_timeout.poll_next_unpin(cx) {
self.tick();
}
for (id, request) in self
.chain_sync
.block_requests()
.map(|(peer_id, request)| (peer_id, request))
.collect::<Vec<_>>()
{
let event =
prepare_block_request(self.chain_sync.as_ref(), &mut self.peers, id, request);
self.pending_messages.push_back(event);
}
if let Some((id, request)) = self.chain_sync.state_request() {
let event = prepare_state_request(&mut self.peers, id, request);
self.pending_messages.push_back(event);
}
for (id, request) in self.chain_sync.justification_requests().collect::<Vec<_>>() {
let event =
prepare_block_request(self.chain_sync.as_ref(), &mut self.peers, id, request);
self.pending_messages.push_back(event);
}
if let Some((id, request)) = self.chain_sync.warp_sync_request() {
let event = prepare_warp_sync_request(&mut self.peers, id, request);
self.pending_messages.push_back(event);
}
// Advance the state of `ChainSync`
//
// Process any received requests received from `NetworkService` and
// check if there is any block announcement validation finished.
while let Poll::Ready(result) = self.chain_sync.poll(cx) {
match self.process_block_announce_validation_result(result) {
CustomMessageOutcome::None => {},
outcome => self.pending_messages.push_back(outcome),
match result {
PollResult::Import(import) => self.pending_messages.push_back(match import {
ImportResult::BlockImport(origin, blocks) =>
CustomMessageOutcome::BlockImport(origin, blocks),
ImportResult::JustificationImport(origin, hash, number, justifications) =>
CustomMessageOutcome::JustificationImport(
origin,
hash,
number,
justifications,
),
}),
PollResult::Announce(announce) =>
match self.process_block_announce_validation_result(announce) {
CustomMessageOutcome::None => {},
outcome => self.pending_messages.push_back(outcome),
},
}
}
while let Poll::Ready(Some(())) = self.tick_timeout.poll_next_unpin(cx) {
self.tick();
}
if let Some(message) = self.pending_messages.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message))
}