Move syncing code from sc-network-common to sc-network-sync (#1912)

This PR moves syncing-related code from `sc-network-common` to
`sc-network-sync`.

Unfortunately, some parts are tightly integrated with networking, so
they were left in `sc-network-common` for now:

1. `SyncMode` in `common/src/sync.rs` (used in `NetworkConfiguration`).
2. `BlockAnnouncesHandshake`, `BlockRequest`, `BlockResponse`, etc. in
`common/src/sync/message.rs` (used in `src/protocol.rs` and
`src/protocol/message.rs`).

More substantial refactoring is needed to decouple syncing and
networking completely, including getting rid of the hardcoded sync
protocol.

## Release notes

Move syncing-related code from `sc-network-common` to `sc-network-sync`.
Delete `ChainSync` trait as it's never used (the only implementation is
accessed directly from `SyncingEngine` and exposes a lot of public
methods that are not part of the trait). Some new trait(s) for syncing
will likely be introduced as part of Sync 2.0 refactoring to represent
syncing strategies.
This commit is contained in:
Dmitry Markin
2023-11-01 15:10:33 +02:00
committed by GitHub
parent 9ca267328e
commit 1cd6acdff3
35 changed files with 3903 additions and 4041 deletions
Generated
+5
View File
@@ -14995,6 +14995,7 @@ dependencies = [
"sc-network",
"sc-network-common",
"sc-network-gossip",
"sc-network-sync",
"sc-network-test",
"sc-telemetry",
"sc-transaction-pool-api",
@@ -15211,6 +15212,7 @@ dependencies = [
"sc-client-api",
"sc-network",
"sc-network-common",
"sc-network-sync",
"sp-blockchain",
"sp-runtime",
]
@@ -15363,6 +15365,7 @@ dependencies = [
"quickcheck",
"sc-network",
"sc-network-common",
"sc-network-sync",
"schnellru",
"sp-runtime",
"substrate-prometheus-endpoint",
@@ -15403,6 +15406,7 @@ dependencies = [
"parity-scale-codec",
"sc-network",
"sc-network-common",
"sc-network-sync",
"sp-consensus",
"sp-statement-store",
"substrate-prometheus-endpoint",
@@ -15489,6 +15493,7 @@ dependencies = [
"parity-scale-codec",
"sc-network",
"sc-network-common",
"sc-network-sync",
"sc-utils",
"sp-consensus",
"sp-runtime",
@@ -37,6 +37,7 @@ sc-consensus = { path = "../common" }
sc-network = { path = "../../network" }
sc-network-gossip = { path = "../../network-gossip" }
sc-network-common = { path = "../../network/common" }
sc-network-sync = { path = "../../network/sync" }
sc-telemetry = { path = "../../telemetry" }
sc-utils = { path = "../../utils" }
sp-api = { path = "../../../primitives/api" }
@@ -59,7 +59,7 @@ use crate::{
use gossip::{
FullCatchUpMessage, FullCommitMessage, GossipMessage, GossipValidator, PeerReport, VoteMessage,
};
use sc_network_common::sync::SyncEventStream;
use sc_network_sync::SyncEventStream;
use sc_utils::mpsc::TracingUnboundedReceiver;
use sp_consensus_grandpa::{AuthorityId, AuthoritySignature, RoundNumber, SetId as SetIdNumber};
@@ -33,11 +33,9 @@ use sc_network::{
NetworkSyncForkRequest, NotificationSenderError, NotificationSenderT as NotificationSender,
PeerId, ReputationChange,
};
use sc_network_common::{
role::ObservedRole,
sync::{SyncEvent as SyncStreamEvent, SyncEventStream},
};
use sc_network_common::role::ObservedRole;
use sc_network_gossip::Validator;
use sc_network_sync::{SyncEvent as SyncStreamEvent, SyncEventStream};
use sc_network_test::{Block, Hash};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_consensus_grandpa::AuthorityList;
@@ -23,7 +23,7 @@ use crate::{
BlockNumberOps, GrandpaJustification, SharedAuthoritySet,
};
use sc_client_api::Backend as ClientBackend;
use sc_network_common::sync::warp::{EncodedProof, VerificationResult, WarpSyncProvider};
use sc_network_sync::warp::{EncodedProof, VerificationResult, WarpSyncProvider};
use sp_blockchain::{Backend as BlockchainBackend, HeaderBackend};
use sp_consensus_grandpa::{AuthorityList, SetId, GRANDPA_ENGINE_ID};
use sp_runtime::{
+1
View File
@@ -19,6 +19,7 @@ futures-timer = "3.0.1"
log = "0.4.17"
sc-client-api = { path = "../api" }
sc-network-common = { path = "../network/common" }
sc-network-sync = { path = "../network/sync" }
sc-network = { path = "../network" }
sp-blockchain = { path = "../../primitives/blockchain" }
sp-runtime = { path = "../../primitives/runtime" }
+1 -1
View File
@@ -21,7 +21,7 @@ use ansi_term::Colour;
use log::info;
use sc_client_api::ClientInfo;
use sc_network::NetworkStatus;
use sc_network_common::sync::{
use sc_network_sync::{
warp::{WarpSyncPhase, WarpSyncProgress},
SyncState, SyncStatus,
};
+1 -1
View File
@@ -24,7 +24,7 @@ use futures_timer::Delay;
use log::{debug, info, trace};
use sc_client_api::{BlockchainEvents, UsageProvider};
use sc_network::NetworkStatusProvider;
use sc_network_common::sync::SyncStatusProvider;
use sc_network_sync::SyncStatusProvider;
use sp_blockchain::HeaderMetadata;
use sp_runtime::traits::{Block as BlockT, Header};
use std::{collections::VecDeque, fmt::Display, sync::Arc, time::Duration};
@@ -24,6 +24,7 @@ tracing = "0.1.29"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../utils/prometheus" }
sc-network = { path = "../network" }
sc-network-common = { path = "../network/common" }
sc-network-sync = { path = "../network/sync" }
sp-runtime = { path = "../../primitives/runtime" }
[dev-dependencies]
@@ -22,7 +22,7 @@ use crate::{
};
use sc_network::{event::Event, types::ProtocolName, ReputationChange};
use sc_network_common::sync::SyncEvent;
use sc_network_sync::SyncEvent;
use futures::{
channel::mpsc::{channel, Receiver, Sender},
@@ -338,7 +338,8 @@ mod tests {
config::MultiaddrWithPeerId, NetworkBlock, NetworkEventStream, NetworkNotification,
NetworkPeers, NotificationSenderError, NotificationSenderT as NotificationSender,
};
use sc_network_common::{role::ObservedRole, sync::SyncEventStream};
use sc_network_common::role::ObservedRole;
use sc_network_sync::SyncEventStream;
use sp_runtime::{
testing::H256,
traits::{Block as BlockT, NumberFor},
+1 -1
View File
@@ -71,7 +71,7 @@ use libp2p::{multiaddr, PeerId};
use sc_network::{
types::ProtocolName, NetworkBlock, NetworkEventStream, NetworkNotification, NetworkPeers,
};
use sc_network_common::sync::SyncEventStream;
use sc_network_sync::SyncEventStream;
use sp_runtime::traits::{Block as BlockT, NumberFor};
use std::iter;
-342
View File
@@ -19,150 +19,6 @@
//! Abstract interfaces and data structures related to network sync.
pub mod message;
pub mod metrics;
pub mod warp;
use crate::{role::Roles, types::ReputationChange};
use futures::Stream;
use libp2p_identity::PeerId;
use message::{BlockAnnounce, BlockRequest, BlockResponse};
use sc_consensus::{import_queue::RuntimeOrigin, IncomingBlock};
use sp_consensus::BlockOrigin;
use sp_runtime::{
traits::{Block as BlockT, NumberFor},
Justifications,
};
use warp::WarpSyncProgress;
use std::{any::Any, fmt, fmt::Formatter, pin::Pin, sync::Arc};
/// The sync status of a peer we are trying to sync with
#[derive(Debug)]
pub struct PeerInfo<Block: BlockT> {
/// Their best block hash.
pub best_hash: Block::Hash,
/// Their best block number.
pub best_number: NumberFor<Block>,
}
/// Info about a peer's known state (both full and light).
#[derive(Clone, Debug)]
pub struct ExtendedPeerInfo<B: BlockT> {
/// Roles
pub roles: Roles,
/// Peer best block hash
pub best_hash: B::Hash,
/// Peer best block number
pub best_number: NumberFor<B>,
}
/// Reported sync state.
#[derive(Clone, Eq, PartialEq, Debug)]
pub enum SyncState<BlockNumber> {
/// Initial sync is complete, keep-up sync is active.
Idle,
/// Actively catching up with the chain.
Downloading { target: BlockNumber },
/// All blocks are downloaded and are being imported.
Importing { target: BlockNumber },
}
impl<BlockNumber> SyncState<BlockNumber> {
/// Are we actively catching up with the chain?
pub fn is_major_syncing(&self) -> bool {
!matches!(self, SyncState::Idle)
}
}
/// Reported state download progress.
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct StateDownloadProgress {
/// Estimated download percentage.
pub percentage: u32,
/// Total state size in bytes downloaded so far.
pub size: u64,
}
/// Syncing status and statistics.
#[derive(Debug, Clone)]
pub struct SyncStatus<Block: BlockT> {
/// Current global sync state.
pub state: SyncState<NumberFor<Block>>,
/// Target sync block number.
pub best_seen_block: Option<NumberFor<Block>>,
/// Number of peers participating in syncing.
pub num_peers: u32,
/// Number of peers known to `SyncingEngine` (both full and light).
pub num_connected_peers: u32,
/// Number of blocks queued for import
pub queued_blocks: u32,
/// State sync status in progress, if any.
pub state_sync: Option<StateDownloadProgress>,
/// Warp sync in progress, if any.
pub warp_sync: Option<WarpSyncProgress<Block>>,
}
/// A peer did not behave as expected and should be reported.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BadPeer(pub PeerId, pub ReputationChange);
impl fmt::Display for BadPeer {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Bad peer {}; Reputation change: {:?}", self.0, self.1)
}
}
impl std::error::Error for BadPeer {}
/// Action that the parent of [`ChainSync`] should perform if we want to import blocks.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ImportBlocksAction<B: BlockT> {
pub origin: BlockOrigin,
pub blocks: Vec<IncomingBlock<B>>,
}
/// Result of [`ChainSync::on_block_data`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum OnBlockData<Block: BlockT> {
/// The block should be imported.
Import(ImportBlocksAction<Block>),
/// A new block request needs to be made to the given peer.
Request(PeerId, BlockRequest<Block>),
/// Continue processing events.
Continue,
}
/// Result of [`ChainSync::on_block_justification`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum OnBlockJustification<Block: BlockT> {
/// The justification needs no further handling.
Nothing,
/// The justification should be imported.
Import {
peer_id: PeerId,
hash: Block::Hash,
number: NumberFor<Block>,
justifications: Justifications,
},
}
/// Result of `ChainSync::on_state_data`.
#[derive(Debug)]
pub enum OnStateData<Block: BlockT> {
/// The block and state that should be imported.
Import(BlockOrigin, IncomingBlock<Block>),
/// A new state request needs to be made to the given peer.
Continue,
}
/// Block or justification request polled from `ChainSync`
#[derive(Debug)]
pub enum ImportResult<B: BlockT> {
BlockImport(BlockOrigin, Vec<IncomingBlock<B>>),
JustificationImport(RuntimeOrigin, B::Hash, NumberFor<B>, Justifications),
}
/// Sync operation mode.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
@@ -197,201 +53,3 @@ impl Default for SyncMode {
Self::Full
}
}
#[derive(Debug)]
pub struct Metrics {
pub queued_blocks: u32,
pub fork_targets: u32,
pub justifications: metrics::Metrics,
}
#[derive(Debug)]
pub enum PeerRequest<B: BlockT> {
Block(BlockRequest<B>),
State,
WarpProof,
}
#[derive(Debug)]
pub enum PeerRequestType {
Block,
State,
WarpProof,
}
impl<B: BlockT> PeerRequest<B> {
pub fn get_type(&self) -> PeerRequestType {
match self {
PeerRequest::Block(_) => PeerRequestType::Block,
PeerRequest::State => PeerRequestType::State,
PeerRequest::WarpProof => PeerRequestType::WarpProof,
}
}
}
/// Wrapper for implementation-specific state request.
///
/// NOTE: Implementation must be able to encode and decode it for network purposes.
pub struct OpaqueStateRequest(pub Box<dyn Any + Send>);
impl fmt::Debug for OpaqueStateRequest {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("OpaqueStateRequest").finish()
}
}
/// Wrapper for implementation-specific state response.
///
/// NOTE: Implementation must be able to encode and decode it for network purposes.
pub struct OpaqueStateResponse(pub Box<dyn Any + Send>);
impl fmt::Debug for OpaqueStateResponse {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("OpaqueStateResponse").finish()
}
}
/// Provides high-level status of syncing.
#[async_trait::async_trait]
pub trait SyncStatusProvider<Block: BlockT>: Send + Sync {
/// Get high-level view of the syncing status.
async fn status(&self) -> Result<SyncStatus<Block>, ()>;
}
#[async_trait::async_trait]
impl<T, Block> SyncStatusProvider<Block> for Arc<T>
where
T: ?Sized,
T: SyncStatusProvider<Block>,
Block: BlockT,
{
async fn status(&self) -> Result<SyncStatus<Block>, ()> {
T::status(self).await
}
}
/// Syncing-related events that other protocols can subscribe to.
pub enum SyncEvent {
/// Peer that the syncing implementation is tracking connected.
PeerConnected(PeerId),
/// Peer that the syncing implementation was tracking disconnected.
PeerDisconnected(PeerId),
}
pub trait SyncEventStream: Send + Sync {
/// Subscribe to syncing-related events.
fn event_stream(&self, name: &'static str) -> Pin<Box<dyn Stream<Item = SyncEvent> + Send>>;
}
impl<T> SyncEventStream for Arc<T>
where
T: ?Sized,
T: SyncEventStream,
{
fn event_stream(&self, name: &'static str) -> Pin<Box<dyn Stream<Item = SyncEvent> + Send>> {
T::event_stream(self, name)
}
}
/// Something that represents the syncing strategy to download past and future blocks of the chain.
pub trait ChainSync<Block: BlockT>: Send {
/// Returns the state of the sync of the given peer.
///
/// Returns `None` if the peer is unknown.
fn peer_info(&self, who: &PeerId) -> Option<PeerInfo<Block>>;
/// Returns the current sync status.
fn status(&self) -> SyncStatus<Block>;
/// Number of active forks requests. This includes
/// requests that are pending or could be issued right away.
fn num_sync_requests(&self) -> usize;
/// Number of downloaded blocks.
fn num_downloaded_blocks(&self) -> usize;
/// Returns the current number of peers stored within this state machine.
fn num_peers(&self) -> usize;
/// Handle a new connected peer.
///
/// Call this method whenever we connect to a new peer.
#[must_use]
fn new_peer(
&mut self,
who: PeerId,
best_hash: Block::Hash,
best_number: NumberFor<Block>,
) -> Result<Option<BlockRequest<Block>>, BadPeer>;
/// Signal that a new best block has been imported.
fn update_chain_info(&mut self, best_hash: &Block::Hash, best_number: NumberFor<Block>);
/// Schedule a justification request for the given block.
fn request_justification(&mut self, hash: &Block::Hash, number: NumberFor<Block>);
/// Clear all pending justification requests.
fn clear_justification_requests(&mut self);
/// Request syncing for the given block from given set of peers.
fn set_sync_fork_request(
&mut self,
peers: Vec<PeerId>,
hash: &Block::Hash,
number: NumberFor<Block>,
);
/// Handle a response from the remote to a block request that we made.
///
/// `request` must be the original request that triggered `response`.
/// or `None` if data comes from the block announcement.
///
/// If this corresponds to a valid block, this outputs the block that
/// must be imported in the import queue.
#[must_use]
fn on_block_data(
&mut self,
who: &PeerId,
request: Option<BlockRequest<Block>>,
response: BlockResponse<Block>,
) -> Result<OnBlockData<Block>, BadPeer>;
/// Handle a response from the remote to a justification request that we made.
///
/// `request` must be the original request that triggered `response`.
#[must_use]
fn on_block_justification(
&mut self,
who: PeerId,
response: BlockResponse<Block>,
) -> Result<OnBlockJustification<Block>, BadPeer>;
/// Call this when a justification has been processed by the import queue,
/// with or without errors.
fn on_justification_import(
&mut self,
hash: Block::Hash,
number: NumberFor<Block>,
success: bool,
);
/// Notify about finalization of the given block.
fn on_block_finalized(&mut self, hash: &Block::Hash, number: NumberFor<Block>);
/// Notify about pre-validated block announcement.
fn on_validated_block_announce(
&mut self,
is_best: bool,
who: PeerId,
announce: &BlockAnnounce<Block::Header>,
);
/// Call when a peer has disconnected.
/// Canceled obsolete block request may result in some blocks being ready for
/// import, so this functions checks for such blocks and returns them.
#[must_use]
fn peer_disconnected(&mut self, who: &PeerId) -> Option<ImportBlocksAction<Block>>;
/// Return some key metrics.
fn metrics(&self) -> Metrics;
}
@@ -1,101 +0,0 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use codec::{Decode, Encode};
pub use sp_consensus_grandpa::{AuthorityList, SetId};
use sp_runtime::traits::{Block as BlockT, NumberFor};
use std::fmt;
/// Scale-encoded warp sync proof response.
pub struct EncodedProof(pub Vec<u8>);
/// Warp sync request
#[derive(Encode, Decode, Debug)]
pub struct WarpProofRequest<B: BlockT> {
/// Start collecting proofs from this block.
pub begin: B::Hash,
}
/// Proof verification result.
pub enum VerificationResult<Block: BlockT> {
/// Proof is valid, but the target was not reached.
Partial(SetId, AuthorityList, Block::Hash),
/// Target finality is proved.
Complete(SetId, AuthorityList, Block::Header),
}
/// Warp sync backend. Handles retrieving and verifying warp sync proofs.
pub trait WarpSyncProvider<Block: BlockT>: Send + Sync {
/// Generate proof starting at given block hash. The proof is accumulated until maximum proof
/// size is reached.
fn generate(
&self,
start: Block::Hash,
) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>>;
/// Verify warp proof against current set of authorities.
fn verify(
&self,
proof: &EncodedProof,
set_id: SetId,
authorities: AuthorityList,
) -> Result<VerificationResult<Block>, Box<dyn std::error::Error + Send + Sync>>;
/// Get current list of authorities. This is supposed to be genesis authorities when starting
/// sync.
fn current_authorities(&self) -> AuthorityList;
}
/// Reported warp sync phase.
#[derive(Clone, Eq, PartialEq, Debug)]
pub enum WarpSyncPhase<Block: BlockT> {
/// Waiting for peers to connect.
AwaitingPeers { required_peers: usize },
/// Waiting for target block to be received.
AwaitingTargetBlock,
/// Downloading and verifying grandpa warp proofs.
DownloadingWarpProofs,
/// Downloading target block.
DownloadingTargetBlock,
/// Downloading state data.
DownloadingState,
/// Importing state.
ImportingState,
/// Downloading block history.
DownloadingBlocks(NumberFor<Block>),
}
impl<Block: BlockT> fmt::Display for WarpSyncPhase<Block> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::AwaitingPeers { required_peers } =>
write!(f, "Waiting for {required_peers} peers to be connected"),
Self::AwaitingTargetBlock => write!(f, "Waiting for target block to be received"),
Self::DownloadingWarpProofs => write!(f, "Downloading finality proofs"),
Self::DownloadingTargetBlock => write!(f, "Downloading target block"),
Self::DownloadingState => write!(f, "Downloading state"),
Self::ImportingState => write!(f, "Importing state"),
Self::DownloadingBlocks(n) => write!(f, "Downloading block history (#{})", n),
}
}
}
/// Reported warp sync progress.
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct WarpSyncProgress<Block: BlockT> {
/// Estimated download percentage.
pub phase: WarpSyncPhase<Block>,
/// Total bytes downloaded so far.
pub total_bytes: u64,
}
+7 -7
View File
@@ -30,7 +30,11 @@ pub use crate::{
types::ProtocolName,
};
pub use libp2p::{identity::Keypair, multiaddr, Multiaddr, PeerId};
pub use libp2p::{
build_multiaddr,
identity::{self, ed25519, Keypair},
multiaddr, Multiaddr, PeerId,
};
use crate::peer_store::PeerStoreHandle;
use codec::Encode;
@@ -39,9 +43,10 @@ use zeroize::Zeroize;
pub use sc_network_common::{
role::{Role, Roles},
sync::{warp::WarpSyncProvider, SyncMode},
sync::SyncMode,
ExHashT,
};
use sc_utils::mpsc::TracingUnboundedSender;
use sp_runtime::traits::Block as BlockT;
@@ -58,11 +63,6 @@ use std::{
str::{self, FromStr},
};
pub use libp2p::{
build_multiaddr,
identity::{self, ed25519},
};
/// Protocol name prefix, transmitted on the wire for legacy protocol names.
/// I.e., `dot` in `/dot/sync/2`. Should be unique for each chain. Always UTF-8.
/// Deprecated in favour of genesis hash & fork ID based protocol names.
+1 -8
View File
@@ -266,14 +266,7 @@ pub use event::{DhtEvent, Event, SyncEvent};
#[doc(inline)]
pub use libp2p::{multiaddr, Multiaddr, PeerId};
pub use request_responses::{Config, IfDisconnected, RequestFailure};
pub use sc_network_common::{
role::ObservedRole,
sync::{
warp::{WarpSyncPhase, WarpSyncProgress},
ExtendedPeerInfo, StateDownloadProgress, SyncEventStream, SyncState, SyncStatusProvider,
},
types::ReputationChange,
};
pub use sc_network_common::{role::ObservedRole, types::ReputationChange};
pub use service::{
signature::Signature,
traits::{
@@ -21,6 +21,7 @@ libp2p = "0.51.3"
log = "0.4.17"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../../utils/prometheus" }
sc-network-common = { path = "../common" }
sc-network-sync = { path = "../sync" }
sc-network = { path = ".." }
sp-consensus = { path = "../../../primitives/consensus/common" }
sp-statement-store = { path = "../../../primitives/statement-store" }
@@ -39,10 +39,8 @@ use sc_network::{
utils::{interval, LruHashSet},
NetworkEventStream, NetworkNotification, NetworkPeers,
};
use sc_network_common::{
role::ObservedRole,
sync::{SyncEvent, SyncEventStream},
};
use sc_network_common::role::ObservedRole;
use sc_network_sync::{SyncEvent, SyncEventStream};
use sp_statement_store::{
Hash, NetworkPriority, Statement, StatementSource, StatementStore, SubmitResult,
};
@@ -24,7 +24,6 @@ use crate::{
BlockResponse as BlockResponseSchema, BlockResponse, Direction,
},
service::network::NetworkServiceHandle,
MAX_BLOCKS_IN_RESPONSE,
};
use codec::{Decode, DecodeAll, Encode};
@@ -54,6 +53,9 @@ use std::{
time::Duration,
};
/// Maximum blocks per response.
pub(crate) const MAX_BLOCKS_IN_RESPONSE: usize = 128;
const LOG_TARGET: &str = "sync";
const MAX_BODY_BYTES: usize = 8 * 1024 * 1024;
const MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER: usize = 2;
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
+26 -23
View File
@@ -24,12 +24,21 @@ use crate::{
BlockAnnounceValidationResult, BlockAnnounceValidator as BlockAnnounceValidatorStream,
},
block_relay_protocol::{BlockDownloader, BlockResponseError},
block_request_handler::MAX_BLOCKS_IN_RESPONSE,
chain_sync::{
BlockRequestAction, ChainSync, ImportBlocksAction, ImportJustificationsAction,
OnBlockResponse,
},
pending_responses::{PendingResponses, ResponseEvent},
schema::v1::{StateRequest, StateResponse},
service::{self, chain_sync::ToServiceCommand},
warp::WarpSyncParams,
BlockRequestAction, ChainSync, ClientError, ImportBlocksAction, ImportJustificationsAction,
OnBlockResponse, SyncingService,
service::{
self,
chain_sync::{SyncingService, ToServiceCommand},
},
types::{
BadPeer, ExtendedPeerInfo, OpaqueStateRequest, OpaqueStateResponse, PeerRequest, SyncEvent,
},
warp::{EncodedProof, WarpProofRequest, WarpSyncParams},
};
use codec::{Decode, Encode};
@@ -61,15 +70,10 @@ use sc_network::{
};
use sc_network_common::{
role::Roles,
sync::{
message::{BlockAnnounce, BlockAnnouncesHandshake, BlockRequest, BlockState},
warp::{EncodedProof, WarpProofRequest},
BadPeer, ChainSync as ChainSyncT, ExtendedPeerInfo, OpaqueStateRequest,
OpaqueStateResponse, PeerRequest, SyncEvent,
},
sync::message::{BlockAnnounce, BlockAnnouncesHandshake, BlockRequest, BlockState},
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_blockchain::HeaderMetadata;
use sp_blockchain::{Error as ClientError, HeaderMetadata};
use sp_consensus::block_validation::BlockAnnounceValidator;
use sp_runtime::traits::{Block as BlockT, Header, NumberFor, Zero};
@@ -363,18 +367,17 @@ where
) -> Result<(Self, SyncingService<B>, NonDefaultSetConfig), ClientError> {
let mode = net_config.network_config.sync_mode;
let max_parallel_downloads = net_config.network_config.max_parallel_downloads;
let max_blocks_per_request = if net_config.network_config.max_blocks_per_request >
crate::MAX_BLOCKS_IN_RESPONSE as u32
{
log::info!(
target: LOG_TARGET,
"clamping maximum blocks per request to {}",
crate::MAX_BLOCKS_IN_RESPONSE,
);
crate::MAX_BLOCKS_IN_RESPONSE as u32
} else {
net_config.network_config.max_blocks_per_request
};
let max_blocks_per_request =
if net_config.network_config.max_blocks_per_request > MAX_BLOCKS_IN_RESPONSE as u32 {
log::info!(
target: LOG_TARGET,
"clamping maximum blocks per request to {}",
MAX_BLOCKS_IN_RESPONSE,
);
MAX_BLOCKS_IN_RESPONSE as u32
} else {
net_config.network_config.max_blocks_per_request
};
let cache_capacity = (net_config.network_config.default_peers_set.in_peers +
net_config.network_config.default_peers_set.out_peers)
.max(1);
@@ -16,11 +16,13 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::{PeerSync, PeerSyncState};
use crate::{
chain_sync::{PeerSync, PeerSyncState},
request_metrics::Metrics,
};
use fork_tree::ForkTree;
use libp2p::PeerId;
use log::{debug, trace, warn};
use sc_network_common::sync::metrics::Metrics;
use sp_blockchain::Error as ClientError;
use sp_runtime::traits::{Block as BlockT, NumberFor, Zero};
use std::{
@@ -343,7 +345,7 @@ impl<'a, B: BlockT> Matcher<'a, B> {
#[cfg(test)]
mod tests {
use super::*;
use crate::PeerSync;
use crate::chain_sync::PeerSync;
use quickcheck::{Arbitrary, Gen, QuickCheck};
use sp_blockchain::Error as ClientError;
use sp_test_primitives::{Block, BlockNumber, Hash};
File diff suppressed because it is too large Load Diff
+2 -59
View File
@@ -23,65 +23,8 @@ use crate::block_relay_protocol::{BlockDownloader as BlockDownloaderT, BlockResp
use futures::channel::oneshot;
use libp2p::PeerId;
use sc_network::RequestFailure;
use sc_network_common::sync::{
message::{BlockAnnounce, BlockData, BlockRequest, BlockResponse},
BadPeer, ChainSync as ChainSyncT, ImportBlocksAction, Metrics, OnBlockData,
OnBlockJustification, PeerInfo, SyncStatus,
};
use sp_runtime::traits::{Block as BlockT, NumberFor};
mockall::mock! {
pub ChainSync<Block: BlockT> {}
impl<Block: BlockT> ChainSyncT<Block> for ChainSync<Block> {
fn peer_info(&self, who: &PeerId) -> Option<PeerInfo<Block>>;
fn status(&self) -> SyncStatus<Block>;
fn num_sync_requests(&self) -> usize;
fn num_downloaded_blocks(&self) -> usize;
fn num_peers(&self) -> usize;
fn new_peer(
&mut self,
who: PeerId,
best_hash: Block::Hash,
best_number: NumberFor<Block>,
) -> Result<Option<BlockRequest<Block>>, BadPeer>;
fn update_chain_info(&mut self, best_hash: &Block::Hash, best_number: NumberFor<Block>);
fn request_justification(&mut self, hash: &Block::Hash, number: NumberFor<Block>);
fn clear_justification_requests(&mut self);
fn set_sync_fork_request(
&mut self,
peers: Vec<PeerId>,
hash: &Block::Hash,
number: NumberFor<Block>,
);
fn on_block_data(
&mut self,
who: &PeerId,
request: Option<BlockRequest<Block>>,
response: BlockResponse<Block>,
) -> Result<OnBlockData<Block>, BadPeer>;
fn on_block_justification(
&mut self,
who: PeerId,
response: BlockResponse<Block>,
) -> Result<OnBlockJustification<Block>, BadPeer>;
fn on_justification_import(
&mut self,
hash: Block::Hash,
number: NumberFor<Block>,
success: bool,
);
fn on_block_finalized(&mut self, hash: &Block::Hash, number: NumberFor<Block>);
fn on_validated_block_announce(
&mut self,
is_best: bool,
who: PeerId,
announce: &BlockAnnounce<Block::Header>,
);
fn peer_disconnected(&mut self, who: &PeerId) -> Option<ImportBlocksAction<Block>>;
fn metrics(&self) -> Metrics;
}
}
use sc_network_common::sync::message::{BlockData, BlockRequest};
use sp_runtime::traits::Block as BlockT;
mockall::mock! {
pub BlockDownloader<Block: BlockT> {}
@@ -19,6 +19,7 @@
//! [`PendingResponses`] is responsible for keeping track of pending responses and
//! polling them.
use crate::types::PeerRequest;
use futures::{
channel::oneshot,
future::BoxFuture,
@@ -28,11 +29,13 @@ use futures::{
use libp2p::PeerId;
use log::error;
use sc_network::request_responses::RequestFailure;
use sc_network_common::sync::PeerRequest;
use sp_runtime::traits::Block as BlockT;
use std::task::{Context, Poll};
use tokio_stream::StreamMap;
/// Log target for this file.
const LOG_TARGET: &'static str = "sync";
/// Response result.
type ResponseResult = Result<Result<Vec<u8>, RequestFailure>, oneshot::Canceled>;
@@ -74,7 +77,7 @@ impl<B: BlockT> PendingResponses<B> {
.is_some()
{
error!(
target: crate::LOG_TARGET,
target: LOG_TARGET,
"Discarded pending response from peer {peer_id}, request type: {request_type:?}.",
);
debug_assert!(false);
@@ -16,14 +16,13 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::types::{ExtendedPeerInfo, SyncEvent, SyncEventStream, SyncStatus, SyncStatusProvider};
use futures::{channel::oneshot, Stream};
use libp2p::PeerId;
use sc_consensus::{BlockImportError, BlockImportStatus, JustificationSyncLink, Link};
use sc_network::{NetworkBlock, NetworkSyncForkRequest};
use sc_network_common::sync::{
ExtendedPeerInfo, SyncEvent, SyncEventStream, SyncStatus, SyncStatusProvider,
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
use sp_runtime::traits::{Block as BlockT, NumberFor};
+4 -2
View File
@@ -18,12 +18,14 @@
//! State sync support.
use crate::schema::v1::{StateEntry, StateRequest, StateResponse};
use crate::{
schema::v1::{StateEntry, StateRequest, StateResponse},
types::StateDownloadProgress,
};
use codec::{Decode, Encode};
use log::debug;
use sc_client_api::{CompactProof, ProofProvider};
use sc_consensus::ImportedState;
use sc_network_common::sync::StateDownloadProgress;
use smallvec::SmallVec;
use sp_core::storage::well_known_keys;
use sp_runtime::{
+206
View File
@@ -0,0 +1,206 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Common syncing types.
use futures::Stream;
use sc_network_common::{role::Roles, types::ReputationChange};
use libp2p::PeerId;
use crate::warp::WarpSyncProgress;
use sc_network_common::sync::message::BlockRequest;
use sp_runtime::traits::{Block as BlockT, NumberFor};
use std::{any::Any, fmt, fmt::Formatter, pin::Pin, sync::Arc};
pub use sc_network_common::sync::SyncMode;
/// The sync status of a peer we are trying to sync with
#[derive(Debug)]
pub struct PeerInfo<Block: BlockT> {
/// Their best block hash.
pub best_hash: Block::Hash,
/// Their best block number.
pub best_number: NumberFor<Block>,
}
/// Info about a peer's known state (both full and light).
#[derive(Clone, Debug)]
pub struct ExtendedPeerInfo<B: BlockT> {
/// Roles
pub roles: Roles,
/// Peer best block hash
pub best_hash: B::Hash,
/// Peer best block number
pub best_number: NumberFor<B>,
}
/// Reported sync state.
#[derive(Clone, Eq, PartialEq, Debug)]
pub enum SyncState<BlockNumber> {
/// Initial sync is complete, keep-up sync is active.
Idle,
/// Actively catching up with the chain.
Downloading { target: BlockNumber },
/// All blocks are downloaded and are being imported.
Importing { target: BlockNumber },
}
impl<BlockNumber> SyncState<BlockNumber> {
/// Are we actively catching up with the chain?
pub fn is_major_syncing(&self) -> bool {
!matches!(self, SyncState::Idle)
}
}
/// Reported state download progress.
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct StateDownloadProgress {
/// Estimated download percentage.
pub percentage: u32,
/// Total state size in bytes downloaded so far.
pub size: u64,
}
/// Syncing status and statistics.
#[derive(Debug, Clone)]
pub struct SyncStatus<Block: BlockT> {
/// Current global sync state.
pub state: SyncState<NumberFor<Block>>,
/// Target sync block number.
pub best_seen_block: Option<NumberFor<Block>>,
/// Number of peers participating in syncing.
pub num_peers: u32,
/// Number of peers known to `SyncingEngine` (both full and light).
pub num_connected_peers: u32,
/// Number of blocks queued for import
pub queued_blocks: u32,
/// State sync status in progress, if any.
pub state_sync: Option<StateDownloadProgress>,
/// Warp sync in progress, if any.
pub warp_sync: Option<WarpSyncProgress<Block>>,
}
/// A peer did not behave as expected and should be reported.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BadPeer(pub PeerId, pub ReputationChange);
impl fmt::Display for BadPeer {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Bad peer {}; Reputation change: {:?}", self.0, self.1)
}
}
impl std::error::Error for BadPeer {}
#[derive(Debug)]
pub struct Metrics {
pub queued_blocks: u32,
pub fork_targets: u32,
pub justifications: crate::request_metrics::Metrics,
}
#[derive(Debug)]
pub enum PeerRequest<B: BlockT> {
Block(BlockRequest<B>),
State,
WarpProof,
}
#[derive(Debug)]
pub enum PeerRequestType {
Block,
State,
WarpProof,
}
impl<B: BlockT> PeerRequest<B> {
pub fn get_type(&self) -> PeerRequestType {
match self {
PeerRequest::Block(_) => PeerRequestType::Block,
PeerRequest::State => PeerRequestType::State,
PeerRequest::WarpProof => PeerRequestType::WarpProof,
}
}
}
/// Wrapper for implementation-specific state request.
///
/// NOTE: Implementation must be able to encode and decode it for network purposes.
pub struct OpaqueStateRequest(pub Box<dyn Any + Send>);
impl fmt::Debug for OpaqueStateRequest {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("OpaqueStateRequest").finish()
}
}
/// Wrapper for implementation-specific state response.
///
/// NOTE: Implementation must be able to encode and decode it for network purposes.
pub struct OpaqueStateResponse(pub Box<dyn Any + Send>);
impl fmt::Debug for OpaqueStateResponse {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("OpaqueStateResponse").finish()
}
}
/// Provides high-level status of syncing.
#[async_trait::async_trait]
pub trait SyncStatusProvider<Block: BlockT>: Send + Sync {
/// Get high-level view of the syncing status.
async fn status(&self) -> Result<SyncStatus<Block>, ()>;
}
#[async_trait::async_trait]
impl<T, Block> SyncStatusProvider<Block> for Arc<T>
where
T: ?Sized,
T: SyncStatusProvider<Block>,
Block: BlockT,
{
async fn status(&self) -> Result<SyncStatus<Block>, ()> {
T::status(self).await
}
}
/// Syncing-related events that other protocols can subscribe to.
pub enum SyncEvent {
/// Peer that the syncing implementation is tracking connected.
PeerConnected(PeerId),
/// Peer that the syncing implementation was tracking disconnected.
PeerDisconnected(PeerId),
}
pub trait SyncEventStream: Send + Sync {
/// Subscribe to syncing-related events.
fn event_stream(&self, name: &'static str) -> Pin<Box<dyn Stream<Item = SyncEvent> + Send>>;
}
impl<T> SyncEventStream for Arc<T>
where
T: ?Sized,
T: SyncEventStream,
{
fn event_stream(&self, name: &'static str) -> Pin<Box<dyn Stream<Item = SyncEvent> + Send>> {
T::event_stream(self, name)
}
}
+87 -8
View File
@@ -18,28 +18,107 @@
//! Warp sync support.
pub use sp_consensus_grandpa::{AuthorityList, SetId};
use crate::{
schema::v1::{StateRequest, StateResponse},
state::{ImportResult, StateSync},
};
use codec::{Decode, Encode};
use futures::channel::oneshot;
use log::error;
use sc_client_api::ProofProvider;
use sc_network_common::sync::{
message::{BlockAttributes, BlockData, BlockRequest, Direction, FromBlock},
warp::{
EncodedProof, VerificationResult, WarpProofRequest, WarpSyncPhase, WarpSyncProgress,
WarpSyncProvider,
},
use sc_network_common::sync::message::{
BlockAttributes, BlockData, BlockRequest, Direction, FromBlock,
};
use sp_blockchain::HeaderBackend;
use sp_consensus_grandpa::{AuthorityList, SetId};
use sp_runtime::traits::{Block as BlockT, Header, NumberFor, Zero};
use std::sync::Arc;
use std::{fmt, sync::Arc};
/// Log target for this file.
const LOG_TARGET: &'static str = "sync";
/// Scale-encoded warp sync proof response.
pub struct EncodedProof(pub Vec<u8>);
/// Warp sync request
#[derive(Encode, Decode, Debug)]
pub struct WarpProofRequest<B: BlockT> {
/// Start collecting proofs from this block.
pub begin: B::Hash,
}
/// Proof verification result.
pub enum VerificationResult<Block: BlockT> {
/// Proof is valid, but the target was not reached.
Partial(SetId, AuthorityList, Block::Hash),
/// Target finality is proved.
Complete(SetId, AuthorityList, Block::Header),
}
/// Warp sync backend. Handles retrieving and verifying warp sync proofs.
pub trait WarpSyncProvider<Block: BlockT>: Send + Sync {
/// Generate proof starting at given block hash. The proof is accumulated until maximum proof
/// size is reached.
fn generate(
&self,
start: Block::Hash,
) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>>;
/// Verify warp proof against current set of authorities.
fn verify(
&self,
proof: &EncodedProof,
set_id: SetId,
authorities: AuthorityList,
) -> Result<VerificationResult<Block>, Box<dyn std::error::Error + Send + Sync>>;
/// Get current list of authorities. This is supposed to be genesis authorities when starting
/// sync.
fn current_authorities(&self) -> AuthorityList;
}
/// Reported warp sync phase.
#[derive(Clone, Eq, PartialEq, Debug)]
pub enum WarpSyncPhase<Block: BlockT> {
/// Waiting for peers to connect.
AwaitingPeers { required_peers: usize },
/// Waiting for target block to be received.
AwaitingTargetBlock,
/// Downloading and verifying grandpa warp proofs.
DownloadingWarpProofs,
/// Downloading target block.
DownloadingTargetBlock,
/// Downloading state data.
DownloadingState,
/// Importing state.
ImportingState,
/// Downloading block history.
DownloadingBlocks(NumberFor<Block>),
}
impl<Block: BlockT> fmt::Display for WarpSyncPhase<Block> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::AwaitingPeers { required_peers } =>
write!(f, "Waiting for {required_peers} peers to be connected"),
Self::AwaitingTargetBlock => write!(f, "Waiting for target block to be received"),
Self::DownloadingWarpProofs => write!(f, "Downloading finality proofs"),
Self::DownloadingTargetBlock => write!(f, "Downloading target block"),
Self::DownloadingState => write!(f, "Downloading state"),
Self::ImportingState => write!(f, "Importing state"),
Self::DownloadingBlocks(n) => write!(f, "Downloading block history (#{})", n),
}
}
}
/// Reported warp sync progress.
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct WarpSyncProgress<Block: BlockT> {
/// Estimated download percentage.
pub phase: WarpSyncPhase<Block>,
/// Total bytes downloaded so far.
pub total_bytes: u64,
}
/// The different types of warp syncing, passed to `build_network`.
pub enum WarpSyncParams<Block: BlockT> {
/// Standard warp sync for the chain.
@@ -20,13 +20,13 @@ use codec::Decode;
use futures::{channel::oneshot, stream::StreamExt};
use log::debug;
use crate::warp::{EncodedProof, WarpProofRequest, WarpSyncProvider};
use sc_network::{
config::ProtocolId,
request_responses::{
IncomingRequest, OutgoingResponse, ProtocolConfig as RequestResponseConfig,
},
};
use sc_network_common::sync::warp::{EncodedProof, WarpProofRequest, WarpSyncProvider};
use sp_runtime::traits::Block as BlockT;
use std::{sync::Arc, time::Duration};
+4 -5
View File
@@ -60,16 +60,15 @@ use sc_network::{
Multiaddr, NetworkBlock, NetworkService, NetworkStateInfo, NetworkSyncForkRequest,
NetworkWorker,
};
use sc_network_common::{
role::Roles,
sync::warp::{AuthorityList, EncodedProof, SetId, VerificationResult, WarpSyncProvider},
};
use sc_network_common::role::Roles;
use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
use sc_network_sync::{
block_request_handler::BlockRequestHandler,
service::{chain_sync::SyncingService, network::NetworkServiceProvider},
state_request_handler::StateRequestHandler,
warp::WarpSyncParams,
warp::{
AuthorityList, EncodedProof, SetId, VerificationResult, WarpSyncParams, WarpSyncProvider,
},
warp_request_handler,
};
use sc_service::client::Client;
@@ -21,6 +21,7 @@ log = "0.4.17"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../../utils/prometheus" }
sc-network = { path = ".." }
sc-network-common = { path = "../common" }
sc-network-sync = { path = "../sync" }
sc-utils = { path = "../../utils" }
sp-runtime = { path = "../../../primitives/runtime" }
sp-consensus = { path = "../../../primitives/consensus/common" }
@@ -42,11 +42,8 @@ use sc_network::{
utils::{interval, LruHashSet},
NetworkEventStream, NetworkNotification, NetworkPeers,
};
use sc_network_common::{
role::ObservedRole,
sync::{SyncEvent, SyncEventStream},
ExHashT,
};
use sc_network_common::{role::ObservedRole, ExHashT};
use sc_network_sync::{SyncEvent, SyncEventStream};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_runtime::traits::Block as BlockT;
+1 -1
View File
@@ -23,7 +23,7 @@ use futures_timer::Delay;
use prometheus_endpoint::{register, Gauge, GaugeVec, Opts, PrometheusError, Registry, U64};
use sc_client_api::{ClientInfo, UsageProvider};
use sc_network::{config::Role, NetworkStatus, NetworkStatusProvider};
use sc_network_common::sync::{SyncStatus, SyncStatusProvider};
use sc_network_sync::{SyncStatus, SyncStatusProvider};
use sc_telemetry::{telemetry, TelemetryHandle, SUBSTRATE_INFO};
use sc_transaction_pool_api::{MaintainedTransactionPool, PoolStatus};
use sc_utils::metrics::register_globals;