Move requests-responses and polling from ChainSync to SyncingEngine (#1650)

Move request-response handling from `ChainSync` to `SyncingEngine` as
part of [Sync
2.0](https://github.com/paritytech/polkadot-sdk/issues/534) refactoring
aimed at making `ChainSync` a pure state machine.

Resolves https://github.com/paritytech/polkadot-sdk/issues/502.

---------

Co-authored-by: Aaro Altonen <48052676+altonen@users.noreply.github.com>
Co-authored-by: Bastian Köcher <git@kchr.de>
This commit is contained in:
Dmitry Markin
2023-09-27 19:44:37 +03:00
committed by GitHub
parent 02284a3e82
commit 14e5d23348
8 changed files with 608 additions and 542 deletions
+360 -22
View File
@@ -23,10 +23,12 @@ use crate::{
block_announce_validator::{
BlockAnnounceValidationResult, BlockAnnounceValidator as BlockAnnounceValidatorStream,
},
block_relay_protocol::BlockDownloader,
block_relay_protocol::{BlockDownloader, BlockResponseError},
pending_responses::{PendingResponses, ResponseEvent},
schema::v1::{StateRequest, StateResponse},
service::{self, chain_sync::ToServiceCommand},
warp::WarpSyncParams,
ChainSync, ClientError, SyncingService,
BlockRequestEvent, ChainSync, ClientError, SyncingService,
};
use codec::{Decode, Encode};
@@ -36,24 +38,32 @@ use futures::{
FutureExt, StreamExt,
};
use futures_timer::Delay;
use libp2p::PeerId;
use libp2p::{request_response::OutboundFailure, PeerId};
use log::{debug, trace};
use prometheus_endpoint::{
register, Gauge, GaugeVec, MetricSource, Opts, PrometheusError, Registry, SourcedGauge, U64,
};
use prost::Message;
use schnellru::{ByLength, LruMap};
use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider};
use sc_consensus::import_queue::ImportQueueService;
use sc_network::{
config::{FullNetworkConfiguration, NonDefaultSetConfig, ProtocolId},
config::{
FullNetworkConfiguration, NonDefaultSetConfig, NonReservedPeerMode, NotificationHandshake,
ProtocolId, SetConfig,
},
request_responses::{IfDisconnected, RequestFailure},
utils::LruHashSet,
NotificationsSink, ProtocolName, ReputationChange,
};
use sc_network_common::{
role::Roles,
sync::{
message::{BlockAnnounce, BlockAnnouncesHandshake, BlockState},
BadPeer, ChainSync as ChainSyncT, ExtendedPeerInfo, SyncEvent,
message::{BlockAnnounce, BlockAnnouncesHandshake, BlockRequest, BlockState},
warp::{EncodedProof, WarpProofRequest},
BadPeer, ChainSync as ChainSyncT, ExtendedPeerInfo, OpaqueStateRequest,
OpaqueStateResponse, PeerRequest, SyncEvent,
},
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
@@ -63,6 +73,7 @@ use sp_runtime::traits::{Block as BlockT, Header, NumberFor, Zero};
use std::{
collections::{HashMap, HashSet},
iter,
num::NonZeroUsize,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
@@ -98,6 +109,9 @@ const INACTIVITY_EVICT_THRESHOLD: Duration = Duration::from_secs(30);
/// before it starts evicting peers.
const INITIAL_EVICTION_WAIT_PERIOD: Duration = Duration::from_secs(2 * 60);
/// Maximum allowed size for a block announce.
const MAX_BLOCK_ANNOUNCE_SIZE: u64 = 1024 * 1024;
mod rep {
use sc_network::ReputationChange as Rep;
/// Peer has different genesis.
@@ -106,6 +120,14 @@ mod rep {
pub const BAD_BLOCK_ANNOUNCEMENT: Rep = Rep::new(-(1 << 12), "Bad block announcement");
/// Block announce substream with the peer has been inactive too long
pub const INACTIVE_SUBSTREAM: Rep = Rep::new(-(1 << 10), "Inactive block announce substream");
/// We received a message that failed to decode.
pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
/// Peer is on unsupported protocol version.
pub const BAD_PROTOCOL: Rep = Rep::new_fatal("Unsupported protocol");
/// Reputation change when a peer refuses a request.
pub const REFUSED: Rep = Rep::new(-(1 << 10), "Request refused");
/// Reputation change when a peer doesn't respond in time to our messages.
pub const TIMEOUT: Rep = Rep::new(-(1 << 10), "Request timeout");
}
struct Metrics {
@@ -277,6 +299,18 @@ pub struct SyncingEngine<B: BlockT, Client> {
/// Instant when the last notification was sent or received.
last_notification_io: Instant,
/// Pending responses
pending_responses: PendingResponses<B>,
/// Block downloader
block_downloader: Arc<dyn BlockDownloader<B>>,
/// Protocol name used to send out state requests
state_request_protocol_name: ProtocolName,
/// Protocol name used to send out warp sync requests
warp_sync_protocol_name: Option<ProtocolName>,
}
impl<B: BlockT, Client> SyncingEngine<B, Client>
@@ -381,24 +415,32 @@ where
let warp_sync_target_block_header_rx = warp_sync_target_block_header_rx
.map_or(futures::future::pending().boxed().fuse(), |rx| rx.boxed().fuse());
let (chain_sync, block_announce_config) = ChainSync::new(
mode,
client.clone(),
let block_announce_config = Self::get_block_announce_proto_config(
protocol_id,
fork_id,
roles,
client.info().best_number,
client.info().best_hash,
client
.block_hash(Zero::zero())
.ok()
.flatten()
.expect("Genesis block exists; qed"),
);
let block_announce_protocol_name = block_announce_config.notifications_protocol.clone();
let chain_sync = ChainSync::new(
mode,
client.clone(),
block_announce_protocol_name.clone(),
max_parallel_downloads,
max_blocks_per_request,
warp_sync_config,
metrics_registry,
network_service.clone(),
import_queue,
block_downloader,
state_request_protocol_name,
warp_sync_protocol_name,
)?;
let block_announce_protocol_name = block_announce_config.notifications_protocol.clone();
let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000);
let num_connected = Arc::new(AtomicUsize::new(0));
let is_major_syncing = Arc::new(AtomicBool::new(false));
@@ -455,6 +497,10 @@ where
} else {
None
},
pending_responses: PendingResponses::new(),
block_downloader,
state_request_protocol_name,
warp_sync_protocol_name,
},
SyncingService::new(tx, num_connected, is_major_syncing),
block_announce_config,
@@ -682,11 +728,23 @@ where
ToServiceCommand::BlocksProcessed(imported, count, results) => {
for result in self.chain_sync.on_blocks_processed(imported, count, results) {
match result {
Ok((id, req)) => self.chain_sync.send_block_request(id, req),
Err(BadPeer(id, repu)) => {
self.network_service
.disconnect_peer(id, self.block_announce_protocol_name.clone());
self.network_service.report_peer(id, repu)
Ok(event) => match event {
BlockRequestEvent::SendRequest { peer_id, request } => {
// drop obsolete pending response first
self.pending_responses.remove(&peer_id);
self.send_block_request(peer_id, request);
},
BlockRequestEvent::RemoveStale { peer_id } => {
self.pending_responses.remove(&peer_id);
},
},
Err(BadPeer(peer_id, repu)) => {
self.pending_responses.remove(&peer_id);
self.network_service.disconnect_peer(
peer_id,
self.block_announce_protocol_name.clone(),
);
self.network_service.report_peer(peer_id, repu)
},
}
}
@@ -715,7 +773,7 @@ where
let _ = tx.send(status);
},
ToServiceCommand::NumActivePeers(tx) => {
let _ = tx.send(self.chain_sync.num_active_peers());
let _ = tx.send(self.num_active_peers());
},
ToServiceCommand::SyncState(tx) => {
let _ = tx.send(self.chain_sync.status());
@@ -817,8 +875,13 @@ where
Poll::Pending => {},
}
// Drive `ChainSync`.
while let Poll::Ready(()) = self.chain_sync.poll(cx) {}
// Send outbound requests on `ChanSync`'s behalf.
self.send_chain_sync_requests();
// Poll & process pending responses.
while let Poll::Ready(Some(event)) = self.pending_responses.poll_next_unpin(cx) {
self.process_response_event(event);
}
// Poll block announce validations last, because if a block announcement was received
// through the event stream between `SyncingEngine` and `Protocol` and the validation
@@ -860,6 +923,7 @@ where
}
self.chain_sync.peer_disconnected(&peer_id);
self.pending_responses.remove(&peer_id);
self.event_streams.retain(|stream| {
stream.unbounded_send(SyncEvent::PeerDisconnected(peer_id)).is_ok()
});
@@ -991,7 +1055,7 @@ where
}
if let Some(req) = req {
self.chain_sync.send_block_request(peer_id, req);
self.send_block_request(peer_id, req);
}
self.event_streams
@@ -999,4 +1063,278 @@ where
Ok(())
}
fn send_chain_sync_requests(&mut self) {
for (peer_id, request) in self.chain_sync.block_requests() {
self.send_block_request(peer_id, request);
}
if let Some((peer_id, request)) = self.chain_sync.state_request() {
self.send_state_request(peer_id, request);
}
for (peer_id, request) in self.chain_sync.justification_requests() {
self.send_block_request(peer_id, request);
}
if let Some((peer_id, request)) = self.chain_sync.warp_sync_request() {
self.send_warp_sync_request(peer_id, request);
}
}
fn send_block_request(&mut self, peer_id: PeerId, request: BlockRequest<B>) {
if !self.chain_sync.is_peer_known(&peer_id) {
trace!(target: LOG_TARGET, "Cannot send block request to unknown peer {peer_id}");
debug_assert!(false);
return
}
let downloader = self.block_downloader.clone();
self.pending_responses.insert(
peer_id,
PeerRequest::Block(request.clone()),
async move { downloader.download_blocks(peer_id, request).await }.boxed(),
);
}
fn send_state_request(&mut self, peer_id: PeerId, request: OpaqueStateRequest) {
if !self.chain_sync.is_peer_known(&peer_id) {
trace!(target: LOG_TARGET, "Cannot send state request to unknown peer {peer_id}");
debug_assert!(false);
return
}
let (tx, rx) = oneshot::channel();
self.pending_responses.insert(peer_id, PeerRequest::State, rx.boxed());
match Self::encode_state_request(&request) {
Ok(data) => {
self.network_service.start_request(
peer_id,
self.state_request_protocol_name.clone(),
data,
tx,
IfDisconnected::ImmediateError,
);
},
Err(err) => {
log::warn!(
target: LOG_TARGET,
"Failed to encode state request {request:?}: {err:?}",
);
},
}
}
fn send_warp_sync_request(&mut self, peer_id: PeerId, request: WarpProofRequest<B>) {
if !self.chain_sync.is_peer_known(&peer_id) {
trace!(target: LOG_TARGET, "Cannot send warp proof request to unknown peer {peer_id}");
debug_assert!(false);
return
}
let (tx, rx) = oneshot::channel();
self.pending_responses.insert(peer_id, PeerRequest::WarpProof, rx.boxed());
match &self.warp_sync_protocol_name {
Some(name) => self.network_service.start_request(
peer_id,
name.clone(),
request.encode(),
tx,
IfDisconnected::ImmediateError,
),
None => {
log::warn!(
target: LOG_TARGET,
"Trying to send warp sync request when no protocol is configured {request:?}",
);
},
}
}
fn encode_state_request(request: &OpaqueStateRequest) -> Result<Vec<u8>, String> {
let request: &StateRequest = request.0.downcast_ref().ok_or_else(|| {
"Failed to downcast opaque state response during encoding, this is an \
implementation bug."
.to_string()
})?;
Ok(request.encode_to_vec())
}
fn decode_state_response(response: &[u8]) -> Result<OpaqueStateResponse, String> {
let response = StateResponse::decode(response)
.map_err(|error| format!("Failed to decode state response: {error}"))?;
Ok(OpaqueStateResponse(Box::new(response)))
}
fn process_response_event(&mut self, response_event: ResponseEvent<B>) {
let ResponseEvent { peer_id, request, response } = response_event;
match response {
Ok(Ok(resp)) => match request {
PeerRequest::Block(req) => {
match self.block_downloader.block_response_into_blocks(&req, resp) {
Ok(blocks) => {
if let Some((peer_id, new_req)) =
self.chain_sync.on_block_response(peer_id, req, blocks)
{
self.send_block_request(peer_id, new_req);
}
},
Err(BlockResponseError::DecodeFailed(e)) => {
debug!(
target: LOG_TARGET,
"Failed to decode block response from peer {:?}: {:?}.",
peer_id,
e
);
self.network_service.report_peer(peer_id, rep::BAD_MESSAGE);
self.network_service.disconnect_peer(
peer_id,
self.block_announce_protocol_name.clone(),
);
return
},
Err(BlockResponseError::ExtractionFailed(e)) => {
debug!(
target: LOG_TARGET,
"Failed to extract blocks from peer response {:?}: {:?}.",
peer_id,
e
);
self.network_service.report_peer(peer_id, rep::BAD_MESSAGE);
return
},
}
},
PeerRequest::State => {
let response = match Self::decode_state_response(&resp[..]) {
Ok(proto) => proto,
Err(e) => {
debug!(
target: LOG_TARGET,
"Failed to decode state response from peer {peer_id:?}: {e:?}.",
);
self.network_service.report_peer(peer_id, rep::BAD_MESSAGE);
self.network_service.disconnect_peer(
peer_id,
self.block_announce_protocol_name.clone(),
);
return
},
};
self.chain_sync.on_state_response(peer_id, response);
},
PeerRequest::WarpProof => {
self.chain_sync.on_warp_sync_response(peer_id, EncodedProof(resp));
},
},
Ok(Err(e)) => {
debug!(target: LOG_TARGET, "Request to peer {peer_id:?} failed: {e:?}.");
match e {
RequestFailure::Network(OutboundFailure::Timeout) => {
self.network_service.report_peer(peer_id, rep::TIMEOUT);
self.network_service
.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
},
RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => {
self.network_service.report_peer(peer_id, rep::BAD_PROTOCOL);
self.network_service
.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
},
RequestFailure::Network(OutboundFailure::DialFailure) => {
self.network_service
.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
},
RequestFailure::Refused => {
self.network_service.report_peer(peer_id, rep::REFUSED);
self.network_service
.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
},
RequestFailure::Network(OutboundFailure::ConnectionClosed) |
RequestFailure::NotConnected => {
self.network_service
.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
},
RequestFailure::UnknownProtocol => {
debug_assert!(false, "Block request protocol should always be known.");
},
RequestFailure::Obsolete => {
debug_assert!(
false,
"Can not receive `RequestFailure::Obsolete` after dropping the \
response receiver.",
);
},
}
},
Err(oneshot::Canceled) => {
trace!(
target: LOG_TARGET,
"Request to peer {peer_id:?} failed due to oneshot being canceled.",
);
self.network_service
.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
},
}
}
/// Returns the number of peers we're connected to and that are being queried.
fn num_active_peers(&self) -> usize {
self.pending_responses.len()
}
/// Get config for the block announcement protocol
fn get_block_announce_proto_config(
protocol_id: ProtocolId,
fork_id: &Option<String>,
roles: Roles,
best_number: NumberFor<B>,
best_hash: B::Hash,
genesis_hash: B::Hash,
) -> NonDefaultSetConfig {
let block_announces_protocol = {
let genesis_hash = genesis_hash.as_ref();
if let Some(ref fork_id) = fork_id {
format!(
"/{}/{}/block-announces/1",
array_bytes::bytes2hex("", genesis_hash),
fork_id
)
} else {
format!("/{}/block-announces/1", array_bytes::bytes2hex("", genesis_hash))
}
};
NonDefaultSetConfig {
notifications_protocol: block_announces_protocol.into(),
fallback_names: iter::once(
format!("/{}/block-announces/1", protocol_id.as_ref()).into(),
)
.collect(),
max_notification_size: MAX_BLOCK_ANNOUNCE_SIZE,
handshake: Some(NotificationHandshake::new(BlockAnnouncesHandshake::<B>::build(
roles,
best_number,
best_hash,
genesis_hash,
))),
// NOTE: `set_config` will be ignored by `protocol.rs` as the block announcement
// protocol is still hardcoded into the peerset.
set_config: SetConfig {
in_peers: 0,
out_peers: 0,
reserved_nodes: Vec::new(),
non_reserved_mode: NonReservedPeerMode::Deny,
},
}
}
}