From af6773aba9c490c11f729ef34a4c25bb61cca4ef Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Fri, 29 Apr 2022 17:02:03 +0300 Subject: [PATCH] Network sync refactoring (part 1) (#11303) * Remove unnecessary imports, move one internal re-export into where it is actually used, make one import explicit * Move a few data structures down into modules * Use generic parameters in `sc-network` instead of `chain::Client` trait * Remove unnecessary bound --- substrate/client/network/src/behaviour.rs | 112 +++++++++++++--- substrate/client/network/src/bitswap.rs | 28 ++-- .../network/src/block_request_handler.rs | 15 ++- substrate/client/network/src/chain.rs | 48 ------- substrate/client/network/src/config.rs | 9 +- substrate/client/network/src/lib.rs | 1 - .../src/light_client_requests/handler.rs | 20 +-- substrate/client/network/src/protocol.rs | 37 ++++-- substrate/client/network/src/protocol/sync.rs | 68 ++++------ .../client/network/src/protocol/sync/state.rs | 32 +++-- .../client/network/src/protocol/sync/warp.rs | 56 +++++--- substrate/client/network/src/service.rs | 121 ++++++++++++++---- .../network/src/state_request_handler.rs | 14 +- substrate/client/network/test/src/lib.rs | 4 +- substrate/client/service/src/lib.rs | 21 ++- 15 files changed, 380 insertions(+), 206 deletions(-) delete mode 100644 substrate/client/network/src/chain.rs diff --git a/substrate/client/network/src/behaviour.rs b/substrate/client/network/src/behaviour.rs index f6a7e9eb87..5ff3ba1ad4 100644 --- a/substrate/client/network/src/behaviour.rs +++ b/substrate/client/network/src/behaviour.rs @@ -40,8 +40,10 @@ use libp2p::{ }; use log::debug; use prost::Message; +use sc_client_api::{BlockBackend, ProofProvider}; use sc_consensus::import_queue::{IncomingBlock, Origin}; use sc_peerset::PeersetHandle; +use sp_blockchain::{HeaderBackend, HeaderMetadata}; use sp_consensus::BlockOrigin; use sp_runtime::{ traits::{Block as BlockT, NumberFor}, @@ -62,17 +64,27 @@ pub use crate::request_responses::{ /// General behaviour of the network. Combines all protocols together. #[derive(NetworkBehaviour)] #[behaviour(out_event = "BehaviourOut", poll_method = "poll", event_process = true)] -pub struct Behaviour { +pub struct Behaviour +where + B: BlockT, + Client: HeaderBackend + + BlockBackend + + HeaderMetadata + + ProofProvider + + Send + + Sync + + 'static, +{ /// All the substrate-specific protocols. - substrate: Protocol, + substrate: Protocol, /// Periodically pings and identifies the nodes we are connected to, and store information in a /// cache. peer_info: peer_info::PeerInfoBehaviour, /// Discovers nodes of the network. discovery: DiscoveryBehaviour, /// Bitswap server for blockchain data. - bitswap: Toggle>, - /// Generic request-reponse protocols. + bitswap: Toggle>, + /// Generic request-response protocols. request_responses: request_responses::RequestResponsesBehaviour, /// Queue of events to produce for the outside. @@ -191,17 +203,27 @@ pub enum BehaviourOut { Dht(DhtEvent, Duration), } -impl Behaviour { +impl Behaviour +where + B: BlockT, + Client: HeaderBackend + + BlockBackend + + HeaderMetadata + + ProofProvider + + Send + + Sync + + 'static, +{ /// Builds a new `Behaviour`. pub fn new( - substrate: Protocol, + substrate: Protocol, user_agent: String, local_public_key: PublicKey, disco_config: DiscoveryConfig, block_request_protocol_config: request_responses::ProtocolConfig, state_request_protocol_config: request_responses::ProtocolConfig, warp_sync_protocol_config: Option, - bitswap: Option>, + bitswap: Option>, light_client_request_protocol_config: request_responses::ProtocolConfig, // All remaining request protocol configs. mut request_response_protocols: Vec, @@ -293,12 +315,12 @@ impl Behaviour { } /// Returns a shared reference to the user protocol. - pub fn user_protocol(&self) -> &Protocol { + pub fn user_protocol(&self) -> &Protocol { &self.substrate } /// Returns a mutable reference to the user protocol. - pub fn user_protocol_mut(&mut self) -> &mut Protocol { + pub fn user_protocol_mut(&mut self) -> &mut Protocol { &mut self.substrate } @@ -325,13 +347,33 @@ fn reported_roles_to_observed_role(roles: Roles) -> ObservedRole { } } -impl NetworkBehaviourEventProcess for Behaviour { +impl NetworkBehaviourEventProcess for Behaviour +where + B: BlockT, + Client: HeaderBackend + + BlockBackend + + HeaderMetadata + + ProofProvider + + Send + + Sync + + 'static, +{ fn inject_event(&mut self, event: void::Void) { void::unreachable(event) } } -impl NetworkBehaviourEventProcess> for Behaviour { +impl NetworkBehaviourEventProcess> for Behaviour +where + B: BlockT, + Client: HeaderBackend + + BlockBackend + + HeaderMetadata + + ProofProvider + + Send + + Sync + + 'static, +{ fn inject_event(&mut self, event: CustomMessageOutcome) { match event { CustomMessageOutcome::BlockImport(origin, blocks) => @@ -435,7 +477,17 @@ impl NetworkBehaviourEventProcess> for Behavi } } -impl NetworkBehaviourEventProcess for Behaviour { +impl NetworkBehaviourEventProcess for Behaviour +where + B: BlockT, + Client: HeaderBackend + + BlockBackend + + HeaderMetadata + + ProofProvider + + Send + + Sync + + 'static, +{ fn inject_event(&mut self, event: request_responses::Event) { match event { request_responses::Event::InboundRequest { peer, protocol, result } => { @@ -457,7 +509,17 @@ impl NetworkBehaviourEventProcess for Behav } } -impl NetworkBehaviourEventProcess for Behaviour { +impl NetworkBehaviourEventProcess for Behaviour +where + B: BlockT, + Client: HeaderBackend + + BlockBackend + + HeaderMetadata + + ProofProvider + + Send + + Sync + + 'static, +{ fn inject_event(&mut self, event: peer_info::PeerInfoEvent) { let peer_info::PeerInfoEvent::Identified { peer_id, @@ -480,7 +542,17 @@ impl NetworkBehaviourEventProcess for Behav } } -impl NetworkBehaviourEventProcess for Behaviour { +impl NetworkBehaviourEventProcess for Behaviour +where + B: BlockT, + Client: HeaderBackend + + BlockBackend + + HeaderMetadata + + ProofProvider + + Send + + Sync + + 'static, +{ fn inject_event(&mut self, out: DiscoveryOut) { match out { DiscoveryOut::UnroutablePeer(_peer_id) => { @@ -514,7 +586,17 @@ impl NetworkBehaviourEventProcess for Behaviour { } } -impl Behaviour { +impl Behaviour +where + B: BlockT, + Client: HeaderBackend + + BlockBackend + + HeaderMetadata + + ProofProvider + + Send + + Sync + + 'static, +{ fn poll( &mut self, _cx: &mut Context, diff --git a/substrate/client/network/src/bitswap.rs b/substrate/client/network/src/bitswap.rs index da80ab8cb3..c46990997c 100644 --- a/substrate/client/network/src/bitswap.rs +++ b/substrate/client/network/src/bitswap.rs @@ -20,12 +20,9 @@ //! Only supports bitswap 1.2.0. //! CID is expected to reference 256-bit Blake2b transaction hash. -use crate::{ - chain::Client, - schema::bitswap::{ - message::{wantlist::WantType, Block as MessageBlock, BlockPresence, BlockPresenceType}, - Message as BitswapMessage, - }, +use crate::schema::bitswap::{ + message::{wantlist::WantType, Block as MessageBlock, BlockPresence, BlockPresenceType}, + Message as BitswapMessage, }; use cid::Version; use core::pin::Pin; @@ -44,10 +41,12 @@ use libp2p::{ }; use log::{debug, error, trace}; use prost::Message; +use sc_client_api::BlockBackend; use sp_runtime::traits::Block as BlockT; use std::{ collections::VecDeque, io, + marker::PhantomData, sync::Arc, task::{Context, Poll}, }; @@ -181,19 +180,24 @@ impl Prefix { } /// Network behaviour that handles sending and receiving IPFS blocks. -pub struct Bitswap { - client: Arc>, +pub struct Bitswap { + client: Arc, ready_blocks: VecDeque<(PeerId, BitswapMessage)>, + _block: PhantomData, } -impl Bitswap { +impl Bitswap { /// Create a new instance of the bitswap protocol handler. - pub fn new(client: Arc>) -> Self { - Self { client, ready_blocks: Default::default() } + pub fn new(client: Arc) -> Self { + Self { client, ready_blocks: Default::default(), _block: PhantomData::default() } } } -impl NetworkBehaviour for Bitswap { +impl NetworkBehaviour for Bitswap +where + B: BlockT, + Client: BlockBackend + Send + Sync + 'static, +{ type ConnectionHandler = OneShotHandler; type OutEvent = void::Void; diff --git a/substrate/client/network/src/block_request_handler.rs b/substrate/client/network/src/block_request_handler.rs index e1fe9ebf8d..2e238c0163 100644 --- a/substrate/client/network/src/block_request_handler.rs +++ b/substrate/client/network/src/block_request_handler.rs @@ -18,7 +18,6 @@ //! `crate::request_responses::RequestResponsesBehaviour`. use crate::{ - chain::Client, config::ProtocolId, protocol::message::BlockAttributes, request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig}, @@ -33,6 +32,8 @@ use futures::{ use log::debug; use lru::LruCache; use prost::Message; +use sc_client_api::BlockBackend; +use sp_blockchain::HeaderBackend; use sp_runtime::{ generic::BlockId, traits::{Block as BlockT, Header, One, Zero}, @@ -113,8 +114,8 @@ enum SeenRequestsValue { } /// Handler for incoming block requests from a remote peer. -pub struct BlockRequestHandler { - client: Arc>, +pub struct BlockRequestHandler { + client: Arc, request_receiver: mpsc::Receiver, /// Maps from request to number of times we have seen this request. /// @@ -122,11 +123,15 @@ pub struct BlockRequestHandler { seen_requests: LruCache, SeenRequestsValue>, } -impl BlockRequestHandler { +impl BlockRequestHandler +where + B: BlockT, + Client: HeaderBackend + BlockBackend + Send + Sync + 'static, +{ /// Create a new [`BlockRequestHandler`]. pub fn new( protocol_id: &ProtocolId, - client: Arc>, + client: Arc, num_peer_hint: usize, ) -> (Self, ProtocolConfig) { // Reserve enough request slots for one request per peer when we are at the maximum diff --git a/substrate/client/network/src/chain.rs b/substrate/client/network/src/chain.rs deleted file mode 100644 index c66cc2ce1d..0000000000 --- a/substrate/client/network/src/chain.rs +++ /dev/null @@ -1,48 +0,0 @@ -// This file is part of Substrate. - -// Copyright (C) 2017-2022 Parity Technologies (UK) Ltd. -// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -//! Blockchain access trait - -use sc_client_api::{BlockBackend, ProofProvider}; -pub use sc_client_api::{StorageData, StorageKey}; -pub use sc_consensus::ImportedState; -use sp_blockchain::{Error, HeaderBackend, HeaderMetadata}; -use sp_runtime::traits::{Block as BlockT, BlockIdTo}; - -/// Local client abstraction for the network. -pub trait Client: - HeaderBackend - + ProofProvider - + BlockIdTo - + BlockBackend - + HeaderMetadata - + Send - + Sync -{ -} - -impl Client for T where - T: HeaderBackend - + ProofProvider - + BlockIdTo - + BlockBackend - + HeaderMetadata - + Send - + Sync -{ -} diff --git a/substrate/client/network/src/config.rs b/substrate/client/network/src/config.rs index 40aefe9a3e..2b448ed14e 100644 --- a/substrate/client/network/src/config.rs +++ b/substrate/client/network/src/config.rs @@ -22,7 +22,6 @@ //! See the documentation of [`Params`]. pub use crate::{ - chain::Client, request_responses::{ IncomingRequest, OutgoingResponse, ProtocolConfig as RequestResponseConfig, }, @@ -64,7 +63,11 @@ use std::{ use zeroize::Zeroize; /// Network initialization parameters. -pub struct Params { +pub struct Params +where + B: BlockT + 'static, + H: ExHashT, +{ /// Assigned role for our node (full, light, ...). pub role: Role, @@ -79,7 +82,7 @@ pub struct Params { pub network_config: NetworkConfiguration, /// Client that contains the blockchain. - pub chain: Arc>, + pub chain: Arc, /// Pool of transactions. /// diff --git a/substrate/client/network/src/lib.rs b/substrate/client/network/src/lib.rs index d9f5b3de1b..973e0b15b7 100644 --- a/substrate/client/network/src/lib.rs +++ b/substrate/client/network/src/lib.rs @@ -245,7 +245,6 @@ //! More precise usage details are still being worked on and will likely change in the future. mod behaviour; -mod chain; mod discovery; mod peer_info; mod protocol; diff --git a/substrate/client/network/src/light_client_requests/handler.rs b/substrate/client/network/src/light_client_requests/handler.rs index fb258304f2..cb9bd96076 100644 --- a/substrate/client/network/src/light_client_requests/handler.rs +++ b/substrate/client/network/src/light_client_requests/handler.rs @@ -23,7 +23,6 @@ //! [`LightClientRequestHandler`](handler::LightClientRequestHandler). use crate::{ - chain::Client, config::ProtocolId, request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig}, schema, PeerId, @@ -32,27 +31,32 @@ use codec::{self, Decode, Encode}; use futures::{channel::mpsc, prelude::*}; use log::{debug, trace}; use prost::Message; -use sc_client_api::StorageProof; +use sc_client_api::{ProofProvider, StorageProof}; use sc_peerset::ReputationChange; use sp_core::{ hexdisplay::HexDisplay, storage::{ChildInfo, ChildType, PrefixedStorageKey}, }; use sp_runtime::{generic::BlockId, traits::Block}; -use std::sync::Arc; +use std::{marker::PhantomData, sync::Arc}; const LOG_TARGET: &str = "light-client-request-handler"; /// Handler for incoming light client requests from a remote peer. -pub struct LightClientRequestHandler { +pub struct LightClientRequestHandler { request_receiver: mpsc::Receiver, /// Blockchain client. - client: Arc>, + client: Arc, + _block: PhantomData, } -impl LightClientRequestHandler { +impl LightClientRequestHandler +where + B: Block, + Client: ProofProvider + Send + Sync + 'static, +{ /// Create a new [`crate::block_request_handler::BlockRequestHandler`]. - pub fn new(protocol_id: &ProtocolId, client: Arc>) -> (Self, ProtocolConfig) { + pub fn new(protocol_id: &ProtocolId, client: Arc) -> (Self, ProtocolConfig) { // For now due to lack of data on light client request handling in production systems, this // value is chosen to match the block request limit. let (tx, request_receiver) = mpsc::channel(20); @@ -60,7 +64,7 @@ impl LightClientRequestHandler { let mut protocol_config = super::generate_protocol_config(protocol_id); protocol_config.inbound_queue = Some(tx); - (Self { client, request_receiver }, protocol_config) + (Self { client, request_receiver, _block: PhantomData::default() }, protocol_config) } /// Run [`LightClientRequestHandler`]. diff --git a/substrate/client/network/src/protocol.rs b/substrate/client/network/src/protocol.rs index 8497dfd940..9999d278a2 100644 --- a/substrate/client/network/src/protocol.rs +++ b/substrate/client/network/src/protocol.rs @@ -17,7 +17,6 @@ // along with this program. If not, see . use crate::{ - chain::Client, config::{self, ProtocolId, WarpSyncProvider}, error, request_responses::RequestFailure, @@ -49,6 +48,7 @@ use message::{ use notifications::{Notifications, NotificationsOut}; use prometheus_endpoint::{register, Gauge, GaugeVec, Opts, PrometheusError, Registry, U64}; use prost::Message as _; +use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider}; use sc_consensus::import_queue::{BlockImportError, BlockImportStatus, IncomingBlock, Origin}; use sp_arithmetic::traits::SaturatedConversion; use sp_consensus::{block_validation::BlockAnnounceValidator, BlockOrigin}; @@ -76,6 +76,7 @@ pub mod message; pub mod sync; pub use notifications::{NotificationsSink, NotifsHandlerError, Ready}; +use sp_blockchain::HeaderMetadata; /// Interval at which we perform time based maintenance const TICK_TIMEOUT: time::Duration = time::Duration::from_millis(1100); @@ -158,7 +159,7 @@ impl Metrics { } // Lock must always be taken in order declared here. -pub struct Protocol { +pub struct Protocol { /// Interval at which we call `tick`. tick_timeout: Pin + Send>>, /// Pending list of messages to return from `poll` as a priority. @@ -167,10 +168,10 @@ pub struct Protocol { genesis_hash: B::Hash, /// State machine that handles the list of in-progress requests. Only full node peers are /// registered. - sync: ChainSync, + sync: ChainSync, // All connected peers. Contains both full and light node peers. peers: HashMap>, - chain: Arc>, + chain: Arc, /// List of nodes for which we perform additional logging because they are important for the /// user. important_peers: HashSet, @@ -283,18 +284,28 @@ impl BlockAnnouncesHandshake { } } -impl Protocol { +impl Protocol +where + B: BlockT, + Client: HeaderBackend + + BlockBackend + + HeaderMetadata + + ProofProvider + + Send + + Sync + + 'static, +{ /// Create a new instance. pub fn new( config: ProtocolConfig, - chain: Arc>, + chain: Arc, protocol_id: ProtocolId, network_config: &config::NetworkConfiguration, notifications_protocols_handshakes: Vec>, block_announce_validator: Box + Send>, metrics_registry: Option<&Registry>, warp_sync_provider: Option>>, - ) -> error::Result<(Protocol, sc_peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> { + ) -> error::Result<(Protocol, sc_peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> { let info = chain.info(); let sync = ChainSync::new( config.sync_mode(), @@ -1366,7 +1377,17 @@ pub enum CustomMessageOutcome { None, } -impl NetworkBehaviour for Protocol { +impl NetworkBehaviour for Protocol +where + B: BlockT, + Client: HeaderBackend + + BlockBackend + + HeaderMetadata + + ProofProvider + + Send + + Sync + + 'static, +{ type ConnectionHandler = ::ConnectionHandler; type OutEvent = CustomMessageOutcome; diff --git a/substrate/client/network/src/protocol/sync.rs b/substrate/client/network/src/protocol/sync.rs index 749366f6c1..7e6a2a3c78 100644 --- a/substrate/client/network/src/protocol/sync.rs +++ b/substrate/client/network/src/protocol/sync.rs @@ -39,9 +39,10 @@ use extra_requests::ExtraRequests; use futures::{stream::FuturesUnordered, task::Poll, Future, FutureExt, StreamExt}; use libp2p::PeerId; use log::{debug, error, info, trace, warn}; +use sc_client_api::{BlockBackend, ProofProvider}; use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock}; use sp_arithmetic::traits::Saturating; -use sp_blockchain::{Error as ClientError, HeaderMetadata}; +use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata}; use sp_consensus::{ block_validation::{BlockAnnounceValidator, Validation}, BlockOrigin, BlockStatus, @@ -54,6 +55,7 @@ use sp_runtime::{ }, EncodedJustification, Justifications, }; +pub use state::StateDownloadProgress; use state::StateSync; use std::{ collections::{hash_map::Entry, HashMap, HashSet}, @@ -63,6 +65,7 @@ use std::{ sync::Arc, }; use warp::{WarpProofRequest, WarpSync, WarpSyncProvider}; +pub use warp::{WarpSyncPhase, WarpSyncProgress}; mod blocks; mod extra_requests; @@ -194,9 +197,9 @@ struct GapSync { /// The main data structure which contains all the state for a chains /// active syncing strategy. -pub struct ChainSync { +pub struct ChainSync { /// Chain client. - client: Arc>, + client: Arc, /// The active peers that we are using to sync and their PeerSync status peers: HashMap>, /// A `BlockCollection` of blocks that are being downloaded from peers @@ -228,9 +231,9 @@ pub struct ChainSync { /// Stats per peer about the number of concurrent block announce validations. block_announce_validation_per_peer_stats: HashMap, /// State sync in progress, if any. - state_sync: Option>, + state_sync: Option>, /// Warp sync in progress, if any. - warp_sync: Option>, + warp_sync: Option>, /// Warp sync provider. warp_sync_provider: Option>>, /// Enable importing existing blocks. This is used used after the state download to @@ -329,30 +332,6 @@ pub enum SyncState { Downloading, } -/// Reported state download progress. -#[derive(Clone, Eq, PartialEq, Debug)] -pub struct StateDownloadProgress { - /// Estimated download percentage. - pub percentage: u32, - /// Total state size in bytes downloaded so far. - pub size: u64, -} - -/// Reported warp sync phase. -#[derive(Clone, Eq, PartialEq, Debug)] -pub enum WarpSyncPhase { - /// Waiting for peers to connect. - AwaitingPeers, - /// Downloading and verifying grandpa warp proofs. - DownloadingWarpProofs, - /// Downloading state data. - DownloadingState, - /// Importing state. - ImportingState, - /// Downloading block history. - DownloadingBlocks(NumberFor), -} - impl fmt::Display for WarpSyncPhase { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { @@ -365,15 +344,6 @@ impl fmt::Display for WarpSyncPhase { } } -/// Reported warp sync progress. -#[derive(Clone, Eq, PartialEq, Debug)] -pub struct WarpSyncProgress { - /// Estimated download percentage. - pub phase: WarpSyncPhase, - /// Total bytes downloaded so far. - pub total_bytes: u64, -} - /// Syncing status and statistics. #[derive(Clone)] pub struct Status { @@ -534,11 +504,21 @@ enum HasSlotForBlockAnnounceValidation { MaximumPeerSlotsReached, } -impl ChainSync { +impl ChainSync +where + B: BlockT, + Client: HeaderBackend + + BlockBackend + + HeaderMetadata + + ProofProvider + + Send + + Sync + + 'static, +{ /// Create a new instance. pub fn new( mode: SyncMode, - client: Arc>, + client: Arc, block_announce_validator: Box + Send>, max_parallel_downloads: u32, warp_sync_provider: Option>>, @@ -2741,7 +2721,11 @@ mod test { } /// Send a block annoucnement for the given `header`. - fn send_block_announce(header: Header, peer_id: &PeerId, sync: &mut ChainSync) { + fn send_block_announce( + header: Header, + peer_id: &PeerId, + sync: &mut ChainSync, + ) { let block_annnounce = BlockAnnounce { header: header.clone(), state: Some(BlockState::Best), @@ -2780,7 +2764,7 @@ mod test { /// Get a block request from `sync` and check that is matches the expected request. fn get_block_request( - sync: &mut ChainSync, + sync: &mut ChainSync, from: FromBlock, max: u32, peer: &PeerId, diff --git a/substrate/client/network/src/protocol/sync/state.rs b/substrate/client/network/src/protocol/sync/state.rs index 0df862a483..4eddc4c608 100644 --- a/substrate/client/network/src/protocol/sync/state.rs +++ b/substrate/client/network/src/protocol/sync/state.rs @@ -16,14 +16,11 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use super::StateDownloadProgress; -use crate::{ - chain::{Client, ImportedState}, - schema::v1::{StateEntry, StateRequest, StateResponse}, -}; +use crate::schema::v1::{StateEntry, StateRequest, StateResponse}; use codec::{Decode, Encode}; use log::debug; -use sc_client_api::CompactProof; +use sc_client_api::{CompactProof, ProofProvider}; +use sc_consensus::ImportedState; use smallvec::SmallVec; use sp_core::storage::well_known_keys; use sp_runtime::traits::{Block as BlockT, Header, NumberFor}; @@ -33,18 +30,27 @@ use std::{collections::HashMap, sync::Arc}; /// State sync state machine. Accumulates partial state data until it /// is ready to be imported. -pub struct StateSync { +pub struct StateSync { target_block: B::Hash, target_header: B::Header, target_root: B::Hash, last_key: SmallVec<[Vec; 2]>, state: HashMap, (Vec<(Vec, Vec)>, Vec>)>, complete: bool, - client: Arc>, + client: Arc, imported_bytes: u64, skip_proof: bool, } +/// Reported state download progress. +#[derive(Clone, Eq, PartialEq, Debug)] +pub struct StateDownloadProgress { + /// Estimated download percentage. + pub percentage: u32, + /// Total state size in bytes downloaded so far. + pub size: u64, +} + /// Import state chunk result. pub enum ImportResult { /// State is complete and ready for import. @@ -55,9 +61,13 @@ pub enum ImportResult { BadResponse, } -impl StateSync { +impl StateSync +where + B: BlockT, + Client: ProofProvider + Send + Sync + 'static, +{ /// Create a new instance. - pub fn new(client: Arc>, target: B::Header, skip_proof: bool) -> Self { + pub fn new(client: Arc, target: B::Header, skip_proof: bool) -> Self { Self { client, target_block: target.hash(), @@ -71,7 +81,7 @@ impl StateSync { } } - /// Validate and import a state reponse. + /// Validate and import a state response. pub fn import(&mut self, response: StateResponse) -> ImportResult { if response.entries.is_empty() && response.proof.is_empty() { debug!(target: "sync", "Bad state response"); diff --git a/substrate/client/network/src/protocol/sync/warp.rs b/substrate/client/network/src/protocol/sync/warp.rs index f12deb2dbb..fa2c23a0b3 100644 --- a/substrate/client/network/src/protocol/sync/warp.rs +++ b/substrate/client/network/src/protocol/sync/warp.rs @@ -17,23 +17,44 @@ // along with this program. If not, see . ///! Warp sync support. -pub use super::state::ImportResult; -use super::state::StateSync; +use super::state::{ImportResult, StateSync}; +use crate::schema::v1::{StateRequest, StateResponse}; pub use crate::warp_request_handler::{ EncodedProof, Request as WarpProofRequest, VerificationResult, WarpSyncProvider, }; -use crate::{ - chain::Client, - schema::v1::{StateRequest, StateResponse}, - WarpSyncPhase, WarpSyncProgress, -}; +use sc_client_api::ProofProvider; +use sp_blockchain::HeaderBackend; use sp_finality_grandpa::{AuthorityList, SetId}; use sp_runtime::traits::{Block as BlockT, NumberFor, Zero}; use std::sync::Arc; -enum Phase { +enum Phase { WarpProof { set_id: SetId, authorities: AuthorityList, last_hash: B::Hash }, - State(StateSync), + State(StateSync), +} + +/// Reported warp sync phase. +#[derive(Clone, Eq, PartialEq, Debug)] +pub enum WarpSyncPhase { + /// Waiting for peers to connect. + AwaitingPeers, + /// Downloading and verifying grandpa warp proofs. + DownloadingWarpProofs, + /// Downloading state data. + DownloadingState, + /// Importing state. + ImportingState, + /// Downloading block history. + DownloadingBlocks(NumberFor), +} + +/// Reported warp sync progress. +#[derive(Clone, Eq, PartialEq, Debug)] +pub struct WarpSyncProgress { + /// Estimated download percentage. + pub phase: WarpSyncPhase, + /// Total bytes downloaded so far. + pub total_bytes: u64, } /// Import warp proof result. @@ -45,19 +66,20 @@ pub enum WarpProofImportResult { } /// Warp sync state machine. Accumulates warp proofs and state. -pub struct WarpSync { - phase: Phase, - client: Arc>, +pub struct WarpSync { + phase: Phase, + client: Arc, warp_sync_provider: Arc>, total_proof_bytes: u64, } -impl WarpSync { +impl WarpSync +where + B: BlockT, + Client: HeaderBackend + ProofProvider + 'static, +{ /// Create a new instance. - pub fn new( - client: Arc>, - warp_sync_provider: Arc>, - ) -> Self { + pub fn new(client: Arc, warp_sync_provider: Arc>) -> Self { let last_hash = client.hash(Zero::zero()).unwrap().expect("Genesis header always exists"); let phase = Phase::WarpProof { set_id: 0, diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index e88e792495..6ffc0ec49b 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -54,16 +54,18 @@ use libp2p::{ ping::Failure as PingFailure, swarm::{ AddressScore, ConnectionError, ConnectionLimits, DialError, NetworkBehaviour, - PendingConnectionError, SwarmBuilder, SwarmEvent, + PendingConnectionError, Swarm, SwarmBuilder, SwarmEvent, }, Multiaddr, PeerId, }; use log::{debug, error, info, trace, warn}; use metrics::{Histogram, HistogramVec, MetricSources, Metrics}; use parking_lot::Mutex; +use sc_client_api::{BlockBackend, ProofProvider}; use sc_consensus::{BlockImportError, BlockImportStatus, ImportQueue, Link}; use sc_peerset::PeersetHandle; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; +use sp_blockchain::{HeaderBackend, HeaderMetadata}; use sp_runtime::traits::{Block as BlockT, NumberFor}; use std::{ borrow::Cow, @@ -98,7 +100,7 @@ pub use libp2p::{ }, kad::record::Key as KademliaKey, }; -pub use signature::*; +pub use signature::Signature; /// Substrate network service. Handles network IO and manages connectivity. pub struct NetworkService { @@ -130,13 +132,24 @@ pub struct NetworkService { _marker: PhantomData, } -impl NetworkWorker { +impl NetworkWorker +where + B: BlockT + 'static, + H: ExHashT, + Client: HeaderBackend + + BlockBackend + + HeaderMetadata + + ProofProvider + + Send + + Sync + + 'static, +{ /// Creates the network service. /// /// Returns a `NetworkWorker` that implements `Future` and must be regularly polled in order /// for the network processing to advance. From it, you can extract a `NetworkService` using /// `worker.service()`. The `NetworkService` can be shared through the codebase. - pub fn new(mut params: Params) -> Result { + pub fn new(mut params: Params) -> Result { // Ensure the listen addresses are consistent with the transport. ensure_addresses_consistent_with_transport( params.network_config.listen_addresses.iter(), @@ -247,7 +260,7 @@ impl NetworkWorker { // Build the swarm. let client = params.chain.clone(); - let (mut swarm, bandwidth): (Swarm, _) = { + let (mut swarm, bandwidth): (Swarm>, _) = { let user_agent = format!( "{} ({})", params.network_config.client_version, params.network_config.node_name @@ -392,14 +405,18 @@ impl NetworkWorker { // Listen on multiaddresses. for addr in ¶ms.network_config.listen_addresses { - if let Err(err) = Swarm::::listen_on(&mut swarm, addr.clone()) { + if let Err(err) = Swarm::>::listen_on(&mut swarm, addr.clone()) { warn!(target: "sub-libp2p", "Can't listen on {} because: {:?}", addr, err) } } // Add external addresses. for addr in ¶ms.network_config.public_addresses { - Swarm::::add_external_address(&mut swarm, addr.clone(), AddressScore::Infinite); + Swarm::>::add_external_address( + &mut swarm, + addr.clone(), + AddressScore::Infinite, + ); } let external_addresses = Arc::new(Mutex::new(Vec::new())); @@ -540,14 +557,14 @@ impl NetworkWorker { /// Returns the local `PeerId`. pub fn local_peer_id(&self) -> &PeerId { - Swarm::::local_peer_id(&self.network_service) + Swarm::>::local_peer_id(&self.network_service) } /// Returns the list of addresses we are listening on. /// /// Does **NOT** include a trailing `/p2p/` with our `PeerId`. pub fn listen_addresses(&self) -> impl Iterator { - Swarm::::listeners(&self.network_service) + Swarm::>::listeners(&self.network_service) } /// Get network state. @@ -627,7 +644,7 @@ impl NetworkWorker { .collect() }; - let peer_id = Swarm::::local_peer_id(&swarm).to_base58(); + let peer_id = Swarm::>::local_peer_id(&swarm).to_base58(); let listened_addresses = swarm.listeners().cloned().collect(); let external_addresses = swarm.external_addresses().map(|r| &r.addr).cloned().collect(); @@ -1445,7 +1462,18 @@ enum ServiceToWorkerMsg { /// /// You are encouraged to poll this in a separate background thread or task. #[must_use = "The NetworkWorker must be polled in order for the network to advance"] -pub struct NetworkWorker { +pub struct NetworkWorker +where + B: BlockT + 'static, + H: ExHashT, + Client: HeaderBackend + + BlockBackend + + HeaderMetadata + + ProofProvider + + Send + + Sync + + 'static, +{ /// Updated by the `NetworkWorker` and loaded by the `NetworkService`. external_addresses: Arc>>, /// Updated by the `NetworkWorker` and loaded by the `NetworkService`. @@ -1455,7 +1483,7 @@ pub struct NetworkWorker { /// The network service that can be extracted and shared through the codebase. service: Arc>, /// The *actual* network. - network_service: Swarm, + network_service: Swarm>, /// The import queue that was passed at initialization. import_queue: Box>, /// Messages from the [`NetworkService`] that must be processed. @@ -1473,7 +1501,18 @@ pub struct NetworkWorker { tx_handler_controller: transactions::TransactionsHandlerController, } -impl Future for NetworkWorker { +impl Future for NetworkWorker +where + B: BlockT + 'static, + H: ExHashT, + Client: HeaderBackend + + BlockBackend + + HeaderMetadata + + ProofProvider + + Send + + Sync + + 'static, +{ type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll { @@ -2055,10 +2094,11 @@ impl Future for NetworkWorker { // Update the variables shared with the `NetworkService`. this.num_connected.store(num_connected_peers, Ordering::Relaxed); { - let external_addresses = Swarm::::external_addresses(&this.network_service) - .map(|r| &r.addr) - .cloned() - .collect(); + let external_addresses = + Swarm::>::external_addresses(&this.network_service) + .map(|r| &r.addr) + .cloned() + .collect(); *this.external_addresses.lock() = external_addresses; } @@ -2113,17 +2153,46 @@ impl Future for NetworkWorker { } } -impl Unpin for NetworkWorker {} - -/// The libp2p swarm, customized for our needs. -type Swarm = libp2p::swarm::Swarm>; - -// Implementation of `import_queue::Link` trait using the available local variables. -struct NetworkLink<'a, B: BlockT> { - protocol: &'a mut Swarm, +impl Unpin for NetworkWorker +where + B: BlockT + 'static, + H: ExHashT, + Client: HeaderBackend + + BlockBackend + + HeaderMetadata + + ProofProvider + + Send + + Sync + + 'static, +{ } -impl<'a, B: BlockT> Link for NetworkLink<'a, B> { +// Implementation of `import_queue::Link` trait using the available local variables. +struct NetworkLink<'a, B, Client> +where + B: BlockT, + Client: HeaderBackend + + BlockBackend + + HeaderMetadata + + ProofProvider + + Send + + Sync + + 'static, +{ + protocol: &'a mut Swarm>, +} + +impl<'a, B, Client> Link for NetworkLink<'a, B, Client> +where + B: BlockT, + Client: HeaderBackend + + BlockBackend + + HeaderMetadata + + ProofProvider + + Send + + Sync + + 'static, +{ fn blocks_processed( &mut self, imported: usize, diff --git a/substrate/client/network/src/state_request_handler.rs b/substrate/client/network/src/state_request_handler.rs index 10a77061a0..3e208e22e3 100644 --- a/substrate/client/network/src/state_request_handler.rs +++ b/substrate/client/network/src/state_request_handler.rs @@ -18,7 +18,6 @@ //! `crate::request_responses::RequestResponsesBehaviour`. use crate::{ - chain::Client, config::ProtocolId, request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig}, schema::v1::{KeyValueStateEntry, StateEntry, StateRequest, StateResponse}, @@ -32,6 +31,7 @@ use futures::{ use log::{debug, trace}; use lru::LruCache; use prost::Message; +use sc_client_api::ProofProvider; use sp_runtime::{generic::BlockId, traits::Block as BlockT}; use std::{ hash::{Hash, Hasher}, @@ -96,8 +96,8 @@ enum SeenRequestsValue { } /// Handler for incoming block requests from a remote peer. -pub struct StateRequestHandler { - client: Arc>, +pub struct StateRequestHandler { + client: Arc, request_receiver: mpsc::Receiver, /// Maps from request to number of times we have seen this request. /// @@ -105,11 +105,15 @@ pub struct StateRequestHandler { seen_requests: LruCache, SeenRequestsValue>, } -impl StateRequestHandler { +impl StateRequestHandler +where + B: BlockT, + Client: ProofProvider + Send + Sync + 'static, +{ /// Create a new [`StateRequestHandler`]. pub fn new( protocol_id: &ProtocolId, - client: Arc>, + client: Arc, num_peer_hint: usize, ) -> (Self, ProtocolConfig) { // Reserve enough request slots for one request per peer when we are at the maximum diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index 552879f35d..1760c08759 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -223,7 +223,7 @@ pub struct Peer { block_import: BlockImportAdapter, select_chain: Option>, backend: Option>, - network: NetworkWorker::Hash>, + network: NetworkWorker::Hash, PeersFullClient>, imported_blocks_stream: Pin> + Send>>, finality_notification_stream: Pin> + Send>>, listen_addr: Multiaddr, @@ -498,7 +498,7 @@ where } /// Get a reference to the network worker. - pub fn network(&self) -> &NetworkWorker::Hash> { + pub fn network(&self) -> &NetworkWorker::Hash, PeersFullClient> { &self.network } diff --git a/substrate/client/service/src/lib.rs b/substrate/client/service/src/lib.rs index 6d9d999942..a48e6168c5 100644 --- a/substrate/client/service/src/lib.rs +++ b/substrate/client/service/src/lib.rs @@ -39,8 +39,10 @@ use std::{collections::HashMap, io, net::SocketAddr, pin::Pin}; use codec::{Decode, Encode}; use futures::{Future, FutureExt, StreamExt}; use log::{debug, error, warn}; +use sc_client_api::{BlockBackend, ProofProvider}; use sc_network::PeerId; use sc_utils::mpsc::TracingUnboundedReceiver; +use sp_blockchain::HeaderMetadata; use sp_runtime::{ generic::BlockId, traits::{Block as BlockT, Header as HeaderT}, @@ -135,11 +137,18 @@ pub struct PartialComponents + HeaderBackend, + C: BlockchainEvents + + HeaderBackend + + BlockBackend + + HeaderMetadata + + ProofProvider + + Send + + Sync + + 'static, H: sc_network::ExHashT, >( role: Role, - mut network: sc_network::NetworkWorker, + mut network: sc_network::NetworkWorker, client: Arc, mut rpc_rx: TracingUnboundedReceiver>, should_have_peers: bool, @@ -461,7 +470,13 @@ where impl sc_network::config::TransactionPool for TransactionPoolAdapter where - C: sc_network::config::Client + Send + Sync, + C: HeaderBackend + + BlockBackend + + HeaderMetadata + + ProofProvider + + Send + + Sync + + 'static, Pool: 'static + TransactionPool, B: BlockT, H: std::hash::Hash + Eq + sp_runtime::traits::Member + sp_runtime::traits::MaybeSerialize,