Network crate cleanups (#3049)

* Remove useless internal messages

* Remove NetworkService::disconnect_peer

* Remove NetworkMsg altogether

* Rename ProtocolMsg ServerToWorkerMsg

* Remove useless code

* Add example for parse_str_addr

* Move parse_str_addr and ProtocolId to config

* Don't reexport the content of config

* Rework the imports

* More reexports rework

* Add documentation

* Move finalization report to network future

* Move on_block_imported to worker

* get_value/put_value no longer locking

* local_peer_id() no longer locks

* Remove FetchFuture

* Service imports cleanup

* Produce the network state in the network task

* Merge network task and RPC network task

* Move network methods to NetworkWorker

* Remove Arc peers system from network

* add_reserved_peer now goes through the channel

* Remove Mutex around network swarm

* Remove the FnOnce alias traits

* Replace is_offline with num_connected

* Improve style of poll()

* Fix network tests

* Some doc in service module

* Remove macro export

* Minor doc changes

* Remove the synchronized() method of the import queue

* Line width

* Line widths

* Fix import queue tests

* Fix CLI tests
This commit is contained in:
Pierre Krieger
2019-07-08 15:33:29 +02:00
committed by Gavin Wood
parent 7df8e52cfe
commit 1e126eab2f
24 changed files with 598 additions and 814 deletions
+303 -427
View File
@@ -14,43 +14,38 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::collections::HashMap;
use std::{fs, io, path::Path};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
//! Main entry point of the substrate-network crate.
//!
//! There are two main structs in this module: [`NetworkWorker`] and [`NetworkService`].
//! The [`NetworkWorker`] *is* the network and implements the `Future` trait. It must be polled in
//! order fo the network to advance.
//! The [`NetworkService`] is merely a shared version of the [`NetworkWorker`]. You can obtain an
//! `Arc<NetworkService>` by calling [`NetworkWorker::service`].
//!
//! The methods of the [`NetworkService`] are implemented by sending a message over a channel,
//! which is then processed by [`NetworkWorker::poll`].
use std::{collections::HashMap, fs, marker::PhantomData, io, path::Path};
use std::sync::{Arc, atomic::{AtomicBool, AtomicUsize, Ordering}};
use log::{warn, error, info};
use libp2p::core::swarm::NetworkBehaviour;
use libp2p::core::{transport::boxed::Boxed, muxing::StreamMuxerBox};
use libp2p::{Multiaddr, multihash::Multihash};
use futures::{prelude::*, sync::oneshot, sync::mpsc};
use parking_lot::{Mutex, RwLock};
use crate::protocol::Protocol;
use crate::{behaviour::{Behaviour, BehaviourOut}, parse_str_addr};
use crate::{NetworkState, NetworkStateNotConnectedPeer, NetworkStatePeer};
use crate::{transport, config::NodeKeyConfig, config::NonReservedPeerMode};
use peerset::PeersetHandle;
use consensus::import_queue::{ImportQueue, Link, SharedFinalityProofRequestBuilder};
use futures::{prelude::*, sync::mpsc};
use log::{warn, error, info};
use libp2p::core::{swarm::NetworkBehaviour, transport::boxed::Boxed, muxing::StreamMuxerBox};
use libp2p::{PeerId, Multiaddr, multihash::Multihash};
use peerset::PeersetHandle;
use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId};
use crate::AlwaysBadChecker;
use crate::protocol::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient};
use crate::protocol::{event::Event, message::Message};
use crate::protocol::on_demand::RequestData;
use crate::protocol::{self, Context, CustomMessageOutcome, ConnectedPeer, PeerInfo};
use crate::protocol::sync::SyncState;
use crate::{behaviour::{Behaviour, BehaviourOut}, config::parse_str_addr};
use crate::{NetworkState, NetworkStateNotConnectedPeer, NetworkStatePeer};
use crate::{transport, config::NodeKeyConfig, config::NonReservedPeerMode};
use crate::config::{Params, TransportConfig};
use crate::error::Error;
use crate::protocol::{self, Protocol, Context, CustomMessageOutcome, PeerInfo};
use crate::protocol::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient};
use crate::protocol::{event::Event, on_demand::{AlwaysBadChecker, RequestData}};
use crate::protocol::specialization::NetworkSpecialization;
/// Interval at which we update the `peers` field on the main thread.
const CONNECTED_PEERS_INTERVAL: Duration = Duration::from_millis(500);
pub use libp2p::PeerId;
/// Type that represents fetch completion future.
pub type FetchFuture = oneshot::Receiver<Vec<u8>>;
use crate::protocol::sync::SyncState;
/// Minimum Requirements for a Hash within Networking
pub trait ExHashT:
@@ -88,23 +83,22 @@ impl ReportHandle {
/// Substrate network service. Handles network IO and manages connectivity.
pub struct NetworkService<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> {
/// Are we connected to any peer?
is_offline: Arc<AtomicBool>,
/// Number of peers we're connected to.
num_connected: Arc<AtomicUsize>,
/// Are we actively catching up with the chain?
is_major_syncing: Arc<AtomicBool>,
/// Peers whom we are connected with.
peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<B>>>>,
/// Channel for networking messages processed by the background thread.
network_chan: mpsc::UnboundedSender<NetworkMsg<B>>,
/// Network service
network: Arc<Mutex<Swarm<B, S, H>>>,
/// Local copy of the `PeerId` of the local node.
local_peer_id: PeerId,
/// Bandwidth logging system. Can be queried to know the average bandwidth consumed.
bandwidth: Arc<transport::BandwidthSinks>,
/// Peerset manager (PSM); manages the reputation of nodes and indicates the network which
/// nodes it should be connected to or not.
peerset: PeersetHandle,
/// Protocol sender
protocol_sender: mpsc::UnboundedSender<ProtocolMsg<B, S>>,
/// Channel that sends messages to the actual worker.
to_worker: mpsc::UnboundedSender<ServerToWorkerMsg<B, S>>,
/// Marker to pin the `H` generic. Serves no purpose except to not break backwards
/// compatibility.
_marker: PhantomData<H>,
}
impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker<B, S, H> {
@@ -116,8 +110,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
pub fn new(
params: Params<B, S, H>,
) -> Result<NetworkWorker<B, S, H>, Error> {
let (network_chan, network_port) = mpsc::unbounded();
let (protocol_sender, protocol_rx) = mpsc::unbounded();
let (to_worker, from_worker) = mpsc::unbounded();
if let Some(ref path) = params.network_config.net_config_path {
fs::create_dir_all(Path::new(path))?;
@@ -166,10 +159,8 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
let local_peer_id = local_public.clone().into_peer_id();
info!(target: "sub-libp2p", "Local node identity is: {}", local_peer_id.to_base58());
// Start in off-line mode, since we're not connected to any nodes yet.
let is_offline = Arc::new(AtomicBool::new(true));
let num_connected = Arc::new(AtomicUsize::new(0));
let is_major_syncing = Arc::new(AtomicBool::new(false));
let peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<B>>>> = Arc::new(Default::default());
let (protocol, peerset_handle) = Protocol::new(
protocol::ProtocolConfig { roles: params.roles },
params.chain,
@@ -222,31 +213,24 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
Swarm::<B, S, H>::add_external_address(&mut swarm, addr.clone());
}
let network = Arc::new(Mutex::new(swarm));
let service = Arc::new(NetworkService {
bandwidth,
is_offline: is_offline.clone(),
num_connected: num_connected.clone(),
is_major_syncing: is_major_syncing.clone(),
network_chan,
peers: peers.clone(),
peerset: peerset_handle.clone(),
network: network.clone(),
protocol_sender: protocol_sender.clone(),
peerset: peerset_handle,
local_peer_id,
to_worker: to_worker.clone(),
_marker: PhantomData,
});
Ok(NetworkWorker {
is_offline,
num_connected,
is_major_syncing,
network_service: network,
peerset: peerset_handle,
network_service: swarm,
service,
peers,
import_queue: params.import_queue,
network_port,
protocol_rx,
from_worker,
on_demand_in: params.on_demand.and_then(|od| od.extract_receiver()),
connected_peers_interval: tokio_timer::Interval::new_interval(CONNECTED_PEERS_INTERVAL),
})
}
@@ -262,32 +246,32 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
/// Returns the number of peers we're connected to.
pub fn num_connected_peers(&self) -> usize {
self.network_service.lock().user_protocol_mut().num_connected_peers()
self.network_service.user_protocol().num_connected_peers()
}
/// Returns the number of peers we're connected to and that are being queried.
pub fn num_active_peers(&self) -> usize {
self.network_service.lock().user_protocol_mut().num_active_peers()
self.network_service.user_protocol().num_active_peers()
}
/// Current global sync state.
pub fn sync_state(&self) -> SyncState {
self.network_service.lock().user_protocol_mut().sync_state()
self.network_service.user_protocol().sync_state()
}
/// Target sync block number.
pub fn best_seen_block(&self) -> Option<NumberFor<B>> {
self.network_service.lock().user_protocol_mut().best_seen_block()
self.network_service.user_protocol().best_seen_block()
}
/// Number of peers participating in syncing.
pub fn num_sync_peers(&self) -> u32 {
self.network_service.lock().user_protocol_mut().num_sync_peers()
self.network_service.user_protocol().num_sync_peers()
}
/// Adds an address for a node.
pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
self.network_service.lock().add_known_address(peer_id, addr);
self.network_service.add_known_address(peer_id, addr);
}
/// Return a `NetworkService` that can be shared through the code base and can be used to
@@ -295,114 +279,23 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
pub fn service(&self) -> &Arc<NetworkService<B, S, H>> {
&self.service
}
}
impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkService<B, S, H> {
/// Returns the network identity of the node.
pub fn local_peer_id(&self) -> PeerId {
Swarm::<B, S, H>::local_peer_id(&*self.network.lock()).clone()
/// You must call this when a new block is imported by the client.
pub fn on_block_imported(&mut self, hash: B::Hash, header: B::Header) {
self.network_service.user_protocol_mut().on_block_imported(hash, &header);
}
/// Called when a new block is imported by the client.
pub fn on_block_imported(&self, hash: B::Hash, header: B::Header) {
let _ = self
.protocol_sender
.unbounded_send(ProtocolMsg::BlockImported(hash, header));
/// You must call this when a new block is finalized by the client.
pub fn on_block_finalized(&mut self, hash: B::Hash, header: B::Header) {
self.network_service.user_protocol_mut().on_block_finalized(hash, &header);
}
/// Called when a new block is finalized by the client.
pub fn on_block_finalized(&self, hash: B::Hash, header: B::Header) {
let _ = self
.protocol_sender
.unbounded_send(ProtocolMsg::BlockFinalized(hash, header));
}
/// Called when new transactons are imported by the client.
pub fn trigger_repropagate(&self) {
let _ = self.protocol_sender.unbounded_send(ProtocolMsg::PropagateExtrinsics);
}
/// Make sure an important block is propagated to peers.
///
/// In chain-based consensus, we often need to make sure non-best forks are
/// at least temporarily synced.
pub fn announce_block(&self, hash: B::Hash) {
let _ = self.protocol_sender.unbounded_send(ProtocolMsg::AnnounceBlock(hash));
}
/// Send a consensus message through the gossip
pub fn gossip_consensus_message(
&self,
topic: B::Hash,
engine_id: ConsensusEngineId,
message: Vec<u8>,
recipient: GossipMessageRecipient,
) {
let _ = self
.protocol_sender
.unbounded_send(ProtocolMsg::GossipConsensusMessage(
topic, engine_id, message, recipient,
));
}
/// Report a given peer as either beneficial (+) or costly (-) according to the
/// given scalar.
pub fn report_peer(&self, who: PeerId, cost_benefit: i32) {
self.peerset.report_peer(who, cost_benefit);
}
/// Send a message to the given peer. Has no effect if we're not connected to this peer.
///
/// This method is extremely poor in terms of API and should be eventually removed.
pub fn disconnect_peer(&self, who: PeerId) {
let _ = self.network_chan.unbounded_send(NetworkMsg::DisconnectPeer(who));
}
/// Request a justification for the given block.
pub fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) {
let _ = self
.protocol_sender
.unbounded_send(ProtocolMsg::RequestJustification(hash.clone(), number));
}
/// Execute a closure with the chain-specific network specialization.
pub fn with_spec<F>(&self, f: F)
where F: FnOnce(&mut S, &mut dyn Context<B>) + Send + 'static
{
let _ = self
.protocol_sender
.unbounded_send(ProtocolMsg::ExecuteWithSpec(Box::new(f)));
}
/// Execute a closure with the consensus gossip.
pub fn with_gossip<F>(&self, f: F)
where F: FnOnce(&mut ConsensusGossip<B>, &mut dyn Context<B>) + Send + 'static
{
let _ = self
.protocol_sender
.unbounded_send(ProtocolMsg::ExecuteWithGossip(Box::new(f)));
}
/// Are we in the process of downloading the chain?
pub fn is_major_syncing(&self) -> bool {
self.is_major_syncing.load(Ordering::Relaxed)
}
/// Get a value.
pub fn get_value(&mut self, key: &Multihash) {
self.network.lock().get_value(key);
}
/// Put a value.
pub fn put_value(&mut self, key: Multihash, value: Vec<u8>) {
self.network.lock().put_value(key, value);
}
}
impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkService<B, S, H> {
/// Get network state.
pub fn network_state(&self) -> NetworkState {
let mut swarm = self.network.lock();
///
/// **Note**: Use this only for debugging. This API is unstable. There are warnings literaly
/// everywhere about this. Please don't use this function to retreive actual information.
pub fn network_state(&mut self) -> NetworkState {
let swarm = &mut self.network_service;
let open = swarm.user_protocol().open_peers().cloned().collect::<Vec<_>>();
let connected_peers = {
@@ -450,8 +343,8 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkServic
peer_id: Swarm::<B, S, H>::local_peer_id(&swarm).to_base58(),
listened_addresses: Swarm::<B, S, H>::listeners(&swarm).cloned().collect(),
external_addresses: Swarm::<B, S, H>::external_addresses(&swarm).cloned().collect(),
average_download_per_sec: self.bandwidth.average_download_per_sec(),
average_upload_per_sec: self.bandwidth.average_upload_per_sec(),
average_download_per_sec: self.service.bandwidth.average_download_per_sec(),
average_upload_per_sec: self.service.bandwidth.average_upload_per_sec(),
connected_peers,
not_connected_peers,
peerset: swarm.user_protocol_mut().peerset_debug_info(),
@@ -459,12 +352,138 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkServic
}
/// Get currently connected peers.
pub fn peers_debug_info(&mut self) -> Vec<(PeerId, PeerInfo<B>)> {
self.network_service.user_protocol_mut()
.peers_info()
.map(|(id, info)| (id.clone(), info.clone()))
.collect()
}
}
impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkService<B, S, H> {
/// Returns the network identity of the node.
pub fn local_peer_id(&self) -> PeerId {
self.local_peer_id.clone()
}
/// You must call this when new transactons are imported by the transaction pool.
///
/// > **Warning**: This method can return outdated information and should only ever be used
/// > when obtaining outdated information is acceptable.
pub fn peers_debug_info(&self) -> Vec<(PeerId, PeerInfo<B>)> {
let peers = (*self.peers.read()).clone();
peers.into_iter().map(|(idx, connected)| (idx, connected.peer_info)).collect()
/// The latest transactions will be fetched from the `TransactionPool` that was passed at
/// initialization as part of the configuration.
pub fn trigger_repropagate(&self) {
let _ = self.to_worker.unbounded_send(ServerToWorkerMsg::PropagateExtrinsics);
}
/// Make sure an important block is propagated to peers.
///
/// In chain-based consensus, we often need to make sure non-best forks are
/// at least temporarily synced. This function forces such an announcement.
pub fn announce_block(&self, hash: B::Hash) {
let _ = self.to_worker.unbounded_send(ServerToWorkerMsg::AnnounceBlock(hash));
}
/// Send a consensus message through the gossip
pub fn gossip_consensus_message(
&self,
topic: B::Hash,
engine_id: ConsensusEngineId,
message: Vec<u8>,
recipient: GossipMessageRecipient,
) {
let _ = self
.to_worker
.unbounded_send(ServerToWorkerMsg::GossipConsensusMessage(
topic, engine_id, message, recipient,
));
}
/// Report a given peer as either beneficial (+) or costly (-) according to the
/// given scalar.
pub fn report_peer(&self, who: PeerId, cost_benefit: i32) {
self.peerset.report_peer(who, cost_benefit);
}
/// Request a justification for the given block from the network.
///
/// On success, the justification will be passed to the import queue that was part at
/// initialization as part of the configuration.
pub fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) {
let _ = self
.to_worker
.unbounded_send(ServerToWorkerMsg::RequestJustification(hash.clone(), number));
}
/// Execute a closure with the chain-specific network specialization.
pub fn with_spec<F>(&self, f: F)
where F: FnOnce(&mut S, &mut dyn Context<B>) + Send + 'static
{
let _ = self
.to_worker
.unbounded_send(ServerToWorkerMsg::ExecuteWithSpec(Box::new(f)));
}
/// Execute a closure with the consensus gossip.
pub fn with_gossip<F>(&self, f: F)
where F: FnOnce(&mut ConsensusGossip<B>, &mut dyn Context<B>) + Send + 'static
{
let _ = self
.to_worker
.unbounded_send(ServerToWorkerMsg::ExecuteWithGossip(Box::new(f)));
}
/// Are we in the process of downloading the chain?
pub fn is_major_syncing(&self) -> bool {
self.is_major_syncing.load(Ordering::Relaxed)
}
/// Start getting a value from the DHT.
///
/// This will generate either a `ValueFound` or a `ValueNotFound` event and pass it to
/// `on_event` on the network specialization.
pub fn get_value(&mut self, key: &Multihash) {
let _ = self
.to_worker
.unbounded_send(ServerToWorkerMsg::GetValue(key.clone()));
}
/// Start putting a value in the DHT.
///
/// This will generate either a `ValuePut` or a `ValuePutFailed` event and pass it to
/// `on_event` on the network specialization.
pub fn put_value(&mut self, key: Multihash, value: Vec<u8>) {
let _ = self
.to_worker
.unbounded_send(ServerToWorkerMsg::PutValue(key, value));
}
/// Connect to unreserved peers and allow unreserved peers to connect.
pub fn accept_unreserved_peers(&self) {
self.peerset.set_reserved_only(false);
}
/// Disconnect from unreserved peers and deny new unreserved peers to connect.
pub fn deny_unreserved_peers(&self) {
self.peerset.set_reserved_only(true);
}
/// Removes a `PeerId` from the list of reserved peers.
pub fn remove_reserved_peer(&self, peer: PeerId) {
self.peerset.remove_reserved_peer(peer);
}
/// Adds a `PeerId` and its address as reserved.
pub fn add_reserved_peer(&self, peer: String) -> Result<(), String> {
let (peer_id, addr) = parse_str_addr(&peer).map_err(|e| format!("{:?}", e))?;
self.peerset.add_reserved_peer(peer_id.clone());
let _ = self
.to_worker
.unbounded_send(ServerToWorkerMsg::AddKnownAddress(peer_id, addr));
Ok(())
}
/// Returns the number of peers we're connected to.
pub fn num_connected(&self) -> usize {
self.num_connected.load(Ordering::Relaxed)
}
}
@@ -475,139 +494,44 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>
}
fn is_offline(&self) -> bool {
self.is_offline.load(Ordering::Relaxed)
self.num_connected.load(Ordering::Relaxed) == 0
}
}
/// Trait for managing network
pub trait ManageNetwork {
/// Set to allow unreserved peers to connect
fn accept_unreserved_peers(&self);
/// Set to deny unreserved peers to connect
fn deny_unreserved_peers(&self);
/// Remove reservation for the peer
fn remove_reserved_peer(&self, peer: PeerId);
/// Add reserved peer
fn add_reserved_peer(&self, peer: String) -> Result<(), String>;
}
impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> ManageNetwork for NetworkService<B, S, H> {
fn accept_unreserved_peers(&self) {
self.peerset.set_reserved_only(false);
}
fn deny_unreserved_peers(&self) {
self.peerset.set_reserved_only(true);
}
fn remove_reserved_peer(&self, peer: PeerId) {
self.peerset.remove_reserved_peer(peer);
}
fn add_reserved_peer(&self, peer: String) -> Result<(), String> {
let (peer_id, addr) = parse_str_addr(&peer).map_err(|e| format!("{:?}", e))?;
self.peerset.add_reserved_peer(peer_id.clone());
self.network.lock().add_known_address(peer_id, addr);
Ok(())
}
}
/// Messages to be handled by Libp2pNetService.
#[derive(Debug)]
pub enum NetworkMsg<B: BlockT + 'static> {
/// Send an outgoing custom message.
Outgoing(PeerId, Message<B>),
/// Disconnect a peer we're connected to, or do nothing if we're not connected.
DisconnectPeer(PeerId),
/// Performs a reputation adjustement on a peer.
ReportPeer(PeerId, i32),
/// Synchronization response.
#[cfg(any(test, feature = "test-helpers"))]
Synchronized,
}
/// Messages sent to Protocol from elsewhere inside the system.
pub enum ProtocolMsg<B: BlockT, S: NetworkSpecialization<B>> {
/// A batch of blocks has been processed, with or without errors.
BlocksProcessed(Vec<B::Hash>, bool),
/// Tell protocol to restart sync.
RestartSync,
/// Tell protocol to propagate extrinsics.
/// Messages sent from the `NetworkService` to the `NetworkWorker`.
///
/// Each entry corresponds to a method of `NetworkService`.
enum ServerToWorkerMsg<B: BlockT, S: NetworkSpecialization<B>> {
PropagateExtrinsics,
/// Tell protocol that a block was imported (sent by the import-queue).
BlockImportedSync(B::Hash, NumberFor<B>),
/// Tell protocol to clear all pending justification requests.
ClearJustificationRequests,
/// Tell protocol to request justification for a block.
RequestJustification(B::Hash, NumberFor<B>),
/// Inform protocol whether a justification was successfully imported.
JustificationImportResult(B::Hash, NumberFor<B>, bool),
/// Set finality proof request builder.
SetFinalityProofRequestBuilder(SharedFinalityProofRequestBuilder<B>),
/// Tell protocol to request finality proof for a block.
RequestFinalityProof(B::Hash, NumberFor<B>),
/// Inform protocol whether a finality proof was successfully imported.
FinalityProofImportResult((B::Hash, NumberFor<B>), Result<(B::Hash, NumberFor<B>), ()>),
/// Propagate a block to peers.
AnnounceBlock(B::Hash),
/// A block has been imported (sent by the client).
BlockImported(B::Hash, B::Header),
/// A block has been finalized (sent by the client).
BlockFinalized(B::Hash, B::Header),
/// Execute a closure with the chain-specific network specialization.
ExecuteWithSpec(Box<dyn SpecTask<B, S> + Send + 'static>),
/// Execute a closure with the consensus gossip.
ExecuteWithGossip(Box<dyn GossipTask<B> + Send + 'static>),
/// Incoming gossip consensus message.
ExecuteWithSpec(Box<dyn FnOnce(&mut S, &mut dyn Context<B>) + Send>),
ExecuteWithGossip(Box<dyn FnOnce(&mut ConsensusGossip<B>, &mut dyn Context<B>) + Send>),
GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec<u8>, GossipMessageRecipient),
/// Tell protocol to perform regular maintenance.
#[cfg(any(test, feature = "test-helpers"))]
Tick,
/// Synchronization request.
#[cfg(any(test, feature = "test-helpers"))]
Synchronize,
GetValue(Multihash),
PutValue(Multihash, Vec<u8>),
AddKnownAddress(PeerId, Multiaddr),
}
/// A task, consisting of a user-provided closure, to be executed on the Protocol thread.
pub trait SpecTask<B: BlockT, S: NetworkSpecialization<B>> {
fn call_box(self: Box<Self>, spec: &mut S, context: &mut dyn Context<B>);
}
impl<B: BlockT, S: NetworkSpecialization<B>, F: FnOnce(&mut S, &mut dyn Context<B>)> SpecTask<B, S> for F {
fn call_box(self: Box<F>, spec: &mut S, context: &mut dyn Context<B>) {
(*self)(spec, context)
}
}
/// A task, consisting of a user-provided closure, to be executed on the Protocol thread.
pub trait GossipTask<B: BlockT> {
fn call_box(self: Box<Self>, gossip: &mut ConsensusGossip<B>, context: &mut dyn Context<B>);
}
impl<B: BlockT, F: FnOnce(&mut ConsensusGossip<B>, &mut dyn Context<B>)> GossipTask<B> for F {
fn call_box(self: Box<F>, gossip: &mut ConsensusGossip<B>, context: &mut dyn Context<B>) {
(*self)(gossip, context)
}
}
/// Future tied to the `Network` service and that must be polled in order for the network to
/// advance.
/// Main network worker. Must be polled in order for the network to advance.
///
/// You are encouraged to poll this in a separate background thread or task.
#[must_use = "The NetworkWorker must be polled in order for the network to work"]
pub struct NetworkWorker<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> {
is_offline: Arc<AtomicBool>,
/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
num_connected: Arc<AtomicUsize>,
/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
is_major_syncing: Arc<AtomicBool>,
/// The network service that can be extracted and shared through the codebase.
service: Arc<NetworkService<B, S, H>>,
network_service: Arc<Mutex<Swarm<B, S, H>>>,
peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<B>>>>,
/// The *actual* network.
network_service: Swarm<B, S, H>,
/// The import queue that was passed as initialization.
import_queue: Box<dyn ImportQueue<B>>,
network_port: mpsc::UnboundedReceiver<NetworkMsg<B>>,
protocol_rx: mpsc::UnboundedReceiver<ProtocolMsg<B, S>>,
peerset: PeersetHandle,
/// Messages from the `NetworkService` and that must be processed.
from_worker: mpsc::UnboundedReceiver<ServerToWorkerMsg<B, S>>,
/// Receiver for queries from the on-demand that must be processed.
on_demand_in: Option<mpsc::UnboundedReceiver<RequestData<B>>>,
/// Interval at which we update the `connected_peers` Arc.
connected_peers_interval: tokio_timer::Interval,
}
impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for NetworkWorker<B, S, H> {
@@ -615,165 +539,63 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// Implementation of `import_queue::Link` trait using the available local variables.
struct NetworkLink<'a, B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
protocol: &'a mut Swarm<B, S, H>,
}
impl<'a, B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Link<B> for NetworkLink<'a, B, S, H> {
fn block_imported(&mut self, hash: &B::Hash, number: NumberFor<B>) {
self.protocol.user_protocol_mut().block_imported(&hash, number)
}
fn blocks_processed(&mut self, hashes: Vec<B::Hash>, has_error: bool) {
self.protocol.user_protocol_mut().blocks_processed(hashes, has_error)
}
fn justification_imported(&mut self, who: PeerId, hash: &B::Hash, number: NumberFor<B>, success: bool) {
self.protocol.user_protocol_mut().justification_import_result(hash.clone(), number, success);
if !success {
info!("Invalid justification provided by {} for #{}", who, hash);
self.protocol.user_protocol_mut().disconnect_peer(&who);
self.protocol.user_protocol_mut().report_peer(who, i32::min_value());
}
}
fn clear_justification_requests(&mut self) {
self.protocol.user_protocol_mut().clear_justification_requests()
}
fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
self.protocol.user_protocol_mut().request_justification(hash, number)
}
fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor<B>) {
self.protocol.user_protocol_mut().request_finality_proof(hash, number)
}
fn finality_proof_imported(
&mut self,
who: PeerId,
request_block: (B::Hash, NumberFor<B>),
finalization_result: Result<(B::Hash, NumberFor<B>), ()>,
) {
let success = finalization_result.is_ok();
self.protocol.user_protocol_mut().finality_proof_import_result(request_block, finalization_result);
if !success {
info!("Invalid finality proof provided by {} for #{}", who, request_block.0);
self.protocol.user_protocol_mut().disconnect_peer(&who);
self.protocol.user_protocol_mut().report_peer(who, i32::min_value());
}
}
fn report_peer(&mut self, who: PeerId, reputation_change: i32) {
self.protocol.user_protocol_mut().report_peer(who, reputation_change)
}
fn restart(&mut self) {
self.protocol.user_protocol_mut().restart()
}
fn set_finality_proof_request_builder(&mut self, builder: SharedFinalityProofRequestBuilder<B>) {
self.protocol.user_protocol_mut().set_finality_proof_request_builder(builder)
}
}
{
let mut network_service = self.network_service.lock();
let mut link = NetworkLink {
protocol: &mut network_service,
};
self.import_queue.poll_actions(&mut link);
}
while let Ok(Async::Ready(_)) = self.connected_peers_interval.poll() {
let mut network_service = self.network_service.lock();
let infos = network_service.user_protocol_mut().peers_info().map(|(id, info)| {
(id.clone(), ConnectedPeer { peer_info: info.clone() })
}).collect();
*self.peers.write() = infos;
}
// Poll the import queue for actions to perform.
self.import_queue.poll_actions(&mut NetworkLink {
protocol: &mut self.network_service,
});
// Check for new incoming on-demand requests.
if let Some(on_demand_in) = self.on_demand_in.as_mut() {
while let Ok(Async::Ready(Some(rq))) = on_demand_in.poll() {
let mut network_service = self.network_service.lock();
network_service.user_protocol_mut().add_on_demand_request(rq);
self.network_service.user_protocol_mut().add_on_demand_request(rq);
}
}
loop {
match self.network_port.poll() {
Ok(Async::NotReady) => break,
Ok(Async::Ready(Some(NetworkMsg::Outgoing(who, outgoing_message)))) =>
self.network_service.lock().user_protocol_mut().send_packet(&who, outgoing_message),
Ok(Async::Ready(Some(NetworkMsg::ReportPeer(who, reputation)))) =>
self.peerset.report_peer(who, reputation),
Ok(Async::Ready(Some(NetworkMsg::DisconnectPeer(who)))) =>
self.network_service.lock().user_protocol_mut().disconnect_peer(&who),
#[cfg(any(test, feature = "test-helpers"))]
Ok(Async::Ready(Some(NetworkMsg::Synchronized))) => {}
Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())),
}
}
loop {
let msg = match self.protocol_rx.poll() {
// Process the next message coming from the `NetworkService`.
let msg = match self.from_worker.poll() {
Ok(Async::Ready(Some(msg))) => msg,
Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())),
Ok(Async::NotReady) => break,
};
let mut network_service = self.network_service.lock();
match msg {
ProtocolMsg::BlockImported(hash, header) =>
network_service.user_protocol_mut().on_block_imported(hash, &header),
ProtocolMsg::BlockFinalized(hash, header) =>
network_service.user_protocol_mut().on_block_finalized(hash, &header),
ProtocolMsg::ExecuteWithSpec(task) => {
let protocol = network_service.user_protocol_mut();
ServerToWorkerMsg::ExecuteWithSpec(task) => {
let protocol = self.network_service.user_protocol_mut();
let (mut context, spec) = protocol.specialization_lock();
task.call_box(spec, &mut context);
task(spec, &mut context);
},
ProtocolMsg::ExecuteWithGossip(task) => {
let protocol = network_service.user_protocol_mut();
ServerToWorkerMsg::ExecuteWithGossip(task) => {
let protocol = self.network_service.user_protocol_mut();
let (mut context, gossip) = protocol.consensus_gossip_lock();
task.call_box(gossip, &mut context);
task(gossip, &mut context);
}
ProtocolMsg::GossipConsensusMessage(topic, engine_id, message, recipient) =>
network_service.user_protocol_mut().gossip_consensus_message(topic, engine_id, message, recipient),
ProtocolMsg::BlocksProcessed(hashes, has_error) =>
network_service.user_protocol_mut().blocks_processed(hashes, has_error),
ProtocolMsg::RestartSync =>
network_service.user_protocol_mut().restart(),
ProtocolMsg::AnnounceBlock(hash) =>
network_service.user_protocol_mut().announce_block(hash),
ProtocolMsg::BlockImportedSync(hash, number) =>
network_service.user_protocol_mut().block_imported(&hash, number),
ProtocolMsg::ClearJustificationRequests =>
network_service.user_protocol_mut().clear_justification_requests(),
ProtocolMsg::RequestJustification(hash, number) =>
network_service.user_protocol_mut().request_justification(&hash, number),
ProtocolMsg::JustificationImportResult(hash, number, success) =>
network_service.user_protocol_mut().justification_import_result(hash, number, success),
ProtocolMsg::SetFinalityProofRequestBuilder(builder) =>
network_service.user_protocol_mut().set_finality_proof_request_builder(builder),
ProtocolMsg::RequestFinalityProof(hash, number) =>
network_service.user_protocol_mut().request_finality_proof(&hash, number),
ProtocolMsg::FinalityProofImportResult(requested_block, finalziation_result) =>
network_service.user_protocol_mut()
.finality_proof_import_result(requested_block, finalziation_result),
ProtocolMsg::PropagateExtrinsics =>
network_service.user_protocol_mut().propagate_extrinsics(),
#[cfg(any(test, feature = "test-helpers"))]
ProtocolMsg::Tick => network_service.user_protocol_mut().tick(),
#[cfg(any(test, feature = "test-helpers"))]
ProtocolMsg::Synchronize => {},
ServerToWorkerMsg::GossipConsensusMessage(topic, engine_id, message, recipient) =>
self.network_service.user_protocol_mut().gossip_consensus_message(topic, engine_id, message, recipient),
ServerToWorkerMsg::AnnounceBlock(hash) =>
self.network_service.user_protocol_mut().announce_block(hash),
ServerToWorkerMsg::RequestJustification(hash, number) =>
self.network_service.user_protocol_mut().request_justification(&hash, number),
ServerToWorkerMsg::PropagateExtrinsics =>
self.network_service.user_protocol_mut().propagate_extrinsics(),
ServerToWorkerMsg::GetValue(key) =>
self.network_service.get_value(&key),
ServerToWorkerMsg::PutValue(key, value) =>
self.network_service.put_value(key, value),
ServerToWorkerMsg::AddKnownAddress(peer_id, addr) =>
self.network_service.add_known_address(peer_id, addr),
}
}
loop {
let mut network_service = self.network_service.lock();
let poll_value = network_service.poll();
// Process the next action coming from the network.
let poll_value = self.network_service.poll();
let outcome = match poll_value {
Ok(Async::NotReady) => break,
Ok(Async::Ready(Some(BehaviourOut::SubstrateAction(outcome)))) => outcome,
Ok(Async::Ready(Some(BehaviourOut::Dht(ev)))) => {
network_service.user_protocol_mut()
self.network_service.user_protocol_mut()
.on_event(Event::Dht(ev));
CustomMessageOutcome::None
},
@@ -795,9 +617,9 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
}
}
let mut network_service = self.network_service.lock();
self.is_offline.store(network_service.user_protocol_mut().num_connected_peers() == 0, Ordering::Relaxed);
self.is_major_syncing.store(match network_service.user_protocol_mut().sync_state() {
// Update the variables shared with the `NetworkService`.
self.num_connected.store(self.network_service.user_protocol_mut().num_connected_peers(), Ordering::Relaxed);
self.is_major_syncing.store(match self.network_service.user_protocol_mut().sync_state() {
SyncState::Idle => false,
SyncState::Downloading => true,
}, Ordering::Relaxed);
@@ -811,3 +633,57 @@ type Swarm<B, S, H> = libp2p::core::Swarm<
Boxed<(PeerId, StreamMuxerBox), io::Error>,
Behaviour<B, S, H>
>;
// Implementation of `import_queue::Link` trait using the available local variables.
struct NetworkLink<'a, B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
protocol: &'a mut Swarm<B, S, H>,
}
impl<'a, B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Link<B> for NetworkLink<'a, B, S, H> {
fn block_imported(&mut self, hash: &B::Hash, number: NumberFor<B>) {
self.protocol.user_protocol_mut().block_imported(&hash, number)
}
fn blocks_processed(&mut self, hashes: Vec<B::Hash>, has_error: bool) {
self.protocol.user_protocol_mut().blocks_processed(hashes, has_error)
}
fn justification_imported(&mut self, who: PeerId, hash: &B::Hash, number: NumberFor<B>, success: bool) {
self.protocol.user_protocol_mut().justification_import_result(hash.clone(), number, success);
if !success {
info!("Invalid justification provided by {} for #{}", who, hash);
self.protocol.user_protocol_mut().disconnect_peer(&who);
self.protocol.user_protocol_mut().report_peer(who, i32::min_value());
}
}
fn clear_justification_requests(&mut self) {
self.protocol.user_protocol_mut().clear_justification_requests()
}
fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
self.protocol.user_protocol_mut().request_justification(hash, number)
}
fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor<B>) {
self.protocol.user_protocol_mut().request_finality_proof(hash, number)
}
fn finality_proof_imported(
&mut self,
who: PeerId,
request_block: (B::Hash, NumberFor<B>),
finalization_result: Result<(B::Hash, NumberFor<B>), ()>,
) {
let success = finalization_result.is_ok();
self.protocol.user_protocol_mut().finality_proof_import_result(request_block, finalization_result);
if !success {
info!("Invalid finality proof provided by {} for #{}", who, request_block.0);
self.protocol.user_protocol_mut().disconnect_peer(&who);
self.protocol.user_protocol_mut().report_peer(who, i32::min_value());
}
}
fn report_peer(&mut self, who: PeerId, reputation_change: i32) {
self.protocol.user_protocol_mut().report_peer(who, reputation_change)
}
fn restart(&mut self) {
self.protocol.user_protocol_mut().restart()
}
fn set_finality_proof_request_builder(&mut self, builder: SharedFinalityProofRequestBuilder<B>) {
self.protocol.user_protocol_mut().set_finality_proof_request_builder(builder)
}
}