mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-26 11:07:56 +00:00
Unify ChainSync actions under one enum (follow-up) (#2317)
Get rid of public `ChainSync::..._requests()` functions and return all requests as actions. --------- Co-authored-by: Sebastian Kunert <skunert49@gmail.com>
This commit is contained in:
@@ -191,6 +191,10 @@ pub enum ChainSyncAction<B: BlockT> {
|
||||
SendBlockRequest { peer_id: PeerId, request: BlockRequest<B> },
|
||||
/// Drop stale block request.
|
||||
CancelBlockRequest { peer_id: PeerId },
|
||||
/// Send state request to peer.
|
||||
SendStateRequest { peer_id: PeerId, request: OpaqueStateRequest },
|
||||
/// Send warp proof request to peer.
|
||||
SendWarpProofRequest { peer_id: PeerId, request: WarpProofRequest<B> },
|
||||
/// Peer misbehaved. Disconnect, report it and cancel the block request to it.
|
||||
DropPeer(BadPeer),
|
||||
/// Import blocks.
|
||||
@@ -1420,11 +1424,6 @@ where
|
||||
.any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash))
|
||||
}
|
||||
|
||||
/// Check if the peer is known to the sync state machine. Used for sanity checks.
|
||||
pub fn is_peer_known(&self, peer_id: &PeerId) -> bool {
|
||||
self.peers.contains_key(peer_id)
|
||||
}
|
||||
|
||||
/// Get the set of downloaded blocks that are ready to be queued for import.
|
||||
fn ready_blocks(&mut self) -> Vec<IncomingBlock<B>> {
|
||||
self.blocks
|
||||
@@ -1537,7 +1536,7 @@ where
|
||||
}
|
||||
|
||||
/// Get justification requests scheduled by sync to be sent out.
|
||||
pub fn justification_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
|
||||
fn justification_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
|
||||
let peers = &mut self.peers;
|
||||
let mut matcher = self.extra_justifications.matcher();
|
||||
std::iter::from_fn(move || {
|
||||
@@ -1564,7 +1563,7 @@ where
|
||||
}
|
||||
|
||||
/// Get block requests scheduled by sync to be sent out.
|
||||
pub fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
|
||||
fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
|
||||
if self.mode == SyncMode::Warp {
|
||||
return self
|
||||
.warp_target_block_request()
|
||||
@@ -1691,7 +1690,7 @@ where
|
||||
}
|
||||
|
||||
/// Get a state request scheduled by sync to be sent out (if any).
|
||||
pub fn state_request(&mut self) -> Option<(PeerId, OpaqueStateRequest)> {
|
||||
fn state_request(&mut self) -> Option<(PeerId, OpaqueStateRequest)> {
|
||||
if self.allowed_requests.is_empty() {
|
||||
return None
|
||||
}
|
||||
@@ -1737,7 +1736,7 @@ where
|
||||
}
|
||||
|
||||
/// Get a warp proof request scheduled by sync to be sent out (if any).
|
||||
pub fn warp_sync_request(&mut self) -> Option<(PeerId, WarpProofRequest<B>)> {
|
||||
fn warp_sync_request(&mut self) -> Option<(PeerId, WarpProofRequest<B>)> {
|
||||
if let Some(sync) = &self.warp_sync {
|
||||
if self.allowed_requests.is_empty() ||
|
||||
sync.is_complete() ||
|
||||
@@ -2025,7 +2024,38 @@ where
|
||||
|
||||
/// Get pending actions to perform.
|
||||
#[must_use]
|
||||
pub fn take_actions(&mut self) -> impl Iterator<Item = ChainSyncAction<B>> {
|
||||
pub fn actions(&mut self) -> impl Iterator<Item = ChainSyncAction<B>> {
|
||||
let block_requests = self
|
||||
.block_requests()
|
||||
.into_iter()
|
||||
.map(|(peer_id, request)| ChainSyncAction::SendBlockRequest { peer_id, request });
|
||||
self.actions.extend(block_requests);
|
||||
|
||||
let justification_requests = self
|
||||
.justification_requests()
|
||||
.into_iter()
|
||||
.map(|(peer_id, request)| ChainSyncAction::SendBlockRequest { peer_id, request });
|
||||
self.actions.extend(justification_requests);
|
||||
|
||||
let state_request = self
|
||||
.state_request()
|
||||
.into_iter()
|
||||
.map(|(peer_id, request)| ChainSyncAction::SendStateRequest { peer_id, request });
|
||||
self.actions.extend(state_request);
|
||||
|
||||
let warp_proof_request = self
|
||||
.warp_sync_request()
|
||||
.into_iter()
|
||||
.map(|(peer_id, request)| ChainSyncAction::SendWarpProofRequest { peer_id, request });
|
||||
self.actions.extend(warp_proof_request);
|
||||
|
||||
std::mem::take(&mut self.actions).into_iter()
|
||||
}
|
||||
|
||||
/// A version of `actions()` that doesn't schedule extra requests. For testing only.
|
||||
#[cfg(test)]
|
||||
#[must_use]
|
||||
fn take_actions(&mut self) -> impl Iterator<Item = ChainSyncAction<B>> {
|
||||
std::mem::take(&mut self.actions).into_iter()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,7 +30,7 @@ use crate::{
|
||||
schema::v1::{StateRequest, StateResponse},
|
||||
service::{
|
||||
self,
|
||||
chain_sync::{SyncingService, ToServiceCommand},
|
||||
syncing_service::{SyncingService, ToServiceCommand},
|
||||
},
|
||||
types::{
|
||||
BadPeer, ExtendedPeerInfo, OpaqueStateRequest, OpaqueStateResponse, PeerRequest, SyncEvent,
|
||||
@@ -713,16 +713,13 @@ where
|
||||
self.is_major_syncing
|
||||
.store(self.chain_sync.status().state.is_major_syncing(), Ordering::Relaxed);
|
||||
|
||||
// Process actions requested by `ChainSync` during `select!`.
|
||||
// Process actions requested by `ChainSync`.
|
||||
self.process_chain_sync_actions();
|
||||
|
||||
// Send outbound requests on `ChanSync`'s behalf.
|
||||
self.send_chain_sync_requests();
|
||||
}
|
||||
}
|
||||
|
||||
fn process_chain_sync_actions(&mut self) {
|
||||
self.chain_sync.take_actions().for_each(|action| match action {
|
||||
self.chain_sync.actions().for_each(|action| match action {
|
||||
ChainSyncAction::SendBlockRequest { peer_id, request } => {
|
||||
// Sending block request implies dropping obsolete pending response as we are not
|
||||
// interested in it anymore (see [`ChainSyncAction::SendBlockRequest`]).
|
||||
@@ -741,7 +738,25 @@ where
|
||||
ChainSyncAction::CancelBlockRequest { peer_id } => {
|
||||
let removed = self.pending_responses.remove(&peer_id);
|
||||
|
||||
trace!(target: LOG_TARGET, "Processed {action:?}., response removed: {removed}.");
|
||||
trace!(target: LOG_TARGET, "Processed {action:?}, response removed: {removed}.");
|
||||
},
|
||||
ChainSyncAction::SendStateRequest { peer_id, request } => {
|
||||
self.send_state_request(peer_id, request);
|
||||
|
||||
trace!(
|
||||
target: LOG_TARGET,
|
||||
"Processed `ChainSyncAction::SendBlockRequest` to {peer_id}.",
|
||||
);
|
||||
},
|
||||
ChainSyncAction::SendWarpProofRequest { peer_id, request } => {
|
||||
self.send_warp_proof_request(peer_id, request.clone());
|
||||
|
||||
trace!(
|
||||
target: LOG_TARGET,
|
||||
"Processed `ChainSyncAction::SendWarpProofRequest` to {}, request: {:?}.",
|
||||
peer_id,
|
||||
request,
|
||||
);
|
||||
},
|
||||
ChainSyncAction::DropPeer(BadPeer(peer_id, rep)) => {
|
||||
self.pending_responses.remove(&peer_id);
|
||||
@@ -1104,26 +1119,8 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn send_chain_sync_requests(&mut self) {
|
||||
for (peer_id, request) in self.chain_sync.block_requests() {
|
||||
self.send_block_request(peer_id, request);
|
||||
}
|
||||
|
||||
if let Some((peer_id, request)) = self.chain_sync.state_request() {
|
||||
self.send_state_request(peer_id, request);
|
||||
}
|
||||
|
||||
for (peer_id, request) in self.chain_sync.justification_requests() {
|
||||
self.send_block_request(peer_id, request);
|
||||
}
|
||||
|
||||
if let Some((peer_id, request)) = self.chain_sync.warp_sync_request() {
|
||||
self.send_warp_sync_request(peer_id, request);
|
||||
}
|
||||
}
|
||||
|
||||
fn send_block_request(&mut self, peer_id: PeerId, request: BlockRequest<B>) {
|
||||
if !self.chain_sync.is_peer_known(&peer_id) {
|
||||
if !self.peers.contains_key(&peer_id) {
|
||||
trace!(target: LOG_TARGET, "Cannot send block request to unknown peer {peer_id}");
|
||||
debug_assert!(false);
|
||||
return
|
||||
@@ -1139,7 +1136,7 @@ where
|
||||
}
|
||||
|
||||
fn send_state_request(&mut self, peer_id: PeerId, request: OpaqueStateRequest) {
|
||||
if !self.chain_sync.is_peer_known(&peer_id) {
|
||||
if !self.peers.contains_key(&peer_id) {
|
||||
trace!(target: LOG_TARGET, "Cannot send state request to unknown peer {peer_id}");
|
||||
debug_assert!(false);
|
||||
return
|
||||
@@ -1168,8 +1165,8 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn send_warp_sync_request(&mut self, peer_id: PeerId, request: WarpProofRequest<B>) {
|
||||
if !self.chain_sync.is_peer_known(&peer_id) {
|
||||
fn send_warp_proof_request(&mut self, peer_id: PeerId, request: WarpProofRequest<B>) {
|
||||
if !self.peers.contains_key(&peer_id) {
|
||||
trace!(target: LOG_TARGET, "Cannot send warp proof request to unknown peer {peer_id}");
|
||||
debug_assert!(false);
|
||||
return
|
||||
|
||||
@@ -18,7 +18,7 @@
|
||||
|
||||
//! Blockchain syncing implementation in Substrate.
|
||||
|
||||
pub use service::chain_sync::SyncingService;
|
||||
pub use service::syncing_service::SyncingService;
|
||||
pub use types::{SyncEvent, SyncEventStream, SyncState, SyncStatus, SyncStatusProvider};
|
||||
|
||||
mod block_announce_validator;
|
||||
|
||||
@@ -16,8 +16,8 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
//! `ChainSync`-related service code
|
||||
//! `SyncingEngine`-related service code
|
||||
|
||||
pub mod chain_sync;
|
||||
pub mod mock;
|
||||
pub mod network;
|
||||
pub mod syncing_service;
|
||||
|
||||
+3
-3
@@ -34,7 +34,7 @@ use std::{
|
||||
},
|
||||
};
|
||||
|
||||
/// Commands send to `ChainSync`
|
||||
/// Commands send to `SyncingEngine`
|
||||
pub enum ToServiceCommand<B: BlockT> {
|
||||
SetSyncForkRequest(Vec<PeerId>, B::Hash, NumberFor<B>),
|
||||
RequestJustification(B::Hash, NumberFor<B>),
|
||||
@@ -63,7 +63,7 @@ pub enum ToServiceCommand<B: BlockT> {
|
||||
// },
|
||||
}
|
||||
|
||||
/// Handle for communicating with `ChainSync` asynchronously
|
||||
/// Handle for communicating with `SyncingEngine` asynchronously
|
||||
#[derive(Clone)]
|
||||
pub struct SyncingService<B: BlockT> {
|
||||
tx: TracingUnboundedSender<ToServiceCommand<B>>,
|
||||
@@ -148,7 +148,7 @@ impl<B: BlockT> SyncingService<B> {
|
||||
|
||||
/// Get sync status
|
||||
///
|
||||
/// Returns an error if `ChainSync` has terminated.
|
||||
/// Returns an error if `SyncingEngine` has terminated.
|
||||
pub async fn status(&self) -> Result<SyncStatus<B>, ()> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let _ = self.tx.unbounded_send(ToServiceCommand::Status(tx));
|
||||
@@ -42,7 +42,7 @@ const LOG_TARGET: &'static str = "sync";
|
||||
pub struct EncodedProof(pub Vec<u8>);
|
||||
|
||||
/// Warp sync request
|
||||
#[derive(Encode, Decode, Debug)]
|
||||
#[derive(Encode, Decode, Debug, Clone)]
|
||||
pub struct WarpProofRequest<B: BlockT> {
|
||||
/// Start collecting proofs from this block.
|
||||
pub begin: B::Hash,
|
||||
|
||||
Reference in New Issue
Block a user