Move code from sc-network-common back to sc-network (#13592)

* Move service tests to `client/network/tests`

These tests depend on `sc-network` and `sc-network-sync` so they should
live outside the crate.

* Move some configs from `sc-network-common` to `sc-network`

* Move `NetworkService` traits to `sc-network`

* Move request-responses to `sc-network`

* Remove more stuff

* Remove rest of configs from `sc-network-common` to `sc-network`

* Remove more stuff

* Fix warnings

* Update client/network/src/request_responses.rs

Co-authored-by: Dmitry Markin <dmitry@markin.tech>

* Fix cargo doc

---------

Co-authored-by: Dmitry Markin <dmitry@markin.tech>
This commit is contained in:
Aaro Altonen
2023-03-14 14:06:40 +02:00
committed by GitHub
parent 5d718e45c1
commit 9ced14e2de
85 changed files with 1411 additions and 1434 deletions
+10 -2
View File
@@ -8101,6 +8101,7 @@ dependencies = [
"quickcheck",
"rand 0.8.5",
"sc-client-api",
"sc-network",
"sc-network-common",
"sp-api",
"sp-authority-discovery",
@@ -8163,7 +8164,7 @@ dependencies = [
"sc-chain-spec-derive",
"sc-client-api",
"sc-executor",
"sc-network-common",
"sc-network",
"sc-telemetry",
"serde",
"serde_json",
@@ -8751,6 +8752,7 @@ dependencies = [
"futures-timer",
"log",
"sc-client-api",
"sc-network",
"sc-network-common",
"sp-blockchain",
"sp-runtime",
@@ -8787,6 +8789,7 @@ dependencies = [
"futures-timer",
"ip_network",
"libp2p",
"linked_hash_set",
"log",
"lru",
"mockall",
@@ -8838,6 +8841,7 @@ dependencies = [
"sc-block-builder",
"sc-client-api",
"sc-consensus",
"sc-network",
"sc-network-common",
"sp-blockchain",
"sp-consensus",
@@ -8861,7 +8865,6 @@ dependencies = [
"futures",
"futures-timer",
"libp2p",
"linked_hash_set",
"parity-scale-codec",
"prost-build",
"sc-consensus",
@@ -8890,6 +8893,7 @@ dependencies = [
"log",
"lru",
"quickcheck",
"sc-network",
"sc-network-common",
"sc-peerset",
"sp-runtime",
@@ -8911,6 +8915,7 @@ dependencies = [
"prost",
"prost-build",
"sc-client-api",
"sc-network",
"sc-network-common",
"sc-peerset",
"sp-blockchain",
@@ -8939,6 +8944,7 @@ dependencies = [
"sc-block-builder",
"sc-client-api",
"sc-consensus",
"sc-network",
"sc-network-common",
"sc-peerset",
"sc-utils",
@@ -8997,6 +9003,7 @@ dependencies = [
"log",
"parity-scale-codec",
"pin-project",
"sc-network",
"sc-network-common",
"sc-peerset",
"sc-utils",
@@ -9026,6 +9033,7 @@ dependencies = [
"sc-block-builder",
"sc-client-api",
"sc-client-db",
"sc-network",
"sc-network-common",
"sc-peerset",
"sc-transaction-pool",
+2 -4
View File
@@ -31,10 +31,8 @@ use node_primitives::Block;
use sc_client_api::BlockBackend;
use sc_consensus_babe::{self, SlotProportion};
use sc_executor::NativeElseWasmExecutor;
use sc_network::NetworkService;
use sc_network_common::{
protocol::event::Event, service::NetworkEventStream, sync::warp::WarpSyncParams,
};
use sc_network::{event::Event, NetworkEventStream, NetworkService};
use sc_network_common::sync::warp::WarpSyncParams;
use sc_network_sync::SyncingService;
use sc_service::{config::Configuration, error::Error as ServiceError, RpcHandlers, TaskManager};
use sc_telemetry::{Telemetry, TelemetryWorker};
@@ -28,6 +28,7 @@ rand = "0.8.5"
thiserror = "1.0"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../utils/prometheus" }
sc-client-api = { version = "4.0.0-dev", path = "../api" }
sc-network = { version = "0.10.0-dev", path = "../network/" }
sc-network-common = { version = "0.10.0-dev", path = "../network/common" }
sp-api = { version = "4.0.0-dev", path = "../../primitives/api" }
sp-authority-discovery = { version = "4.0.0-dev", path = "../../primitives/authority-discovery" }
@@ -40,7 +40,7 @@ use futures::{
};
use libp2p::{Multiaddr, PeerId};
use sc_network_common::protocol::event::DhtEvent;
use sc_network::event::DhtEvent;
use sp_authority_discovery::AuthorityId;
use sp_blockchain::HeaderBackend;
use sp_runtime::traits::Block as BlockT;
@@ -43,9 +43,8 @@ use log::{debug, error, log_enabled};
use prometheus_endpoint::{register, Counter, CounterVec, Gauge, Opts, U64};
use prost::Message;
use rand::{seq::SliceRandom, thread_rng};
use sc_network_common::{
protocol::event::DhtEvent,
service::{KademliaKey, NetworkDHTProvider, NetworkSigner, NetworkStateInfo, Signature},
use sc_network::{
event::DhtEvent, KademliaKey, NetworkDHTProvider, NetworkSigner, NetworkStateInfo, Signature,
};
use sp_api::{ApiError, ProvideRuntimeApi};
use sp_authority_discovery::{
@@ -29,11 +29,16 @@ use futures::{
sink::SinkExt,
task::LocalSpawn,
};
use libp2p::{core::multiaddr, identity::Keypair, PeerId};
use libp2p::{
core::multiaddr,
identity::{error::SigningError, Keypair},
kad::record::Key as KademliaKey,
PeerId,
};
use prometheus_endpoint::prometheus::default_registry;
use sc_client_api::HeaderBackend;
use sc_network_common::service::{KademliaKey, Signature, SigningError};
use sc_network::Signature;
use sp_api::{ApiRef, ProvideRuntimeApi};
use sp_keystore::{testing::KeyStore, CryptoStore};
use sp_runtime::traits::{Block as BlockT, NumberFor, Zero};
+1 -1
View File
@@ -19,7 +19,7 @@ serde_json = "1.0.85"
sc-client-api = { version = "4.0.0-dev", path = "../api" }
sc-chain-spec-derive = { version = "4.0.0-dev", path = "./derive" }
sc-executor = { version = "0.10.0-dev", path = "../executor" }
sc-network-common = { version = "0.10.0-dev", path = "../network/common" }
sc-network = { version = "0.10.0-dev", path = "../network" }
sc-telemetry = { version = "4.0.0-dev", path = "../telemetry" }
sp-blockchain = { version = "4.0.0-dev", path = "../../primitives/blockchain" }
sp-core = { version = "7.0.0", path = "../../primitives/core" }
@@ -20,7 +20,7 @@
#![warn(missing_docs)]
use crate::{extension::GetExtension, ChainType, Properties, RuntimeGenesis};
use sc_network_common::config::MultiaddrWithPeerId;
use sc_network::config::MultiaddrWithPeerId;
use sc_telemetry::TelemetryEndpoints;
use serde::{Deserialize, Serialize};
use serde_json as json;
+1 -1
View File
@@ -189,7 +189,7 @@ pub use self::{
};
pub use sc_chain_spec_derive::{ChainSpecExtension, ChainSpecGroup};
use sc_network_common::config::MultiaddrWithPeerId;
use sc_network::config::MultiaddrWithPeerId;
use sc_telemetry::TelemetryEndpoints;
use serde::{de::DeserializeOwned, Serialize};
use sp_core::storage::Storage;
+8 -12
View File
@@ -251,19 +251,15 @@ pub enum SyncMode {
Warp,
}
impl Into<sc_network_common::config::SyncMode> for SyncMode {
fn into(self) -> sc_network_common::config::SyncMode {
impl Into<sc_network::config::SyncMode> for SyncMode {
fn into(self) -> sc_network::config::SyncMode {
match self {
SyncMode::Full => sc_network_common::config::SyncMode::Full,
SyncMode::Fast => sc_network_common::config::SyncMode::Fast {
skip_proofs: false,
storage_chain_mode: false,
},
SyncMode::FastUnsafe => sc_network_common::config::SyncMode::Fast {
skip_proofs: true,
storage_chain_mode: false,
},
SyncMode::Warp => sc_network_common::config::SyncMode::Warp,
SyncMode::Full => sc_network::config::SyncMode::Full,
SyncMode::Fast =>
sc_network::config::SyncMode::Fast { skip_proofs: false, storage_chain_mode: false },
SyncMode::FastUnsafe =>
sc_network::config::SyncMode::Fast { skip_proofs: true, storage_chain_mode: false },
SyncMode::Warp => sc_network::config::SyncMode::Warp,
}
}
}
@@ -18,8 +18,12 @@
use crate::{arg_enums::SyncMode, params::node_key_params::NodeKeyParams};
use clap::Args;
use sc_network::{config::NetworkConfiguration, multiaddr::Protocol};
use sc_network_common::config::{NodeKeyConfig, NonReservedPeerMode, SetConfig, TransportConfig};
use sc_network::{
config::{
NetworkConfiguration, NodeKeyConfig, NonReservedPeerMode, SetConfig, TransportConfig,
},
multiaddr::Protocol,
};
use sc_service::{
config::{Multiaddr, MultiaddrWithPeerId},
ChainSpec, ChainType,
@@ -17,7 +17,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use clap::Args;
use sc_network_common::config::{identity::ed25519, NodeKeyConfig};
use sc_network::config::{identity::ed25519, NodeKeyConfig};
use sp_core::H256;
use std::{path::PathBuf, str::FromStr};
@@ -92,7 +92,7 @@ impl NodeKeyParams {
let secret = if let Some(node_key) = self.node_key.as_ref() {
parse_ed25519_secret(node_key)?
} else {
sc_network_common::config::Secret::File(
sc_network::config::Secret::File(
self.node_key_file
.clone()
.unwrap_or_else(|| net_config_dir.join(NODE_KEY_ED25519_FILE)),
@@ -111,10 +111,10 @@ fn invalid_node_key(e: impl std::fmt::Display) -> error::Error {
}
/// Parse a Ed25519 secret key from a hex string into a `sc_network::Secret`.
fn parse_ed25519_secret(hex: &str) -> error::Result<sc_network_common::config::Ed25519Secret> {
fn parse_ed25519_secret(hex: &str) -> error::Result<sc_network::config::Ed25519Secret> {
H256::from_str(hex).map_err(invalid_node_key).and_then(|bytes| {
ed25519::SecretKey::from_bytes(bytes)
.map(sc_network_common::config::Secret::Input)
.map(sc_network::config::Secret::Input)
.map_err(invalid_node_key)
})
}
@@ -123,7 +123,7 @@ fn parse_ed25519_secret(hex: &str) -> error::Result<sc_network_common::config::E
mod tests {
use super::*;
use clap::ValueEnum;
use sc_network_common::config::identity::{ed25519, Keypair};
use sc_network::config::identity::{ed25519, Keypair};
use std::fs;
#[test]
@@ -140,7 +140,7 @@ mod tests {
node_key_file: None,
};
params.node_key(net_config_dir).and_then(|c| match c {
NodeKeyConfig::Ed25519(sc_network_common::config::Secret::Input(ref ski))
NodeKeyConfig::Ed25519(sc_network::config::Secret::Input(ref ski))
if node_key_type == NodeKeyType::Ed25519 && &sk[..] == ski.as_ref() =>
Ok(()),
_ => Err(error::Error::Input("Unexpected node key config".into())),
@@ -200,7 +200,7 @@ mod tests {
let dir = PathBuf::from(net_config_dir.clone());
let typ = params.node_key_type;
params.node_key(net_config_dir).and_then(move |c| match c {
NodeKeyConfig::Ed25519(sc_network_common::config::Secret::File(ref f))
NodeKeyConfig::Ed25519(sc_network::config::Secret::File(ref f))
if typ == NodeKeyType::Ed25519 && f == &dir.join(NODE_KEY_ED25519_FILE) =>
Ok(()),
_ => Err(error::Error::Input("Unexpected node key config".into())),
@@ -67,9 +67,8 @@ pub(crate) mod beefy_protocol_name {
/// For standard protocol name see [`beefy_protocol_name::gossip_protocol_name`].
pub fn beefy_peers_set_config(
gossip_protocol_name: sc_network::ProtocolName,
) -> sc_network_common::config::NonDefaultSetConfig {
let mut cfg =
sc_network_common::config::NonDefaultSetConfig::new(gossip_protocol_name, 1024 * 1024);
) -> sc_network::config::NonDefaultSetConfig {
let mut cfg = sc_network::config::NonDefaultSetConfig::new(gossip_protocol_name, 1024 * 1024);
cfg.allow_non_reserved(25, 25);
cfg
}
@@ -23,8 +23,10 @@ use futures::{
};
use log::{debug, trace};
use sc_client_api::BlockBackend;
use sc_network::{config as netconfig, config::RequestResponseConfig, PeerId, ReputationChange};
use sc_network_common::protocol::ProtocolName;
use sc_network::{
config as netconfig, config::RequestResponseConfig, types::ProtocolName, PeerId,
ReputationChange,
};
use sp_consensus_beefy::BEEFY_ENGINE_ID;
use sp_runtime::traits::Block;
use std::{marker::PhantomData, sync::Arc};
@@ -22,10 +22,9 @@ use codec::Encode;
use futures::channel::{oneshot, oneshot::Canceled};
use log::{debug, warn};
use parking_lot::Mutex;
use sc_network::{PeerId, ProtocolName};
use sc_network_common::{
use sc_network::{
request_responses::{IfDisconnected, RequestFailure},
service::NetworkRequest,
NetworkRequest, PeerId, ProtocolName,
};
use sp_consensus_beefy::{crypto::AuthorityId, ValidatorSet};
use sp_runtime::traits::{Block, NumberFor};
+1 -2
View File
@@ -38,8 +38,7 @@ use parking_lot::Mutex;
use prometheus::Registry;
use sc_client_api::{Backend, BlockBackend, BlockchainEvents, FinalityNotifications, Finalizer};
use sc_consensus::BlockImport;
use sc_network::ProtocolName;
use sc_network_common::service::NetworkRequest;
use sc_network::{NetworkRequest, ProtocolName};
use sc_network_gossip::{GossipEngine, Network as GossipNetwork, Syncing as GossipSyncing};
use sp_api::{HeaderT, NumberFor, ProvideRuntimeApi};
use sp_blockchain::{
@@ -91,7 +91,7 @@ use parity_scale_codec::{Decode, Encode};
use prometheus_endpoint::{register, CounterVec, Opts, PrometheusError, Registry, U64};
use rand::seq::SliceRandom;
use sc_network::{PeerId, ReputationChange};
use sc_network_common::protocol::role::ObservedRole;
use sc_network_common::role::ObservedRole;
use sc_network_gossip::{MessageIntent, ValidatorContext};
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
@@ -820,7 +820,7 @@ impl<Block: BlockT> Inner<Block> {
fn note_set(&mut self, set_id: SetId, authorities: Vec<AuthorityId>) -> MaybeMessage<Block> {
let local_view = match self.local_view {
ref mut x @ None => x.get_or_insert(LocalView::new(set_id, Round(1))),
Some(ref mut v) =>
Some(ref mut v) => {
if v.set_id == set_id {
let diff_authorities = self.authorities.iter().collect::<HashSet<_>>() !=
authorities.iter().collect::<HashSet<_>>();
@@ -841,7 +841,8 @@ impl<Block: BlockT> Inner<Block> {
return None
} else {
v
},
}
},
};
local_view.update_set(set_id);
@@ -46,7 +46,7 @@ use finality_grandpa::{
Message::{Precommit, Prevote, PrimaryPropose},
};
use parity_scale_codec::{Decode, Encode};
use sc_network::ReputationChange;
use sc_network::{NetworkBlock, NetworkSyncForkRequest, ReputationChange};
use sc_network_gossip::{GossipEngine, Network as GossipNetwork};
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_INFO};
use sp_keystore::SyncCryptoStorePtr;
@@ -59,10 +59,7 @@ use crate::{
use gossip::{
FullCatchUpMessage, FullCommitMessage, GossipMessage, GossipValidator, PeerReport, VoteMessage,
};
use sc_network_common::{
service::{NetworkBlock, NetworkSyncForkRequest},
sync::SyncEventStream,
};
use sc_network_common::sync::SyncEventStream;
use sc_utils::mpsc::TracingUnboundedReceiver;
use sp_consensus_grandpa::{AuthorityId, AuthoritySignature, RoundNumber, SetId as SetIdNumber};
@@ -77,7 +74,7 @@ pub(crate) const NEIGHBOR_REBROADCAST_PERIOD: Duration = Duration::from_secs(2 *
pub mod grandpa_protocol_name {
use sc_chain_spec::ChainSpec;
use sc_network_common::protocol::ProtocolName;
use sc_network::types::ProtocolName;
pub(crate) const NAME: &str = "/grandpa/1";
/// Old names for the notifications protocol, used for backward compatibility.
@@ -25,14 +25,16 @@ use super::{
use crate::{communication::grandpa_protocol_name, environment::SharedVoterSetState};
use futures::prelude::*;
use parity_scale_codec::Encode;
use sc_network::{config::Role, Multiaddr, PeerId, ReputationChange};
use sc_network::{
config::{MultiaddrWithPeerId, Role},
event::Event as NetworkEvent,
types::ProtocolName,
Multiaddr, NetworkBlock, NetworkEventStream, NetworkNotification, NetworkPeers,
NetworkSyncForkRequest, NotificationSenderError, NotificationSenderT as NotificationSender,
PeerId, ReputationChange,
};
use sc_network_common::{
config::MultiaddrWithPeerId,
protocol::{event::Event as NetworkEvent, role::ObservedRole, ProtocolName},
service::{
NetworkBlock, NetworkEventStream, NetworkNotification, NetworkPeers,
NetworkSyncForkRequest, NotificationSender, NotificationSenderError,
},
role::ObservedRole,
sync::{SyncEvent as SyncStreamEvent, SyncEventStream},
};
use sc_network_gossip::Validator;
@@ -68,7 +68,7 @@ use sc_client_api::{
StorageProvider, TransactionFor,
};
use sc_consensus::BlockImport;
use sc_network_common::protocol::ProtocolName;
use sc_network::types::ProtocolName;
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_INFO};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
use sp_api::ProvideRuntimeApi;
@@ -695,19 +695,19 @@ pub struct GrandpaParams<Block: BlockT, C, N, S, SC, VR> {
/// For standard protocol name see [`crate::protocol_standard_name`].
pub fn grandpa_peers_set_config(
protocol_name: ProtocolName,
) -> sc_network_common::config::NonDefaultSetConfig {
) -> sc_network::config::NonDefaultSetConfig {
use communication::grandpa_protocol_name;
sc_network_common::config::NonDefaultSetConfig {
sc_network::config::NonDefaultSetConfig {
notifications_protocol: protocol_name,
fallback_names: grandpa_protocol_name::LEGACY_NAMES.iter().map(|&n| n.into()).collect(),
// Notifications reach ~256kiB in size at the time of writing on Kusama and Polkadot.
max_notification_size: 1024 * 1024,
handshake: None,
set_config: sc_network_common::config::SetConfig {
set_config: sc_network::config::SetConfig {
in_peers: 0,
out_peers: 0,
reserved_nodes: Vec::new(),
non_reserved_mode: sc_network_common::config::NonReservedPeerMode::Deny,
non_reserved_mode: sc_network::config::NonReservedPeerMode::Deny,
},
}
}
+1
View File
@@ -19,5 +19,6 @@ futures-timer = "3.0.1"
log = "0.4.17"
sc-client-api = { version = "4.0.0-dev", path = "../api" }
sc-network-common = { version = "0.10.0-dev", path = "../network/common" }
sc-network = { version = "0.10.0-dev", path = "../network" }
sp-blockchain = { version = "4.0.0-dev", path = "../../primitives/blockchain" }
sp-runtime = { version = "7.0.0", path = "../../primitives/runtime" }
+4 -6
View File
@@ -20,12 +20,10 @@ use crate::OutputFormat;
use ansi_term::Colour;
use log::info;
use sc_client_api::ClientInfo;
use sc_network_common::{
service::NetworkStatus,
sync::{
warp::{WarpSyncPhase, WarpSyncProgress},
SyncState, SyncStatus,
},
use sc_network::NetworkStatus;
use sc_network_common::sync::{
warp::{WarpSyncPhase, WarpSyncProgress},
SyncState, SyncStatus,
};
use sp_runtime::traits::{Block as BlockT, CheckedDiv, NumberFor, Saturating, Zero};
use std::{fmt, time::Instant};
+2 -1
View File
@@ -23,7 +23,8 @@ use futures::prelude::*;
use futures_timer::Delay;
use log::{debug, info, trace};
use sc_client_api::{BlockchainEvents, UsageProvider};
use sc_network_common::{service::NetworkStatusProvider, sync::SyncStatusProvider};
use sc_network::NetworkStatusProvider;
use sc_network_common::sync::SyncStatusProvider;
use sp_blockchain::HeaderMetadata;
use sp_runtime::traits::{Block as BlockT, Header};
use std::{collections::VecDeque, fmt::Display, sync::Arc, time::Duration};
@@ -22,6 +22,7 @@ log = "0.4.17"
lru = "0.8.1"
tracing = "0.1.29"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../utils/prometheus" }
sc-network = { version = "0.10.0-dev", path = "../network/" }
sc-network-common = { version = "0.10.0-dev", path = "../network/common" }
sc-peerset = { version = "4.0.0-dev", path = "../peerset" }
sp-runtime = { version = "7.0.0", path = "../../primitives/runtime" }
+6 -12
View File
@@ -21,10 +21,8 @@ use crate::{
Network, Syncing, Validator,
};
use sc_network_common::{
protocol::{event::Event, ProtocolName},
sync::SyncEvent,
};
use sc_network::{event::Event, types::ProtocolName};
use sc_network_common::sync::SyncEvent;
use sc_peerset::ReputationChange;
use futures::{
@@ -340,15 +338,11 @@ mod tests {
future::poll_fn,
};
use quickcheck::{Arbitrary, Gen, QuickCheck};
use sc_network_common::{
config::MultiaddrWithPeerId,
protocol::role::ObservedRole,
service::{
NetworkBlock, NetworkEventStream, NetworkNotification, NetworkPeers,
NotificationSender, NotificationSenderError,
},
sync::SyncEventStream,
use sc_network::{
config::MultiaddrWithPeerId, NetworkBlock, NetworkEventStream, NetworkNotification,
NetworkPeers, NotificationSenderError, NotificationSenderT as NotificationSender,
};
use sc_network_common::{role::ObservedRole, sync::SyncEventStream};
use sp_runtime::{
testing::H256,
traits::{Block as BlockT, NumberFor},
+3 -4
View File
@@ -68,11 +68,10 @@ pub use self::{
};
use libp2p::{multiaddr, PeerId};
use sc_network_common::{
protocol::ProtocolName,
service::{NetworkBlock, NetworkEventStream, NetworkNotification, NetworkPeers},
sync::SyncEventStream,
use sc_network::{
types::ProtocolName, NetworkBlock, NetworkEventStream, NetworkNotification, NetworkPeers,
};
use sc_network_common::sync::SyncEventStream;
use sp_runtime::traits::{Block as BlockT, NumberFor};
use std::iter;
@@ -22,7 +22,8 @@ use ahash::AHashSet;
use libp2p::PeerId;
use lru::LruCache;
use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
use sc_network_common::protocol::{role::ObservedRole, ProtocolName};
use sc_network::types::ProtocolName;
use sc_network_common::role::ObservedRole;
use sp_runtime::traits::{Block as BlockT, Hash, HashFor};
use std::{collections::HashMap, iter, num::NonZeroUsize, sync::Arc, time, time::Instant};
@@ -525,13 +526,10 @@ mod tests {
use super::*;
use crate::multiaddr::Multiaddr;
use futures::prelude::*;
use sc_network_common::{
config::MultiaddrWithPeerId,
protocol::event::Event,
service::{
NetworkBlock, NetworkEventStream, NetworkNotification, NetworkPeers,
NotificationSender, NotificationSenderError,
},
use sc_network::{
config::MultiaddrWithPeerId, event::Event, NetworkBlock, NetworkEventStream,
NetworkNotification, NetworkPeers, NotificationSenderError,
NotificationSenderT as NotificationSender,
};
use sc_peerset::ReputationChange;
use sp_runtime::{
@@ -17,7 +17,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use libp2p::PeerId;
use sc_network_common::protocol::role::ObservedRole;
use sc_network_common::role::ObservedRole;
use sp_runtime::traits::Block as BlockT;
/// Validates consensus messages.
+1
View File
@@ -26,6 +26,7 @@ futures = "0.3.21"
futures-timer = "3.0.2"
ip_network = "0.4.1"
libp2p = { version = "0.50.0", features = ["dns", "identify", "kad", "macros", "mdns", "mplex", "noise", "ping", "tcp", "tokio", "yamux", "websocket"] }
linked_hash_set = "0.1.3"
log = "0.4.17"
lru = "0.8.1"
mockall = "0.11.3"
@@ -24,6 +24,7 @@ prost = "0.11"
thiserror = "1.0"
unsigned-varint = { version = "0.7.1", features = ["futures", "asynchronous_codec"] }
sc-client-api = { version = "4.0.0-dev", path = "../../api" }
sc-network = { version = "0.10.0-dev", path = "../" }
sc-network-common = { version = "0.10.0-dev", path = "../common" }
sp-blockchain = { version = "4.0.0-dev", path = "../../../primitives/blockchain" }
sp-runtime = { version = "7.0.0", path = "../../../primitives/runtime" }
+5 -4
View File
@@ -26,9 +26,9 @@ use libp2p::core::PeerId;
use log::{debug, error, trace};
use prost::Message;
use sc_client_api::BlockBackend;
use sc_network_common::{
protocol::ProtocolName,
use sc_network::{
request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig},
types::ProtocolName,
};
use schema::bitswap::{
message::{wantlist::WantType, Block as MessageBlock, BlockPresence, BlockPresenceType},
@@ -127,8 +127,9 @@ impl<B: BlockT> BitswapRequestHandler<B> {
};
match pending_response.send(response) {
Ok(()) =>
trace!(target: LOG_TARGET, "Handled bitswap request from {peer}.",),
Ok(()) => {
trace!(target: LOG_TARGET, "Handled bitswap request from {peer}.",)
},
Err(_) => debug!(
target: LOG_TARGET,
"Failed to handle light client request from {peer}: {}",
@@ -26,7 +26,6 @@ codec = { package = "parity-scale-codec", version = "3.2.2", features = [
futures = "0.3.21"
futures-timer = "3.0.2"
libp2p = { version = "0.50.0", features = ["request-response", "kad"] }
linked_hash_set = "0.1.3"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../../utils/prometheus" }
smallvec = "1.8.0"
sc-consensus = { version = "0.10.0-dev", path = "../../consensus/common" }
@@ -1,702 +0,0 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Configuration of the networking layer.
pub use crate::{
protocol::{self, role::Role},
request_responses::{
IncomingRequest, OutgoingResponse, ProtocolConfig as RequestResponseConfig,
},
sync::warp::WarpSyncProvider,
ExHashT,
};
pub use libp2p::{build_multiaddr, core::PublicKey, identity};
use codec::Encode;
use libp2p::{
identity::{ed25519, Keypair},
multiaddr, Multiaddr, PeerId,
};
use zeroize::Zeroize;
use std::{
error::Error,
fmt, fs,
io::{self, Write},
iter,
net::Ipv4Addr,
path::{Path, PathBuf},
str,
str::FromStr,
};
/// Protocol name prefix, transmitted on the wire for legacy protocol names.
/// I.e., `dot` in `/dot/sync/2`. Should be unique for each chain. Always UTF-8.
/// Deprecated in favour of genesis hash & fork ID based protocol names.
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct ProtocolId(smallvec::SmallVec<[u8; 6]>);
impl<'a> From<&'a str> for ProtocolId {
fn from(bytes: &'a str) -> ProtocolId {
Self(bytes.as_bytes().into())
}
}
impl AsRef<str> for ProtocolId {
fn as_ref(&self) -> &str {
str::from_utf8(&self.0[..])
.expect("the only way to build a ProtocolId is through a UTF-8 String; qed")
}
}
impl fmt::Debug for ProtocolId {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(self.as_ref(), f)
}
}
/// Parses a string address and splits it into Multiaddress and PeerId, if
/// valid.
///
/// # Example
///
/// ```
/// # use libp2p::{Multiaddr, PeerId};
/// # use sc_network_common::config::parse_str_addr;
/// let (peer_id, addr) = parse_str_addr(
/// "/ip4/198.51.100.19/tcp/30333/p2p/QmSk5HQbn6LhUwDiNMseVUjuRYhEtYj4aUZ6WfWoGURpdV"
/// ).unwrap();
/// assert_eq!(peer_id, "QmSk5HQbn6LhUwDiNMseVUjuRYhEtYj4aUZ6WfWoGURpdV".parse::<PeerId>().unwrap());
/// assert_eq!(addr, "/ip4/198.51.100.19/tcp/30333".parse::<Multiaddr>().unwrap());
/// ```
pub fn parse_str_addr(addr_str: &str) -> Result<(PeerId, Multiaddr), ParseErr> {
let addr: Multiaddr = addr_str.parse()?;
parse_addr(addr)
}
/// Splits a Multiaddress into a Multiaddress and PeerId.
pub fn parse_addr(mut addr: Multiaddr) -> Result<(PeerId, Multiaddr), ParseErr> {
let who = match addr.pop() {
Some(multiaddr::Protocol::P2p(key)) =>
PeerId::from_multihash(key).map_err(|_| ParseErr::InvalidPeerId)?,
_ => return Err(ParseErr::PeerIdMissing),
};
Ok((who, addr))
}
/// Address of a node, including its identity.
///
/// This struct represents a decoded version of a multiaddress that ends with `/p2p/<peerid>`.
///
/// # Example
///
/// ```
/// # use libp2p::{Multiaddr, PeerId};
/// # use sc_network_common::config::MultiaddrWithPeerId;
/// let addr: MultiaddrWithPeerId =
/// "/ip4/198.51.100.19/tcp/30333/p2p/QmSk5HQbn6LhUwDiNMseVUjuRYhEtYj4aUZ6WfWoGURpdV".parse().unwrap();
/// assert_eq!(addr.peer_id.to_base58(), "QmSk5HQbn6LhUwDiNMseVUjuRYhEtYj4aUZ6WfWoGURpdV");
/// assert_eq!(addr.multiaddr.to_string(), "/ip4/198.51.100.19/tcp/30333");
/// ```
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
#[serde(try_from = "String", into = "String")]
pub struct MultiaddrWithPeerId {
/// Address of the node.
pub multiaddr: Multiaddr,
/// Its identity.
pub peer_id: PeerId,
}
impl MultiaddrWithPeerId {
/// Concatenates the multiaddress and peer ID into one multiaddress containing both.
pub fn concat(&self) -> Multiaddr {
let proto = multiaddr::Protocol::P2p(From::from(self.peer_id));
self.multiaddr.clone().with(proto)
}
}
impl fmt::Display for MultiaddrWithPeerId {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Display::fmt(&self.concat(), f)
}
}
impl FromStr for MultiaddrWithPeerId {
type Err = ParseErr;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let (peer_id, multiaddr) = parse_str_addr(s)?;
Ok(Self { peer_id, multiaddr })
}
}
impl From<MultiaddrWithPeerId> for String {
fn from(ma: MultiaddrWithPeerId) -> String {
format!("{}", ma)
}
}
impl TryFrom<String> for MultiaddrWithPeerId {
type Error = ParseErr;
fn try_from(string: String) -> Result<Self, Self::Error> {
string.parse()
}
}
/// Error that can be generated by `parse_str_addr`.
#[derive(Debug)]
pub enum ParseErr {
/// Error while parsing the multiaddress.
MultiaddrParse(multiaddr::Error),
/// Multihash of the peer ID is invalid.
InvalidPeerId,
/// The peer ID is missing from the address.
PeerIdMissing,
}
impl fmt::Display for ParseErr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::MultiaddrParse(err) => write!(f, "{}", err),
Self::InvalidPeerId => write!(f, "Peer id at the end of the address is invalid"),
Self::PeerIdMissing => write!(f, "Peer id is missing from the address"),
}
}
}
impl std::error::Error for ParseErr {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::MultiaddrParse(err) => Some(err),
Self::InvalidPeerId => None,
Self::PeerIdMissing => None,
}
}
}
impl From<multiaddr::Error> for ParseErr {
fn from(err: multiaddr::Error) -> ParseErr {
Self::MultiaddrParse(err)
}
}
/// Configuration for a set of nodes.
#[derive(Clone, Debug)]
pub struct SetConfig {
/// Maximum allowed number of incoming substreams related to this set.
pub in_peers: u32,
/// Number of outgoing substreams related to this set that we're trying to maintain.
pub out_peers: u32,
/// List of reserved node addresses.
pub reserved_nodes: Vec<MultiaddrWithPeerId>,
/// Whether nodes that aren't in [`SetConfig::reserved_nodes`] are accepted or automatically
/// refused.
pub non_reserved_mode: NonReservedPeerMode,
}
impl Default for SetConfig {
fn default() -> Self {
Self {
in_peers: 25,
out_peers: 75,
reserved_nodes: Vec::new(),
non_reserved_mode: NonReservedPeerMode::Accept,
}
}
}
/// Custom handshake for the notification protocol
#[derive(Debug, Clone)]
pub struct NotificationHandshake(Vec<u8>);
impl NotificationHandshake {
/// Create new `NotificationHandshake` from an object that implements `Encode`
pub fn new<H: Encode>(handshake: H) -> Self {
Self(handshake.encode())
}
/// Create new `NotificationHandshake` from raw bytes
pub fn from_bytes(bytes: Vec<u8>) -> Self {
Self(bytes)
}
}
impl std::ops::Deref for NotificationHandshake {
type Target = Vec<u8>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
/// Extension to [`SetConfig`] for sets that aren't the default set.
///
/// > **Note**: As new fields might be added in the future, please consider using the `new` method
/// > and modifiers instead of creating this struct manually.
#[derive(Clone, Debug)]
pub struct NonDefaultSetConfig {
/// Name of the notifications protocols of this set. A substream on this set will be
/// considered established once this protocol is open.
///
/// > **Note**: This field isn't present for the default set, as this is handled internally
/// > by the networking code.
pub notifications_protocol: protocol::ProtocolName,
/// If the remote reports that it doesn't support the protocol indicated in the
/// `notifications_protocol` field, then each of these fallback names will be tried one by
/// one.
///
/// If a fallback is used, it will be reported in
/// `sc_network::protocol::event::Event::NotificationStreamOpened::negotiated_fallback`
pub fallback_names: Vec<protocol::ProtocolName>,
/// Handshake of the protocol
///
/// NOTE: Currently custom handshakes are not fully supported. See issue #5685 for more
/// details. This field is temporarily used to allow moving the hardcoded block announcement
/// protocol out of `protocol.rs`.
pub handshake: Option<NotificationHandshake>,
/// Maximum allowed size of single notifications.
pub max_notification_size: u64,
/// Base configuration.
pub set_config: SetConfig,
}
impl NonDefaultSetConfig {
/// Creates a new [`NonDefaultSetConfig`]. Zero slots and accepts only reserved nodes.
pub fn new(notifications_protocol: protocol::ProtocolName, max_notification_size: u64) -> Self {
Self {
notifications_protocol,
max_notification_size,
fallback_names: Vec::new(),
handshake: None,
set_config: SetConfig {
in_peers: 0,
out_peers: 0,
reserved_nodes: Vec::new(),
non_reserved_mode: NonReservedPeerMode::Deny,
},
}
}
/// Modifies the configuration to allow non-reserved nodes.
pub fn allow_non_reserved(&mut self, in_peers: u32, out_peers: u32) {
self.set_config.in_peers = in_peers;
self.set_config.out_peers = out_peers;
self.set_config.non_reserved_mode = NonReservedPeerMode::Accept;
}
/// Add a node to the list of reserved nodes.
pub fn add_reserved(&mut self, peer: MultiaddrWithPeerId) {
self.set_config.reserved_nodes.push(peer);
}
/// Add a list of protocol names used for backward compatibility.
///
/// See the explanations in [`NonDefaultSetConfig::fallback_names`].
pub fn add_fallback_names(&mut self, fallback_names: Vec<protocol::ProtocolName>) {
self.fallback_names.extend(fallback_names);
}
}
/// Configuration for the transport layer.
#[derive(Clone, Debug)]
pub enum TransportConfig {
/// Normal transport mode.
Normal {
/// If true, the network will use mDNS to discover other libp2p nodes on the local network
/// and connect to them if they support the same chain.
enable_mdns: bool,
/// If true, allow connecting to private IPv4/IPv6 addresses (as defined in
/// [RFC1918](https://tools.ietf.org/html/rfc1918)). Irrelevant for addresses that have
/// been passed in `::sc_network::config::NetworkConfiguration::boot_nodes`.
allow_private_ip: bool,
},
/// Only allow connections within the same process.
/// Only addresses of the form `/memory/...` will be supported.
MemoryOnly,
}
/// The policy for connections to non-reserved peers.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum NonReservedPeerMode {
/// Accept them. This is the default.
Accept,
/// Deny them.
Deny,
}
impl NonReservedPeerMode {
/// Attempt to parse the peer mode from a string.
pub fn parse(s: &str) -> Option<Self> {
match s {
"accept" => Some(Self::Accept),
"deny" => Some(Self::Deny),
_ => None,
}
}
}
/// Sync operation mode.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum SyncMode {
/// Full block download and verification.
Full,
/// Download blocks and the latest state.
Fast {
/// Skip state proof download and verification.
skip_proofs: bool,
/// Download indexed transactions for recent blocks.
storage_chain_mode: bool,
},
/// Warp sync - verify authority set transitions and the latest state.
Warp,
}
impl SyncMode {
/// Returns if `self` is [`Self::Warp`].
pub fn is_warp(&self) -> bool {
matches!(self, Self::Warp)
}
/// Returns if `self` is [`Self::Fast`].
pub fn is_fast(&self) -> bool {
matches!(self, Self::Fast { .. })
}
}
impl Default for SyncMode {
fn default() -> Self {
Self::Full
}
}
/// Network service configuration.
#[derive(Clone, Debug)]
pub struct NetworkConfiguration {
/// Directory path to store network-specific configuration. None means nothing will be saved.
pub net_config_path: Option<PathBuf>,
/// Multiaddresses to listen for incoming connections.
pub listen_addresses: Vec<Multiaddr>,
/// Multiaddresses to advertise. Detected automatically if empty.
pub public_addresses: Vec<Multiaddr>,
/// List of initial node addresses
pub boot_nodes: Vec<MultiaddrWithPeerId>,
/// The node key configuration, which determines the node's network identity keypair.
pub node_key: NodeKeyConfig,
/// List of request-response protocols that the node supports.
pub request_response_protocols: Vec<RequestResponseConfig>,
/// Configuration for the default set of nodes used for block syncing and transactions.
pub default_peers_set: SetConfig,
/// Number of substreams to reserve for full nodes for block syncing and transactions.
/// Any other slot will be dedicated to light nodes.
///
/// This value is implicitly capped to `default_set.out_peers + default_set.in_peers`.
pub default_peers_set_num_full: u32,
/// Configuration for extra sets of nodes.
pub extra_sets: Vec<NonDefaultSetConfig>,
/// Client identifier. Sent over the wire for debugging purposes.
pub client_version: String,
/// Name of the node. Sent over the wire for debugging purposes.
pub node_name: String,
/// Configuration for the transport layer.
pub transport: TransportConfig,
/// Maximum number of peers to ask the same blocks in parallel.
pub max_parallel_downloads: u32,
/// Initial syncing mode.
pub sync_mode: SyncMode,
/// True if Kademlia random discovery should be enabled.
///
/// If true, the node will automatically randomly walk the DHT in order to find new peers.
pub enable_dht_random_walk: bool,
/// Should we insert non-global addresses into the DHT?
pub allow_non_globals_in_dht: bool,
/// Require iterative Kademlia DHT queries to use disjoint paths for increased resiliency in
/// the presence of potentially adversarial nodes.
pub kademlia_disjoint_query_paths: bool,
/// Enable serving block data over IPFS bitswap.
pub ipfs_server: bool,
/// Size of Yamux receive window of all substreams. `None` for the default (256kiB).
/// Any value less than 256kiB is invalid.
///
/// # Context
///
/// By design, notifications substreams on top of Yamux connections only allow up to `N` bytes
/// to be transferred at a time, where `N` is the Yamux receive window size configurable here.
/// This means, in practice, that every `N` bytes must be acknowledged by the receiver before
/// the sender can send more data. The maximum bandwidth of each notifications substream is
/// therefore `N / round_trip_time`.
///
/// It is recommended to leave this to `None`, and use a request-response protocol instead if
/// a large amount of data must be transferred. The reason why the value is configurable is
/// that some Substrate users mis-use notification protocols to send large amounts of data.
/// As such, this option isn't designed to stay and will likely get removed in the future.
///
/// Note that configuring a value here isn't a modification of the Yamux protocol, but rather
/// a modification of the way the implementation works. Different nodes with different
/// configured values remain compatible with each other.
pub yamux_window_size: Option<u32>,
}
impl NetworkConfiguration {
/// Create new default configuration
pub fn new<SN: Into<String>, SV: Into<String>>(
node_name: SN,
client_version: SV,
node_key: NodeKeyConfig,
net_config_path: Option<PathBuf>,
) -> Self {
let default_peers_set = SetConfig::default();
Self {
net_config_path,
listen_addresses: Vec::new(),
public_addresses: Vec::new(),
boot_nodes: Vec::new(),
node_key,
request_response_protocols: Vec::new(),
default_peers_set_num_full: default_peers_set.in_peers + default_peers_set.out_peers,
default_peers_set,
extra_sets: Vec::new(),
client_version: client_version.into(),
node_name: node_name.into(),
transport: TransportConfig::Normal { enable_mdns: false, allow_private_ip: true },
max_parallel_downloads: 5,
sync_mode: SyncMode::Full,
enable_dht_random_walk: true,
allow_non_globals_in_dht: false,
kademlia_disjoint_query_paths: false,
yamux_window_size: None,
ipfs_server: false,
}
}
/// Create new default configuration for localhost-only connection with random port (useful for
/// testing)
pub fn new_local() -> NetworkConfiguration {
let mut config =
NetworkConfiguration::new("test-node", "test-client", Default::default(), None);
config.listen_addresses =
vec![iter::once(multiaddr::Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
.chain(iter::once(multiaddr::Protocol::Tcp(0)))
.collect()];
config.allow_non_globals_in_dht = true;
config
}
/// Create new default configuration for localhost-only connection with random port (useful for
/// testing)
pub fn new_memory() -> NetworkConfiguration {
let mut config =
NetworkConfiguration::new("test-node", "test-client", Default::default(), None);
config.listen_addresses =
vec![iter::once(multiaddr::Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
.chain(iter::once(multiaddr::Protocol::Tcp(0)))
.collect()];
config.allow_non_globals_in_dht = true;
config
}
}
/// The configuration of a node's secret key, describing the type of key
/// and how it is obtained. A node's identity keypair is the result of
/// the evaluation of the node key configuration.
#[derive(Clone, Debug)]
pub enum NodeKeyConfig {
/// A Ed25519 secret key configuration.
Ed25519(Secret<ed25519::SecretKey>),
}
impl Default for NodeKeyConfig {
fn default() -> NodeKeyConfig {
Self::Ed25519(Secret::New)
}
}
/// The options for obtaining a Ed25519 secret key.
pub type Ed25519Secret = Secret<ed25519::SecretKey>;
/// The configuration options for obtaining a secret key `K`.
#[derive(Clone)]
pub enum Secret<K> {
/// Use the given secret key `K`.
Input(K),
/// Read the secret key from a file. If the file does not exist,
/// it is created with a newly generated secret key `K`. The format
/// of the file is determined by `K`:
///
/// * `ed25519::SecretKey`: An unencoded 32 bytes Ed25519 secret key.
File(PathBuf),
/// Always generate a new secret key `K`.
New,
}
impl<K> fmt::Debug for Secret<K> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Input(_) => f.debug_tuple("Secret::Input").finish(),
Self::File(path) => f.debug_tuple("Secret::File").field(path).finish(),
Self::New => f.debug_tuple("Secret::New").finish(),
}
}
}
impl NodeKeyConfig {
/// Evaluate a `NodeKeyConfig` to obtain an identity `Keypair`:
///
/// * If the secret is configured as input, the corresponding keypair is returned.
///
/// * If the secret is configured as a file, it is read from that file, if it exists. Otherwise
/// a new secret is generated and stored. In either case, the keypair obtained from the
/// secret is returned.
///
/// * If the secret is configured to be new, it is generated and the corresponding keypair is
/// returned.
pub fn into_keypair(self) -> io::Result<Keypair> {
use NodeKeyConfig::*;
match self {
Ed25519(Secret::New) => Ok(Keypair::generate_ed25519()),
Ed25519(Secret::Input(k)) => Ok(Keypair::Ed25519(k.into())),
Ed25519(Secret::File(f)) => get_secret(
f,
|mut b| match String::from_utf8(b.to_vec()).ok().and_then(|s| {
if s.len() == 64 {
array_bytes::hex2bytes(&s).ok()
} else {
None
}
}) {
Some(s) => ed25519::SecretKey::from_bytes(s),
_ => ed25519::SecretKey::from_bytes(&mut b),
},
ed25519::SecretKey::generate,
|b| b.as_ref().to_vec(),
)
.map(ed25519::Keypair::from)
.map(Keypair::Ed25519),
}
}
}
/// Load a secret key from a file, if it exists, or generate a
/// new secret key and write it to that file. In either case,
/// the secret key is returned.
fn get_secret<P, F, G, E, W, K>(file: P, parse: F, generate: G, serialize: W) -> io::Result<K>
where
P: AsRef<Path>,
F: for<'r> FnOnce(&'r mut [u8]) -> Result<K, E>,
G: FnOnce() -> K,
E: Error + Send + Sync + 'static,
W: Fn(&K) -> Vec<u8>,
{
std::fs::read(&file)
.and_then(|mut sk_bytes| {
parse(&mut sk_bytes).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
})
.or_else(|e| {
if e.kind() == io::ErrorKind::NotFound {
file.as_ref().parent().map_or(Ok(()), fs::create_dir_all)?;
let sk = generate();
let mut sk_vec = serialize(&sk);
write_secret_file(file, &sk_vec)?;
sk_vec.zeroize();
Ok(sk)
} else {
Err(e)
}
})
}
/// Write secret bytes to a file.
fn write_secret_file<P>(path: P, sk_bytes: &[u8]) -> io::Result<()>
where
P: AsRef<Path>,
{
let mut file = open_secret_file(&path)?;
file.write_all(sk_bytes)
}
/// Opens a file containing a secret key in write mode.
#[cfg(unix)]
fn open_secret_file<P>(path: P) -> io::Result<fs::File>
where
P: AsRef<Path>,
{
use std::os::unix::fs::OpenOptionsExt;
fs::OpenOptions::new().write(true).create_new(true).mode(0o600).open(path)
}
/// Opens a file containing a secret key in write mode.
#[cfg(not(unix))]
fn open_secret_file<P>(path: P) -> Result<fs::File, io::Error>
where
P: AsRef<Path>,
{
fs::OpenOptions::new().write(true).create_new(true).open(path)
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn tempdir_with_prefix(prefix: &str) -> TempDir {
tempfile::Builder::new().prefix(prefix).tempdir().unwrap()
}
fn secret_bytes(kp: &Keypair) -> Vec<u8> {
let Keypair::Ed25519(p) = kp;
p.secret().as_ref().iter().cloned().collect()
}
#[test]
fn test_secret_file() {
let tmp = tempdir_with_prefix("x");
std::fs::remove_dir(tmp.path()).unwrap(); // should be recreated
let file = tmp.path().join("x").to_path_buf();
let kp1 = NodeKeyConfig::Ed25519(Secret::File(file.clone())).into_keypair().unwrap();
let kp2 = NodeKeyConfig::Ed25519(Secret::File(file.clone())).into_keypair().unwrap();
assert!(file.is_file() && secret_bytes(&kp1) == secret_bytes(&kp2))
}
#[test]
fn test_secret_input() {
let sk = ed25519::SecretKey::generate();
let kp1 = NodeKeyConfig::Ed25519(Secret::Input(sk.clone())).into_keypair().unwrap();
let kp2 = NodeKeyConfig::Ed25519(Secret::Input(sk)).into_keypair().unwrap();
assert!(secret_bytes(&kp1) == secret_bytes(&kp2));
}
#[test]
fn test_secret_new() {
let kp1 = NodeKeyConfig::Ed25519(Secret::New).into_keypair().unwrap();
let kp2 = NodeKeyConfig::Ed25519(Secret::New).into_keypair().unwrap();
assert!(secret_bytes(&kp1) != secret_bytes(&kp2));
}
}
+1 -6
View File
@@ -18,14 +18,9 @@
//! Common data structures of the networking layer.
pub mod config;
pub mod error;
pub mod message;
pub mod protocol;
pub mod request_responses;
pub mod service;
pub mod role;
pub mod sync;
pub mod utils;
/// Minimum Requirements for a Hash within Networking
pub trait ExHashT: std::hash::Hash + Eq + std::fmt::Debug + Clone + Send + Sync + 'static {}
@@ -1,155 +0,0 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Collection of generic data structures for request-response protocols.
use crate::protocol::ProtocolName;
use futures::channel::{mpsc, oneshot};
use libp2p::{request_response::OutboundFailure, PeerId};
use sc_peerset::ReputationChange;
use std::time::Duration;
/// Configuration for a single request-response protocol.
#[derive(Debug, Clone)]
pub struct ProtocolConfig {
/// Name of the protocol on the wire. Should be something like `/foo/bar`.
pub name: ProtocolName,
/// Fallback on the wire protocol names to support.
pub fallback_names: Vec<ProtocolName>,
/// Maximum allowed size, in bytes, of a request.
///
/// Any request larger than this value will be declined as a way to avoid allocating too
/// much memory for it.
pub max_request_size: u64,
/// Maximum allowed size, in bytes, of a response.
///
/// Any response larger than this value will be declined as a way to avoid allocating too
/// much memory for it.
pub max_response_size: u64,
/// Duration after which emitted requests are considered timed out.
///
/// If you expect the response to come back quickly, you should set this to a smaller duration.
pub request_timeout: Duration,
/// Channel on which the networking service will send incoming requests.
///
/// Every time a peer sends a request to the local node using this protocol, the networking
/// service will push an element on this channel. The receiving side of this channel then has
/// to pull this element, process the request, and send back the response to send back to the
/// peer.
///
/// The size of the channel has to be carefully chosen. If the channel is full, the networking
/// service will discard the incoming request send back an error to the peer. Consequently,
/// the channel being full is an indicator that the node is overloaded.
///
/// You can typically set the size of the channel to `T / d`, where `T` is the
/// `request_timeout` and `d` is the expected average duration of CPU and I/O it takes to
/// build a response.
///
/// Can be `None` if the local node does not support answering incoming requests.
/// If this is `None`, then the local node will not advertise support for this protocol towards
/// other peers. If this is `Some` but the channel is closed, then the local node will
/// advertise support for this protocol, but any incoming request will lead to an error being
/// sent back.
pub inbound_queue: Option<mpsc::Sender<IncomingRequest>>,
}
/// A single request received by a peer on a request-response protocol.
#[derive(Debug)]
pub struct IncomingRequest {
/// Who sent the request.
pub peer: PeerId,
/// Request sent by the remote. Will always be smaller than
/// [`ProtocolConfig::max_request_size`].
pub payload: Vec<u8>,
/// Channel to send back the response.
///
/// There are two ways to indicate that handling the request failed:
///
/// 1. Drop `pending_response` and thus not changing the reputation of the peer.
///
/// 2. Sending an `Err(())` via `pending_response`, optionally including reputation changes for
/// the given peer.
pub pending_response: oneshot::Sender<OutgoingResponse>,
}
/// Response for an incoming request to be send by a request protocol handler.
#[derive(Debug)]
pub struct OutgoingResponse {
/// The payload of the response.
///
/// `Err(())` if none is available e.g. due an error while handling the request.
pub result: Result<Vec<u8>, ()>,
/// Reputation changes accrued while handling the request. To be applied to the reputation of
/// the peer sending the request.
pub reputation_changes: Vec<ReputationChange>,
/// If provided, the `oneshot::Sender` will be notified when the request has been sent to the
/// peer.
///
/// > **Note**: Operating systems typically maintain a buffer of a few dozen kilobytes of
/// > outgoing data for each TCP socket, and it is not possible for a user
/// > application to inspect this buffer. This channel here is not actually notified
/// > when the response has been fully sent out, but rather when it has fully been
/// > written to the buffer managed by the operating system.
pub sent_feedback: Option<oneshot::Sender<()>>,
}
/// When sending a request, what to do on a disconnected recipient.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum IfDisconnected {
/// Try to connect to the peer.
TryConnect,
/// Just fail if the destination is not yet connected.
ImmediateError,
}
/// Convenience functions for `IfDisconnected`.
impl IfDisconnected {
/// Shall we connect to a disconnected peer?
pub fn should_connect(self) -> bool {
match self {
Self::TryConnect => true,
Self::ImmediateError => false,
}
}
}
/// Error in a request.
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
pub enum RequestFailure {
#[error("We are not currently connected to the requested peer.")]
NotConnected,
#[error("Given protocol hasn't been registered.")]
UnknownProtocol,
#[error("Remote has closed the substream before answering, thereby signaling that it considers the request as valid, but refused to answer it.")]
Refused,
#[error("The remote replied, but the local node is no longer interested in the response.")]
Obsolete,
/// Problem on the network.
#[error("Problem on the network: {0}")]
Network(OutboundFailure),
}
+1 -1
View File
@@ -22,7 +22,7 @@ pub mod message;
pub mod metrics;
pub mod warp;
use crate::protocol::role::Roles;
use crate::role::Roles;
use futures::Stream;
use libp2p::PeerId;
@@ -19,7 +19,7 @@
//! Network packet message types. These get serialized and put into the lower level protocol
//! payload.
use crate::protocol::role::Roles;
use crate::role::Roles;
use bitflags::bitflags;
use codec::{Decode, Encode, Error, Input, Output};
@@ -26,6 +26,7 @@ log = "0.4.16"
prost = "0.11"
sp-blockchain = { version = "4.0.0-dev", path = "../../../primitives/blockchain" }
sc-client-api = { version = "4.0.0-dev", path = "../../api" }
sc-network = { version = "0.10.0-dev", path = "../" }
sc-network-common = { version = "0.10.0-dev", path = "../common" }
sc-peerset = { version = "4.0.0-dev", path = "../../peerset" }
sp-core = { version = "7.0.0", path = "../../../primitives/core" }
@@ -18,13 +18,13 @@
//! Helpers for outgoing and incoming light client requests.
/// For incoming light client requests.
pub mod handler;
use sc_network_common::{config::ProtocolId, request_responses::ProtocolConfig};
use sc_network::{config::ProtocolId, request_responses::ProtocolConfig};
use std::time::Duration;
/// For incoming light client requests.
pub mod handler;
/// Generate the light client protocol name from the genesis hash and fork id.
fn generate_protocol_name<Hash: AsRef<[u8]>>(genesis_hash: Hash, fork_id: Option<&str>) -> String {
let genesis_hash = genesis_hash.as_ref();
@@ -29,7 +29,7 @@ use libp2p::PeerId;
use log::{debug, trace};
use prost::Message;
use sc_client_api::{BlockBackend, ProofProvider};
use sc_network_common::{
use sc_network::{
config::ProtocolId,
request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig},
};
+4 -9
View File
@@ -18,9 +18,11 @@
use crate::{
discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut},
event::DhtEvent,
peer_info,
protocol::{CustomMessageOutcome, NotificationsSink, Protocol},
request_responses,
request_responses::{self, IfDisconnected, ProtocolConfig, RequestFailure},
types::ProtocolName,
};
use bytes::Bytes;
@@ -32,14 +34,7 @@ use libp2p::{
swarm::NetworkBehaviour,
};
use sc_network_common::{
protocol::{
event::DhtEvent,
role::{ObservedRole, Roles},
ProtocolName,
},
request_responses::{IfDisconnected, ProtocolConfig, RequestFailure},
};
use sc_network_common::role::{ObservedRole, Roles};
use sc_peerset::{PeersetHandle, ReputationChange};
use sp_runtime::traits::Block as BlockT;
use std::{collections::HashSet, time::Duration};
+699 -9
View File
@@ -21,21 +21,671 @@
//! The [`Params`] struct is the struct that must be passed in order to initialize the networking.
//! See the documentation of [`Params`].
pub use sc_network_common::{
config::{NetworkConfiguration, ProtocolId},
protocol::role::Role,
pub use crate::{
request_responses::{
IncomingRequest, OutgoingResponse, ProtocolConfig as RequestResponseConfig,
},
sync::warp::WarpSyncProvider,
ExHashT,
types::ProtocolName,
};
pub use libp2p::{build_multiaddr, core::PublicKey, identity};
use codec::Encode;
use libp2p::{identity::Keypair, multiaddr, Multiaddr, PeerId};
use prometheus_endpoint::Registry;
use sc_network_common::config::NonDefaultSetConfig;
use std::{future::Future, pin::Pin, sync::Arc};
pub use sc_network_common::{role::Role, sync::warp::WarpSyncProvider, ExHashT};
use zeroize::Zeroize;
use std::{
error::Error,
fmt, fs,
future::Future,
io::{self, Write},
iter,
net::Ipv4Addr,
path::{Path, PathBuf},
pin::Pin,
str::{self, FromStr},
sync::Arc,
};
pub use libp2p::{
build_multiaddr,
identity::{self, ed25519},
};
/// Protocol name prefix, transmitted on the wire for legacy protocol names.
/// I.e., `dot` in `/dot/sync/2`. Should be unique for each chain. Always UTF-8.
/// Deprecated in favour of genesis hash & fork ID based protocol names.
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct ProtocolId(smallvec::SmallVec<[u8; 6]>);
impl<'a> From<&'a str> for ProtocolId {
fn from(bytes: &'a str) -> ProtocolId {
Self(bytes.as_bytes().into())
}
}
impl AsRef<str> for ProtocolId {
fn as_ref(&self) -> &str {
str::from_utf8(&self.0[..])
.expect("the only way to build a ProtocolId is through a UTF-8 String; qed")
}
}
impl fmt::Debug for ProtocolId {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(self.as_ref(), f)
}
}
/// Parses a string address and splits it into Multiaddress and PeerId, if
/// valid.
///
/// # Example
///
/// ```
/// # use libp2p::{Multiaddr, PeerId};
/// use sc_network::config::parse_str_addr;
/// let (peer_id, addr) = parse_str_addr(
/// "/ip4/198.51.100.19/tcp/30333/p2p/QmSk5HQbn6LhUwDiNMseVUjuRYhEtYj4aUZ6WfWoGURpdV"
/// ).unwrap();
/// assert_eq!(peer_id, "QmSk5HQbn6LhUwDiNMseVUjuRYhEtYj4aUZ6WfWoGURpdV".parse::<PeerId>().unwrap());
/// assert_eq!(addr, "/ip4/198.51.100.19/tcp/30333".parse::<Multiaddr>().unwrap());
/// ```
pub fn parse_str_addr(addr_str: &str) -> Result<(PeerId, Multiaddr), ParseErr> {
let addr: Multiaddr = addr_str.parse()?;
parse_addr(addr)
}
/// Splits a Multiaddress into a Multiaddress and PeerId.
pub fn parse_addr(mut addr: Multiaddr) -> Result<(PeerId, Multiaddr), ParseErr> {
let who = match addr.pop() {
Some(multiaddr::Protocol::P2p(key)) =>
PeerId::from_multihash(key).map_err(|_| ParseErr::InvalidPeerId)?,
_ => return Err(ParseErr::PeerIdMissing),
};
Ok((who, addr))
}
/// Address of a node, including its identity.
///
/// This struct represents a decoded version of a multiaddress that ends with `/p2p/<peerid>`.
///
/// # Example
///
/// ```
/// # use libp2p::{Multiaddr, PeerId};
/// use sc_network::config::MultiaddrWithPeerId;
/// let addr: MultiaddrWithPeerId =
/// "/ip4/198.51.100.19/tcp/30333/p2p/QmSk5HQbn6LhUwDiNMseVUjuRYhEtYj4aUZ6WfWoGURpdV".parse().unwrap();
/// assert_eq!(addr.peer_id.to_base58(), "QmSk5HQbn6LhUwDiNMseVUjuRYhEtYj4aUZ6WfWoGURpdV");
/// assert_eq!(addr.multiaddr.to_string(), "/ip4/198.51.100.19/tcp/30333");
/// ```
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
#[serde(try_from = "String", into = "String")]
pub struct MultiaddrWithPeerId {
/// Address of the node.
pub multiaddr: Multiaddr,
/// Its identity.
pub peer_id: PeerId,
}
impl MultiaddrWithPeerId {
/// Concatenates the multiaddress and peer ID into one multiaddress containing both.
pub fn concat(&self) -> Multiaddr {
let proto = multiaddr::Protocol::P2p(From::from(self.peer_id));
self.multiaddr.clone().with(proto)
}
}
impl fmt::Display for MultiaddrWithPeerId {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Display::fmt(&self.concat(), f)
}
}
impl FromStr for MultiaddrWithPeerId {
type Err = ParseErr;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let (peer_id, multiaddr) = parse_str_addr(s)?;
Ok(Self { peer_id, multiaddr })
}
}
impl From<MultiaddrWithPeerId> for String {
fn from(ma: MultiaddrWithPeerId) -> String {
format!("{}", ma)
}
}
impl TryFrom<String> for MultiaddrWithPeerId {
type Error = ParseErr;
fn try_from(string: String) -> Result<Self, Self::Error> {
string.parse()
}
}
/// Error that can be generated by `parse_str_addr`.
#[derive(Debug)]
pub enum ParseErr {
/// Error while parsing the multiaddress.
MultiaddrParse(multiaddr::Error),
/// Multihash of the peer ID is invalid.
InvalidPeerId,
/// The peer ID is missing from the address.
PeerIdMissing,
}
impl fmt::Display for ParseErr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::MultiaddrParse(err) => write!(f, "{}", err),
Self::InvalidPeerId => write!(f, "Peer id at the end of the address is invalid"),
Self::PeerIdMissing => write!(f, "Peer id is missing from the address"),
}
}
}
impl std::error::Error for ParseErr {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::MultiaddrParse(err) => Some(err),
Self::InvalidPeerId => None,
Self::PeerIdMissing => None,
}
}
}
impl From<multiaddr::Error> for ParseErr {
fn from(err: multiaddr::Error) -> ParseErr {
Self::MultiaddrParse(err)
}
}
/// Custom handshake for the notification protocol
#[derive(Debug, Clone)]
pub struct NotificationHandshake(Vec<u8>);
impl NotificationHandshake {
/// Create new `NotificationHandshake` from an object that implements `Encode`
pub fn new<H: Encode>(handshake: H) -> Self {
Self(handshake.encode())
}
/// Create new `NotificationHandshake` from raw bytes
pub fn from_bytes(bytes: Vec<u8>) -> Self {
Self(bytes)
}
}
impl std::ops::Deref for NotificationHandshake {
type Target = Vec<u8>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
/// Configuration for the transport layer.
#[derive(Clone, Debug)]
pub enum TransportConfig {
/// Normal transport mode.
Normal {
/// If true, the network will use mDNS to discover other libp2p nodes on the local network
/// and connect to them if they support the same chain.
enable_mdns: bool,
/// If true, allow connecting to private IPv4/IPv6 addresses (as defined in
/// [RFC1918](https://tools.ietf.org/html/rfc1918)). Irrelevant for addresses that have
/// been passed in `::sc_network::config::NetworkConfiguration::boot_nodes`.
allow_private_ip: bool,
},
/// Only allow connections within the same process.
/// Only addresses of the form `/memory/...` will be supported.
MemoryOnly,
}
/// The policy for connections to non-reserved peers.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum NonReservedPeerMode {
/// Accept them. This is the default.
Accept,
/// Deny them.
Deny,
}
impl NonReservedPeerMode {
/// Attempt to parse the peer mode from a string.
pub fn parse(s: &str) -> Option<Self> {
match s {
"accept" => Some(Self::Accept),
"deny" => Some(Self::Deny),
_ => None,
}
}
}
/// Sync operation mode.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum SyncMode {
/// Full block download and verification.
Full,
/// Download blocks and the latest state.
Fast {
/// Skip state proof download and verification.
skip_proofs: bool,
/// Download indexed transactions for recent blocks.
storage_chain_mode: bool,
},
/// Warp sync - verify authority set transitions and the latest state.
Warp,
}
impl SyncMode {
/// Returns if `self` is [`Self::Warp`].
pub fn is_warp(&self) -> bool {
matches!(self, Self::Warp)
}
/// Returns if `self` is [`Self::Fast`].
pub fn is_fast(&self) -> bool {
matches!(self, Self::Fast { .. })
}
}
impl Default for SyncMode {
fn default() -> Self {
Self::Full
}
}
/// The configuration of a node's secret key, describing the type of key
/// and how it is obtained. A node's identity keypair is the result of
/// the evaluation of the node key configuration.
#[derive(Clone, Debug)]
pub enum NodeKeyConfig {
/// A Ed25519 secret key configuration.
Ed25519(Secret<ed25519::SecretKey>),
}
impl Default for NodeKeyConfig {
fn default() -> NodeKeyConfig {
Self::Ed25519(Secret::New)
}
}
/// The options for obtaining a Ed25519 secret key.
pub type Ed25519Secret = Secret<ed25519::SecretKey>;
/// The configuration options for obtaining a secret key `K`.
#[derive(Clone)]
pub enum Secret<K> {
/// Use the given secret key `K`.
Input(K),
/// Read the secret key from a file. If the file does not exist,
/// it is created with a newly generated secret key `K`. The format
/// of the file is determined by `K`:
///
/// * `ed25519::SecretKey`: An unencoded 32 bytes Ed25519 secret key.
File(PathBuf),
/// Always generate a new secret key `K`.
New,
}
impl<K> fmt::Debug for Secret<K> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Input(_) => f.debug_tuple("Secret::Input").finish(),
Self::File(path) => f.debug_tuple("Secret::File").field(path).finish(),
Self::New => f.debug_tuple("Secret::New").finish(),
}
}
}
impl NodeKeyConfig {
/// Evaluate a `NodeKeyConfig` to obtain an identity `Keypair`:
///
/// * If the secret is configured as input, the corresponding keypair is returned.
///
/// * If the secret is configured as a file, it is read from that file, if it exists. Otherwise
/// a new secret is generated and stored. In either case, the keypair obtained from the
/// secret is returned.
///
/// * If the secret is configured to be new, it is generated and the corresponding keypair is
/// returned.
pub fn into_keypair(self) -> io::Result<Keypair> {
use NodeKeyConfig::*;
match self {
Ed25519(Secret::New) => Ok(Keypair::generate_ed25519()),
Ed25519(Secret::Input(k)) => Ok(Keypair::Ed25519(k.into())),
Ed25519(Secret::File(f)) => get_secret(
f,
|mut b| match String::from_utf8(b.to_vec()).ok().and_then(|s| {
if s.len() == 64 {
array_bytes::hex2bytes(&s).ok()
} else {
None
}
}) {
Some(s) => ed25519::SecretKey::from_bytes(s),
_ => ed25519::SecretKey::from_bytes(&mut b),
},
ed25519::SecretKey::generate,
|b| b.as_ref().to_vec(),
)
.map(ed25519::Keypair::from)
.map(Keypair::Ed25519),
}
}
}
/// Load a secret key from a file, if it exists, or generate a
/// new secret key and write it to that file. In either case,
/// the secret key is returned.
fn get_secret<P, F, G, E, W, K>(file: P, parse: F, generate: G, serialize: W) -> io::Result<K>
where
P: AsRef<Path>,
F: for<'r> FnOnce(&'r mut [u8]) -> Result<K, E>,
G: FnOnce() -> K,
E: Error + Send + Sync + 'static,
W: Fn(&K) -> Vec<u8>,
{
std::fs::read(&file)
.and_then(|mut sk_bytes| {
parse(&mut sk_bytes).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
})
.or_else(|e| {
if e.kind() == io::ErrorKind::NotFound {
file.as_ref().parent().map_or(Ok(()), fs::create_dir_all)?;
let sk = generate();
let mut sk_vec = serialize(&sk);
write_secret_file(file, &sk_vec)?;
sk_vec.zeroize();
Ok(sk)
} else {
Err(e)
}
})
}
/// Write secret bytes to a file.
fn write_secret_file<P>(path: P, sk_bytes: &[u8]) -> io::Result<()>
where
P: AsRef<Path>,
{
let mut file = open_secret_file(&path)?;
file.write_all(sk_bytes)
}
/// Opens a file containing a secret key in write mode.
#[cfg(unix)]
fn open_secret_file<P>(path: P) -> io::Result<fs::File>
where
P: AsRef<Path>,
{
use std::os::unix::fs::OpenOptionsExt;
fs::OpenOptions::new().write(true).create_new(true).mode(0o600).open(path)
}
/// Opens a file containing a secret key in write mode.
#[cfg(not(unix))]
fn open_secret_file<P>(path: P) -> Result<fs::File, io::Error>
where
P: AsRef<Path>,
{
fs::OpenOptions::new().write(true).create_new(true).open(path)
}
/// Configuration for a set of nodes.
#[derive(Clone, Debug)]
pub struct SetConfig {
/// Maximum allowed number of incoming substreams related to this set.
pub in_peers: u32,
/// Number of outgoing substreams related to this set that we're trying to maintain.
pub out_peers: u32,
/// List of reserved node addresses.
pub reserved_nodes: Vec<MultiaddrWithPeerId>,
/// Whether nodes that aren't in [`SetConfig::reserved_nodes`] are accepted or automatically
/// refused.
pub non_reserved_mode: NonReservedPeerMode,
}
impl Default for SetConfig {
fn default() -> Self {
Self {
in_peers: 25,
out_peers: 75,
reserved_nodes: Vec::new(),
non_reserved_mode: NonReservedPeerMode::Accept,
}
}
}
/// Extension to [`SetConfig`] for sets that aren't the default set.
///
/// > **Note**: As new fields might be added in the future, please consider using the `new` method
/// > and modifiers instead of creating this struct manually.
#[derive(Clone, Debug)]
pub struct NonDefaultSetConfig {
/// Name of the notifications protocols of this set. A substream on this set will be
/// considered established once this protocol is open.
///
/// > **Note**: This field isn't present for the default set, as this is handled internally
/// > by the networking code.
pub notifications_protocol: ProtocolName,
/// If the remote reports that it doesn't support the protocol indicated in the
/// `notifications_protocol` field, then each of these fallback names will be tried one by
/// one.
///
/// If a fallback is used, it will be reported in
/// `sc_network::protocol::event::Event::NotificationStreamOpened::negotiated_fallback`
pub fallback_names: Vec<ProtocolName>,
/// Handshake of the protocol
///
/// NOTE: Currently custom handshakes are not fully supported. See issue #5685 for more
/// details. This field is temporarily used to allow moving the hardcoded block announcement
/// protocol out of `protocol.rs`.
pub handshake: Option<NotificationHandshake>,
/// Maximum allowed size of single notifications.
pub max_notification_size: u64,
/// Base configuration.
pub set_config: SetConfig,
}
impl NonDefaultSetConfig {
/// Creates a new [`NonDefaultSetConfig`]. Zero slots and accepts only reserved nodes.
pub fn new(notifications_protocol: ProtocolName, max_notification_size: u64) -> Self {
Self {
notifications_protocol,
max_notification_size,
fallback_names: Vec::new(),
handshake: None,
set_config: SetConfig {
in_peers: 0,
out_peers: 0,
reserved_nodes: Vec::new(),
non_reserved_mode: NonReservedPeerMode::Deny,
},
}
}
/// Modifies the configuration to allow non-reserved nodes.
pub fn allow_non_reserved(&mut self, in_peers: u32, out_peers: u32) {
self.set_config.in_peers = in_peers;
self.set_config.out_peers = out_peers;
self.set_config.non_reserved_mode = NonReservedPeerMode::Accept;
}
/// Add a node to the list of reserved nodes.
pub fn add_reserved(&mut self, peer: MultiaddrWithPeerId) {
self.set_config.reserved_nodes.push(peer);
}
/// Add a list of protocol names used for backward compatibility.
///
/// See the explanations in [`NonDefaultSetConfig::fallback_names`].
pub fn add_fallback_names(&mut self, fallback_names: Vec<ProtocolName>) {
self.fallback_names.extend(fallback_names);
}
}
/// Network service configuration.
#[derive(Clone, Debug)]
pub struct NetworkConfiguration {
/// Directory path to store network-specific configuration. None means nothing will be saved.
pub net_config_path: Option<PathBuf>,
/// Multiaddresses to listen for incoming connections.
pub listen_addresses: Vec<Multiaddr>,
/// Multiaddresses to advertise. Detected automatically if empty.
pub public_addresses: Vec<Multiaddr>,
/// List of initial node addresses
pub boot_nodes: Vec<MultiaddrWithPeerId>,
/// The node key configuration, which determines the node's network identity keypair.
pub node_key: NodeKeyConfig,
/// List of request-response protocols that the node supports.
pub request_response_protocols: Vec<RequestResponseConfig>,
/// Configuration for the default set of nodes used for block syncing and transactions.
pub default_peers_set: SetConfig,
/// Number of substreams to reserve for full nodes for block syncing and transactions.
/// Any other slot will be dedicated to light nodes.
///
/// This value is implicitly capped to `default_set.out_peers + default_set.in_peers`.
pub default_peers_set_num_full: u32,
/// Configuration for extra sets of nodes.
pub extra_sets: Vec<NonDefaultSetConfig>,
/// Client identifier. Sent over the wire for debugging purposes.
pub client_version: String,
/// Name of the node. Sent over the wire for debugging purposes.
pub node_name: String,
/// Configuration for the transport layer.
pub transport: TransportConfig,
/// Maximum number of peers to ask the same blocks in parallel.
pub max_parallel_downloads: u32,
/// Initial syncing mode.
pub sync_mode: SyncMode,
/// True if Kademlia random discovery should be enabled.
///
/// If true, the node will automatically randomly walk the DHT in order to find new peers.
pub enable_dht_random_walk: bool,
/// Should we insert non-global addresses into the DHT?
pub allow_non_globals_in_dht: bool,
/// Require iterative Kademlia DHT queries to use disjoint paths for increased resiliency in
/// the presence of potentially adversarial nodes.
pub kademlia_disjoint_query_paths: bool,
/// Enable serving block data over IPFS bitswap.
pub ipfs_server: bool,
/// Size of Yamux receive window of all substreams. `None` for the default (256kiB).
/// Any value less than 256kiB is invalid.
///
/// # Context
///
/// By design, notifications substreams on top of Yamux connections only allow up to `N` bytes
/// to be transferred at a time, where `N` is the Yamux receive window size configurable here.
/// This means, in practice, that every `N` bytes must be acknowledged by the receiver before
/// the sender can send more data. The maximum bandwidth of each notifications substream is
/// therefore `N / round_trip_time`.
///
/// It is recommended to leave this to `None`, and use a request-response protocol instead if
/// a large amount of data must be transferred. The reason why the value is configurable is
/// that some Substrate users mis-use notification protocols to send large amounts of data.
/// As such, this option isn't designed to stay and will likely get removed in the future.
///
/// Note that configuring a value here isn't a modification of the Yamux protocol, but rather
/// a modification of the way the implementation works. Different nodes with different
/// configured values remain compatible with each other.
pub yamux_window_size: Option<u32>,
}
impl NetworkConfiguration {
/// Create new default configuration
pub fn new<SN: Into<String>, SV: Into<String>>(
node_name: SN,
client_version: SV,
node_key: NodeKeyConfig,
net_config_path: Option<PathBuf>,
) -> Self {
let default_peers_set = SetConfig::default();
Self {
net_config_path,
listen_addresses: Vec::new(),
public_addresses: Vec::new(),
boot_nodes: Vec::new(),
node_key,
request_response_protocols: Vec::new(),
default_peers_set_num_full: default_peers_set.in_peers + default_peers_set.out_peers,
default_peers_set,
extra_sets: Vec::new(),
client_version: client_version.into(),
node_name: node_name.into(),
transport: TransportConfig::Normal { enable_mdns: false, allow_private_ip: true },
max_parallel_downloads: 5,
sync_mode: SyncMode::Full,
enable_dht_random_walk: true,
allow_non_globals_in_dht: false,
kademlia_disjoint_query_paths: false,
yamux_window_size: None,
ipfs_server: false,
}
}
/// Create new default configuration for localhost-only connection with random port (useful for
/// testing)
pub fn new_local() -> NetworkConfiguration {
let mut config =
NetworkConfiguration::new("test-node", "test-client", Default::default(), None);
config.listen_addresses =
vec![iter::once(multiaddr::Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
.chain(iter::once(multiaddr::Protocol::Tcp(0)))
.collect()];
config.allow_non_globals_in_dht = true;
config
}
/// Create new default configuration for localhost-only connection with random port (useful for
/// testing)
pub fn new_memory() -> NetworkConfiguration {
let mut config =
NetworkConfiguration::new("test-node", "test-client", Default::default(), None);
config.listen_addresses =
vec![iter::once(multiaddr::Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
.chain(iter::once(multiaddr::Protocol::Tcp(0)))
.collect()];
config.allow_non_globals_in_dht = true;
config
}
}
/// Network initialization parameters.
pub struct Params<Client> {
@@ -67,3 +717,43 @@ pub struct Params<Client> {
/// Request response protocol configurations
pub request_response_protocol_configs: Vec<RequestResponseConfig>,
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn tempdir_with_prefix(prefix: &str) -> TempDir {
tempfile::Builder::new().prefix(prefix).tempdir().unwrap()
}
fn secret_bytes(kp: &Keypair) -> Vec<u8> {
let Keypair::Ed25519(p) = kp;
p.secret().as_ref().iter().cloned().collect()
}
#[test]
fn test_secret_file() {
let tmp = tempdir_with_prefix("x");
std::fs::remove_dir(tmp.path()).unwrap(); // should be recreated
let file = tmp.path().join("x").to_path_buf();
let kp1 = NodeKeyConfig::Ed25519(Secret::File(file.clone())).into_keypair().unwrap();
let kp2 = NodeKeyConfig::Ed25519(Secret::File(file.clone())).into_keypair().unwrap();
assert!(file.is_file() && secret_bytes(&kp1) == secret_bytes(&kp2))
}
#[test]
fn test_secret_input() {
let sk = ed25519::SecretKey::generate();
let kp1 = NodeKeyConfig::Ed25519(Secret::Input(sk.clone())).into_keypair().unwrap();
let kp2 = NodeKeyConfig::Ed25519(Secret::Input(sk)).into_keypair().unwrap();
assert!(secret_bytes(&kp1) == secret_bytes(&kp2));
}
#[test]
fn test_secret_new() {
let kp1 = NodeKeyConfig::Ed25519(Secret::New).into_keypair().unwrap();
let kp2 = NodeKeyConfig::Ed25519(Secret::New).into_keypair().unwrap();
assert!(secret_bytes(&kp1) != secret_bytes(&kp2));
}
}
+3 -2
View File
@@ -46,6 +46,8 @@
//! active mechanism that asks nodes for the addresses they are listening on. Whenever we learn
//! of a node's address, you must call `add_self_reported_address`.
use crate::{config::ProtocolId, utils::LruHashSet};
use array_bytes::bytes2hex;
use futures::prelude::*;
use futures_timer::Delay;
@@ -73,7 +75,6 @@ use libp2p::{
},
};
use log::{debug, info, trace, warn};
use sc_network_common::{config::ProtocolId, utils::LruHashSet};
use sp_core::hexdisplay::HexDisplay;
use std::{
cmp,
@@ -904,6 +905,7 @@ mod tests {
use super::{
kademlia_protocol_name, legacy_kademlia_protocol_name, DiscoveryConfig, DiscoveryOut,
};
use crate::config::ProtocolId;
use futures::prelude::*;
use libp2p::{
core::{
@@ -915,7 +917,6 @@ mod tests {
swarm::{Executor, Swarm, SwarmEvent},
yamux, Multiaddr,
};
use sc_network_common::config::ProtocolId;
use sp_core::hash::H256;
use std::{collections::HashSet, pin::Pin, task::Poll};
@@ -18,7 +18,8 @@
//! Substrate network possible errors.
use crate::{config::TransportConfig, protocol::ProtocolName};
use crate::{config::TransportConfig, types::ProtocolName};
use libp2p::{Multiaddr, PeerId};
use std::fmt;
@@ -19,11 +19,13 @@
//! Network event types. These are are not the part of the protocol, but rather
//! events that happen on the network like DHT get/put results received.
use super::ProtocolName;
use crate::protocol::role::ObservedRole;
use crate::types::ProtocolName;
use bytes::Bytes;
use libp2p::{core::PeerId, kad::record::Key};
use sc_network_common::role::ObservedRole;
/// Events generated by DHT as a response to get_value and put_value requests.
#[derive(Debug, Clone)]
#[must_use]
+18 -14
View File
@@ -248,36 +248,40 @@ mod behaviour;
mod discovery;
mod peer_info;
mod protocol;
mod request_responses;
mod service;
mod transport;
pub mod config;
pub mod error;
pub mod event;
pub mod network_state;
pub mod request_responses;
pub mod types;
pub mod utils;
pub use event::{DhtEvent, Event};
#[doc(inline)]
pub use libp2p::{multiaddr, Multiaddr, PeerId};
pub use request_responses::{IfDisconnected, RequestFailure, RequestResponseConfig};
pub use sc_network_common::{
protocol::{
event::{DhtEvent, Event},
role::ObservedRole,
ProtocolName,
},
request_responses::{IfDisconnected, RequestFailure},
service::{
KademliaKey, NetworkBlock, NetworkDHTProvider, NetworkRequest, NetworkSigner,
NetworkStateInfo, NetworkStatus, NetworkStatusProvider, NetworkSyncForkRequest, Signature,
SigningError,
},
role::ObservedRole,
sync::{
warp::{WarpSyncPhase, WarpSyncProgress},
ExtendedPeerInfo, StateDownloadProgress, SyncEventStream, SyncState, SyncStatusProvider,
},
};
pub use service::{
DecodingError, Keypair, NetworkService, NetworkWorker, NotificationSender,
NotificationSenderReady, OutboundFailure, PublicKey,
signature::Signature,
traits::{
KademliaKey, NetworkBlock, NetworkDHTProvider, NetworkEventStream, NetworkNotification,
NetworkPeers, NetworkRequest, NetworkSigner, NetworkStateInfo, NetworkStatus,
NetworkStatusProvider, NetworkSyncForkRequest, NotificationSender as NotificationSenderT,
NotificationSenderError, NotificationSenderReady,
},
DecodingError, Keypair, NetworkService, NetworkWorker, NotificationSender, OutboundFailure,
PublicKey,
};
pub use types::ProtocolName;
pub use sc_peerset::ReputationChange;
+3 -1
View File
@@ -16,6 +16,8 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::utils::interval;
use fnv::FnvHashMap;
use futures::prelude::*;
use libp2p::{
@@ -36,8 +38,8 @@ use libp2p::{
Multiaddr,
};
use log::{debug, error, trace};
use sc_network_common::utils::interval;
use smallvec::SmallVec;
use std::{
collections::hash_map::Entry,
pin::Pin,
+14 -12
View File
@@ -16,7 +16,11 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::config;
use crate::{
config::{self, NonReservedPeerMode},
error,
types::ProtocolName,
};
use bytes::Bytes;
use codec::{DecodeAll, Encode};
@@ -29,27 +33,25 @@ use libp2p::{
Multiaddr, PeerId,
};
use log::{debug, error, warn};
use message::{generic::Message as GenericMessage, Message};
use notifications::{Notifications, NotificationsOut};
use sc_network_common::{
config::NonReservedPeerMode,
error,
protocol::{role::Roles, ProtocolName},
sync::message::BlockAnnouncesHandshake,
};
use sc_network_common::{role::Roles, sync::message::BlockAnnouncesHandshake};
use sp_runtime::traits::Block as BlockT;
use std::{
collections::{HashMap, HashSet, VecDeque},
iter,
task::Poll,
};
use message::{generic::Message as GenericMessage, Message};
use notifications::{Notifications, NotificationsOut};
pub use notifications::{NotificationsSink, NotifsHandlerError, Ready};
mod notifications;
pub mod message;
pub use notifications::{NotificationsSink, NotifsHandlerError, Ready};
/// Maximum size used for notifications in the block announce and transaction protocols.
// Must be equal to `max(MAX_BLOCK_ANNOUNCE_SIZE, MAX_TRANSACTIONS_SIZE)`.
pub(crate) const BLOCK_ANNOUNCES_TRANSACTIONS_SUBSTREAM_SIZE: u64 = 16 * 1024 * 1024;
@@ -93,7 +95,7 @@ impl<B: BlockT> Protocol<B> {
pub fn new(
roles: Roles,
network_config: &config::NetworkConfiguration,
block_announces_protocol: sc_network_common::config::NonDefaultSetConfig,
block_announces_protocol: config::NonDefaultSetConfig,
) -> error::Result<(Self, sc_peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> {
let mut known_addresses = Vec::new();
@@ -61,7 +61,7 @@ pub mod generic {
use sc_client_api::StorageProof;
use sc_network_common::{
message::RequestId,
protocol::role::Roles,
role::Roles,
sync::message::{
generic::{BlockRequest, BlockResponse},
BlockAnnounce,
@@ -16,8 +16,11 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::protocol::notifications::handler::{
self, NotificationsSink, NotifsHandlerIn, NotifsHandlerOut, NotifsHandlerProto,
use crate::{
protocol::notifications::handler::{
self, NotificationsSink, NotifsHandlerIn, NotifsHandlerOut, NotifsHandlerProto,
},
types::ProtocolName,
};
use bytes::BytesMut;
@@ -35,7 +38,6 @@ use libp2p::{
use log::{error, trace, warn};
use parking_lot::RwLock;
use rand::distributions::{Distribution as _, Uniform};
use sc_network_common::protocol::ProtocolName;
use sc_peerset::DropReason;
use smallvec::SmallVec;
use std::{
@@ -57,9 +57,12 @@
//! It is illegal to send a [`NotifsHandlerIn::Open`] before a previously-emitted
//! [`NotifsHandlerIn::Open`] has gotten an answer.
use crate::protocol::notifications::upgrade::{
NotificationsIn, NotificationsInSubstream, NotificationsOut, NotificationsOutSubstream,
UpgradeCollec,
use crate::{
protocol::notifications::upgrade::{
NotificationsIn, NotificationsInSubstream, NotificationsOut, NotificationsOutSubstream,
UpgradeCollec,
},
types::ProtocolName,
};
use bytes::BytesMut;
@@ -77,7 +80,6 @@ use libp2p::{
};
use log::error;
use parking_lot::{Mutex, RwLock};
use sc_network_common::protocol::ProtocolName;
use std::{
collections::VecDeque,
mem,
@@ -945,8 +947,9 @@ pub mod tests {
Poll::Ready(Some(NotificationsSinkMessage::Notification { message })) =>
Poll::Ready(Some(message)),
Poll::Pending => Poll::Ready(None),
Poll::Ready(Some(NotificationsSinkMessage::ForceClose)) | Poll::Ready(None) =>
panic!("sink closed"),
Poll::Ready(Some(NotificationsSinkMessage::ForceClose)) | Poll::Ready(None) => {
panic!("sink closed")
},
})
.await
}
@@ -103,8 +103,8 @@ impl<T: Future<Output = Result<O, E>>, O, E> Future for FutWithUsize<T> {
#[cfg(test)]
mod tests {
use super::*;
use crate::types::ProtocolName as ProtoName;
use libp2p::core::upgrade::{ProtocolName, UpgradeInfo};
use sc_network_common::protocol::ProtocolName as ProtoName;
// TODO: move to mocks
mockall::mock! {
@@ -16,7 +16,6 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use asynchronous_codec::Framed;
/// Notifications protocol.
///
/// The Substrate notifications protocol consists in the following:
@@ -35,11 +34,15 @@ use asynchronous_codec::Framed;
///
/// Notification substreams are unidirectional. If A opens a substream with B, then B is
/// encouraged but not required to open a substream to A as well.
use crate::types::ProtocolName;
use asynchronous_codec::Framed;
use bytes::BytesMut;
use futures::prelude::*;
use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use log::{error, warn};
use sc_network_common::protocol::ProtocolName;
use unsigned_varint::codec::UviBytes;
use std::{
convert::Infallible,
io, mem,
@@ -47,7 +50,6 @@ use std::{
task::{Context, Poll},
vec,
};
use unsigned_varint::codec::UviBytes;
/// Maximum allowed size of the two handshake messages, in bytes.
const MAX_HANDSHAKE_SIZE: usize = 1024;
+147 -13
View File
@@ -34,7 +34,8 @@
//! - If provided, a ["requests processing"](ProtocolConfig::inbound_queue) channel
//! is used to handle incoming requests.
use crate::ReputationChange;
use crate::{types::ProtocolName, ReputationChange};
use futures::{
channel::{mpsc, oneshot},
prelude::*,
@@ -43,7 +44,7 @@ use libp2p::{
core::{connection::ConnectionId, Multiaddr, PeerId},
request_response::{
handler::RequestResponseHandler, ProtocolSupport, RequestResponse, RequestResponseCodec,
RequestResponseConfig, RequestResponseEvent, RequestResponseMessage, ResponseChannel,
RequestResponseEvent, RequestResponseMessage, ResponseChannel,
},
swarm::{
behaviour::{ConnectionClosed, DialFailure, FromSwarm, ListenFailure},
@@ -52,12 +53,9 @@ use libp2p::{
PollParameters,
},
};
use sc_network_common::{
protocol::ProtocolName,
request_responses::{
IfDisconnected, IncomingRequest, OutgoingResponse, ProtocolConfig, RequestFailure,
},
};
use sc_peerset::{PeersetHandle, BANNED_THRESHOLD};
use std::{
collections::{hash_map::Entry, HashMap},
io, iter,
@@ -66,8 +64,138 @@ use std::{
time::{Duration, Instant},
};
pub use libp2p::request_response::{InboundFailure, OutboundFailure, RequestId};
use sc_peerset::{PeersetHandle, BANNED_THRESHOLD};
pub use libp2p::request_response::{
InboundFailure, OutboundFailure, RequestId, RequestResponseConfig,
};
/// Error in a request.
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
pub enum RequestFailure {
#[error("We are not currently connected to the requested peer.")]
NotConnected,
#[error("Given protocol hasn't been registered.")]
UnknownProtocol,
#[error("Remote has closed the substream before answering, thereby signaling that it considers the request as valid, but refused to answer it.")]
Refused,
#[error("The remote replied, but the local node is no longer interested in the response.")]
Obsolete,
#[error("Problem on the network: {0}")]
Network(OutboundFailure),
}
/// Configuration for a single request-response protocol.
#[derive(Debug, Clone)]
pub struct ProtocolConfig {
/// Name of the protocol on the wire. Should be something like `/foo/bar`.
pub name: ProtocolName,
/// Fallback on the wire protocol names to support.
pub fallback_names: Vec<ProtocolName>,
/// Maximum allowed size, in bytes, of a request.
///
/// Any request larger than this value will be declined as a way to avoid allocating too
/// much memory for it.
pub max_request_size: u64,
/// Maximum allowed size, in bytes, of a response.
///
/// Any response larger than this value will be declined as a way to avoid allocating too
/// much memory for it.
pub max_response_size: u64,
/// Duration after which emitted requests are considered timed out.
///
/// If you expect the response to come back quickly, you should set this to a smaller duration.
pub request_timeout: Duration,
/// Channel on which the networking service will send incoming requests.
///
/// Every time a peer sends a request to the local node using this protocol, the networking
/// service will push an element on this channel. The receiving side of this channel then has
/// to pull this element, process the request, and send back the response to send back to the
/// peer.
///
/// The size of the channel has to be carefully chosen. If the channel is full, the networking
/// service will discard the incoming request send back an error to the peer. Consequently,
/// the channel being full is an indicator that the node is overloaded.
///
/// You can typically set the size of the channel to `T / d`, where `T` is the
/// `request_timeout` and `d` is the expected average duration of CPU and I/O it takes to
/// build a response.
///
/// Can be `None` if the local node does not support answering incoming requests.
/// If this is `None`, then the local node will not advertise support for this protocol towards
/// other peers. If this is `Some` but the channel is closed, then the local node will
/// advertise support for this protocol, but any incoming request will lead to an error being
/// sent back.
pub inbound_queue: Option<mpsc::Sender<IncomingRequest>>,
}
/// A single request received by a peer on a request-response protocol.
#[derive(Debug)]
pub struct IncomingRequest {
/// Who sent the request.
pub peer: PeerId,
/// Request sent by the remote. Will always be smaller than
/// [`ProtocolConfig::max_request_size`].
pub payload: Vec<u8>,
/// Channel to send back the response.
///
/// There are two ways to indicate that handling the request failed:
///
/// 1. Drop `pending_response` and thus not changing the reputation of the peer.
///
/// 2. Sending an `Err(())` via `pending_response`, optionally including reputation changes for
/// the given peer.
pub pending_response: oneshot::Sender<OutgoingResponse>,
}
/// Response for an incoming request to be send by a request protocol handler.
#[derive(Debug)]
pub struct OutgoingResponse {
/// The payload of the response.
///
/// `Err(())` if none is available e.g. due an error while handling the request.
pub result: Result<Vec<u8>, ()>,
/// Reputation changes accrued while handling the request. To be applied to the reputation of
/// the peer sending the request.
pub reputation_changes: Vec<ReputationChange>,
/// If provided, the `oneshot::Sender` will be notified when the request has been sent to the
/// peer.
///
/// > **Note**: Operating systems typically maintain a buffer of a few dozen kilobytes of
/// > outgoing data for each TCP socket, and it is not possible for a user
/// > application to inspect this buffer. This channel here is not actually notified
/// > when the response has been fully sent out, but rather when it has fully been
/// > written to the buffer managed by the operating system.
pub sent_feedback: Option<oneshot::Sender<()>>,
}
/// When sending a request, what to do on a disconnected recipient.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum IfDisconnected {
/// Try to connect to the peer.
TryConnect,
/// Just fail if the destination is not yet connected.
ImmediateError,
}
/// Convenience functions for `IfDisconnected`.
impl IfDisconnected {
/// Shall we connect to a disconnected peer?
pub fn should_connect(self) -> bool {
match self {
Self::TryConnect => true,
Self::ImmediateError => false,
}
}
}
/// Event generated by the [`RequestResponsesBehaviour`].
#[derive(Debug)]
@@ -103,7 +231,12 @@ pub enum Event {
},
/// A request protocol handler issued reputation changes for the given peer.
ReputationChanges { peer: PeerId, changes: Vec<ReputationChange> },
ReputationChanges {
/// Peer whose reputation needs to be adjust.
peer: PeerId,
/// Reputation changes.
changes: Vec<ReputationChange>,
},
}
/// Combination of a protocol name and a request id.
@@ -344,7 +477,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
)
}
},
FromSwarm::DialFailure(DialFailure { peer_id, error, handler }) =>
FromSwarm::DialFailure(DialFailure { peer_id, error, handler }) => {
for (p_name, p_handler) in handler.into_iter() {
if let Some((proto, _)) = self.protocols.get_mut(p_name.as_str()) {
proto.on_swarm_event(FromSwarm::DialFailure(DialFailure {
@@ -359,7 +492,8 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
p_name,
)
}
},
}
},
FromSwarm::ListenerClosed(e) =>
for (p, _) in self.protocols.values_mut() {
NetworkBehaviour::on_swarm_event(p, FromSwarm::ListenerClosed(e));
+28 -26
View File
@@ -29,13 +29,27 @@
use crate::{
behaviour::{self, Behaviour, BehaviourOut},
config::Params,
config::{MultiaddrWithPeerId, Params, TransportConfig},
discovery::DiscoveryConfig,
error::Error,
event::{DhtEvent, Event},
network_state::{
NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
},
protocol::{self, NotificationsSink, NotifsHandlerError, Protocol, Ready},
transport, ReputationChange,
request_responses::{IfDisconnected, RequestFailure},
service::{
signature::{Signature, SigningError},
traits::{
NetworkDHTProvider, NetworkEventStream, NetworkNotification, NetworkPeers,
NetworkRequest, NetworkSigner, NetworkStateInfo, NetworkStatus, NetworkStatusProvider,
NotificationSender as NotificationSenderT, NotificationSenderError,
NotificationSenderReady as NotificationSenderReadyT,
},
},
transport,
types::ProtocolName,
ReputationChange,
};
use futures::{channel::oneshot, prelude::*};
@@ -55,26 +69,13 @@ use libp2p::{
use log::{debug, error, info, trace, warn};
use metrics::{Histogram, HistogramVec, MetricSources, Metrics};
use parking_lot::Mutex;
use sc_network_common::{
config::{MultiaddrWithPeerId, TransportConfig},
error::Error,
protocol::{
event::{DhtEvent, Event},
ProtocolName,
},
request_responses::{IfDisconnected, RequestFailure},
service::{
NetworkDHTProvider, NetworkEventStream, NetworkNotification, NetworkPeers, NetworkSigner,
NetworkStateInfo, NetworkStatus, NetworkStatusProvider,
NotificationSender as NotificationSenderT, NotificationSenderError,
NotificationSenderReady as NotificationSenderReadyT, Signature, SigningError,
},
ExHashT,
};
use sc_network_common::ExHashT;
use sc_peerset::PeersetHandle;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_blockchain::HeaderBackend;
use sp_runtime::traits::{Block as BlockT, Zero};
use std::{
cmp,
collections::{HashMap, HashSet},
@@ -90,14 +91,13 @@ use std::{
};
pub use behaviour::{InboundFailure, OutboundFailure, ResponseFailure};
pub use libp2p::identity::{error::DecodingError, Keypair, PublicKey};
mod metrics;
mod out_events;
#[cfg(test)]
mod tests;
pub use libp2p::identity::{error::DecodingError, Keypair, PublicKey};
use sc_network_common::service::NetworkRequest;
pub mod signature;
pub mod traits;
/// Custom error that can be produced by the [`ConnectionHandler`] of the [`NetworkBehaviour`].
/// Used as a template parameter of [`SwarmEvent`] below.
@@ -1432,10 +1432,11 @@ where
},
}
},
SwarmEvent::Behaviour(BehaviourOut::ReputationChanges { peer, changes }) =>
SwarmEvent::Behaviour(BehaviourOut::ReputationChanges { peer, changes }) => {
for change in changes {
self.network_service.behaviour().user_protocol().report_peer(peer, change);
},
}
},
SwarmEvent::Behaviour(BehaviourOut::PeerIdentify {
peer_id,
info:
@@ -1467,10 +1468,11 @@ where
.user_protocol_mut()
.add_default_set_discovered_nodes(iter::once(peer_id));
},
SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted) =>
SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted) => {
if let Some(metrics) = self.metrics.as_ref() {
metrics.kademlia_random_queries_total.inc();
},
}
},
SwarmEvent::Behaviour(BehaviourOut::NotificationStreamOpened {
remote,
protocol,
@@ -31,11 +31,12 @@
//! - Send events by calling [`OutChannels::send`]. Events are cloned for each sender in the
//! collection.
use crate::event::Event;
use futures::{prelude::*, ready, stream::FusedStream};
use log::error;
use parking_lot::Mutex;
use prometheus_endpoint::{register, CounterVec, GaugeVec, Opts, PrometheusError, Registry, U64};
use sc_network_common::protocol::event::Event;
use std::{
backtrace::Backtrace,
cell::RefCell,
@@ -19,10 +19,12 @@
// If you read this, you are very thorough, congratulations.
use libp2p::{
identity::{error::SigningError, Keypair, PublicKey},
identity::{Keypair, PublicKey},
PeerId,
};
pub use libp2p::identity::error::SigningError;
/// A result of signing a message with a network identity. Since `PeerId` is potentially a hash of a
/// `PublicKey`, you need to reveal the `PublicKey` next to the signature, so the verifier can check
/// if the signature was made by the entity that controls a given `PeerId`.
@@ -1,240 +0,0 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::{config, NetworkService, NetworkWorker};
use futures::prelude::*;
use libp2p::Multiaddr;
use sc_consensus::{ImportQueue, Link};
use sc_network_common::{
config::{NonDefaultSetConfig, ProtocolId, SetConfig, TransportConfig},
protocol::{event::Event, role::Roles},
service::NetworkEventStream,
};
use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
use sc_network_sync::{
block_request_handler::BlockRequestHandler,
engine::SyncingEngine,
service::network::{NetworkServiceHandle, NetworkServiceProvider},
state_request_handler::StateRequestHandler,
};
use sp_runtime::traits::Block as BlockT;
use std::sync::Arc;
use substrate_test_runtime_client::{
runtime::{Block as TestBlock, Hash as TestHash},
TestClientBuilder, TestClientBuilderExt as _,
};
#[cfg(test)]
mod service;
type TestNetworkWorker = NetworkWorker<TestBlock, TestHash>;
type TestNetworkService = NetworkService<TestBlock, TestHash>;
const PROTOCOL_NAME: &str = "/foo";
struct TestNetwork {
network: TestNetworkWorker,
}
impl TestNetwork {
pub fn new(network: TestNetworkWorker) -> Self {
Self { network }
}
pub fn start_network(
self,
) -> (Arc<TestNetworkService>, (impl Stream<Item = Event> + std::marker::Unpin)) {
let worker = self.network;
let service = worker.service().clone();
let event_stream = service.event_stream("test");
tokio::spawn(worker.run());
(service, event_stream)
}
}
struct TestNetworkBuilder {
import_queue: Option<Box<dyn ImportQueue<TestBlock>>>,
link: Option<Box<dyn Link<TestBlock>>>,
client: Option<Arc<substrate_test_runtime_client::TestClient>>,
listen_addresses: Vec<Multiaddr>,
set_config: Option<SetConfig>,
chain_sync_network: Option<(NetworkServiceProvider, NetworkServiceHandle)>,
config: Option<config::NetworkConfiguration>,
}
impl TestNetworkBuilder {
pub fn new() -> Self {
Self {
import_queue: None,
link: None,
client: None,
listen_addresses: Vec::new(),
set_config: None,
chain_sync_network: None,
config: None,
}
}
pub fn with_config(mut self, config: config::NetworkConfiguration) -> Self {
self.config = Some(config);
self
}
pub fn with_listen_addresses(mut self, addresses: Vec<Multiaddr>) -> Self {
self.listen_addresses = addresses;
self
}
pub fn with_set_config(mut self, set_config: SetConfig) -> Self {
self.set_config = Some(set_config);
self
}
pub fn build(mut self) -> TestNetwork {
let client = self.client.as_mut().map_or(
Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0),
|v| v.clone(),
);
let network_config = self.config.unwrap_or(config::NetworkConfiguration {
extra_sets: vec![NonDefaultSetConfig {
notifications_protocol: PROTOCOL_NAME.into(),
fallback_names: Vec::new(),
max_notification_size: 1024 * 1024,
handshake: None,
set_config: self.set_config.unwrap_or_default(),
}],
listen_addresses: self.listen_addresses,
transport: TransportConfig::MemoryOnly,
..config::NetworkConfiguration::new_local()
});
#[derive(Clone)]
struct PassThroughVerifier(bool);
#[async_trait::async_trait]
impl<B: BlockT> sc_consensus::Verifier<B> for PassThroughVerifier {
async fn verify(
&mut self,
mut block: sc_consensus::BlockImportParams<B, ()>,
) -> Result<sc_consensus::BlockImportParams<B, ()>, String> {
block.finalized = self.0;
block.fork_choice = Some(sc_consensus::ForkChoiceStrategy::LongestChain);
Ok(block)
}
}
let mut import_queue =
self.import_queue.unwrap_or(Box::new(sc_consensus::BasicQueue::new(
PassThroughVerifier(false),
Box::new(client.clone()),
None,
&sp_core::testing::TaskExecutor::new(),
None,
)));
let protocol_id = ProtocolId::from("test-protocol-name");
let fork_id = Some(String::from("test-fork-id"));
let block_request_protocol_config = {
let (handler, protocol_config) =
BlockRequestHandler::new(&protocol_id, None, client.clone(), 50);
tokio::spawn(handler.run().boxed());
protocol_config
};
let state_request_protocol_config = {
let (handler, protocol_config) =
StateRequestHandler::new(&protocol_id, None, client.clone(), 50);
tokio::spawn(handler.run().boxed());
protocol_config
};
let light_client_request_protocol_config = {
let (handler, protocol_config) =
LightClientRequestHandler::new(&protocol_id, None, client.clone());
tokio::spawn(handler.run().boxed());
protocol_config
};
let (chain_sync_network_provider, chain_sync_network_handle) =
self.chain_sync_network.unwrap_or(NetworkServiceProvider::new());
let (engine, chain_sync_service, block_announce_config) = SyncingEngine::new(
Roles::from(&config::Role::Full),
client.clone(),
None,
&network_config,
protocol_id.clone(),
&None,
Box::new(sp_consensus::block_validation::DefaultBlockAnnounceValidator),
None,
chain_sync_network_handle,
import_queue.service(),
block_request_protocol_config.name.clone(),
state_request_protocol_config.name.clone(),
None,
)
.unwrap();
let mut link = self.link.unwrap_or(Box::new(chain_sync_service.clone()));
let worker = NetworkWorker::<
substrate_test_runtime_client::runtime::Block,
substrate_test_runtime_client::runtime::Hash,
>::new(config::Params {
block_announce_config,
role: config::Role::Full,
executor: Box::new(|f| {
tokio::spawn(f);
}),
network_config,
chain: client.clone(),
protocol_id,
fork_id,
metrics_registry: None,
request_response_protocol_configs: [
block_request_protocol_config,
state_request_protocol_config,
light_client_request_protocol_config,
]
.to_vec(),
})
.unwrap();
let service = worker.service().clone();
tokio::spawn(async move {
let _ = chain_sync_network_provider.run(service).await;
});
tokio::spawn(async move {
loop {
futures::future::poll_fn(|cx| {
import_queue.poll_actions(cx, &mut *link);
std::task::Poll::Ready(())
})
.await;
tokio::time::sleep(std::time::Duration::from_millis(250)).await;
}
});
let stream = worker.service().event_stream("syncing");
tokio::spawn(engine.run(stream));
TestNetwork::new(worker)
}
}
@@ -20,17 +20,20 @@
use crate::{
config::MultiaddrWithPeerId,
protocol::{event::Event, ProtocolName},
event::Event,
request_responses::{IfDisconnected, RequestFailure},
service::signature::Signature,
types::ProtocolName,
};
use futures::{channel::oneshot, Stream};
pub use libp2p::{identity::error::SigningError, kad::record::Key as KademliaKey};
use libp2p::{Multiaddr, PeerId};
use sc_peerset::ReputationChange;
pub use signature::Signature;
use std::{collections::HashSet, future::Future, pin::Pin, sync::Arc};
mod signature;
pub use libp2p::{identity::error::SigningError, kad::record::Key as KademliaKey};
/// Signer with network identity
pub trait NetworkSigner {
@@ -16,6 +16,10 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! `sc-network` type definitions
use libp2p::core::upgrade;
use std::{
borrow::Borrow,
fmt,
@@ -24,11 +28,6 @@ use std::{
sync::Arc,
};
use libp2p::core::upgrade;
pub mod event;
pub mod role;
/// The protocol name transmitted on the wire.
#[derive(Debug, Clone)]
pub enum ProtocolName {
@@ -16,9 +16,12 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! `sc-network` utilities
use futures::{stream::unfold, FutureExt, Stream, StreamExt};
use futures_timer::Delay;
use linked_hash_set::LinkedHashSet;
use std::{hash::Hash, num::NonZeroUsize, time::Duration};
/// Creates a stream that returns a new value every `duration`.
+1
View File
@@ -32,6 +32,7 @@ fork-tree = { version = "3.0.0", path = "../../../utils/fork-tree" }
prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../../utils/prometheus" }
sc-client-api = { version = "4.0.0-dev", path = "../../api" }
sc-consensus = { version = "0.10.0-dev", path = "../../consensus/common" }
sc-network = { version = "0.10.0-dev", path = "../" }
sc-network-common = { version = "0.10.0-dev", path = "../common" }
sc-peerset = { version = "4.0.0-dev", path = "../../peerset" }
sc-utils = { version = "4.0.0-dev", path = "../../utils" }
@@ -18,6 +18,7 @@
//! `crate::request_responses::RequestResponsesBehaviour`.
use crate::schema::v1::{block_request::FromBlock, BlockResponse, Direction};
use codec::{Decode, Encode};
use futures::{
channel::{mpsc, oneshot},
@@ -27,17 +28,19 @@ use libp2p::PeerId;
use log::debug;
use lru::LruCache;
use prost::Message;
use sc_client_api::BlockBackend;
use sc_network_common::{
use sc_network::{
config::ProtocolId,
request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig},
sync::message::BlockAttributes,
};
use sc_network_common::sync::message::BlockAttributes;
use sp_blockchain::HeaderBackend;
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, Header, One, Zero},
};
use std::{
cmp::min,
hash::{Hash, Hasher},
+9 -5
View File
@@ -24,22 +24,27 @@ use crate::{
ChainSync, ClientError, SyncingService,
};
use codec::{Decode, DecodeAll, Encode};
use futures::{FutureExt, Stream, StreamExt};
use futures_timer::Delay;
use libp2p::PeerId;
use lru::LruCache;
use prometheus_endpoint::{
register, Gauge, GaugeVec, MetricSource, Opts, PrometheusError, Registry, SourcedGauge, U64,
};
use codec::{Decode, DecodeAll, Encode};
use futures_timer::Delay;
use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider};
use sc_consensus::import_queue::ImportQueueService;
use sc_network_common::{
use sc_network::{
config::{
NetworkConfiguration, NonDefaultSetConfig, ProtocolId, SyncMode as SyncOperationMode,
},
protocol::{event::Event, role::Roles, ProtocolName},
event::Event,
utils::LruHashSet,
ProtocolName,
};
use sc_network_common::{
role::Roles,
sync::{
message::{
generic::{BlockData, BlockResponse},
@@ -49,7 +54,6 @@ use sc_network_common::{
BadPeer, ChainSync as ChainSyncT, ExtendedPeerInfo, PollBlockAnnounceValidation, SyncEvent,
SyncMode,
},
utils::LruHashSet,
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_blockchain::HeaderMetadata;
+25 -19
View File
@@ -28,23 +28,13 @@
//! the network, or whenever a block has been successfully verified, call the appropriate method in
//! order to update it.
pub mod block_request_handler;
pub mod blocks;
pub mod engine;
pub mod mock;
mod schema;
pub mod service;
pub mod state;
pub mod state_request_handler;
pub mod warp;
pub mod warp_request_handler;
use crate::{
blocks::BlockCollection,
schema::v1::{StateRequest, StateResponse},
state::StateSync,
warp::{WarpProofImportResult, WarpSync},
};
use codec::{Decode, DecodeAll, Encode};
use extra_requests::ExtraRequests;
use futures::{
@@ -52,18 +42,22 @@ use futures::{
};
use libp2p::{request_response::OutboundFailure, PeerId};
use log::{debug, error, info, trace, warn};
use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
use prost::Message;
use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
use sc_client_api::{BlockBackend, ProofProvider};
use sc_consensus::{
import_queue::ImportQueueService, BlockImportError, BlockImportStatus, IncomingBlock,
};
use sc_network_common::{
use sc_network::{
config::{
NonDefaultSetConfig, NonReservedPeerMode, NotificationHandshake, ProtocolId, SetConfig,
},
protocol::{role::Roles, ProtocolName},
request_responses::{IfDisconnected, RequestFailure},
types::ProtocolName,
};
use sc_network_common::{
role::Roles,
sync::{
message::{
BlockAnnounce, BlockAnnouncesHandshake, BlockAttributes, BlockData, BlockRequest,
@@ -76,7 +70,6 @@ use sc_network_common::{
SyncState, SyncStatus,
},
};
pub use service::chain_sync::SyncingService;
use sp_arithmetic::traits::Saturating;
use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata};
use sp_consensus::{
@@ -90,6 +83,7 @@ use sp_runtime::{
},
EncodedJustification, Justifications,
};
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
iter,
@@ -97,9 +91,21 @@ use std::{
pin::Pin,
sync::Arc,
};
use warp::TargetBlockImportResult;
pub use service::chain_sync::SyncingService;
mod extra_requests;
mod schema;
pub mod block_request_handler;
pub mod blocks;
pub mod engine;
pub mod mock;
pub mod service;
pub mod state;
pub mod state_request_handler;
pub mod warp;
pub mod warp_request_handler;
/// Maximum blocks to request in a single packet.
const MAX_BLOCKS_TO_REQUEST: usize = 64;
@@ -927,9 +933,9 @@ where
match warp_sync.import_target_block(
blocks.pop().expect("`blocks` len checked above."),
) {
TargetBlockImportResult::Success =>
warp::TargetBlockImportResult::Success =>
return Ok(OnBlockData::Continue),
TargetBlockImportResult::BadResponse =>
warp::TargetBlockImportResult::BadResponse =>
return Err(BadPeer(*who, rep::VERIFICATION_FAIL)),
}
} else if blocks.is_empty() {
@@ -3160,7 +3166,7 @@ mod test {
use futures::{executor::block_on, future::poll_fn};
use sc_block_builder::BlockBuilderProvider;
use sc_network_common::{
protocol::role::Role,
role::Role,
sync::message::{BlockData, BlockState, FromBlock},
};
use sp_blockchain::HeaderBackend;
@@ -20,9 +20,9 @@ use futures::{channel::oneshot, Stream};
use libp2p::PeerId;
use sc_consensus::{BlockImportError, BlockImportStatus, JustificationSyncLink, Link};
use sc_network_common::{
service::{NetworkBlock, NetworkSyncForkRequest},
sync::{ExtendedPeerInfo, SyncEvent, SyncEventStream, SyncStatus, SyncStatusProvider},
use sc_network::{NetworkBlock, NetworkSyncForkRequest};
use sc_network_common::sync::{
ExtendedPeerInfo, SyncEvent, SyncEventStream, SyncStatus, SyncStatusProvider,
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
use sp_runtime::traits::{Block as BlockT, NumberFor};
@@ -18,18 +18,18 @@
use futures::channel::oneshot;
use libp2p::{Multiaddr, PeerId};
use sc_consensus::{BlockImportError, BlockImportStatus};
use sc_network_common::{
use sc_network::{
config::MultiaddrWithPeerId,
protocol::ProtocolName,
request_responses::{IfDisconnected, RequestFailure},
service::{
NetworkNotification, NetworkPeers, NetworkRequest, NetworkSyncForkRequest,
NotificationSender, NotificationSenderError,
},
types::ProtocolName,
NetworkNotification, NetworkPeers, NetworkRequest, NetworkSyncForkRequest,
NotificationSenderError, NotificationSenderT,
};
use sc_peerset::ReputationChange;
use sp_runtime::traits::{Block as BlockT, NumberFor};
use std::collections::HashSet;
mockall::mock! {
@@ -135,7 +135,7 @@ mockall::mock! {
&self,
target: PeerId,
protocol: ProtocolName,
) -> Result<Box<dyn NotificationSender>, NotificationSenderError>;
) -> Result<Box<dyn NotificationSenderT>, NotificationSenderError>;
fn set_notification_handshake(&self, protocol: ProtocolName, handshake: Vec<u8>);
}
}
@@ -18,13 +18,15 @@
use futures::{channel::oneshot, StreamExt};
use libp2p::PeerId;
use sc_network_common::{
protocol::ProtocolName,
use sc_network::{
request_responses::{IfDisconnected, RequestFailure},
service::{NetworkNotification, NetworkPeers, NetworkRequest},
types::ProtocolName,
NetworkNotification, NetworkPeers, NetworkRequest,
};
use sc_peerset::ReputationChange;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use std::sync::Arc;
/// Network-related services required by `sc-network-sync`
@@ -18,6 +18,7 @@
//! `crate::request_responses::RequestResponsesBehaviour`.
use crate::schema::v1::{KeyValueStateEntry, StateEntry, StateRequest, StateResponse};
use codec::{Decode, Encode};
use futures::{
channel::{mpsc, oneshot},
@@ -27,12 +28,14 @@ use libp2p::PeerId;
use log::{debug, trace};
use lru::LruCache;
use prost::Message;
use sc_client_api::{BlockBackend, ProofProvider};
use sc_network_common::{
use sc_network::{
config::ProtocolId,
request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig},
};
use sp_runtime::traits::Block as BlockT;
use std::{
hash::{Hash, Hasher},
num::NonZeroUsize,
@@ -22,14 +22,16 @@ use futures::{
stream::StreamExt,
};
use log::debug;
use sc_network_common::{
use sc_network::{
config::ProtocolId,
request_responses::{
IncomingRequest, OutgoingResponse, ProtocolConfig as RequestResponseConfig,
},
sync::warp::{EncodedProof, WarpProofRequest, WarpSyncProvider},
};
use sc_network_common::sync::warp::{EncodedProof, WarpProofRequest, WarpSyncProvider};
use sp_runtime::traits::Block as BlockT;
use std::{sync::Arc, time::Duration};
const MAX_RESPONSE_SIZE: u64 = 16 * 1024 * 1024;
+11 -5
View File
@@ -20,6 +20,8 @@
#[cfg(test)]
mod block_import;
#[cfg(test)]
mod service;
#[cfg(test)]
mod sync;
use std::{
@@ -46,14 +48,18 @@ use sc_consensus::{
ForkChoiceStrategy, ImportQueue, ImportResult, JustificationImport, JustificationSyncLink,
LongestChain, Verifier,
};
use sc_network::{Multiaddr, NetworkService, NetworkWorker};
use sc_network_common::{
use sc_network::{
config::{
MultiaddrWithPeerId, NetworkConfiguration, NonDefaultSetConfig, NonReservedPeerMode,
ProtocolId, RequestResponseConfig, Role, SyncMode, TransportConfig,
ProtocolId, Role, SyncMode, TransportConfig,
},
protocol::{role::Roles, ProtocolName},
service::{NetworkBlock, NetworkEventStream, NetworkStateInfo, NetworkSyncForkRequest},
request_responses::ProtocolConfig as RequestResponseConfig,
types::ProtocolName,
Multiaddr, NetworkBlock, NetworkEventStream, NetworkService, NetworkStateInfo,
NetworkSyncForkRequest, NetworkWorker,
};
use sc_network_common::{
role::Roles,
sync::warp::{
AuthorityList, EncodedProof, SetId, VerificationResult, WarpSyncParams, WarpSyncProvider,
},
@@ -16,24 +16,228 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::{config, service::tests::TestNetworkBuilder, NetworkService};
use futures::prelude::*;
use libp2p::PeerId;
use sc_network_common::{
config::{MultiaddrWithPeerId, NonDefaultSetConfig, SetConfig, TransportConfig},
protocol::event::Event,
service::{NetworkNotification, NetworkPeers, NetworkStateInfo},
use libp2p::{Multiaddr, PeerId};
use sc_consensus::{ImportQueue, Link};
use sc_network::{
config::{self, MultiaddrWithPeerId, ProtocolId, TransportConfig},
event::Event,
NetworkEventStream, NetworkNotification, NetworkPeers, NetworkService, NetworkStateInfo,
NetworkWorker,
};
use sc_network_common::role::Roles;
use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
use sc_network_sync::{
block_request_handler::BlockRequestHandler,
engine::SyncingEngine,
service::network::{NetworkServiceHandle, NetworkServiceProvider},
state_request_handler::StateRequestHandler,
};
use sp_runtime::traits::Block as BlockT;
use substrate_test_runtime_client::{
runtime::{Block as TestBlock, Hash as TestHash},
TestClientBuilder, TestClientBuilderExt as _,
};
use std::{sync::Arc, time::Duration};
type TestNetworkService = NetworkService<
substrate_test_runtime_client::runtime::Block,
substrate_test_runtime_client::runtime::Hash,
>;
type TestNetworkWorker = NetworkWorker<TestBlock, TestHash>;
type TestNetworkService = NetworkService<TestBlock, TestHash>;
const PROTOCOL_NAME: &str = "/foo";
struct TestNetwork {
network: TestNetworkWorker,
}
impl TestNetwork {
pub fn new(network: TestNetworkWorker) -> Self {
Self { network }
}
pub fn start_network(
self,
) -> (Arc<TestNetworkService>, (impl Stream<Item = Event> + std::marker::Unpin)) {
let worker = self.network;
let service = worker.service().clone();
let event_stream = service.event_stream("test");
tokio::spawn(worker.run());
(service, event_stream)
}
}
struct TestNetworkBuilder {
import_queue: Option<Box<dyn ImportQueue<TestBlock>>>,
link: Option<Box<dyn Link<TestBlock>>>,
client: Option<Arc<substrate_test_runtime_client::TestClient>>,
listen_addresses: Vec<Multiaddr>,
set_config: Option<config::SetConfig>,
chain_sync_network: Option<(NetworkServiceProvider, NetworkServiceHandle)>,
config: Option<config::NetworkConfiguration>,
}
impl TestNetworkBuilder {
pub fn new() -> Self {
Self {
import_queue: None,
link: None,
client: None,
listen_addresses: Vec::new(),
set_config: None,
chain_sync_network: None,
config: None,
}
}
pub fn with_config(mut self, config: config::NetworkConfiguration) -> Self {
self.config = Some(config);
self
}
pub fn with_listen_addresses(mut self, addresses: Vec<Multiaddr>) -> Self {
self.listen_addresses = addresses;
self
}
pub fn with_set_config(mut self, set_config: config::SetConfig) -> Self {
self.set_config = Some(set_config);
self
}
pub fn build(mut self) -> TestNetwork {
let client = self.client.as_mut().map_or(
Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0),
|v| v.clone(),
);
let network_config = self.config.unwrap_or(config::NetworkConfiguration {
extra_sets: vec![config::NonDefaultSetConfig {
notifications_protocol: PROTOCOL_NAME.into(),
fallback_names: Vec::new(),
max_notification_size: 1024 * 1024,
handshake: None,
set_config: self.set_config.unwrap_or_default(),
}],
listen_addresses: self.listen_addresses,
transport: TransportConfig::MemoryOnly,
..config::NetworkConfiguration::new_local()
});
#[derive(Clone)]
struct PassThroughVerifier(bool);
#[async_trait::async_trait]
impl<B: BlockT> sc_consensus::Verifier<B> for PassThroughVerifier {
async fn verify(
&mut self,
mut block: sc_consensus::BlockImportParams<B, ()>,
) -> Result<sc_consensus::BlockImportParams<B, ()>, String> {
block.finalized = self.0;
block.fork_choice = Some(sc_consensus::ForkChoiceStrategy::LongestChain);
Ok(block)
}
}
let mut import_queue =
self.import_queue.unwrap_or(Box::new(sc_consensus::BasicQueue::new(
PassThroughVerifier(false),
Box::new(client.clone()),
None,
&sp_core::testing::TaskExecutor::new(),
None,
)));
let protocol_id = ProtocolId::from("test-protocol-name");
let fork_id = Some(String::from("test-fork-id"));
let block_request_protocol_config = {
let (handler, protocol_config) =
BlockRequestHandler::new(&protocol_id, None, client.clone(), 50);
tokio::spawn(handler.run().boxed());
protocol_config
};
let state_request_protocol_config = {
let (handler, protocol_config) =
StateRequestHandler::new(&protocol_id, None, client.clone(), 50);
tokio::spawn(handler.run().boxed());
protocol_config
};
let light_client_request_protocol_config = {
let (handler, protocol_config) =
LightClientRequestHandler::new(&protocol_id, None, client.clone());
tokio::spawn(handler.run().boxed());
protocol_config
};
let (chain_sync_network_provider, chain_sync_network_handle) =
self.chain_sync_network.unwrap_or(NetworkServiceProvider::new());
let (engine, chain_sync_service, block_announce_config) = SyncingEngine::new(
Roles::from(&config::Role::Full),
client.clone(),
None,
&network_config,
protocol_id.clone(),
&None,
Box::new(sp_consensus::block_validation::DefaultBlockAnnounceValidator),
None,
chain_sync_network_handle,
import_queue.service(),
block_request_protocol_config.name.clone(),
state_request_protocol_config.name.clone(),
None,
)
.unwrap();
let mut link = self.link.unwrap_or(Box::new(chain_sync_service.clone()));
let worker = NetworkWorker::<
substrate_test_runtime_client::runtime::Block,
substrate_test_runtime_client::runtime::Hash,
>::new(config::Params {
block_announce_config,
role: config::Role::Full,
executor: Box::new(|f| {
tokio::spawn(f);
}),
network_config,
chain: client.clone(),
protocol_id,
fork_id,
metrics_registry: None,
request_response_protocol_configs: [
block_request_protocol_config,
state_request_protocol_config,
light_client_request_protocol_config,
]
.to_vec(),
})
.unwrap();
let service = worker.service().clone();
tokio::spawn(async move {
let _ = chain_sync_network_provider.run(service).await;
});
tokio::spawn(async move {
loop {
futures::future::poll_fn(|cx| {
import_queue.poll_actions(cx, &mut *link);
std::task::Poll::Ready(())
})
.await;
tokio::time::sleep(std::time::Duration::from_millis(250)).await;
}
});
let stream = worker.service().event_stream("syncing");
tokio::spawn(engine.run(stream));
TestNetwork::new(worker)
}
}
/// Builds two nodes and their associated events stream.
/// The nodes are connected together and have the `PROTOCOL_NAME` protocol registered.
fn build_nodes_one_proto() -> (
@@ -50,7 +254,7 @@ fn build_nodes_one_proto() -> (
.start_network();
let (node2, events_stream2) = TestNetworkBuilder::new()
.with_set_config(SetConfig {
.with_set_config(config::SetConfig {
reserved_nodes: vec![MultiaddrWithPeerId {
multiaddr: listen_addr,
peer_id: node1.local_peer_id(),
@@ -208,7 +412,7 @@ async fn lots_of_incoming_peers_works() {
let (main_node, _) = TestNetworkBuilder::new()
.with_listen_addresses(vec![listen_addr.clone()])
.with_set_config(SetConfig { in_peers: u32::MAX, ..Default::default() })
.with_set_config(config::SetConfig { in_peers: u32::MAX, ..Default::default() })
.build()
.start_network();
@@ -220,7 +424,7 @@ async fn lots_of_incoming_peers_works() {
for _ in 0..32 {
let (_dialing_node, event_stream) = TestNetworkBuilder::new()
.with_set_config(SetConfig {
.with_set_config(config::SetConfig {
reserved_nodes: vec![MultiaddrWithPeerId {
multiaddr: listen_addr.clone(),
peer_id: main_node_peer_id,
@@ -289,10 +493,11 @@ async fn notifications_back_pressure() {
while received_notifications < TOTAL_NOTIFS {
match events_stream2.next().await.unwrap() {
Event::NotificationStreamOpened { protocol, .. } =>
Event::NotificationStreamOpened { protocol, .. } => {
if let None = sync_protocol_name {
sync_protocol_name = Some(protocol);
},
}
},
Event::NotificationStreamClosed { protocol, .. } => {
if Some(&protocol) != sync_protocol_name.as_ref() {
panic!()
@@ -344,7 +549,7 @@ async fn fallback_name_working() {
let listen_addr = config::build_multiaddr![Memory(rand::random::<u64>())];
let (node1, mut events_stream1) = TestNetworkBuilder::new()
.with_config(config::NetworkConfiguration {
extra_sets: vec![NonDefaultSetConfig {
extra_sets: vec![config::NonDefaultSetConfig {
notifications_protocol: NEW_PROTOCOL_NAME.into(),
fallback_names: vec![PROTOCOL_NAME.into()],
max_notification_size: 1024 * 1024,
@@ -359,7 +564,7 @@ async fn fallback_name_working() {
.start_network();
let (_, mut events_stream2) = TestNetworkBuilder::new()
.with_set_config(SetConfig {
.with_set_config(config::SetConfig {
reserved_nodes: vec![MultiaddrWithPeerId {
multiaddr: listen_addr,
peer_id: node1.local_peer_id(),
@@ -500,7 +705,7 @@ async fn ensure_reserved_node_addresses_consistent_with_transport_memory() {
.with_config(config::NetworkConfiguration {
listen_addresses: vec![listen_addr.clone()],
transport: TransportConfig::MemoryOnly,
default_peers_set: SetConfig {
default_peers_set: config::SetConfig {
reserved_nodes: vec![reserved_node],
..Default::default()
},
@@ -527,7 +732,7 @@ async fn ensure_reserved_node_addresses_consistent_with_transport_not_memory() {
let _ = TestNetworkBuilder::new()
.with_config(config::NetworkConfiguration {
listen_addresses: vec![listen_addr.clone()],
default_peers_set: SetConfig {
default_peers_set: config::SetConfig {
reserved_nodes: vec![reserved_node],
..Default::default()
},
@@ -20,6 +20,7 @@ libp2p = "0.50.0"
log = "0.4.17"
pin-project = "1.0.12"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../../utils/prometheus" }
sc-network = { version = "0.10.0-dev", path = "../" }
sc-network-common = { version = "0.10.0-dev", path = "../common" }
sc-peerset = { version = "4.0.0-dev", path = "../../peerset" }
sc-utils = { version = "4.0.0-dev", path = "../../utils" }
@@ -27,22 +27,29 @@
//! `Future` that processes transactions.
use crate::config::*;
use codec::{Decode, Encode};
use futures::{prelude::*, stream::FuturesUnordered};
use libp2p::{multiaddr, PeerId};
use log::{debug, trace, warn};
use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
use sc_network_common::{
use sc_network::{
config::{NonDefaultSetConfig, NonReservedPeerMode, ProtocolId, SetConfig},
error,
protocol::{event::Event, role::ObservedRole, ProtocolName},
service::{NetworkEventStream, NetworkNotification, NetworkPeers},
sync::{SyncEvent, SyncEventStream},
event::Event,
types::ProtocolName,
utils::{interval, LruHashSet},
NetworkEventStream, NetworkNotification, NetworkPeers,
};
use sc_network_common::{
role::ObservedRole,
sync::{SyncEvent, SyncEventStream},
ExHashT,
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_runtime::traits::Block as BlockT;
use std::{
collections::{hash_map::Entry, HashMap},
iter,
+1
View File
@@ -29,6 +29,7 @@ rand = "0.8.5"
threadpool = "1.7"
tracing = "0.1.29"
sc-client-api = { version = "4.0.0-dev", path = "../api" }
sc-network = { version = "0.10.0-dev", path = "../network" }
sc-network-common = { version = "0.10.0-dev", path = "../network/common" }
sc-peerset = { version = "4.0.0-dev", path = "../peerset" }
sc-utils = { version = "4.0.0-dev", path = "../utils" }
+2 -4
View File
@@ -326,10 +326,8 @@ mod tests {
use super::*;
use libp2p::PeerId;
use sc_client_db::offchain::LocalStorage;
use sc_network_common::{
config::MultiaddrWithPeerId,
protocol::ProtocolName,
service::{NetworkPeers, NetworkStateInfo},
use sc_network::{
config::MultiaddrWithPeerId, types::ProtocolName, NetworkPeers, NetworkStateInfo,
};
use sc_peerset::ReputationChange;
use sp_core::offchain::{DbExternalities, Externalities};
+2 -2
View File
@@ -42,7 +42,7 @@ use futures::{
prelude::*,
};
use parking_lot::Mutex;
use sc_network_common::service::{NetworkPeers, NetworkStateInfo};
use sc_network::{NetworkPeers, NetworkStateInfo};
use sp_api::{ApiExt, ProvideRuntimeApi};
use sp_core::{offchain, traits::SpawnNamed, ExecutionContext};
use sp_runtime::traits::{self, Header};
@@ -246,7 +246,7 @@ mod tests {
use libp2p::{Multiaddr, PeerId};
use sc_block_builder::BlockBuilderProvider as _;
use sc_client_api::Backend as _;
use sc_network_common::{config::MultiaddrWithPeerId, protocol::ProtocolName};
use sc_network::{config::MultiaddrWithPeerId, types::ProtocolName};
use sc_peerset::ReputationChange;
use sc_transaction_pool::{BasicPool, FullChainApi};
use sc_transaction_pool_api::{InPoolTransaction, TransactionPool};
+1 -1
View File
@@ -99,7 +99,7 @@ fn api<T: Into<Option<Status>>>(sync: T) -> RpcModule<System<Block>> {
);
},
Request::NetworkAddReservedPeer(peer, sender) => {
let _ = match sc_network_common::config::parse_str_addr(&peer) {
let _ = match sc_network::config::parse_str_addr(&peer) {
Ok(_) => sender.send(Ok(())),
Err(s) =>
sender.send(Err(error::Error::MalformattedPeerArg(s.to_string()))),
+4 -7
View File
@@ -38,14 +38,11 @@ use sc_client_db::{Backend, DatabaseSettings};
use sc_consensus::import_queue::ImportQueue;
use sc_executor::RuntimeVersionOf;
use sc_keystore::LocalKeystore;
use sc_network::NetworkService;
use sc_network_bitswap::BitswapRequestHandler;
use sc_network_common::{
config::SyncMode,
protocol::role::Roles,
service::{NetworkEventStream, NetworkStateInfo, NetworkStatusProvider},
sync::warp::WarpSyncParams,
use sc_network::{
config::SyncMode, NetworkEventStream, NetworkService, NetworkStateInfo, NetworkStatusProvider,
};
use sc_network_bitswap::BitswapRequestHandler;
use sc_network_common::{role::Roles, sync::warp::WarpSyncParams};
use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
use sc_network_sync::{
block_request_handler::BlockRequestHandler, engine::SyncingEngine,
+3 -7
View File
@@ -22,22 +22,18 @@ pub use sc_client_api::execution_extensions::{ExecutionStrategies, ExecutionStra
pub use sc_client_db::{BlocksPruning, Database, DatabaseSource, PruningMode};
pub use sc_executor::{WasmExecutionMethod, WasmtimeInstantiationStrategy};
pub use sc_network::{
config::{NetworkConfiguration, Role},
Multiaddr,
};
pub use sc_network_common::{
config::{
MultiaddrWithPeerId, NodeKeyConfig, NonDefaultSetConfig, ProtocolId, SetConfig,
TransportConfig,
MultiaddrWithPeerId, NetworkConfiguration, NodeKeyConfig, NonDefaultSetConfig, ProtocolId,
Role, SetConfig, SyncMode, TransportConfig,
},
request_responses::{
IncomingRequest, OutgoingResponse, ProtocolConfig as RequestResponseConfig,
},
Multiaddr,
};
use prometheus_endpoint::Registry;
use sc_chain_spec::ChainSpec;
use sc_network_common::config::SyncMode;
pub use sc_telemetry::TelemetryEndpoints;
pub use sc_transaction_pool::Options as TransactionPoolOptions;
use sp_core::crypto::SecretString;
+1 -1
View File
@@ -40,7 +40,7 @@ pub enum Error {
Consensus(#[from] sp_consensus::Error),
#[error(transparent)]
Network(#[from] sc_network_common::error::Error),
Network(#[from] sc_network::error::Error),
#[error(transparent)]
Keystore(#[from] sc_keystore::Error),
+2 -4
View File
@@ -41,10 +41,8 @@ use futures::{channel::mpsc, pin_mut, FutureExt, StreamExt};
use jsonrpsee::{core::Error as JsonRpseeError, RpcModule};
use log::{debug, error, warn};
use sc_client_api::{blockchain::HeaderBackend, BlockBackend, BlockchainEvents, ProofProvider};
use sc_network::{NetworkStateInfo, PeerId};
use sc_network_common::{
config::MultiaddrWithPeerId,
service::{NetworkBlock, NetworkPeers},
use sc_network::{
config::MultiaddrWithPeerId, NetworkBlock, NetworkPeers, NetworkStateInfo, PeerId,
};
use sc_network_sync::SyncingService;
use sc_utils::mpsc::TracingUnboundedReceiver;
+2 -5
View File
@@ -22,11 +22,8 @@ use crate::config::Configuration;
use futures_timer::Delay;
use prometheus_endpoint::{register, Gauge, GaugeVec, Opts, PrometheusError, Registry, U64};
use sc_client_api::{ClientInfo, UsageProvider};
use sc_network::config::Role;
use sc_network_common::{
service::{NetworkStatus, NetworkStatusProvider},
sync::{SyncStatus, SyncStatusProvider},
};
use sc_network::{config::Role, NetworkStatus, NetworkStatusProvider};
use sc_network_common::sync::{SyncStatus, SyncStatusProvider};
use sc_telemetry::{telemetry, TelemetryHandle, SUBSTRATE_INFO};
use sc_transaction_pool_api::{MaintainedTransactionPool, PoolStatus};
use sc_utils::metrics::register_globals;
+3 -4
View File
@@ -22,10 +22,9 @@ use futures::{task::Poll, Future, TryFutureExt as _};
use log::{debug, info};
use parking_lot::Mutex;
use sc_client_api::{Backend, CallExecutor};
use sc_network::{config::NetworkConfiguration, multiaddr};
use sc_network_common::{
config::{MultiaddrWithPeerId, TransportConfig},
service::{NetworkBlock, NetworkPeers, NetworkStateInfo},
use sc_network::{
config::{MultiaddrWithPeerId, NetworkConfiguration, TransportConfig},
multiaddr, NetworkBlock, NetworkPeers, NetworkStateInfo,
};
use sc_network_sync::SyncingService;
use sc_service::{