Network sync refactoring (part 1) (#11303)

* Remove unnecessary imports, move one internal re-export into where it is actually used, make one import explicit

* Move a few data structures down into modules

* Use generic parameters in `sc-network` instead of `chain::Client` trait

* Remove unnecessary bound
This commit is contained in:
Nazar Mokrynskyi
2022-04-29 17:02:03 +03:00
committed by GitHub
parent 887acda7d0
commit af6773aba9
15 changed files with 380 additions and 206 deletions
+97 -15
View File
@@ -40,8 +40,10 @@ use libp2p::{
};
use log::debug;
use prost::Message;
use sc_client_api::{BlockBackend, ProofProvider};
use sc_consensus::import_queue::{IncomingBlock, Origin};
use sc_peerset::PeersetHandle;
use sp_blockchain::{HeaderBackend, HeaderMetadata};
use sp_consensus::BlockOrigin;
use sp_runtime::{
traits::{Block as BlockT, NumberFor},
@@ -62,17 +64,27 @@ pub use crate::request_responses::{
/// General behaviour of the network. Combines all protocols together.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "BehaviourOut<B>", poll_method = "poll", event_process = true)]
pub struct Behaviour<B: BlockT> {
pub struct Behaviour<B, Client>
where
B: BlockT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
{
/// All the substrate-specific protocols.
substrate: Protocol<B>,
substrate: Protocol<B, Client>,
/// Periodically pings and identifies the nodes we are connected to, and store information in a
/// cache.
peer_info: peer_info::PeerInfoBehaviour,
/// Discovers nodes of the network.
discovery: DiscoveryBehaviour,
/// Bitswap server for blockchain data.
bitswap: Toggle<Bitswap<B>>,
/// Generic request-reponse protocols.
bitswap: Toggle<Bitswap<B, Client>>,
/// Generic request-response protocols.
request_responses: request_responses::RequestResponsesBehaviour,
/// Queue of events to produce for the outside.
@@ -191,17 +203,27 @@ pub enum BehaviourOut<B: BlockT> {
Dht(DhtEvent, Duration),
}
impl<B: BlockT> Behaviour<B> {
impl<B, Client> Behaviour<B, Client>
where
B: BlockT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
{
/// Builds a new `Behaviour`.
pub fn new(
substrate: Protocol<B>,
substrate: Protocol<B, Client>,
user_agent: String,
local_public_key: PublicKey,
disco_config: DiscoveryConfig,
block_request_protocol_config: request_responses::ProtocolConfig,
state_request_protocol_config: request_responses::ProtocolConfig,
warp_sync_protocol_config: Option<request_responses::ProtocolConfig>,
bitswap: Option<Bitswap<B>>,
bitswap: Option<Bitswap<B, Client>>,
light_client_request_protocol_config: request_responses::ProtocolConfig,
// All remaining request protocol configs.
mut request_response_protocols: Vec<request_responses::ProtocolConfig>,
@@ -293,12 +315,12 @@ impl<B: BlockT> Behaviour<B> {
}
/// Returns a shared reference to the user protocol.
pub fn user_protocol(&self) -> &Protocol<B> {
pub fn user_protocol(&self) -> &Protocol<B, Client> {
&self.substrate
}
/// Returns a mutable reference to the user protocol.
pub fn user_protocol_mut(&mut self) -> &mut Protocol<B> {
pub fn user_protocol_mut(&mut self) -> &mut Protocol<B, Client> {
&mut self.substrate
}
@@ -325,13 +347,33 @@ fn reported_roles_to_observed_role(roles: Roles) -> ObservedRole {
}
}
impl<B: BlockT> NetworkBehaviourEventProcess<void::Void> for Behaviour<B> {
impl<B, Client> NetworkBehaviourEventProcess<void::Void> for Behaviour<B, Client>
where
B: BlockT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
{
fn inject_event(&mut self, event: void::Void) {
void::unreachable(event)
}
}
impl<B: BlockT> NetworkBehaviourEventProcess<CustomMessageOutcome<B>> for Behaviour<B> {
impl<B, Client> NetworkBehaviourEventProcess<CustomMessageOutcome<B>> for Behaviour<B, Client>
where
B: BlockT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
{
fn inject_event(&mut self, event: CustomMessageOutcome<B>) {
match event {
CustomMessageOutcome::BlockImport(origin, blocks) =>
@@ -435,7 +477,17 @@ impl<B: BlockT> NetworkBehaviourEventProcess<CustomMessageOutcome<B>> for Behavi
}
}
impl<B: BlockT> NetworkBehaviourEventProcess<request_responses::Event> for Behaviour<B> {
impl<B, Client> NetworkBehaviourEventProcess<request_responses::Event> for Behaviour<B, Client>
where
B: BlockT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
{
fn inject_event(&mut self, event: request_responses::Event) {
match event {
request_responses::Event::InboundRequest { peer, protocol, result } => {
@@ -457,7 +509,17 @@ impl<B: BlockT> NetworkBehaviourEventProcess<request_responses::Event> for Behav
}
}
impl<B: BlockT> NetworkBehaviourEventProcess<peer_info::PeerInfoEvent> for Behaviour<B> {
impl<B, Client> NetworkBehaviourEventProcess<peer_info::PeerInfoEvent> for Behaviour<B, Client>
where
B: BlockT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
{
fn inject_event(&mut self, event: peer_info::PeerInfoEvent) {
let peer_info::PeerInfoEvent::Identified {
peer_id,
@@ -480,7 +542,17 @@ impl<B: BlockT> NetworkBehaviourEventProcess<peer_info::PeerInfoEvent> for Behav
}
}
impl<B: BlockT> NetworkBehaviourEventProcess<DiscoveryOut> for Behaviour<B> {
impl<B, Client> NetworkBehaviourEventProcess<DiscoveryOut> for Behaviour<B, Client>
where
B: BlockT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
{
fn inject_event(&mut self, out: DiscoveryOut) {
match out {
DiscoveryOut::UnroutablePeer(_peer_id) => {
@@ -514,7 +586,17 @@ impl<B: BlockT> NetworkBehaviourEventProcess<DiscoveryOut> for Behaviour<B> {
}
}
impl<B: BlockT> Behaviour<B> {
impl<B, Client> Behaviour<B, Client>
where
B: BlockT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
{
fn poll(
&mut self,
_cx: &mut Context,
+16 -12
View File
@@ -20,12 +20,9 @@
//! Only supports bitswap 1.2.0.
//! CID is expected to reference 256-bit Blake2b transaction hash.
use crate::{
chain::Client,
schema::bitswap::{
message::{wantlist::WantType, Block as MessageBlock, BlockPresence, BlockPresenceType},
Message as BitswapMessage,
},
use crate::schema::bitswap::{
message::{wantlist::WantType, Block as MessageBlock, BlockPresence, BlockPresenceType},
Message as BitswapMessage,
};
use cid::Version;
use core::pin::Pin;
@@ -44,10 +41,12 @@ use libp2p::{
};
use log::{debug, error, trace};
use prost::Message;
use sc_client_api::BlockBackend;
use sp_runtime::traits::Block as BlockT;
use std::{
collections::VecDeque,
io,
marker::PhantomData,
sync::Arc,
task::{Context, Poll},
};
@@ -181,19 +180,24 @@ impl Prefix {
}
/// Network behaviour that handles sending and receiving IPFS blocks.
pub struct Bitswap<B> {
client: Arc<dyn Client<B>>,
pub struct Bitswap<B, Client> {
client: Arc<Client>,
ready_blocks: VecDeque<(PeerId, BitswapMessage)>,
_block: PhantomData<B>,
}
impl<B: BlockT> Bitswap<B> {
impl<B, Client> Bitswap<B, Client> {
/// Create a new instance of the bitswap protocol handler.
pub fn new(client: Arc<dyn Client<B>>) -> Self {
Self { client, ready_blocks: Default::default() }
pub fn new(client: Arc<Client>) -> Self {
Self { client, ready_blocks: Default::default(), _block: PhantomData::default() }
}
}
impl<B: BlockT> NetworkBehaviour for Bitswap<B> {
impl<B, Client> NetworkBehaviour for Bitswap<B, Client>
where
B: BlockT,
Client: BlockBackend<B> + Send + Sync + 'static,
{
type ConnectionHandler = OneShotHandler<BitswapConfig, BitswapMessage, HandlerEvent>;
type OutEvent = void::Void;
@@ -18,7 +18,6 @@
//! `crate::request_responses::RequestResponsesBehaviour`.
use crate::{
chain::Client,
config::ProtocolId,
protocol::message::BlockAttributes,
request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig},
@@ -33,6 +32,8 @@ use futures::{
use log::debug;
use lru::LruCache;
use prost::Message;
use sc_client_api::BlockBackend;
use sp_blockchain::HeaderBackend;
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, Header, One, Zero},
@@ -113,8 +114,8 @@ enum SeenRequestsValue {
}
/// Handler for incoming block requests from a remote peer.
pub struct BlockRequestHandler<B: BlockT> {
client: Arc<dyn Client<B>>,
pub struct BlockRequestHandler<B: BlockT, Client> {
client: Arc<Client>,
request_receiver: mpsc::Receiver<IncomingRequest>,
/// Maps from request to number of times we have seen this request.
///
@@ -122,11 +123,15 @@ pub struct BlockRequestHandler<B: BlockT> {
seen_requests: LruCache<SeenRequestsKey<B>, SeenRequestsValue>,
}
impl<B: BlockT> BlockRequestHandler<B> {
impl<B, Client> BlockRequestHandler<B, Client>
where
B: BlockT,
Client: HeaderBackend<B> + BlockBackend<B> + Send + Sync + 'static,
{
/// Create a new [`BlockRequestHandler`].
pub fn new(
protocol_id: &ProtocolId,
client: Arc<dyn Client<B>>,
client: Arc<Client>,
num_peer_hint: usize,
) -> (Self, ProtocolConfig) {
// Reserve enough request slots for one request per peer when we are at the maximum
-48
View File
@@ -1,48 +0,0 @@
// This file is part of Substrate.
// Copyright (C) 2017-2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Blockchain access trait
use sc_client_api::{BlockBackend, ProofProvider};
pub use sc_client_api::{StorageData, StorageKey};
pub use sc_consensus::ImportedState;
use sp_blockchain::{Error, HeaderBackend, HeaderMetadata};
use sp_runtime::traits::{Block as BlockT, BlockIdTo};
/// Local client abstraction for the network.
pub trait Client<Block: BlockT>:
HeaderBackend<Block>
+ ProofProvider<Block>
+ BlockIdTo<Block, Error = Error>
+ BlockBackend<Block>
+ HeaderMetadata<Block, Error = Error>
+ Send
+ Sync
{
}
impl<Block: BlockT, T> Client<Block> for T where
T: HeaderBackend<Block>
+ ProofProvider<Block>
+ BlockIdTo<Block, Error = Error>
+ BlockBackend<Block>
+ HeaderMetadata<Block, Error = Error>
+ Send
+ Sync
{
}
+6 -3
View File
@@ -22,7 +22,6 @@
//! See the documentation of [`Params`].
pub use crate::{
chain::Client,
request_responses::{
IncomingRequest, OutgoingResponse, ProtocolConfig as RequestResponseConfig,
},
@@ -64,7 +63,11 @@ use std::{
use zeroize::Zeroize;
/// Network initialization parameters.
pub struct Params<B: BlockT, H: ExHashT> {
pub struct Params<B, H, Client>
where
B: BlockT + 'static,
H: ExHashT,
{
/// Assigned role for our node (full, light, ...).
pub role: Role,
@@ -79,7 +82,7 @@ pub struct Params<B: BlockT, H: ExHashT> {
pub network_config: NetworkConfiguration,
/// Client that contains the blockchain.
pub chain: Arc<dyn Client<B>>,
pub chain: Arc<Client>,
/// Pool of transactions.
///
-1
View File
@@ -245,7 +245,6 @@
//! More precise usage details are still being worked on and will likely change in the future.
mod behaviour;
mod chain;
mod discovery;
mod peer_info;
mod protocol;
@@ -23,7 +23,6 @@
//! [`LightClientRequestHandler`](handler::LightClientRequestHandler).
use crate::{
chain::Client,
config::ProtocolId,
request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig},
schema, PeerId,
@@ -32,27 +31,32 @@ use codec::{self, Decode, Encode};
use futures::{channel::mpsc, prelude::*};
use log::{debug, trace};
use prost::Message;
use sc_client_api::StorageProof;
use sc_client_api::{ProofProvider, StorageProof};
use sc_peerset::ReputationChange;
use sp_core::{
hexdisplay::HexDisplay,
storage::{ChildInfo, ChildType, PrefixedStorageKey},
};
use sp_runtime::{generic::BlockId, traits::Block};
use std::sync::Arc;
use std::{marker::PhantomData, sync::Arc};
const LOG_TARGET: &str = "light-client-request-handler";
/// Handler for incoming light client requests from a remote peer.
pub struct LightClientRequestHandler<B: Block> {
pub struct LightClientRequestHandler<B, Client> {
request_receiver: mpsc::Receiver<IncomingRequest>,
/// Blockchain client.
client: Arc<dyn Client<B>>,
client: Arc<Client>,
_block: PhantomData<B>,
}
impl<B: Block> LightClientRequestHandler<B> {
impl<B, Client> LightClientRequestHandler<B, Client>
where
B: Block,
Client: ProofProvider<B> + Send + Sync + 'static,
{
/// Create a new [`crate::block_request_handler::BlockRequestHandler`].
pub fn new(protocol_id: &ProtocolId, client: Arc<dyn Client<B>>) -> (Self, ProtocolConfig) {
pub fn new(protocol_id: &ProtocolId, client: Arc<Client>) -> (Self, ProtocolConfig) {
// For now due to lack of data on light client request handling in production systems, this
// value is chosen to match the block request limit.
let (tx, request_receiver) = mpsc::channel(20);
@@ -60,7 +64,7 @@ impl<B: Block> LightClientRequestHandler<B> {
let mut protocol_config = super::generate_protocol_config(protocol_id);
protocol_config.inbound_queue = Some(tx);
(Self { client, request_receiver }, protocol_config)
(Self { client, request_receiver, _block: PhantomData::default() }, protocol_config)
}
/// Run [`LightClientRequestHandler`].
+29 -8
View File
@@ -17,7 +17,6 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::{
chain::Client,
config::{self, ProtocolId, WarpSyncProvider},
error,
request_responses::RequestFailure,
@@ -49,6 +48,7 @@ use message::{
use notifications::{Notifications, NotificationsOut};
use prometheus_endpoint::{register, Gauge, GaugeVec, Opts, PrometheusError, Registry, U64};
use prost::Message as _;
use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider};
use sc_consensus::import_queue::{BlockImportError, BlockImportStatus, IncomingBlock, Origin};
use sp_arithmetic::traits::SaturatedConversion;
use sp_consensus::{block_validation::BlockAnnounceValidator, BlockOrigin};
@@ -76,6 +76,7 @@ pub mod message;
pub mod sync;
pub use notifications::{NotificationsSink, NotifsHandlerError, Ready};
use sp_blockchain::HeaderMetadata;
/// Interval at which we perform time based maintenance
const TICK_TIMEOUT: time::Duration = time::Duration::from_millis(1100);
@@ -158,7 +159,7 @@ impl Metrics {
}
// Lock must always be taken in order declared here.
pub struct Protocol<B: BlockT> {
pub struct Protocol<B: BlockT, Client> {
/// Interval at which we call `tick`.
tick_timeout: Pin<Box<dyn Stream<Item = ()> + Send>>,
/// Pending list of messages to return from `poll` as a priority.
@@ -167,10 +168,10 @@ pub struct Protocol<B: BlockT> {
genesis_hash: B::Hash,
/// State machine that handles the list of in-progress requests. Only full node peers are
/// registered.
sync: ChainSync<B>,
sync: ChainSync<B, Client>,
// All connected peers. Contains both full and light node peers.
peers: HashMap<PeerId, Peer<B>>,
chain: Arc<dyn Client<B>>,
chain: Arc<Client>,
/// List of nodes for which we perform additional logging because they are important for the
/// user.
important_peers: HashSet<PeerId>,
@@ -283,18 +284,28 @@ impl<B: BlockT> BlockAnnouncesHandshake<B> {
}
}
impl<B: BlockT> Protocol<B> {
impl<B, Client> Protocol<B, Client>
where
B: BlockT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
{
/// Create a new instance.
pub fn new(
config: ProtocolConfig,
chain: Arc<dyn Client<B>>,
chain: Arc<Client>,
protocol_id: ProtocolId,
network_config: &config::NetworkConfiguration,
notifications_protocols_handshakes: Vec<Vec<u8>>,
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
metrics_registry: Option<&Registry>,
warp_sync_provider: Option<Arc<dyn WarpSyncProvider<B>>>,
) -> error::Result<(Protocol<B>, sc_peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> {
) -> error::Result<(Protocol<B, Client>, sc_peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> {
let info = chain.info();
let sync = ChainSync::new(
config.sync_mode(),
@@ -1366,7 +1377,17 @@ pub enum CustomMessageOutcome<B: BlockT> {
None,
}
impl<B: BlockT> NetworkBehaviour for Protocol<B> {
impl<B, Client> NetworkBehaviour for Protocol<B, Client>
where
B: BlockT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
{
type ConnectionHandler = <Notifications as NetworkBehaviour>::ConnectionHandler;
type OutEvent = CustomMessageOutcome<B>;
+26 -42
View File
@@ -39,9 +39,10 @@ use extra_requests::ExtraRequests;
use futures::{stream::FuturesUnordered, task::Poll, Future, FutureExt, StreamExt};
use libp2p::PeerId;
use log::{debug, error, info, trace, warn};
use sc_client_api::{BlockBackend, ProofProvider};
use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock};
use sp_arithmetic::traits::Saturating;
use sp_blockchain::{Error as ClientError, HeaderMetadata};
use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata};
use sp_consensus::{
block_validation::{BlockAnnounceValidator, Validation},
BlockOrigin, BlockStatus,
@@ -54,6 +55,7 @@ use sp_runtime::{
},
EncodedJustification, Justifications,
};
pub use state::StateDownloadProgress;
use state::StateSync;
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
@@ -63,6 +65,7 @@ use std::{
sync::Arc,
};
use warp::{WarpProofRequest, WarpSync, WarpSyncProvider};
pub use warp::{WarpSyncPhase, WarpSyncProgress};
mod blocks;
mod extra_requests;
@@ -194,9 +197,9 @@ struct GapSync<B: BlockT> {
/// The main data structure which contains all the state for a chains
/// active syncing strategy.
pub struct ChainSync<B: BlockT> {
pub struct ChainSync<B: BlockT, Client> {
/// Chain client.
client: Arc<dyn crate::chain::Client<B>>,
client: Arc<Client>,
/// The active peers that we are using to sync and their PeerSync status
peers: HashMap<PeerId, PeerSync<B>>,
/// A `BlockCollection` of blocks that are being downloaded from peers
@@ -228,9 +231,9 @@ pub struct ChainSync<B: BlockT> {
/// Stats per peer about the number of concurrent block announce validations.
block_announce_validation_per_peer_stats: HashMap<PeerId, usize>,
/// State sync in progress, if any.
state_sync: Option<StateSync<B>>,
state_sync: Option<StateSync<B, Client>>,
/// Warp sync in progress, if any.
warp_sync: Option<WarpSync<B>>,
warp_sync: Option<WarpSync<B, Client>>,
/// Warp sync provider.
warp_sync_provider: Option<Arc<dyn WarpSyncProvider<B>>>,
/// Enable importing existing blocks. This is used used after the state download to
@@ -329,30 +332,6 @@ pub enum SyncState {
Downloading,
}
/// Reported state download progress.
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct StateDownloadProgress {
/// Estimated download percentage.
pub percentage: u32,
/// Total state size in bytes downloaded so far.
pub size: u64,
}
/// Reported warp sync phase.
#[derive(Clone, Eq, PartialEq, Debug)]
pub enum WarpSyncPhase<B: BlockT> {
/// Waiting for peers to connect.
AwaitingPeers,
/// Downloading and verifying grandpa warp proofs.
DownloadingWarpProofs,
/// Downloading state data.
DownloadingState,
/// Importing state.
ImportingState,
/// Downloading block history.
DownloadingBlocks(NumberFor<B>),
}
impl<B: BlockT> fmt::Display for WarpSyncPhase<B> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
@@ -365,15 +344,6 @@ impl<B: BlockT> fmt::Display for WarpSyncPhase<B> {
}
}
/// Reported warp sync progress.
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct WarpSyncProgress<B: BlockT> {
/// Estimated download percentage.
pub phase: WarpSyncPhase<B>,
/// Total bytes downloaded so far.
pub total_bytes: u64,
}
/// Syncing status and statistics.
#[derive(Clone)]
pub struct Status<B: BlockT> {
@@ -534,11 +504,21 @@ enum HasSlotForBlockAnnounceValidation {
MaximumPeerSlotsReached,
}
impl<B: BlockT> ChainSync<B> {
impl<B, Client> ChainSync<B, Client>
where
B: BlockT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
{
/// Create a new instance.
pub fn new(
mode: SyncMode,
client: Arc<dyn crate::chain::Client<B>>,
client: Arc<Client>,
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
max_parallel_downloads: u32,
warp_sync_provider: Option<Arc<dyn WarpSyncProvider<B>>>,
@@ -2741,7 +2721,11 @@ mod test {
}
/// Send a block annoucnement for the given `header`.
fn send_block_announce(header: Header, peer_id: &PeerId, sync: &mut ChainSync<Block>) {
fn send_block_announce(
header: Header,
peer_id: &PeerId,
sync: &mut ChainSync<Block, TestClient>,
) {
let block_annnounce = BlockAnnounce {
header: header.clone(),
state: Some(BlockState::Best),
@@ -2780,7 +2764,7 @@ mod test {
/// Get a block request from `sync` and check that is matches the expected request.
fn get_block_request(
sync: &mut ChainSync<Block>,
sync: &mut ChainSync<Block, TestClient>,
from: FromBlock<Hash, u64>,
max: u32,
peer: &PeerId,
@@ -16,14 +16,11 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use super::StateDownloadProgress;
use crate::{
chain::{Client, ImportedState},
schema::v1::{StateEntry, StateRequest, StateResponse},
};
use crate::schema::v1::{StateEntry, StateRequest, StateResponse};
use codec::{Decode, Encode};
use log::debug;
use sc_client_api::CompactProof;
use sc_client_api::{CompactProof, ProofProvider};
use sc_consensus::ImportedState;
use smallvec::SmallVec;
use sp_core::storage::well_known_keys;
use sp_runtime::traits::{Block as BlockT, Header, NumberFor};
@@ -33,18 +30,27 @@ use std::{collections::HashMap, sync::Arc};
/// State sync state machine. Accumulates partial state data until it
/// is ready to be imported.
pub struct StateSync<B: BlockT> {
pub struct StateSync<B: BlockT, Client> {
target_block: B::Hash,
target_header: B::Header,
target_root: B::Hash,
last_key: SmallVec<[Vec<u8>; 2]>,
state: HashMap<Vec<u8>, (Vec<(Vec<u8>, Vec<u8>)>, Vec<Vec<u8>>)>,
complete: bool,
client: Arc<dyn Client<B>>,
client: Arc<Client>,
imported_bytes: u64,
skip_proof: bool,
}
/// Reported state download progress.
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct StateDownloadProgress {
/// Estimated download percentage.
pub percentage: u32,
/// Total state size in bytes downloaded so far.
pub size: u64,
}
/// Import state chunk result.
pub enum ImportResult<B: BlockT> {
/// State is complete and ready for import.
@@ -55,9 +61,13 @@ pub enum ImportResult<B: BlockT> {
BadResponse,
}
impl<B: BlockT> StateSync<B> {
impl<B, Client> StateSync<B, Client>
where
B: BlockT,
Client: ProofProvider<B> + Send + Sync + 'static,
{
/// Create a new instance.
pub fn new(client: Arc<dyn Client<B>>, target: B::Header, skip_proof: bool) -> Self {
pub fn new(client: Arc<Client>, target: B::Header, skip_proof: bool) -> Self {
Self {
client,
target_block: target.hash(),
@@ -71,7 +81,7 @@ impl<B: BlockT> StateSync<B> {
}
}
/// Validate and import a state reponse.
/// Validate and import a state response.
pub fn import(&mut self, response: StateResponse) -> ImportResult<B> {
if response.entries.is_empty() && response.proof.is_empty() {
debug!(target: "sync", "Bad state response");
@@ -17,23 +17,44 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
///! Warp sync support.
pub use super::state::ImportResult;
use super::state::StateSync;
use super::state::{ImportResult, StateSync};
use crate::schema::v1::{StateRequest, StateResponse};
pub use crate::warp_request_handler::{
EncodedProof, Request as WarpProofRequest, VerificationResult, WarpSyncProvider,
};
use crate::{
chain::Client,
schema::v1::{StateRequest, StateResponse},
WarpSyncPhase, WarpSyncProgress,
};
use sc_client_api::ProofProvider;
use sp_blockchain::HeaderBackend;
use sp_finality_grandpa::{AuthorityList, SetId};
use sp_runtime::traits::{Block as BlockT, NumberFor, Zero};
use std::sync::Arc;
enum Phase<B: BlockT> {
enum Phase<B: BlockT, Client> {
WarpProof { set_id: SetId, authorities: AuthorityList, last_hash: B::Hash },
State(StateSync<B>),
State(StateSync<B, Client>),
}
/// Reported warp sync phase.
#[derive(Clone, Eq, PartialEq, Debug)]
pub enum WarpSyncPhase<B: BlockT> {
/// Waiting for peers to connect.
AwaitingPeers,
/// Downloading and verifying grandpa warp proofs.
DownloadingWarpProofs,
/// Downloading state data.
DownloadingState,
/// Importing state.
ImportingState,
/// Downloading block history.
DownloadingBlocks(NumberFor<B>),
}
/// Reported warp sync progress.
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct WarpSyncProgress<B: BlockT> {
/// Estimated download percentage.
pub phase: WarpSyncPhase<B>,
/// Total bytes downloaded so far.
pub total_bytes: u64,
}
/// Import warp proof result.
@@ -45,19 +66,20 @@ pub enum WarpProofImportResult {
}
/// Warp sync state machine. Accumulates warp proofs and state.
pub struct WarpSync<B: BlockT> {
phase: Phase<B>,
client: Arc<dyn Client<B>>,
pub struct WarpSync<B: BlockT, Client> {
phase: Phase<B, Client>,
client: Arc<Client>,
warp_sync_provider: Arc<dyn WarpSyncProvider<B>>,
total_proof_bytes: u64,
}
impl<B: BlockT> WarpSync<B> {
impl<B, Client> WarpSync<B, Client>
where
B: BlockT,
Client: HeaderBackend<B> + ProofProvider<B> + 'static,
{
/// Create a new instance.
pub fn new(
client: Arc<dyn Client<B>>,
warp_sync_provider: Arc<dyn WarpSyncProvider<B>>,
) -> Self {
pub fn new(client: Arc<Client>, warp_sync_provider: Arc<dyn WarpSyncProvider<B>>) -> Self {
let last_hash = client.hash(Zero::zero()).unwrap().expect("Genesis header always exists");
let phase = Phase::WarpProof {
set_id: 0,
+95 -26
View File
@@ -54,16 +54,18 @@ use libp2p::{
ping::Failure as PingFailure,
swarm::{
AddressScore, ConnectionError, ConnectionLimits, DialError, NetworkBehaviour,
PendingConnectionError, SwarmBuilder, SwarmEvent,
PendingConnectionError, Swarm, SwarmBuilder, SwarmEvent,
},
Multiaddr, PeerId,
};
use log::{debug, error, info, trace, warn};
use metrics::{Histogram, HistogramVec, MetricSources, Metrics};
use parking_lot::Mutex;
use sc_client_api::{BlockBackend, ProofProvider};
use sc_consensus::{BlockImportError, BlockImportStatus, ImportQueue, Link};
use sc_peerset::PeersetHandle;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_blockchain::{HeaderBackend, HeaderMetadata};
use sp_runtime::traits::{Block as BlockT, NumberFor};
use std::{
borrow::Cow,
@@ -98,7 +100,7 @@ pub use libp2p::{
},
kad::record::Key as KademliaKey,
};
pub use signature::*;
pub use signature::Signature;
/// Substrate network service. Handles network IO and manages connectivity.
pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
@@ -130,13 +132,24 @@ pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
_marker: PhantomData<H>,
}
impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
impl<B, H, Client> NetworkWorker<B, H, Client>
where
B: BlockT + 'static,
H: ExHashT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
{
/// Creates the network service.
///
/// Returns a `NetworkWorker` that implements `Future` and must be regularly polled in order
/// for the network processing to advance. From it, you can extract a `NetworkService` using
/// `worker.service()`. The `NetworkService` can be shared through the codebase.
pub fn new(mut params: Params<B, H>) -> Result<Self, Error> {
pub fn new(mut params: Params<B, H, Client>) -> Result<Self, Error> {
// Ensure the listen addresses are consistent with the transport.
ensure_addresses_consistent_with_transport(
params.network_config.listen_addresses.iter(),
@@ -247,7 +260,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
// Build the swarm.
let client = params.chain.clone();
let (mut swarm, bandwidth): (Swarm<B>, _) = {
let (mut swarm, bandwidth): (Swarm<Behaviour<B, Client>>, _) = {
let user_agent = format!(
"{} ({})",
params.network_config.client_version, params.network_config.node_name
@@ -392,14 +405,18 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
// Listen on multiaddresses.
for addr in &params.network_config.listen_addresses {
if let Err(err) = Swarm::<B>::listen_on(&mut swarm, addr.clone()) {
if let Err(err) = Swarm::<Behaviour<B, Client>>::listen_on(&mut swarm, addr.clone()) {
warn!(target: "sub-libp2p", "Can't listen on {} because: {:?}", addr, err)
}
}
// Add external addresses.
for addr in &params.network_config.public_addresses {
Swarm::<B>::add_external_address(&mut swarm, addr.clone(), AddressScore::Infinite);
Swarm::<Behaviour<B, Client>>::add_external_address(
&mut swarm,
addr.clone(),
AddressScore::Infinite,
);
}
let external_addresses = Arc::new(Mutex::new(Vec::new()));
@@ -540,14 +557,14 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
/// Returns the local `PeerId`.
pub fn local_peer_id(&self) -> &PeerId {
Swarm::<B>::local_peer_id(&self.network_service)
Swarm::<Behaviour<B, Client>>::local_peer_id(&self.network_service)
}
/// Returns the list of addresses we are listening on.
///
/// Does **NOT** include a trailing `/p2p/` with our `PeerId`.
pub fn listen_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
Swarm::<B>::listeners(&self.network_service)
Swarm::<Behaviour<B, Client>>::listeners(&self.network_service)
}
/// Get network state.
@@ -627,7 +644,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
.collect()
};
let peer_id = Swarm::<B>::local_peer_id(&swarm).to_base58();
let peer_id = Swarm::<Behaviour<B, Client>>::local_peer_id(&swarm).to_base58();
let listened_addresses = swarm.listeners().cloned().collect();
let external_addresses = swarm.external_addresses().map(|r| &r.addr).cloned().collect();
@@ -1445,7 +1462,18 @@ enum ServiceToWorkerMsg<B: BlockT, H: ExHashT> {
///
/// You are encouraged to poll this in a separate background thread or task.
#[must_use = "The NetworkWorker must be polled in order for the network to advance"]
pub struct NetworkWorker<B: BlockT + 'static, H: ExHashT> {
pub struct NetworkWorker<B, H, Client>
where
B: BlockT + 'static,
H: ExHashT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
{
/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
external_addresses: Arc<Mutex<Vec<Multiaddr>>>,
/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
@@ -1455,7 +1483,7 @@ pub struct NetworkWorker<B: BlockT + 'static, H: ExHashT> {
/// The network service that can be extracted and shared through the codebase.
service: Arc<NetworkService<B, H>>,
/// The *actual* network.
network_service: Swarm<B>,
network_service: Swarm<Behaviour<B, Client>>,
/// The import queue that was passed at initialization.
import_queue: Box<dyn ImportQueue<B>>,
/// Messages from the [`NetworkService`] that must be processed.
@@ -1473,7 +1501,18 @@ pub struct NetworkWorker<B: BlockT + 'static, H: ExHashT> {
tx_handler_controller: transactions::TransactionsHandlerController<H>,
}
impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
impl<B, H, Client> Future for NetworkWorker<B, H, Client>
where
B: BlockT + 'static,
H: ExHashT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
{
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll<Self::Output> {
@@ -2055,10 +2094,11 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
// Update the variables shared with the `NetworkService`.
this.num_connected.store(num_connected_peers, Ordering::Relaxed);
{
let external_addresses = Swarm::<B>::external_addresses(&this.network_service)
.map(|r| &r.addr)
.cloned()
.collect();
let external_addresses =
Swarm::<Behaviour<B, Client>>::external_addresses(&this.network_service)
.map(|r| &r.addr)
.cloned()
.collect();
*this.external_addresses.lock() = external_addresses;
}
@@ -2113,17 +2153,46 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
}
}
impl<B: BlockT + 'static, H: ExHashT> Unpin for NetworkWorker<B, H> {}
/// The libp2p swarm, customized for our needs.
type Swarm<B> = libp2p::swarm::Swarm<Behaviour<B>>;
// Implementation of `import_queue::Link` trait using the available local variables.
struct NetworkLink<'a, B: BlockT> {
protocol: &'a mut Swarm<B>,
impl<B, H, Client> Unpin for NetworkWorker<B, H, Client>
where
B: BlockT + 'static,
H: ExHashT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
{
}
impl<'a, B: BlockT> Link<B> for NetworkLink<'a, B> {
// Implementation of `import_queue::Link` trait using the available local variables.
struct NetworkLink<'a, B, Client>
where
B: BlockT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
{
protocol: &'a mut Swarm<Behaviour<B, Client>>,
}
impl<'a, B, Client> Link<B> for NetworkLink<'a, B, Client>
where
B: BlockT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
{
fn blocks_processed(
&mut self,
imported: usize,
@@ -18,7 +18,6 @@
//! `crate::request_responses::RequestResponsesBehaviour`.
use crate::{
chain::Client,
config::ProtocolId,
request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig},
schema::v1::{KeyValueStateEntry, StateEntry, StateRequest, StateResponse},
@@ -32,6 +31,7 @@ use futures::{
use log::{debug, trace};
use lru::LruCache;
use prost::Message;
use sc_client_api::ProofProvider;
use sp_runtime::{generic::BlockId, traits::Block as BlockT};
use std::{
hash::{Hash, Hasher},
@@ -96,8 +96,8 @@ enum SeenRequestsValue {
}
/// Handler for incoming block requests from a remote peer.
pub struct StateRequestHandler<B: BlockT> {
client: Arc<dyn Client<B>>,
pub struct StateRequestHandler<B: BlockT, Client> {
client: Arc<Client>,
request_receiver: mpsc::Receiver<IncomingRequest>,
/// Maps from request to number of times we have seen this request.
///
@@ -105,11 +105,15 @@ pub struct StateRequestHandler<B: BlockT> {
seen_requests: LruCache<SeenRequestsKey<B>, SeenRequestsValue>,
}
impl<B: BlockT> StateRequestHandler<B> {
impl<B, Client> StateRequestHandler<B, Client>
where
B: BlockT,
Client: ProofProvider<B> + Send + Sync + 'static,
{
/// Create a new [`StateRequestHandler`].
pub fn new(
protocol_id: &ProtocolId,
client: Arc<dyn Client<B>>,
client: Arc<Client>,
num_peer_hint: usize,
) -> (Self, ProtocolConfig) {
// Reserve enough request slots for one request per peer when we are at the maximum