Remove NetworkSpecialization (#4665)

* remove networkspecialization

* Fix most of the fallout

* get all tests compiling

Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>
This commit is contained in:
Robert Habermeier
2020-02-21 05:02:12 -08:00
committed by GitHub
parent e8000e7429
commit 0090fe979b
20 changed files with 136 additions and 503 deletions
+16 -17
View File
@@ -16,9 +16,8 @@
use crate::{
debug_info, discovery::DiscoveryBehaviour, discovery::DiscoveryOut, DiscoveryNetBehaviour,
Event, protocol::event::DhtEvent
Event, protocol::event::DhtEvent, ExHashT,
};
use crate::{ExHashT, specialization::NetworkSpecialization};
use crate::protocol::{self, light_client_handler, CustomMessageOutcome, Protocol};
use libp2p::NetworkBehaviour;
use libp2p::core::{Multiaddr, PeerId, PublicKey};
@@ -33,9 +32,9 @@ use void;
/// General behaviour of the network. Combines all protocols together.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "BehaviourOut<B>", poll_method = "poll")]
pub struct Behaviour<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
pub struct Behaviour<B: BlockT, H: ExHashT> {
/// All the substrate-specific protocols.
substrate: Protocol<B, S, H>,
substrate: Protocol<B, H>,
/// Periodically pings and identifies the nodes we are connected to, and store information in a
/// cache.
debug_info: debug_info::DebugInfoBehaviour,
@@ -58,10 +57,10 @@ pub enum BehaviourOut<B: BlockT> {
Event(Event),
}
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Behaviour<B, S, H> {
impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
/// Builds a new `Behaviour`.
pub async fn new(
substrate: Protocol<B, S, H>,
substrate: Protocol<B, H>,
user_agent: String,
local_public_key: PublicKey,
known_addresses: Vec<(PeerId, Multiaddr)>,
@@ -107,12 +106,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Behaviour<B, S, H> {
}
/// Returns a shared reference to the user protocol.
pub fn user_protocol(&self) -> &Protocol<B, S, H> {
pub fn user_protocol(&self) -> &Protocol<B, H> {
&self.substrate
}
/// Returns a mutable reference to the user protocol.
pub fn user_protocol_mut(&mut self) -> &mut Protocol<B, S, H> {
pub fn user_protocol_mut(&mut self) -> &mut Protocol<B, H> {
&mut self.substrate
}
@@ -133,15 +132,15 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Behaviour<B, S, H> {
}
}
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> NetworkBehaviourEventProcess<void::Void> for
Behaviour<B, S, H> {
impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<void::Void> for
Behaviour<B, H> {
fn inject_event(&mut self, event: void::Void) {
void::unreachable(event)
}
}
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> NetworkBehaviourEventProcess<CustomMessageOutcome<B>> for
Behaviour<B, S, H> {
impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<CustomMessageOutcome<B>> for
Behaviour<B, H> {
fn inject_event(&mut self, event: CustomMessageOutcome<B>) {
match event {
CustomMessageOutcome::BlockImport(origin, blocks) =>
@@ -174,8 +173,8 @@ Behaviour<B, S, H> {
}
}
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> NetworkBehaviourEventProcess<debug_info::DebugInfoEvent>
for Behaviour<B, S, H> {
impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<debug_info::DebugInfoEvent>
for Behaviour<B, H> {
fn inject_event(&mut self, event: debug_info::DebugInfoEvent) {
let debug_info::DebugInfoEvent::Identified { peer_id, mut info } = event;
if info.listen_addrs.len() > 30 {
@@ -192,8 +191,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> NetworkBehaviourEventPr
}
}
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> NetworkBehaviourEventProcess<DiscoveryOut>
for Behaviour<B, S, H> {
impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<DiscoveryOut>
for Behaviour<B, H> {
fn inject_event(&mut self, out: DiscoveryOut) {
match out {
DiscoveryOut::UnroutablePeer(_peer_id) => {
@@ -221,7 +220,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> NetworkBehaviourEventPr
}
}
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Behaviour<B, S, H> {
impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
fn poll<TEv>(&mut self, _: &mut Context, _: &mut impl PollParameters) -> Poll<NetworkBehaviourAction<TEv, BehaviourOut<B>>> {
if !self.events.is_empty() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)))
+1 -4
View File
@@ -43,7 +43,7 @@ use std::{error::Error, fs, io::{self, Write}, net::Ipv4Addr, path::{Path, PathB
use zeroize::Zeroize;
/// Network initialization parameters.
pub struct Params<B: BlockT, S, H: ExHashT> {
pub struct Params<B: BlockT, H: ExHashT> {
/// Assigned roles for our node (full, light, ...).
pub roles: Roles,
@@ -88,9 +88,6 @@ pub struct Params<B: BlockT, S, H: ExHashT> {
/// valid.
pub import_queue: Box<dyn ImportQueue<B>>,
/// Customization of the network. Use this to plug additional networking capabilities.
pub specialization: S,
/// Type to check incoming block announcements.
pub block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
}
+1 -9
View File
@@ -136,10 +136,6 @@
//! - Light-client requests. When a light client requires information, a random node we have a
//! substream open with is chosen, and the information is requested from it.
//! - Gossiping. Used for example by grandpa.
//! - Network specialization. The network protocol can be specialized through a template parameter
//! of the network service. This specialization is free to send and receive messages with the
//! remote. This is meant to be used by the chain that is being built on top of Substrate
//! (eg. Polkadot).
//!
//! It is intended that in the future each of these components gets more isolated, so that they
//! are free to open and close their own substreams, and so that syncing and light client requests
@@ -180,7 +176,7 @@ pub mod error;
pub mod network_state;
pub use service::{NetworkService, NetworkStateInfo, NetworkWorker, ExHashT, ReportHandle};
pub use protocol::{PeerInfo, Context, specialization};
pub use protocol::PeerInfo;
pub use protocol::event::{Event, DhtEvent};
pub use protocol::sync::SyncState;
pub use libp2p::{Multiaddr, PeerId};
@@ -196,10 +192,6 @@ pub use protocol::message::Status as StatusMessage;
pub use sc_peerset::ReputationChange;
// Used by the `construct_simple_protocol!` macro.
#[doc(hidden)]
pub use sp_runtime::traits::Block as BlockT;
/// Extension trait for `NetworkBehaviour` that also accepts discovering nodes.
trait DiscoveryNetBehaviour {
/// Notify the protocol that we have learned about the existence of nodes.
+8 -89
View File
@@ -38,7 +38,6 @@ use sp_arithmetic::traits::SaturatedConversion;
use message::{BlockAnnounce, BlockAttributes, Direction, FromBlock, Message, RequestId};
use message::generic::Message as GenericMessage;
use light_dispatch::{LightDispatch, LightDispatchNetwork, RequestData};
use specialization::NetworkSpecialization;
use sync::{ChainSync, SyncState};
use crate::service::{TransactionPool, ExHashT};
use crate::config::{BoxFinalityProofRequestBuilder, Roles};
@@ -73,7 +72,6 @@ pub mod message;
pub mod event;
pub mod light_client_handler;
pub mod light_dispatch;
pub mod specialization;
pub mod sync;
pub use block_requests::BlockRequests;
@@ -136,7 +134,7 @@ mod rep {
}
// Lock must always be taken in order declared here.
pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
pub struct Protocol<B: BlockT, H: ExHashT> {
/// Interval at which we call `tick`.
tick_timeout: Pin<Box<dyn Stream<Item = ()> + Send>>,
/// Interval at which we call `propagate_extrinsics`.
@@ -146,7 +144,6 @@ pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
light_dispatch: LightDispatch<B>,
genesis_hash: B::Hash,
sync: ChainSync<B>,
specialization: S,
context_data: ContextData<B, H>,
/// List of nodes for which we perform additional logging because they are important for the
/// user.
@@ -335,55 +332,6 @@ impl<'a, B: BlockT> LightDispatchNetwork<B> for LightDispatchIn<'a> {
}
}
/// Context for a network-specific handler.
pub trait Context<B: BlockT> {
/// Adjusts the reputation of the peer. Use this to point out that a peer has been malign or
/// irresponsible or appeared lazy.
fn report_peer(&mut self, who: PeerId, reputation: sc_peerset::ReputationChange);
/// Force disconnecting from a peer. Use this when a peer misbehaved.
fn disconnect_peer(&mut self, who: PeerId);
/// Send a chain-specific message to a peer.
fn send_chain_specific(&mut self, who: PeerId, message: Vec<u8>);
}
/// Protocol context.
struct ProtocolContext<'a, B: 'a + BlockT, H: 'a + ExHashT> {
behaviour: &'a mut GenericProto,
context_data: &'a mut ContextData<B, H>,
peerset_handle: &'a sc_peerset::PeersetHandle,
}
impl<'a, B: BlockT + 'a, H: 'a + ExHashT> ProtocolContext<'a, B, H> {
fn new(
context_data: &'a mut ContextData<B, H>,
behaviour: &'a mut GenericProto,
peerset_handle: &'a sc_peerset::PeersetHandle,
) -> Self {
ProtocolContext { context_data, peerset_handle, behaviour }
}
}
impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context<B> for ProtocolContext<'a, B, H> {
fn report_peer(&mut self, who: PeerId, reputation: sc_peerset::ReputationChange) {
self.peerset_handle.report_peer(who, reputation)
}
fn disconnect_peer(&mut self, who: PeerId) {
self.behaviour.disconnect_peer(&who)
}
fn send_chain_specific(&mut self, who: PeerId, message: Vec<u8>) {
send_message::<B> (
self.behaviour,
&mut self.context_data.stats,
&who,
GenericMessage::ChainSpecific(message)
)
}
}
/// Data necessary to create a context.
struct ContextData<B: BlockT, H: ExHashT> {
// All connected peers
@@ -410,20 +358,19 @@ impl Default for ProtocolConfig {
}
}
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
impl<B: BlockT, H: ExHashT> Protocol<B, H> {
/// Create a new instance.
pub fn new(
config: ProtocolConfig,
chain: Arc<dyn Client<B>>,
checker: Arc<dyn FetchChecker<B>>,
specialization: S,
transaction_pool: Arc<dyn TransactionPool<H, B>>,
finality_proof_provider: Option<Arc<dyn FinalityProofProvider<B>>>,
finality_proof_request_builder: Option<BoxFinalityProofRequestBuilder<B>>,
protocol_id: ProtocolId,
peerset_config: sc_peerset::PeersetConfig,
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>
) -> error::Result<(Protocol<B, S, H>, sc_peerset::PeersetHandle)> {
) -> error::Result<(Protocol<B, H>, sc_peerset::PeersetHandle)> {
let info = chain.info();
let sync = ChainSync::new(
config.roles,
@@ -459,7 +406,6 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
light_dispatch: LightDispatch::new(checker),
genesis_hash: info.genesis_hash,
sync,
specialization,
handshaking_peers: HashMap::new(),
important_peers,
transaction_pool,
@@ -681,11 +627,6 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
CustomMessageOutcome::None
};
},
GenericMessage::ChainSpecific(msg) => self.specialization.on_message(
&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle),
who,
msg,
),
}
CustomMessageOutcome::None
@@ -710,14 +651,6 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
);
}
/// Locks `self` and returns a context plus the network specialization.
pub fn specialization_lock<'a>(
&'a mut self,
) -> (impl Context<B> + 'a, &'a mut S) {
let context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle);
(context, &mut self.specialization)
}
/// Called when a new peer is connected
pub fn on_peer_connected(&mut self, who: PeerId) {
trace!(target: "sync", "Connecting {}", who);
@@ -739,9 +672,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
self.context_data.peers.remove(&peer)
};
if let Some(_peer_data) = removed {
let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle);
self.sync.peer_disconnected(peer.clone());
self.specialization.on_disconnect(&mut context, peer.clone());
self.light_dispatch.on_disconnect(LightDispatchIn {
behaviour: &mut self.behaviour,
peerset: self.peerset_handle.clone(),
@@ -940,9 +871,6 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
}
}
self.specialization.maintain_peers(
&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle)
);
for p in aborting {
self.behaviour.disconnect_peer(&p);
self.peerset_handle.report_peer(p, rep::TIMEOUT);
@@ -1058,9 +986,6 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
}
}
let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle);
self.specialization.on_connect(&mut context, who.clone(), status);
// Notify all the notification protocols as open.
CustomMessageOutcome::NotificationStreamOpened {
remote: who,
@@ -1292,7 +1217,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
roles: self.config.roles.into(),
best_number: info.best_number,
best_hash: info.best_hash,
chain_status: self.specialization.status(),
chain_status: Vec::new(), // TODO: find a way to make this backwards-compatible
};
self.send_message(&who, GenericMessage::Status(status))
@@ -1361,15 +1286,10 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
/// Call this when a block has been imported in the import queue and we should announce it on
/// the network.
pub fn on_block_imported(&mut self, hash: B::Hash, header: &B::Header, data: Vec<u8>, is_best: bool) {
pub fn on_block_imported(&mut self, header: &B::Header, data: Vec<u8>, is_best: bool) {
if is_best {
self.sync.update_chain_info(header);
}
self.specialization.on_block_imported(
&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle),
hash.clone(),
header,
);
// blocks are not announced by light clients
if self.config.roles.is_light() {
@@ -1872,8 +1792,7 @@ fn send_message<B: BlockT>(
behaviour.send_packet(who, encoded);
}
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> NetworkBehaviour for
Protocol<B, S, H> {
impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
type ProtocolsHandler = <GenericProto as NetworkBehaviour>::ProtocolsHandler;
type OutEvent = CustomMessageOutcome<B>;
@@ -2030,13 +1949,13 @@ Protocol<B, S, H> {
}
}
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> DiscoveryNetBehaviour for Protocol<B, S, H> {
impl<B: BlockT, H: ExHashT> DiscoveryNetBehaviour for Protocol<B, H> {
fn add_discovered_nodes(&mut self, peer_ids: impl Iterator<Item = PeerId>) {
self.behaviour.add_discovered_nodes(peer_ids)
}
}
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Drop for Protocol<B, S, H> {
impl<B: BlockT, H: ExHashT> Drop for Protocol<B, H> {
fn drop(&mut self) {
debug!(target: "sync", "Network stats:\n{}", self.format_stats());
}
@@ -25,7 +25,8 @@ use libp2p::swarm::{PollParameters, NetworkBehaviour, NetworkBehaviourAction};
use libp2p::{PeerId, Multiaddr, Transport};
use rand::seq::SliceRandom;
use std::{error, io, task::Context, task::Poll, time::Duration};
use crate::message::Message;
use std::collections::HashSet;
use crate::message::{generic::BlockResponse, Message};
use crate::protocol::generic_proto::{GenericProto, GenericProtoOut};
use sp_test_primitives::Block;
@@ -227,7 +228,10 @@ fn two_nodes_transfer_lots_of_packets() {
for n in 0 .. NUM_PACKETS {
service1.send_packet(
&peer_id,
Message::<Block>::ChainSpecific(vec![(n % 256) as u8]).encode()
Message::<Block>::BlockResponse(BlockResponse {
id: n as _,
blocks: Vec::new(),
}).encode()
);
}
},
@@ -243,8 +247,8 @@ fn two_nodes_transfer_lots_of_packets() {
Some(GenericProtoOut::CustomProtocolOpen { .. }) => {},
Some(GenericProtoOut::CustomMessage { message, .. }) => {
match Message::<Block>::decode(&mut &message[..]).unwrap() {
Message::<Block>::ChainSpecific(message) => {
assert_eq!(message.len(), 1);
Message::<Block>::BlockResponse(BlockResponse { id: _, blocks }) => {
assert!(blocks.is_empty());
packet_counter += 1;
if packet_counter == NUM_PACKETS {
return Poll::Ready(())
@@ -270,9 +274,21 @@ fn basic_two_nodes_requests_in_parallel() {
// Generate random messages with or without a request id.
let mut to_send = {
let mut to_send = Vec::new();
let mut existing_ids = HashSet::new();
for _ in 0..200 { // Note: don't make that number too high or the CPU usage will explode.
let msg = (0..10).map(|_| rand::random::<u8>()).collect::<Vec<_>>();
to_send.push(Message::<Block>::ChainSpecific(msg));
let req_id = loop {
let req_id = rand::random::<u64>();
// ensure uniqueness - odds of randomly sampling collisions
// is unlikely, but possible to cause spurious test failures.
if existing_ids.insert(req_id) {
break req_id;
}
};
to_send.push(Message::<Block>::BlockResponse(
BlockResponse { id: req_id, blocks: Vec::new() }
));
}
to_send
};
@@ -219,9 +219,6 @@ pub mod generic {
FinalityProofResponse(FinalityProofResponse<Hash>),
/// Batch of consensus protocol messages.
ConsensusBatch(Vec<ConsensusMessage>),
/// Chain-specific message.
#[codec(index = "255")]
ChainSpecific(Vec<u8>),
}
impl<Header, Hash, Number, Extrinsic> Message<Header, Hash, Number, Extrinsic> {
@@ -246,7 +243,6 @@ pub mod generic {
Message::FinalityProofRequest(_) => "FinalityProofRequest",
Message::FinalityProofResponse(_) => "FinalityProofResponse",
Message::ConsensusBatch(_) => "ConsensusBatch",
Message::ChainSpecific(_) => "ChainSpecific",
}
}
}
@@ -1,171 +0,0 @@
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Specializations of the substrate network protocol to allow more complex forms of communication.
pub use crate::protocol::event::{DhtEvent, Event};
use crate::protocol::Context;
use libp2p::PeerId;
use sp_runtime::traits::Block as BlockT;
/// A specialization of the substrate network protocol. Handles events and sends messages.
pub trait NetworkSpecialization<B: BlockT>: Send + Sync + 'static {
/// Get the current specialization-status.
fn status(&self) -> Vec<u8>;
/// Called when a peer successfully handshakes.
fn on_connect(&mut self, ctx: &mut dyn Context<B>, who: PeerId, status: crate::protocol::message::Status<B>);
/// Called when a peer is disconnected. If the peer ID is unknown, it should be ignored.
fn on_disconnect(&mut self, ctx: &mut dyn Context<B>, who: PeerId);
/// Called when a network-specific message arrives.
fn on_message(
&mut self,
ctx: &mut dyn Context<B>,
who: PeerId,
message: Vec<u8>
);
/// Called periodically to maintain peers and handle timeouts.
fn maintain_peers(&mut self, _ctx: &mut dyn Context<B>) { }
/// Called when a block is _imported_ at the head of the chain (not during major sync).
/// Not guaranteed to be called for every block, but will be most of the after major sync.
fn on_block_imported(&mut self, _ctx: &mut dyn Context<B>, _hash: B::Hash, _header: &B::Header) { }
}
/// A specialization that does nothing.
#[derive(Clone)]
pub struct DummySpecialization;
impl<B: BlockT> NetworkSpecialization<B> for DummySpecialization {
fn status(&self) -> Vec<u8> {
vec![]
}
fn on_connect(
&mut self,
_ctx: &mut dyn Context<B>,
_peer_id: PeerId,
_status: crate::protocol::message::Status<B>
) {}
fn on_disconnect(&mut self, _ctx: &mut dyn Context<B>, _peer_id: PeerId) {}
fn on_message(
&mut self,
_ctx: &mut dyn Context<B>,
_peer_id: PeerId,
_message: Vec<u8>,
) {}
}
/// Construct a simple protocol that is composed of several sub protocols.
/// Each "sub protocol" needs to implement `Specialization` and needs to provide a `new()` function.
/// For more fine grained implementations, this macro is not usable.
///
/// # Example
///
/// ```nocompile
/// construct_simple_protocol! {
/// pub struct MyProtocol where Block = MyBlock {
/// consensus_gossip: ConsensusGossip<MyBlock>,
/// other_protocol: MyCoolStuff,
/// }
/// }
/// ```
///
/// You can also provide an optional parameter after `where Block = MyBlock`, so it looks like
/// `where Block = MyBlock, Status = consensus_gossip`. This will instruct the implementation to
/// use the `status()` function from the `ConsensusGossip` protocol. By default, `status()` returns
/// an empty vector.
#[macro_export]
macro_rules! construct_simple_protocol {
(
$( #[ $attr:meta ] )*
pub struct $protocol:ident where
Block = $block:ident
$( , Status = $status_protocol_name:ident )*
{
$( $sub_protocol_name:ident : $sub_protocol:ident $( <$protocol_block:ty> )*, )*
}
) => {
$( #[$attr] )*
pub struct $protocol {
$( $sub_protocol_name: $sub_protocol $( <$protocol_block> )*, )*
}
impl $protocol {
/// Instantiate a node protocol handler.
pub fn new() -> Self {
Self {
$( $sub_protocol_name: $sub_protocol::new(), )*
}
}
}
impl $crate::specialization::NetworkSpecialization<$block> for $protocol {
fn status(&self) -> Vec<u8> {
$(
let status = self.$status_protocol_name.status();
if !status.is_empty() {
return status;
}
)*
Vec::new()
}
fn on_connect(
&mut self,
_ctx: &mut $crate::Context<$block>,
_who: $crate::PeerId,
_status: $crate::StatusMessage<$block>
) {
$( self.$sub_protocol_name.on_connect(_ctx, _who, _status); )*
}
fn on_disconnect(&mut self, _ctx: &mut $crate::Context<$block>, _who: $crate::PeerId) {
$( self.$sub_protocol_name.on_disconnect(_ctx, _who); )*
}
fn on_message(
&mut self,
_ctx: &mut $crate::Context<$block>,
_who: $crate::PeerId,
_message: Vec<u8>,
) {
$( self.$sub_protocol_name.on_message(_ctx, _who, _message); )*
}
fn maintain_peers(&mut self, _ctx: &mut $crate::Context<$block>) {
$( self.$sub_protocol_name.maintain_peers(_ctx); )*
}
fn on_block_imported(
&mut self,
_ctx: &mut $crate::Context<$block>,
_hash: <$block as $crate::BlockT>::Hash,
_header: &<$block as $crate::BlockT>::Header
) {
$( self.$sub_protocol_name.on_block_imported(_ctx, _hash, _header); )*
}
}
}
}
+32 -52
View File
@@ -45,9 +45,8 @@ use crate::{transport, config::NonReservedPeerMode, ReputationChange};
use crate::config::{Params, TransportConfig};
use crate::error::Error;
use crate::network_state::{NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer};
use crate::protocol::{self, Protocol, Context, PeerInfo};
use crate::protocol::{self, Protocol, PeerInfo};
use crate::protocol::{event::Event, light_dispatch::{AlwaysBadChecker, RequestData}};
use crate::protocol::specialization::NetworkSpecialization;
use crate::protocol::sync::SyncState;
/// Minimum Requirements for a Hash within Networking
@@ -101,7 +100,7 @@ impl ReportHandle {
}
/// Substrate network service. Handles network IO and manages connectivity.
pub struct NetworkService<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> {
pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
/// Number of peers we're connected to.
num_connected: Arc<AtomicUsize>,
/// The local external addresses.
@@ -116,19 +115,19 @@ pub struct NetworkService<B: BlockT + 'static, S: NetworkSpecialization<B>, H: E
/// nodes it should be connected to or not.
peerset: PeersetHandle,
/// Channel that sends messages to the actual worker.
to_worker: mpsc::UnboundedSender<ServiceToWorkerMsg<B, H, S>>,
to_worker: mpsc::UnboundedSender<ServiceToWorkerMsg<B, H>>,
/// Marker to pin the `H` generic. Serves no purpose except to not break backwards
/// compatibility.
_marker: PhantomData<H>,
}
impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker<B, S, H> {
impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
/// Creates the network service.
///
/// Returns a `NetworkWorker` that implements `Future` and must be regularly polled in order
/// for the network processing to advance. From it, you can extract a `NetworkService` using
/// `worker.service()`. The `NetworkService` can be shared through the codebase.
pub fn new(params: Params<B, S, H>) -> Result<NetworkWorker<B, S, H>, Error> {
pub fn new(params: Params<B, H>) -> Result<NetworkWorker<B, H>, Error> {
let (to_worker, from_worker) = mpsc::unbounded();
if let Some(ref path) = params.network_config.net_config_path {
@@ -205,7 +204,6 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
},
params.chain.clone(),
checker.clone(),
params.specialization,
params.transaction_pool,
params.finality_proof_provider.clone(),
params.finality_proof_request_builder,
@@ -215,7 +213,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
)?;
// Build the swarm.
let (mut swarm, bandwidth): (Swarm::<B, S, H>, _) = {
let (mut swarm, bandwidth): (Swarm::<B, H>, _) = {
let user_agent = format!(
"{} ({})",
params.network_config.client_version,
@@ -263,14 +261,14 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
// Listen on multiaddresses.
for addr in &params.network_config.listen_addresses {
if let Err(err) = Swarm::<B, S, H>::listen_on(&mut swarm, addr.clone()) {
if let Err(err) = Swarm::<B, H>::listen_on(&mut swarm, addr.clone()) {
warn!(target: "sub-libp2p", "Can't listen on {} because: {:?}", addr, err)
}
}
// Add external addresses.
for addr in &params.network_config.public_addresses {
Swarm::<B, S, H>::add_external_address(&mut swarm, addr.clone());
Swarm::<B, H>::add_external_address(&mut swarm, addr.clone());
}
let external_addresses = Arc::new(Mutex::new(Vec::new()));
@@ -351,13 +349,13 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
/// Return a `NetworkService` that can be shared through the code base and can be used to
/// manipulate the worker.
pub fn service(&self) -> &Arc<NetworkService<B, S, H>> {
pub fn service(&self) -> &Arc<NetworkService<B, H>> {
&self.service
}
/// You must call this when a new block is imported by the client.
pub fn on_block_imported(&mut self, hash: B::Hash, header: B::Header, data: Vec<u8>, is_best: bool) {
self.network_service.user_protocol_mut().on_block_imported(hash, &header, data, is_best);
pub fn on_block_imported(&mut self, header: B::Header, data: Vec<u8>, is_best: bool) {
self.network_service.user_protocol_mut().on_block_imported(&header, data, is_best);
}
/// You must call this when a new block is finalized by the client.
@@ -415,9 +413,9 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
};
NetworkState {
peer_id: Swarm::<B, S, H>::local_peer_id(&swarm).to_base58(),
listened_addresses: Swarm::<B, S, H>::listeners(&swarm).cloned().collect(),
external_addresses: Swarm::<B, S, H>::external_addresses(&swarm).cloned().collect(),
peer_id: Swarm::<B, H>::local_peer_id(&swarm).to_base58(),
listened_addresses: Swarm::<B, H>::listeners(&swarm).cloned().collect(),
external_addresses: Swarm::<B, H>::external_addresses(&swarm).cloned().collect(),
average_download_per_sec: self.service.bandwidth.average_download_per_sec(),
average_upload_per_sec: self.service.bandwidth.average_upload_per_sec(),
connected_peers,
@@ -446,7 +444,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
}
}
impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkService<B, S, H> {
impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
/// Writes a message on an open notifications channel. Has no effect if the notifications
/// channel with this protocol name is closed.
///
@@ -545,15 +543,6 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkServic
.unbounded_send(ServiceToWorkerMsg::RequestJustification(hash.clone(), number));
}
/// Execute a closure with the chain-specific network specialization.
pub fn with_spec<F>(&self, f: F)
where F: FnOnce(&mut S, &mut dyn Context<B>) + Send + 'static
{
let _ = self
.to_worker
.unbounded_send(ServiceToWorkerMsg::ExecuteWithSpec(Box::new(f)));
}
/// Are we in the process of downloading the chain?
pub fn is_major_syncing(&self) -> bool {
self.is_major_syncing.load(Ordering::Relaxed)
@@ -641,8 +630,8 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkServic
}
}
impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> sp_consensus::SyncOracle
for NetworkService<B, S, H>
impl<B: BlockT + 'static, H: ExHashT> sp_consensus::SyncOracle
for NetworkService<B, H>
{
fn is_major_syncing(&mut self) -> bool {
NetworkService::is_major_syncing(self)
@@ -653,8 +642,8 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> sp_consensus:
}
}
impl<'a, B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> sp_consensus::SyncOracle
for &'a NetworkService<B, S, H>
impl<'a, B: BlockT + 'static, H: ExHashT> sp_consensus::SyncOracle
for &'a NetworkService<B, H>
{
fn is_major_syncing(&mut self) -> bool {
NetworkService::is_major_syncing(self)
@@ -674,10 +663,9 @@ pub trait NetworkStateInfo {
fn local_peer_id(&self) -> PeerId;
}
impl<B, S, H> NetworkStateInfo for NetworkService<B, S, H>
impl<B, H> NetworkStateInfo for NetworkService<B, H>
where
B: sp_runtime::traits::Block,
S: NetworkSpecialization<B>,
H: ExHashT,
{
/// Returns the local external addresses.
@@ -694,12 +682,11 @@ impl<B, S, H> NetworkStateInfo for NetworkService<B, S, H>
/// Messages sent from the `NetworkService` to the `NetworkWorker`.
///
/// Each entry corresponds to a method of `NetworkService`.
enum ServiceToWorkerMsg<B: BlockT, H: ExHashT, S: NetworkSpecialization<B>> {
enum ServiceToWorkerMsg<B: BlockT, H: ExHashT> {
PropagateExtrinsic(H),
PropagateExtrinsics,
RequestJustification(B::Hash, NumberFor<B>),
AnnounceBlock(B::Hash, Vec<u8>),
ExecuteWithSpec(Box<dyn FnOnce(&mut S, &mut dyn Context<B>) + Send>),
GetValue(record::Key),
PutValue(record::Key, Vec<u8>),
AddKnownAddress(PeerId, Multiaddr),
@@ -721,7 +708,7 @@ enum ServiceToWorkerMsg<B: BlockT, H: ExHashT, S: NetworkSpecialization<B>> {
///
/// You are encouraged to poll this in a separate background thread or task.
#[must_use = "The NetworkWorker must be polled in order for the network to work"]
pub struct NetworkWorker<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> {
pub struct NetworkWorker<B: BlockT + 'static, H: ExHashT> {
/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
external_addresses: Arc<Mutex<Vec<Multiaddr>>>,
/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
@@ -729,20 +716,20 @@ pub struct NetworkWorker<B: BlockT + 'static, S: NetworkSpecialization<B>, H: Ex
/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
is_major_syncing: Arc<AtomicBool>,
/// The network service that can be extracted and shared through the codebase.
service: Arc<NetworkService<B, S, H>>,
service: Arc<NetworkService<B, H>>,
/// The *actual* network.
network_service: Swarm<B, S, H>,
network_service: Swarm<B, H>,
/// The import queue that was passed as initialization.
import_queue: Box<dyn ImportQueue<B>>,
/// Messages from the `NetworkService` and that must be processed.
from_worker: mpsc::UnboundedReceiver<ServiceToWorkerMsg<B, H, S>>,
from_worker: mpsc::UnboundedReceiver<ServiceToWorkerMsg<B, H>>,
/// Receiver for queries from the light client that must be processed.
light_client_rqs: Option<mpsc::UnboundedReceiver<RequestData<B>>>,
/// Senders for events that happen on the network.
event_streams: Vec<mpsc::UnboundedSender<Event>>,
}
impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for NetworkWorker<B, S, H> {
impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
type Output = Result<(), io::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll<Self::Output> {
@@ -769,11 +756,6 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
};
match msg {
ServiceToWorkerMsg::ExecuteWithSpec(task) => {
let protocol = this.network_service.user_protocol_mut();
let (mut context, spec) = protocol.specialization_lock();
task(spec, &mut context);
},
ServiceToWorkerMsg::AnnounceBlock(hash, data) =>
this.network_service.user_protocol_mut().announce_block(hash, data),
ServiceToWorkerMsg::RequestJustification(hash, number) =>
@@ -839,7 +821,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
// Update the variables shared with the `NetworkService`.
this.num_connected.store(this.network_service.user_protocol_mut().num_connected_peers(), Ordering::Relaxed);
{
let external_addresses = Swarm::<B, S, H>::external_addresses(&this.network_service).cloned().collect();
let external_addresses = Swarm::<B, H>::external_addresses(&this.network_service).cloned().collect();
*this.external_addresses.lock() = external_addresses;
}
this.is_major_syncing.store(match this.network_service.user_protocol_mut().sync_state() {
@@ -851,20 +833,18 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
}
}
impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Unpin for NetworkWorker<B, S, H> {
impl<B: BlockT + 'static, H: ExHashT> Unpin for NetworkWorker<B, H> {
}
/// The libp2p swarm, customized for our needs.
type Swarm<B, S, H> = libp2p::swarm::Swarm<
Behaviour<B, S, H>
>;
type Swarm<B, H> = libp2p::swarm::Swarm<Behaviour<B, H>>;
// Implementation of `import_queue::Link` trait using the available local variables.
struct NetworkLink<'a, B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
protocol: &'a mut Swarm<B, S, H>,
struct NetworkLink<'a, B: BlockT, H: ExHashT> {
protocol: &'a mut Swarm<B, H>,
}
impl<'a, B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Link<B> for NetworkLink<'a, B, S, H> {
impl<'a, B: BlockT, H: ExHashT> Link<B> for NetworkLink<'a, B, H> {
fn blocks_processed(
&mut self,
imported: usize,