mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-17 16:01:04 +00:00
Network sync refactoring (part 5) (#11825)
* Make `chain_sync` an explicit networking parameter instead of offering factory method * Derive `Copy` on `SyncMode` and remove cloning
This commit is contained in:
@@ -96,14 +96,8 @@ where
|
|||||||
/// valid.
|
/// valid.
|
||||||
pub import_queue: Box<dyn ImportQueue<B>>,
|
pub import_queue: Box<dyn ImportQueue<B>>,
|
||||||
|
|
||||||
/// Factory function that creates a new instance of chain sync.
|
/// Instance of chain sync implementation.
|
||||||
pub create_chain_sync: Box<
|
pub chain_sync: Box<dyn ChainSync<B>>,
|
||||||
dyn FnOnce(
|
|
||||||
sc_network_common::sync::SyncMode,
|
|
||||||
Arc<Client>,
|
|
||||||
Option<Arc<dyn WarpSyncProvider<B>>>,
|
|
||||||
) -> crate::error::Result<Box<dyn ChainSync<B>>>,
|
|
||||||
>,
|
|
||||||
|
|
||||||
/// Registry for recording prometheus metrics to.
|
/// Registry for recording prometheus metrics to.
|
||||||
pub metrics_registry: Option<Registry>,
|
pub metrics_registry: Option<Registry>,
|
||||||
@@ -138,8 +132,8 @@ where
|
|||||||
/// both outgoing and incoming requests.
|
/// both outgoing and incoming requests.
|
||||||
pub state_request_protocol_config: RequestResponseConfig,
|
pub state_request_protocol_config: RequestResponseConfig,
|
||||||
|
|
||||||
/// Optional warp sync protocol support. Include protocol config and sync provider.
|
/// Optional warp sync protocol config.
|
||||||
pub warp_sync: Option<(Arc<dyn WarpSyncProvider<B>>, RequestResponseConfig)>,
|
pub warp_sync_protocol_config: Option<RequestResponseConfig>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Role of the local node.
|
/// Role of the local node.
|
||||||
@@ -352,7 +346,7 @@ impl From<multiaddr::Error> for ParseErr {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Sync operation mode.
|
/// Sync operation mode.
|
||||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
|
||||||
pub enum SyncMode {
|
pub enum SyncMode {
|
||||||
/// Full block download and verification.
|
/// Full block download and verification.
|
||||||
Full,
|
Full,
|
||||||
|
|||||||
@@ -30,7 +30,7 @@
|
|||||||
use crate::{
|
use crate::{
|
||||||
behaviour::{self, Behaviour, BehaviourOut},
|
behaviour::{self, Behaviour, BehaviourOut},
|
||||||
bitswap::Bitswap,
|
bitswap::Bitswap,
|
||||||
config::{self, parse_str_addr, Params, TransportConfig},
|
config::{parse_str_addr, Params, TransportConfig},
|
||||||
discovery::DiscoveryConfig,
|
discovery::DiscoveryConfig,
|
||||||
error::Error,
|
error::Error,
|
||||||
network_state::{
|
network_state::{
|
||||||
@@ -60,7 +60,7 @@ use metrics::{Histogram, HistogramVec, MetricSources, Metrics};
|
|||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
use sc_client_api::{BlockBackend, ProofProvider};
|
use sc_client_api::{BlockBackend, ProofProvider};
|
||||||
use sc_consensus::{BlockImportError, BlockImportStatus, ImportQueue, Link};
|
use sc_consensus::{BlockImportError, BlockImportStatus, ImportQueue, Link};
|
||||||
use sc_network_common::sync::{SyncMode, SyncState, SyncStatus};
|
use sc_network_common::sync::{SyncState, SyncStatus};
|
||||||
use sc_peerset::PeersetHandle;
|
use sc_peerset::PeersetHandle;
|
||||||
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
|
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
|
||||||
use sp_blockchain::{HeaderBackend, HeaderMetadata};
|
use sp_blockchain::{HeaderBackend, HeaderMetadata};
|
||||||
@@ -239,21 +239,6 @@ where
|
|||||||
|
|
||||||
let default_notif_handshake_message = Roles::from(¶ms.role).encode();
|
let default_notif_handshake_message = Roles::from(¶ms.role).encode();
|
||||||
|
|
||||||
let (warp_sync_provider, warp_sync_protocol_config) = match params.warp_sync {
|
|
||||||
Some((p, c)) => (Some(p), Some(c)),
|
|
||||||
None => (None, None),
|
|
||||||
};
|
|
||||||
|
|
||||||
let chain_sync = (params.create_chain_sync)(
|
|
||||||
match params.network_config.sync_mode {
|
|
||||||
config::SyncMode::Full => SyncMode::Full,
|
|
||||||
config::SyncMode::Fast { skip_proofs, storage_chain_mode } =>
|
|
||||||
SyncMode::LightState { skip_proofs, storage_chain_mode },
|
|
||||||
config::SyncMode::Warp => SyncMode::Warp,
|
|
||||||
},
|
|
||||||
params.chain.clone(),
|
|
||||||
warp_sync_provider,
|
|
||||||
)?;
|
|
||||||
let (protocol, peerset_handle, mut known_addresses) = Protocol::new(
|
let (protocol, peerset_handle, mut known_addresses) = Protocol::new(
|
||||||
From::from(¶ms.role),
|
From::from(¶ms.role),
|
||||||
params.chain.clone(),
|
params.chain.clone(),
|
||||||
@@ -266,7 +251,7 @@ where
|
|||||||
)
|
)
|
||||||
.collect(),
|
.collect(),
|
||||||
params.metrics_registry.as_ref(),
|
params.metrics_registry.as_ref(),
|
||||||
chain_sync,
|
params.chain_sync,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
// List of multiaddresses that we know in the network.
|
// List of multiaddresses that we know in the network.
|
||||||
@@ -303,7 +288,6 @@ where
|
|||||||
let is_major_syncing = Arc::new(AtomicBool::new(false));
|
let is_major_syncing = Arc::new(AtomicBool::new(false));
|
||||||
|
|
||||||
// Build the swarm.
|
// Build the swarm.
|
||||||
let client = params.chain.clone();
|
|
||||||
let (mut swarm, bandwidth): (Swarm<Behaviour<B, Client>>, _) = {
|
let (mut swarm, bandwidth): (Swarm<Behaviour<B, Client>>, _) = {
|
||||||
let user_agent = format!(
|
let user_agent = format!(
|
||||||
"{} ({})",
|
"{} ({})",
|
||||||
@@ -389,7 +373,7 @@ where
|
|||||||
};
|
};
|
||||||
|
|
||||||
let behaviour = {
|
let behaviour = {
|
||||||
let bitswap = params.network_config.ipfs_server.then(|| Bitswap::new(client));
|
let bitswap = params.network_config.ipfs_server.then(|| Bitswap::new(params.chain));
|
||||||
let result = Behaviour::new(
|
let result = Behaviour::new(
|
||||||
protocol,
|
protocol,
|
||||||
user_agent,
|
user_agent,
|
||||||
@@ -397,7 +381,7 @@ where
|
|||||||
discovery_config,
|
discovery_config,
|
||||||
params.block_request_protocol_config,
|
params.block_request_protocol_config,
|
||||||
params.state_request_protocol_config,
|
params.state_request_protocol_config,
|
||||||
warp_sync_protocol_config,
|
params.warp_sync_protocol_config,
|
||||||
bitswap,
|
bitswap,
|
||||||
params.light_client_request_protocol_config,
|
params.light_client_request_protocol_config,
|
||||||
params.network_config.request_response_protocols,
|
params.network_config.request_response_protocols,
|
||||||
|
|||||||
@@ -42,7 +42,7 @@ type TestNetworkService = NetworkService<
|
|||||||
/// > **Note**: We return the events stream in order to not possibly lose events between the
|
/// > **Note**: We return the events stream in order to not possibly lose events between the
|
||||||
/// > construction of the service and the moment the events stream is grabbed.
|
/// > construction of the service and the moment the events stream is grabbed.
|
||||||
fn build_test_full_node(
|
fn build_test_full_node(
|
||||||
config: config::NetworkConfiguration,
|
network_config: config::NetworkConfiguration,
|
||||||
) -> (Arc<TestNetworkService>, impl Stream<Item = Event>) {
|
) -> (Arc<TestNetworkService>, impl Stream<Item = Event>) {
|
||||||
let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0);
|
let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0);
|
||||||
|
|
||||||
@@ -111,35 +111,36 @@ fn build_test_full_node(
|
|||||||
protocol_config
|
protocol_config
|
||||||
};
|
};
|
||||||
|
|
||||||
let max_parallel_downloads = config.max_parallel_downloads;
|
let chain_sync = ChainSync::new(
|
||||||
|
match network_config.sync_mode {
|
||||||
|
config::SyncMode::Full => sc_network_common::sync::SyncMode::Full,
|
||||||
|
config::SyncMode::Fast { skip_proofs, storage_chain_mode } =>
|
||||||
|
sc_network_common::sync::SyncMode::LightState { skip_proofs, storage_chain_mode },
|
||||||
|
config::SyncMode::Warp => sc_network_common::sync::SyncMode::Warp,
|
||||||
|
},
|
||||||
|
client.clone(),
|
||||||
|
Box::new(DefaultBlockAnnounceValidator),
|
||||||
|
network_config.max_parallel_downloads,
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
let worker = NetworkWorker::new(config::Params {
|
let worker = NetworkWorker::new(config::Params {
|
||||||
role: config::Role::Full,
|
role: config::Role::Full,
|
||||||
executor: None,
|
executor: None,
|
||||||
transactions_handler_executor: Box::new(|task| {
|
transactions_handler_executor: Box::new(|task| {
|
||||||
async_std::task::spawn(task);
|
async_std::task::spawn(task);
|
||||||
}),
|
}),
|
||||||
network_config: config,
|
network_config,
|
||||||
chain: client.clone(),
|
chain: client.clone(),
|
||||||
transaction_pool: Arc::new(crate::config::EmptyTransactionPool),
|
transaction_pool: Arc::new(config::EmptyTransactionPool),
|
||||||
protocol_id,
|
protocol_id,
|
||||||
import_queue,
|
import_queue,
|
||||||
create_chain_sync: Box::new(
|
chain_sync: Box::new(chain_sync),
|
||||||
move |sync_mode, chain, warp_sync_provider| match ChainSync::new(
|
|
||||||
sync_mode,
|
|
||||||
chain,
|
|
||||||
Box::new(DefaultBlockAnnounceValidator),
|
|
||||||
max_parallel_downloads,
|
|
||||||
warp_sync_provider,
|
|
||||||
) {
|
|
||||||
Ok(chain_sync) => Ok(Box::new(chain_sync)),
|
|
||||||
Err(error) => Err(Box::new(error).into()),
|
|
||||||
},
|
|
||||||
),
|
|
||||||
metrics_registry: None,
|
metrics_registry: None,
|
||||||
block_request_protocol_config,
|
block_request_protocol_config,
|
||||||
state_request_protocol_config,
|
state_request_protocol_config,
|
||||||
light_client_request_protocol_config,
|
light_client_request_protocol_config,
|
||||||
warp_sync: None,
|
warp_sync_protocol_config: None,
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
|||||||
@@ -838,10 +838,25 @@ where
|
|||||||
protocol_config
|
protocol_config
|
||||||
};
|
};
|
||||||
|
|
||||||
let max_parallel_downloads = network_config.max_parallel_downloads;
|
|
||||||
let block_announce_validator = config
|
let block_announce_validator = config
|
||||||
.block_announce_validator
|
.block_announce_validator
|
||||||
.unwrap_or_else(|| Box::new(DefaultBlockAnnounceValidator));
|
.unwrap_or_else(|| Box::new(DefaultBlockAnnounceValidator));
|
||||||
|
let chain_sync = ChainSync::new(
|
||||||
|
match network_config.sync_mode {
|
||||||
|
SyncMode::Full => sc_network_common::sync::SyncMode::Full,
|
||||||
|
SyncMode::Fast { skip_proofs, storage_chain_mode } =>
|
||||||
|
sc_network_common::sync::SyncMode::LightState {
|
||||||
|
skip_proofs,
|
||||||
|
storage_chain_mode,
|
||||||
|
},
|
||||||
|
SyncMode::Warp => sc_network_common::sync::SyncMode::Warp,
|
||||||
|
},
|
||||||
|
client.clone(),
|
||||||
|
block_announce_validator,
|
||||||
|
network_config.max_parallel_downloads,
|
||||||
|
Some(warp_sync),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
let network = NetworkWorker::new(sc_network::config::Params {
|
let network = NetworkWorker::new(sc_network::config::Params {
|
||||||
role: if config.is_authority { Role::Authority } else { Role::Full },
|
role: if config.is_authority { Role::Authority } else { Role::Full },
|
||||||
executor: None,
|
executor: None,
|
||||||
@@ -853,23 +868,12 @@ where
|
|||||||
transaction_pool: Arc::new(EmptyTransactionPool),
|
transaction_pool: Arc::new(EmptyTransactionPool),
|
||||||
protocol_id,
|
protocol_id,
|
||||||
import_queue,
|
import_queue,
|
||||||
create_chain_sync: Box::new(move |sync_mode, chain, warp_sync_provider| {
|
chain_sync: Box::new(chain_sync),
|
||||||
match ChainSync::new(
|
|
||||||
sync_mode,
|
|
||||||
chain,
|
|
||||||
block_announce_validator,
|
|
||||||
max_parallel_downloads,
|
|
||||||
warp_sync_provider,
|
|
||||||
) {
|
|
||||||
Ok(chain_sync) => Ok(Box::new(chain_sync)),
|
|
||||||
Err(error) => Err(Box::new(error).into()),
|
|
||||||
}
|
|
||||||
}),
|
|
||||||
metrics_registry: None,
|
metrics_registry: None,
|
||||||
block_request_protocol_config,
|
block_request_protocol_config,
|
||||||
state_request_protocol_config,
|
state_request_protocol_config,
|
||||||
light_client_request_protocol_config,
|
light_client_request_protocol_config,
|
||||||
warp_sync: Some((warp_sync, warp_protocol_config)),
|
warp_sync_protocol_config: Some(warp_protocol_config),
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
|||||||
@@ -760,13 +760,15 @@ where
|
|||||||
protocol_config
|
protocol_config
|
||||||
};
|
};
|
||||||
|
|
||||||
let warp_sync_params = warp_sync.map(|provider| {
|
let (warp_sync_provider, warp_sync_protocol_config) = warp_sync
|
||||||
// Allow both outgoing and incoming requests.
|
.map(|provider| {
|
||||||
let (handler, protocol_config) =
|
// Allow both outgoing and incoming requests.
|
||||||
WarpSyncRequestHandler::new(protocol_id.clone(), provider.clone());
|
let (handler, protocol_config) =
|
||||||
spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run());
|
WarpSyncRequestHandler::new(protocol_id.clone(), provider.clone());
|
||||||
(provider, protocol_config)
|
spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run());
|
||||||
});
|
(Some(provider), Some(protocol_config))
|
||||||
|
})
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
let light_client_request_protocol_config = {
|
let light_client_request_protocol_config = {
|
||||||
// Allow both outgoing and incoming requests.
|
// Allow both outgoing and incoming requests.
|
||||||
@@ -776,7 +778,18 @@ where
|
|||||||
protocol_config
|
protocol_config
|
||||||
};
|
};
|
||||||
|
|
||||||
let max_parallel_downloads = config.network.max_parallel_downloads;
|
let chain_sync = ChainSync::new(
|
||||||
|
match config.network.sync_mode {
|
||||||
|
SyncMode::Full => sc_network_common::sync::SyncMode::Full,
|
||||||
|
SyncMode::Fast { skip_proofs, storage_chain_mode } =>
|
||||||
|
sc_network_common::sync::SyncMode::LightState { skip_proofs, storage_chain_mode },
|
||||||
|
SyncMode::Warp => sc_network_common::sync::SyncMode::Warp,
|
||||||
|
},
|
||||||
|
client.clone(),
|
||||||
|
block_announce_validator,
|
||||||
|
config.network.max_parallel_downloads,
|
||||||
|
warp_sync_provider,
|
||||||
|
)?;
|
||||||
let network_params = sc_network::config::Params {
|
let network_params = sc_network::config::Params {
|
||||||
role: config.role.clone(),
|
role: config.role.clone(),
|
||||||
executor: {
|
executor: {
|
||||||
@@ -796,22 +809,11 @@ where
|
|||||||
transaction_pool: transaction_pool_adapter as _,
|
transaction_pool: transaction_pool_adapter as _,
|
||||||
protocol_id,
|
protocol_id,
|
||||||
import_queue: Box::new(import_queue),
|
import_queue: Box::new(import_queue),
|
||||||
create_chain_sync: Box::new(
|
chain_sync: Box::new(chain_sync),
|
||||||
move |sync_mode, chain, warp_sync_provider| match ChainSync::new(
|
|
||||||
sync_mode,
|
|
||||||
chain,
|
|
||||||
block_announce_validator,
|
|
||||||
max_parallel_downloads,
|
|
||||||
warp_sync_provider,
|
|
||||||
) {
|
|
||||||
Ok(chain_sync) => Ok(Box::new(chain_sync)),
|
|
||||||
Err(error) => Err(Box::new(error).into()),
|
|
||||||
},
|
|
||||||
),
|
|
||||||
metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()),
|
metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()),
|
||||||
block_request_protocol_config,
|
block_request_protocol_config,
|
||||||
state_request_protocol_config,
|
state_request_protocol_config,
|
||||||
warp_sync: warp_sync_params,
|
warp_sync_protocol_config,
|
||||||
light_client_request_protocol_config,
|
light_client_request_protocol_config,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user