Prepare sc-network for ProtocolController/NotificationService (#14080)

* Prepare `sc-network` for `ProtocolController`/`NotificationService`

The upcoming notification protocol refactoring requires that protocols
are able to communicate with `sc-network` over unique and direct links.
This means that `sc-network` side of the link has to be created before
`sc-network` is initialized and that it is allowed to consume the object
as the receiver half of the link may not implement `Clone`.

Remove request-response and notification protocols from `NetworkConfiguration`
and create a new object that contains the configurations of these protocols
and which is consumable by `sc-network`. This is needed needed because, e.g.,
the receiver half of `NotificationService` is not clonable so `sc-network`
must consume it when it's initializing the protocols in `Notifications`.

Similar principe applies to `PeerStore`/`ProtocolController`: as per current
design, protocols are created before the network so `Protocol` cannot be
the one creating the `PeerStore` object. `FullNetworkConfiguration` will be
used to store the objects that `sc-network` will use to communicate with
protocols and it will also allow protocols to allocate handles so they
can directly communicate with `sc-network`.

* Fixes

* Update client/service/src/builder.rs

Co-authored-by: Dmitry Markin <dmitry@markin.tech>

* Updates

* Doc updates + cargo-fmt

---------

Co-authored-by: Dmitry Markin <dmitry@markin.tech>
This commit is contained in:
Aaro Altonen
2023-05-11 13:27:21 +03:00
committed by GitHub
parent a62085511b
commit f36749b99e
14 changed files with 321 additions and 271 deletions
+1
View File
@@ -5488,6 +5488,7 @@ dependencies = [
"sc-consensus-grandpa",
"sc-executor",
"sc-keystore",
"sc-network",
"sc-rpc",
"sc-rpc-api",
"sc-service",
@@ -23,6 +23,7 @@ futures = { version = "0.3.21", features = ["thread-pool"]}
sc-cli = { version = "0.10.0-dev", path = "../../../client/cli" }
sp-core = { version = "7.0.0", path = "../../../primitives/core" }
sc-executor = { version = "0.10.0-dev", path = "../../../client/executor" }
sc-network = { version = "0.10.0-dev", path = "../../../client/network" }
sc-service = { version = "0.10.0-dev", path = "../../../client/service" }
sc-telemetry = { version = "4.0.0-dev", path = "../../../client/telemetry" }
sc-keystore = { version = "4.0.0-dev", path = "../../../client/keystore" }
@@ -138,7 +138,7 @@ pub fn new_partial(
}
/// Builds a new service for a full client.
pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError> {
pub fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
let sc_service::PartialComponents {
client,
backend,
@@ -150,15 +150,16 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
other: (block_import, grandpa_link, mut telemetry),
} = new_partial(&config)?;
let mut net_config = sc_network::config::FullNetworkConfiguration::new(&config.network);
let grandpa_protocol_name = sc_consensus_grandpa::protocol_standard_name(
&client.block_hash(0).ok().flatten().expect("Genesis block exists; qed"),
&config.chain_spec,
);
net_config.add_notification_protocol(sc_consensus_grandpa::grandpa_peers_set_config(
grandpa_protocol_name.clone(),
));
config
.network
.extra_sets
.push(sc_consensus_grandpa::grandpa_peers_set_config(grandpa_protocol_name.clone()));
let warp_sync = Arc::new(sc_consensus_grandpa::warp_proof::NetworkProvider::new(
backend.clone(),
grandpa_link.shared_authority_set().clone(),
@@ -168,6 +169,7 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
net_config,
client: client.clone(),
transaction_pool: transaction_pool.clone(),
spawn_handle: task_manager.spawn_handle(),
+8 -6
View File
@@ -316,7 +316,7 @@ pub struct NewFullBase {
/// Creates a full service from the configuration.
pub fn new_full_base(
mut config: Configuration,
config: Configuration,
disable_hardware_benchmarks: bool,
with_startup_data: impl FnOnce(
&sc_consensus_babe::BabeBlockImport<Block, FullClient, FullGrandpaBlockImport>,
@@ -343,10 +343,15 @@ pub fn new_full_base(
let shared_voter_state = rpc_setup;
let auth_disc_publish_non_global_ips = config.network.allow_non_globals_in_dht;
let mut net_config = sc_network::config::FullNetworkConfiguration::new(&config.network);
let grandpa_protocol_name = grandpa::protocol_standard_name(
&client.block_hash(0).ok().flatten().expect("Genesis block exists; qed"),
&config.chain_spec,
);
net_config.add_notification_protocol(grandpa::grandpa_peers_set_config(
grandpa_protocol_name.clone(),
));
let statement_handler_proto = sc_network_statement::StatementHandlerPrototype::new(
client
@@ -356,12 +361,8 @@ pub fn new_full_base(
.expect("Genesis block exists; qed"),
config.chain_spec.fork_id(),
);
config.network.extra_sets.push(statement_handler_proto.set_config());
net_config.add_notification_protocol(statement_handler_proto.set_config());
config
.network
.extra_sets
.push(grandpa::grandpa_peers_set_config(grandpa_protocol_name.clone()));
let warp_sync = Arc::new(grandpa::warp_proof::NetworkProvider::new(
backend.clone(),
import_setup.1.shared_authority_set().clone(),
@@ -371,6 +372,7 @@ pub fn new_full_base(
let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
net_config,
client: client.clone(),
transaction_pool: transaction_pool.clone(),
spawn_handle: task_manager.spawn_handle(),
@@ -221,8 +221,6 @@ impl NetworkParams {
default_peers_set_num_full: self.in_peers + self.out_peers,
listen_addresses,
public_addresses,
extra_sets: Vec::new(),
request_response_protocols: Vec::new(),
node_key,
node_name: node_name.to_string(),
client_version: client_id.to_string(),
@@ -63,7 +63,7 @@ pub(crate) mod beefy_protocol_name {
}
/// Returns the configuration value to put in
/// [`sc_network::config::NetworkConfiguration::extra_sets`].
/// [`sc_network::config::FullNetworkConfiguration`].
/// For standard protocol name see [`beefy_protocol_name::gossip_protocol_name`].
pub fn beefy_peers_set_config(
gossip_protocol_name: sc_network::ProtocolName,
@@ -691,7 +691,7 @@ pub struct GrandpaParams<Block: BlockT, C, N, S, SC, VR> {
}
/// Returns the configuration value to put in
/// [`sc_network::config::NetworkConfiguration::extra_sets`].
/// [`sc_network::config::FullNetworkConfiguration`].
/// For standard protocol name see [`crate::protocol_standard_name`].
pub fn grandpa_peers_set_config(
protocol_name: ProtocolName,
+41 -13
View File
@@ -32,15 +32,16 @@ pub use crate::{
use codec::Encode;
use libp2p::{identity::Keypair, multiaddr, Multiaddr, PeerId};
use prometheus_endpoint::Registry;
use zeroize::Zeroize;
pub use sc_network_common::{
role::{Role, Roles},
sync::warp::WarpSyncProvider,
ExHashT,
};
use sc_utils::mpsc::TracingUnboundedSender;
use zeroize::Zeroize;
use sp_runtime::traits::Block as BlockT;
use std::{
error::Error,
fmt, fs,
@@ -564,9 +565,6 @@ pub struct NetworkConfiguration {
/// The node key configuration, which determines the node's network identity keypair.
pub node_key: NodeKeyConfig,
/// List of request-response protocols that the node supports.
pub request_response_protocols: Vec<RequestResponseConfig>,
/// Configuration for the default set of nodes used for block syncing and transactions.
pub default_peers_set: SetConfig,
@@ -576,9 +574,6 @@ pub struct NetworkConfiguration {
/// This value is implicitly capped to `default_set.out_peers + default_set.in_peers`.
pub default_peers_set_num_full: u32,
/// Configuration for extra sets of nodes.
pub extra_sets: Vec<NonDefaultSetConfig>,
/// Client identifier. Sent over the wire for debugging purposes.
pub client_version: String,
@@ -649,10 +644,8 @@ impl NetworkConfiguration {
public_addresses: Vec::new(),
boot_nodes: Vec::new(),
node_key,
request_response_protocols: Vec::new(),
default_peers_set_num_full: default_peers_set.in_peers + default_peers_set.out_peers,
default_peers_set,
extra_sets: Vec::new(),
client_version: client_version.into(),
node_name: node_name.into(),
transport: TransportConfig::Normal { enable_mdns: false, allow_private_ip: true },
@@ -707,7 +700,7 @@ pub struct Params<Block: BlockT> {
pub executor: Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send>,
/// Network layer configuration.
pub network_config: NetworkConfiguration,
pub network_config: FullNetworkConfiguration,
/// Legacy name of the protocol to use on the wire. Should be different for each chain.
pub protocol_id: ProtocolId,
@@ -727,9 +720,44 @@ pub struct Params<Block: BlockT> {
/// TX channel for direct communication with `SyncingEngine` and `Protocol`.
pub tx: TracingUnboundedSender<crate::event::SyncEvent<Block>>,
}
/// Request response protocol configurations
pub request_response_protocol_configs: Vec<RequestResponseConfig>,
/// Full network configuration.
pub struct FullNetworkConfiguration {
/// Installed notification protocols.
pub(crate) notification_protocols: Vec<NonDefaultSetConfig>,
/// List of request-response protocols that the node supports.
pub(crate) request_response_protocols: Vec<RequestResponseConfig>,
/// Network configuration.
pub network_config: NetworkConfiguration,
}
impl FullNetworkConfiguration {
/// Create new [`FullNetworkConfiguration`].
pub fn new(network_config: &NetworkConfiguration) -> Self {
Self {
notification_protocols: Vec::new(),
request_response_protocols: Vec::new(),
network_config: network_config.clone(),
}
}
/// Add a notification protocol.
pub fn add_notification_protocol(&mut self, config: NonDefaultSetConfig) {
self.notification_protocols.push(config);
}
/// Get reference to installed notification protocols.
pub fn notification_protocols(&self) -> &Vec<NonDefaultSetConfig> {
&self.notification_protocols
}
/// Add a request-response protocol.
pub fn add_request_response_protocol(&mut self, config: RequestResponseConfig) {
self.request_response_protocols.push(config);
}
}
#[cfg(test)]
+5 -4
View File
@@ -102,6 +102,7 @@ impl<B: BlockT> Protocol<B> {
pub fn new(
roles: Roles,
network_config: &config::NetworkConfiguration,
notification_protocols: Vec<config::NonDefaultSetConfig>,
block_announces_protocol: config::NonDefaultSetConfig,
tx: TracingUnboundedSender<crate::event::SyncEvent<B>>,
) -> error::Result<(Self, sc_peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> {
@@ -109,7 +110,7 @@ impl<B: BlockT> Protocol<B> {
let (peerset, peerset_handle) = {
let mut sets =
Vec::with_capacity(NUM_HARDCODED_PEERSETS + network_config.extra_sets.len());
Vec::with_capacity(NUM_HARDCODED_PEERSETS + notification_protocols.len());
let mut default_sets_reserved = HashSet::new();
for reserved in network_config.default_peers_set.reserved_nodes.iter() {
@@ -135,7 +136,7 @@ impl<B: BlockT> Protocol<B> {
NonReservedPeerMode::Deny,
});
for set_cfg in &network_config.extra_sets {
for set_cfg in &notification_protocols {
let mut reserved_nodes = HashSet::new();
for reserved in set_cfg.set_config.reserved_nodes.iter() {
reserved_nodes.insert(reserved.peer_id);
@@ -169,7 +170,7 @@ impl<B: BlockT> Protocol<B> {
handshake: block_announces_protocol.handshake.as_ref().unwrap().to_vec(),
max_notification_size: block_announces_protocol.max_notification_size,
})
.chain(network_config.extra_sets.iter().map(|s| notifications::ProtocolConfig {
.chain(notification_protocols.iter().map(|s| notifications::ProtocolConfig {
name: s.notifications_protocol.clone(),
fallback_names: s.fallback_names.clone(),
handshake: s.handshake.as_ref().map_or(roles.encode(), |h| (*h).to_vec()),
@@ -182,7 +183,7 @@ impl<B: BlockT> Protocol<B> {
peerset_handle: peerset_handle.clone(),
behaviour,
notification_protocols: iter::once(block_announces_protocol.notifications_protocol)
.chain(network_config.extra_sets.iter().map(|s| s.notifications_protocol.clone()))
.chain(notification_protocols.iter().map(|s| s.notifications_protocol.clone()))
.collect(),
bad_handshake_substreams: Default::default(),
peers: HashMap::new(),
+86 -99
View File
@@ -29,7 +29,7 @@
use crate::{
behaviour::{self, Behaviour, BehaviourOut},
config::{MultiaddrWithPeerId, Params, TransportConfig},
config::{FullNetworkConfiguration, MultiaddrWithPeerId, Params, TransportConfig},
discovery::DiscoveryConfig,
error::Error,
event::{DhtEvent, Event},
@@ -147,25 +147,24 @@ where
/// Returns a `NetworkWorker` that implements `Future` and must be regularly polled in order
/// for the network processing to advance. From it, you can extract a `NetworkService` using
/// `worker.service()`. The `NetworkService` can be shared through the codebase.
pub fn new(mut params: Params<B>) -> Result<Self, Error> {
pub fn new(params: Params<B>) -> Result<Self, Error> {
let FullNetworkConfiguration {
notification_protocols,
request_response_protocols,
mut network_config,
} = params.network_config;
// Private and public keys configuration.
let local_identity = params.network_config.node_key.clone().into_keypair()?;
let local_identity = network_config.node_key.clone().into_keypair()?;
let local_public = local_identity.public();
let local_peer_id = local_public.to_peer_id();
params
.network_config
.request_response_protocols
.extend(params.request_response_protocol_configs);
params.network_config.boot_nodes = params
.network_config
network_config.boot_nodes = network_config
.boot_nodes
.into_iter()
.filter(|boot_node| boot_node.peer_id != local_peer_id)
.collect();
params.network_config.default_peers_set.reserved_nodes = params
.network_config
network_config.default_peers_set.reserved_nodes = network_config
.default_peers_set
.reserved_nodes
.into_iter()
@@ -185,36 +184,31 @@ where
// Ensure the listen addresses are consistent with the transport.
ensure_addresses_consistent_with_transport(
params.network_config.listen_addresses.iter(),
&params.network_config.transport,
network_config.listen_addresses.iter(),
&network_config.transport,
)?;
ensure_addresses_consistent_with_transport(
params.network_config.boot_nodes.iter().map(|x| &x.multiaddr),
&params.network_config.transport,
network_config.boot_nodes.iter().map(|x| &x.multiaddr),
&network_config.transport,
)?;
ensure_addresses_consistent_with_transport(
params
.network_config
.default_peers_set
.reserved_nodes
.iter()
.map(|x| &x.multiaddr),
&params.network_config.transport,
network_config.default_peers_set.reserved_nodes.iter().map(|x| &x.multiaddr),
&network_config.transport,
)?;
for extra_set in &params.network_config.extra_sets {
for notification_protocol in &notification_protocols {
ensure_addresses_consistent_with_transport(
extra_set.set_config.reserved_nodes.iter().map(|x| &x.multiaddr),
&params.network_config.transport,
notification_protocol.set_config.reserved_nodes.iter().map(|x| &x.multiaddr),
&network_config.transport,
)?;
}
ensure_addresses_consistent_with_transport(
params.network_config.public_addresses.iter(),
&params.network_config.transport,
network_config.public_addresses.iter(),
&network_config.transport,
)?;
let (to_worker, from_service) = tracing_unbounded("mpsc_network_worker", 100_000);
if let Some(path) = &params.network_config.net_config_path {
if let Some(path) = &network_config.net_config_path {
fs::create_dir_all(path)?;
}
@@ -224,9 +218,58 @@ where
local_peer_id.to_base58(),
);
let (transport, bandwidth) = {
let config_mem = match network_config.transport {
TransportConfig::MemoryOnly => true,
TransportConfig::Normal { .. } => false,
};
// The yamux buffer size limit is configured to be equal to the maximum frame size
// of all protocols. 10 bytes are added to each limit for the length prefix that
// is not included in the upper layer protocols limit but is still present in the
// yamux buffer. These 10 bytes correspond to the maximum size required to encode
// a variable-length-encoding 64bits number. In other words, we make the
// assumption that no notification larger than 2^64 will ever be sent.
let yamux_maximum_buffer_size = {
let requests_max = request_response_protocols
.iter()
.map(|cfg| usize::try_from(cfg.max_request_size).unwrap_or(usize::MAX));
let responses_max = request_response_protocols
.iter()
.map(|cfg| usize::try_from(cfg.max_response_size).unwrap_or(usize::MAX));
let notifs_max = notification_protocols
.iter()
.map(|cfg| usize::try_from(cfg.max_notification_size).unwrap_or(usize::MAX));
// A "default" max is added to cover all the other protocols: ping, identify,
// kademlia, block announces, and transactions.
let default_max = cmp::max(
1024 * 1024,
usize::try_from(protocol::BLOCK_ANNOUNCES_TRANSACTIONS_SUBSTREAM_SIZE)
.unwrap_or(usize::MAX),
);
iter::once(default_max)
.chain(requests_max)
.chain(responses_max)
.chain(notifs_max)
.max()
.expect("iterator known to always yield at least one element; qed")
.saturating_add(10)
};
transport::build_transport(
local_identity.clone(),
config_mem,
network_config.yamux_window_size,
yamux_maximum_buffer_size,
)
};
let (protocol, peerset_handle, mut known_addresses) = Protocol::new(
From::from(&params.role),
&params.network_config,
&network_config,
notification_protocols,
params.block_announce_config,
params.tx,
)?;
@@ -235,7 +278,7 @@ where
let mut boot_node_ids = HashSet::new();
// Process the bootnodes.
for bootnode in params.network_config.boot_nodes.iter() {
for bootnode in network_config.boot_nodes.iter() {
boot_node_ids.insert(bootnode.peer_id);
known_addresses.push((bootnode.peer_id, bootnode.multiaddr.clone()));
}
@@ -243,9 +286,8 @@ where
let boot_node_ids = Arc::new(boot_node_ids);
// Check for duplicate bootnodes.
params.network_config.boot_nodes.iter().try_for_each(|bootnode| {
if let Some(other) = params
.network_config
network_config.boot_nodes.iter().try_for_each(|bootnode| {
if let Some(other) = network_config
.boot_nodes
.iter()
.filter(|o| o.multiaddr == bootnode.multiaddr)
@@ -265,29 +307,25 @@ where
// Build the swarm.
let (mut swarm, bandwidth): (Swarm<Behaviour<B>>, _) = {
let user_agent = format!(
"{} ({})",
params.network_config.client_version, params.network_config.node_name
);
let user_agent =
format!("{} ({})", network_config.client_version, network_config.node_name);
let discovery_config = {
let mut config = DiscoveryConfig::new(local_public.clone());
config.with_permanent_addresses(known_addresses);
config.discovery_limit(
u64::from(params.network_config.default_peers_set.out_peers) + 15,
);
config.discovery_limit(u64::from(network_config.default_peers_set.out_peers) + 15);
config.with_kademlia(
params.genesis_hash,
params.fork_id.as_deref(),
&params.protocol_id,
);
config.with_dht_random_walk(params.network_config.enable_dht_random_walk);
config.allow_non_globals_in_dht(params.network_config.allow_non_globals_in_dht);
config.with_dht_random_walk(network_config.enable_dht_random_walk);
config.allow_non_globals_in_dht(network_config.allow_non_globals_in_dht);
config.use_kademlia_disjoint_query_paths(
params.network_config.kademlia_disjoint_query_paths,
network_config.kademlia_disjoint_query_paths,
);
match params.network_config.transport {
match network_config.transport {
TransportConfig::MemoryOnly => {
config.with_mdns(false);
config.allow_private_ip(false);
@@ -305,64 +343,13 @@ where
config
};
let (transport, bandwidth) = {
let config_mem = match params.network_config.transport {
TransportConfig::MemoryOnly => true,
TransportConfig::Normal { .. } => false,
};
// The yamux buffer size limit is configured to be equal to the maximum frame size
// of all protocols. 10 bytes are added to each limit for the length prefix that
// is not included in the upper layer protocols limit but is still present in the
// yamux buffer. These 10 bytes correspond to the maximum size required to encode
// a variable-length-encoding 64bits number. In other words, we make the
// assumption that no notification larger than 2^64 will ever be sent.
let yamux_maximum_buffer_size = {
let requests_max = params
.network_config
.request_response_protocols
.iter()
.map(|cfg| usize::try_from(cfg.max_request_size).unwrap_or(usize::MAX));
let responses_max =
params.network_config.request_response_protocols.iter().map(|cfg| {
usize::try_from(cfg.max_response_size).unwrap_or(usize::MAX)
});
let notifs_max = params.network_config.extra_sets.iter().map(|cfg| {
usize::try_from(cfg.max_notification_size).unwrap_or(usize::MAX)
});
// A "default" max is added to cover all the other protocols: ping, identify,
// kademlia, block announces, and transactions.
let default_max = cmp::max(
1024 * 1024,
usize::try_from(protocol::BLOCK_ANNOUNCES_TRANSACTIONS_SUBSTREAM_SIZE)
.unwrap_or(usize::MAX),
);
iter::once(default_max)
.chain(requests_max)
.chain(responses_max)
.chain(notifs_max)
.max()
.expect("iterator known to always yield at least one element; qed")
.saturating_add(10)
};
transport::build_transport(
local_identity.clone(),
config_mem,
params.network_config.yamux_window_size,
yamux_maximum_buffer_size,
)
};
let behaviour = {
let result = Behaviour::new(
protocol,
user_agent,
local_public,
discovery_config,
params.network_config.request_response_protocols,
request_response_protocols,
peerset_handle.clone(),
);
@@ -416,14 +403,14 @@ where
};
// Listen on multiaddresses.
for addr in &params.network_config.listen_addresses {
for addr in &network_config.listen_addresses {
if let Err(err) = Swarm::<Behaviour<B>>::listen_on(&mut swarm, addr.clone()) {
warn!(target: "sub-libp2p", "Can't listen on {} because: {:?}", addr, err)
}
}
// Add external addresses.
for addr in &params.network_config.public_addresses {
for addr in &network_config.public_addresses {
Swarm::<Behaviour<B>>::add_external_address(
&mut swarm,
addr.clone(),
+26 -21
View File
@@ -37,7 +37,7 @@ use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider};
use sc_consensus::import_queue::ImportQueueService;
use sc_network::{
config::{
NetworkConfiguration, NonDefaultSetConfig, ProtocolId, SyncMode as SyncOperationMode,
FullNetworkConfiguration, NonDefaultSetConfig, ProtocolId, SyncMode as SyncOperationMode,
},
utils::LruHashSet,
NotificationsSink, ProtocolName,
@@ -260,7 +260,7 @@ where
roles: Roles,
client: Arc<Client>,
metrics_registry: Option<&Registry>,
network_config: &NetworkConfiguration,
net_config: &FullNetworkConfiguration,
protocol_id: ProtocolId,
fork_id: &Option<String>,
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
@@ -272,52 +272,56 @@ where
warp_sync_protocol_name: Option<ProtocolName>,
rx: sc_utils::mpsc::TracingUnboundedReceiver<sc_network::SyncEvent<B>>,
) -> Result<(Self, SyncingService<B>, NonDefaultSetConfig), ClientError> {
let mode = match network_config.sync_mode {
let mode = match net_config.network_config.sync_mode {
SyncOperationMode::Full => SyncMode::Full,
SyncOperationMode::Fast { skip_proofs, storage_chain_mode } =>
SyncMode::LightState { skip_proofs, storage_chain_mode },
SyncOperationMode::Warp => SyncMode::Warp,
};
let max_parallel_downloads = network_config.max_parallel_downloads;
let max_blocks_per_request = if network_config.max_blocks_per_request >
let max_parallel_downloads = net_config.network_config.max_parallel_downloads;
let max_blocks_per_request = if net_config.network_config.max_blocks_per_request >
crate::MAX_BLOCKS_IN_RESPONSE as u32
{
log::info!(target: "sync", "clamping maximum blocks per request to {}", crate::MAX_BLOCKS_IN_RESPONSE);
crate::MAX_BLOCKS_IN_RESPONSE as u32
} else {
network_config.max_blocks_per_request
net_config.network_config.max_blocks_per_request
};
let cache_capacity = NonZeroUsize::new(
(network_config.default_peers_set.in_peers as usize +
network_config.default_peers_set.out_peers as usize)
(net_config.network_config.default_peers_set.in_peers as usize +
net_config.network_config.default_peers_set.out_peers as usize)
.max(1),
)
.expect("cache capacity is not zero");
let important_peers = {
let mut imp_p = HashSet::new();
for reserved in &network_config.default_peers_set.reserved_nodes {
for reserved in &net_config.network_config.default_peers_set.reserved_nodes {
imp_p.insert(reserved.peer_id);
}
for reserved in network_config
.extra_sets
.iter()
.flat_map(|s| s.set_config.reserved_nodes.iter())
{
imp_p.insert(reserved.peer_id);
for config in net_config.notification_protocols() {
let peer_ids = config
.set_config
.reserved_nodes
.iter()
.map(|info| info.peer_id)
.collect::<Vec<PeerId>>();
imp_p.extend(peer_ids);
}
imp_p.shrink_to_fit();
imp_p
};
let boot_node_ids = {
let mut list = HashSet::new();
for node in &network_config.boot_nodes {
for node in &net_config.network_config.boot_nodes {
list.insert(node.peer_id);
}
list.shrink_to_fit();
list
};
let default_peers_set_no_slot_peers = {
let mut no_slot_p: HashSet<PeerId> = network_config
let mut no_slot_p: HashSet<PeerId> = net_config
.network_config
.default_peers_set
.reserved_nodes
.iter()
@@ -326,11 +330,12 @@ where
no_slot_p.shrink_to_fit();
no_slot_p
};
let default_peers_set_num_full = network_config.default_peers_set_num_full as usize;
let default_peers_set_num_full =
net_config.network_config.default_peers_set_num_full as usize;
let default_peers_set_num_light = {
let total = network_config.default_peers_set.out_peers +
network_config.default_peers_set.in_peers;
total.saturating_sub(network_config.default_peers_set_num_full) as usize
let total = net_config.network_config.default_peers_set.out_peers +
net_config.network_config.default_peers_set.in_peers;
total.saturating_sub(net_config.network_config.default_peers_set_num_full) as usize
};
let (chain_sync, block_announce_config) = ChainSync::new(
+27 -25
View File
@@ -50,8 +50,8 @@ use sc_consensus::{
};
use sc_network::{
config::{
MultiaddrWithPeerId, NetworkConfiguration, NonDefaultSetConfig, NonReservedPeerMode,
ProtocolId, Role, SyncMode, TransportConfig,
FullNetworkConfiguration, MultiaddrWithPeerId, NetworkConfiguration, NonDefaultSetConfig,
NonReservedPeerMode, ProtocolId, Role, SyncMode, TransportConfig,
},
request_responses::ProtocolConfig as RequestResponseConfig,
types::ProtocolName,
@@ -800,20 +800,6 @@ where
network_config.transport = TransportConfig::MemoryOnly;
network_config.listen_addresses = vec![listen_addr.clone()];
network_config.allow_non_globals_in_dht = true;
network_config
.request_response_protocols
.extend(config.request_response_protocols);
network_config.extra_sets = config
.notifications_protocols
.into_iter()
.map(|p| NonDefaultSetConfig {
notifications_protocol: p,
fallback_names: Vec::new(),
max_notification_size: 1024 * 1024,
handshake: None,
set_config: Default::default(),
})
.collect();
if let Some(connect_to) = config.connect_to_peers {
let addrs = connect_to
.iter()
@@ -826,6 +812,7 @@ where
network_config.default_peers_set.reserved_nodes = addrs;
network_config.default_peers_set.non_reserved_mode = NonReservedPeerMode::Deny;
}
let mut full_net_config = FullNetworkConfiguration::new(&network_config);
let protocol_id = ProtocolId::from("test-protocol-name");
@@ -890,7 +877,7 @@ where
Roles::from(if config.is_authority { &Role::Authority } else { &Role::Full }),
client.clone(),
None,
&network_config,
&full_net_config,
protocol_id.clone(),
&fork_id,
block_announce_validator,
@@ -906,6 +893,28 @@ where
let sync_service_import_queue = Box::new(sync_service.clone());
let sync_service = Arc::new(sync_service.clone());
for config in config.request_response_protocols {
full_net_config.add_request_response_protocol(config);
}
for config in [
block_request_protocol_config,
state_request_protocol_config,
light_client_request_protocol_config,
warp_protocol_config,
] {
full_net_config.add_request_response_protocol(config);
}
for protocol in config.notifications_protocols {
full_net_config.add_notification_protocol(NonDefaultSetConfig {
notifications_protocol: protocol,
fallback_names: Vec::new(),
max_notification_size: 1024 * 1024,
handshake: None,
set_config: Default::default(),
});
}
let genesis_hash =
client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed");
let network = NetworkWorker::new(sc_network::config::Params {
@@ -913,20 +922,13 @@ where
executor: Box::new(|f| {
tokio::spawn(f);
}),
network_config,
network_config: full_net_config,
genesis_hash,
protocol_id,
fork_id,
metrics_registry: None,
block_announce_config,
tx,
request_response_protocol_configs: [
block_request_protocol_config,
state_request_protocol_config,
light_client_request_protocol_config,
warp_protocol_config,
]
.to_vec(),
})
.unwrap();
+41 -24
View File
@@ -21,7 +21,7 @@ use libp2p::{Multiaddr, PeerId};
use sc_consensus::{ImportQueue, Link};
use sc_network::{
config::{self, MultiaddrWithPeerId, ProtocolId, TransportConfig},
config::{self, FullNetworkConfiguration, MultiaddrWithPeerId, ProtocolId, TransportConfig},
event::Event,
NetworkEventStream, NetworkNotification, NetworkPeers, NetworkService, NetworkStateInfo,
NetworkWorker,
@@ -77,6 +77,7 @@ struct TestNetworkBuilder {
listen_addresses: Vec<Multiaddr>,
set_config: Option<config::SetConfig>,
chain_sync_network: Option<(NetworkServiceProvider, NetworkServiceHandle)>,
notification_protocols: Vec<config::NonDefaultSetConfig>,
config: Option<config::NetworkConfiguration>,
}
@@ -89,6 +90,7 @@ impl TestNetworkBuilder {
listen_addresses: Vec::new(),
set_config: None,
chain_sync_network: None,
notification_protocols: Vec::new(),
config: None,
}
}
@@ -98,6 +100,11 @@ impl TestNetworkBuilder {
self
}
pub fn with_notification_protocol(mut self, config: config::NonDefaultSetConfig) -> Self {
self.notification_protocols.push(config);
self
}
pub fn with_listen_addresses(mut self, addresses: Vec<Multiaddr>) -> Self {
self.listen_addresses = addresses;
self
@@ -115,13 +122,6 @@ impl TestNetworkBuilder {
);
let network_config = self.config.unwrap_or(config::NetworkConfiguration {
extra_sets: vec![config::NonDefaultSetConfig {
notifications_protocol: PROTOCOL_NAME.into(),
fallback_names: Vec::new(),
max_notification_size: 1024 * 1024,
handshake: None,
set_config: self.set_config.unwrap_or_default(),
}],
listen_addresses: self.listen_addresses,
transport: TransportConfig::MemoryOnly,
..config::NetworkConfiguration::new_local()
@@ -153,6 +153,7 @@ impl TestNetworkBuilder {
let protocol_id = ProtocolId::from("test-protocol-name");
let fork_id = Some(String::from("test-fork-id"));
let mut full_net_config = FullNetworkConfiguration::new(&network_config);
let block_request_protocol_config = {
let (handler, protocol_config) =
@@ -178,12 +179,11 @@ impl TestNetworkBuilder {
let (chain_sync_network_provider, chain_sync_network_handle) =
self.chain_sync_network.unwrap_or(NetworkServiceProvider::new());
let (tx, rx) = sc_utils::mpsc::tracing_unbounded("mpsc_syncing_engine_protocol", 100_000);
let (engine, chain_sync_service, block_announce_config) = SyncingEngine::new(
Roles::from(&config::Role::Full),
client.clone(),
None,
&network_config,
&full_net_config,
protocol_id.clone(),
&None,
Box::new(sp_consensus::block_validation::DefaultBlockAnnounceValidator),
@@ -197,6 +197,29 @@ impl TestNetworkBuilder {
)
.unwrap();
let mut link = self.link.unwrap_or(Box::new(chain_sync_service.clone()));
if !self.notification_protocols.is_empty() {
for config in self.notification_protocols {
full_net_config.add_notification_protocol(config);
}
} else {
full_net_config.add_notification_protocol(config::NonDefaultSetConfig {
notifications_protocol: PROTOCOL_NAME.into(),
fallback_names: Vec::new(),
max_notification_size: 1024 * 1024,
handshake: None,
set_config: self.set_config.unwrap_or_default(),
});
}
for config in [
block_request_protocol_config,
state_request_protocol_config,
light_client_request_protocol_config,
] {
full_net_config.add_request_response_protocol(config);
}
let genesis_hash =
client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed");
let worker = NetworkWorker::<
@@ -209,16 +232,10 @@ impl TestNetworkBuilder {
tokio::spawn(f);
}),
genesis_hash,
network_config,
network_config: full_net_config,
protocol_id,
fork_id,
metrics_registry: None,
request_response_protocol_configs: [
block_request_protocol_config,
state_request_protocol_config,
light_client_request_protocol_config,
]
.to_vec(),
tx,
})
.unwrap();
@@ -553,14 +570,14 @@ async fn fallback_name_working() {
let listen_addr = config::build_multiaddr![Memory(rand::random::<u64>())];
let (node1, mut events_stream1) = TestNetworkBuilder::new()
.with_notification_protocol(config::NonDefaultSetConfig {
notifications_protocol: NEW_PROTOCOL_NAME.into(),
fallback_names: vec![PROTOCOL_NAME.into()],
max_notification_size: 1024 * 1024,
handshake: None,
set_config: Default::default(),
})
.with_config(config::NetworkConfiguration {
extra_sets: vec![config::NonDefaultSetConfig {
notifications_protocol: NEW_PROTOCOL_NAME.into(),
fallback_names: vec![PROTOCOL_NAME.into()],
max_notification_size: 1024 * 1024,
handshake: None,
set_config: Default::default(),
}],
listen_addresses: vec![listen_addr.clone()],
transport: TransportConfig::MemoryOnly,
..config::NetworkConfiguration::new_local()
+76 -70
View File
@@ -41,7 +41,10 @@ use sc_executor::{
NativeExecutionDispatch, RuntimeVersionOf, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY,
};
use sc_keystore::LocalKeystore;
use sc_network::{config::SyncMode, NetworkService, NetworkStateInfo, NetworkStatusProvider};
use sc_network::{
config::{FullNetworkConfiguration, SyncMode},
NetworkService, NetworkStateInfo, NetworkStatusProvider,
};
use sc_network_bitswap::BitswapRequestHandler;
use sc_network_common::{role::Roles, sync::warp::WarpSyncParams};
use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
@@ -730,6 +733,8 @@ where
pub struct BuildNetworkParams<'a, TBl: BlockT, TExPool, TImpQu, TCl> {
/// The service configuration.
pub config: &'a Configuration,
/// Full network configuration.
pub net_config: FullNetworkConfiguration,
/// A shared client returned by `new_full_parts`.
pub client: Arc<TCl>,
/// A shared transaction pool.
@@ -773,6 +778,7 @@ where
{
let BuildNetworkParams {
config,
mut net_config,
client,
transaction_pool,
spawn_handle,
@@ -781,8 +787,6 @@ where
warp_sync_params,
} = params;
let mut request_response_protocol_configs = Vec::new();
if warp_sync_params.is_none() && config.network.sync_mode.is_warp() {
return Err("Warp sync enabled, but no warp sync provider configured.".into())
}
@@ -803,32 +807,35 @@ where
Box::new(DefaultBlockAnnounceValidator)
};
let block_request_protocol_config = {
let (block_request_protocol_config, block_request_protocol_name) = {
// Allow both outgoing and incoming requests.
let (handler, protocol_config) = BlockRequestHandler::new(
&protocol_id,
config.chain_spec.fork_id(),
client.clone(),
config.network.default_peers_set.in_peers as usize +
config.network.default_peers_set.out_peers as usize,
net_config.network_config.default_peers_set.in_peers as usize +
net_config.network_config.default_peers_set.out_peers as usize,
);
let config_name = protocol_config.name.clone();
spawn_handle.spawn("block-request-handler", Some("networking"), handler.run());
protocol_config
(protocol_config, config_name)
};
let state_request_protocol_config = {
let (state_request_protocol_config, state_request_protocol_name) = {
// Allow both outgoing and incoming requests.
let (handler, protocol_config) = StateRequestHandler::new(
&protocol_id,
config.chain_spec.fork_id(),
client.clone(),
config.network.default_peers_set_num_full as usize,
net_config.network_config.default_peers_set_num_full as usize,
);
let config_name = protocol_config.name.clone();
spawn_handle.spawn("state-request-handler", Some("networking"), handler.run());
protocol_config
(protocol_config, config_name)
};
let warp_sync_protocol_config = match warp_sync_params.as_ref() {
let (warp_sync_protocol_config, warp_request_protocol_name) = match warp_sync_params.as_ref() {
Some(WarpSyncParams::WithProvider(warp_with_provider)) => {
// Allow both outgoing and incoming requests.
let (handler, protocol_config) = WarpSyncRequestHandler::new(
@@ -841,10 +848,12 @@ where
config.chain_spec.fork_id(),
warp_with_provider.clone(),
);
let config_name = protocol_config.name.clone();
spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run());
Some(protocol_config)
(Some(protocol_config), Some(config_name))
},
_ => None,
_ => (None, None),
};
let light_client_request_protocol_config = {
@@ -858,62 +867,23 @@ where
protocol_config
};
let (tx, rx) = sc_utils::mpsc::tracing_unbounded("mpsc_syncing_engine_protocol", 100_000);
let (chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new();
let (engine, sync_service, block_announce_config) = SyncingEngine::new(
Roles::from(&config.role),
client.clone(),
config.prometheus_config.as_ref().map(|config| config.registry.clone()).as_ref(),
&config.network,
protocol_id.clone(),
&config.chain_spec.fork_id().map(ToOwned::to_owned),
block_announce_validator,
warp_sync_params,
chain_sync_network_handle,
import_queue.service(),
block_request_protocol_config.name.clone(),
state_request_protocol_config.name.clone(),
warp_sync_protocol_config.as_ref().map(|config| config.name.clone()),
rx,
)?;
let sync_service_import_queue = sync_service.clone();
let sync_service = Arc::new(sync_service);
// install request handlers to `FullNetworkConfiguration`
net_config.add_request_response_protocol(block_request_protocol_config);
net_config.add_request_response_protocol(state_request_protocol_config);
net_config.add_request_response_protocol(light_client_request_protocol_config);
request_response_protocol_configs.push(config.network.ipfs_server.then(|| {
if let Some(config) = warp_sync_protocol_config {
net_config.add_request_response_protocol(config);
}
if config.network.ipfs_server {
let (handler, protocol_config) = BitswapRequestHandler::new(client.clone());
spawn_handle.spawn("bitswap-request-handler", Some("networking"), handler.run());
protocol_config
}));
net_config.add_request_response_protocol(protocol_config);
}
let genesis_hash = client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed");
let mut network_params = sc_network::config::Params::<TBl> {
role: config.role.clone(),
executor: {
let spawn_handle = Clone::clone(&spawn_handle);
Box::new(move |fut| {
spawn_handle.spawn("libp2p-node", Some("networking"), fut);
})
},
network_config: config.network.clone(),
genesis_hash,
protocol_id: protocol_id.clone(),
fork_id: config.chain_spec.fork_id().map(ToOwned::to_owned),
metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()),
block_announce_config,
tx,
request_response_protocol_configs: request_response_protocol_configs
.into_iter()
.chain([
Some(block_request_protocol_config),
Some(state_request_protocol_config),
Some(light_client_request_protocol_config),
warp_sync_protocol_config,
])
.flatten()
.collect::<Vec<_>>(),
};
// crate transactions protocol and add it to the list of supported protocols of `network_params`
// create transactions protocol and add it to the list of supported protocols of
// `network_params`
let transactions_handler_proto = sc_network_transactions::TransactionsHandlerPrototype::new(
protocol_id.clone(),
client
@@ -923,12 +893,48 @@ where
.expect("Genesis block exists; qed"),
config.chain_spec.fork_id(),
);
network_params
.network_config
.extra_sets
.insert(0, transactions_handler_proto.set_config());
net_config.add_notification_protocol(transactions_handler_proto.set_config());
let has_bootnodes = !network_params.network_config.boot_nodes.is_empty();
let (tx, rx) = sc_utils::mpsc::tracing_unbounded("mpsc_syncing_engine_protocol", 100_000);
let (chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new();
let (engine, sync_service, block_announce_config) = SyncingEngine::new(
Roles::from(&config.role),
client.clone(),
config.prometheus_config.as_ref().map(|config| config.registry.clone()).as_ref(),
&net_config,
protocol_id.clone(),
&config.chain_spec.fork_id().map(ToOwned::to_owned),
block_announce_validator,
warp_sync_params,
chain_sync_network_handle,
import_queue.service(),
block_request_protocol_name,
state_request_protocol_name,
warp_request_protocol_name,
rx,
)?;
let sync_service_import_queue = sync_service.clone();
let sync_service = Arc::new(sync_service);
let genesis_hash = client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed");
let network_params = sc_network::config::Params::<TBl> {
role: config.role.clone(),
executor: {
let spawn_handle = Clone::clone(&spawn_handle);
Box::new(move |fut| {
spawn_handle.spawn("libp2p-node", Some("networking"), fut);
})
},
network_config: net_config,
genesis_hash,
protocol_id: protocol_id.clone(),
fork_id: config.chain_spec.fork_id().map(ToOwned::to_owned),
metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()),
block_announce_config,
tx,
};
let has_bootnodes = !network_params.network_config.network_config.boot_nodes.is_empty();
let network_mut = sc_network::NetworkWorker::new(network_params)?;
let network = network_mut.service().clone();