mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 10:31:03 +00:00
Add ProtocolBehaviour (#2922)
* Add ProtocolBehaviour * Fix tests * Line widths * Address concerns * Apply suggestions from code review Co-Authored-By: Bastian Köcher <bkchr@users.noreply.github.com> * Remove TODO
This commit is contained in:
committed by
Gavin Wood
parent
48aa32bece
commit
e919d03331
@@ -125,9 +125,10 @@ pub(crate) fn global_topic<B: BlockT>(set_id: u64) -> B::Hash {
|
|||||||
<<B::Header as HeaderT>::Hashing as HashT>::hash(format!("{}-GLOBAL", set_id).as_bytes())
|
<<B::Header as HeaderT>::Hashing as HashT>::hash(format!("{}-GLOBAL", set_id).as_bytes())
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<B, S> Network<B> for Arc<NetworkService<B, S>> where
|
impl<B, S, H> Network<B> for Arc<NetworkService<B, S, H>> where
|
||||||
B: BlockT,
|
B: BlockT,
|
||||||
S: network::specialization::NetworkSpecialization<B>,
|
S: network::specialization::NetworkSpecialization<B>,
|
||||||
|
H: network::ExHashT,
|
||||||
{
|
{
|
||||||
type In = NetworkStream;
|
type In = NetworkStream;
|
||||||
|
|
||||||
|
|||||||
@@ -169,6 +169,7 @@ mod discovery;
|
|||||||
mod on_demand_layer;
|
mod on_demand_layer;
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
mod protocol;
|
mod protocol;
|
||||||
|
mod protocol_behaviour;
|
||||||
mod service;
|
mod service;
|
||||||
mod transport;
|
mod transport;
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,463 @@
|
|||||||
|
// Copyright 2019 Parity Technologies (UK) Ltd.
|
||||||
|
// This file is part of Substrate.
|
||||||
|
|
||||||
|
// Substrate is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// Substrate is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
//! Implementation of libp2p's `NetworkBehaviour` trait that handles everything Substrate-specific.
|
||||||
|
|
||||||
|
use crate::{ExHashT, DiscoveryNetBehaviour, ProtocolId};
|
||||||
|
use crate::custom_proto::{CustomProto, CustomProtoOut};
|
||||||
|
use crate::chain::{Client, FinalityProofProvider};
|
||||||
|
use crate::protocol::{self, CustomMessageOutcome, Protocol, ProtocolConfig, sync::SyncState};
|
||||||
|
use crate::protocol::{PeerInfo, NetworkOut, message::Message, on_demand::RequestData};
|
||||||
|
use crate::protocol::consensus_gossip::MessageRecipient as GossipMessageRecipient;
|
||||||
|
use crate::protocol::specialization::NetworkSpecialization;
|
||||||
|
use crate::service::TransactionPool;
|
||||||
|
|
||||||
|
use client::light::fetcher::FetchChecker;
|
||||||
|
use futures::prelude::*;
|
||||||
|
use consensus::import_queue::SharedFinalityProofRequestBuilder;
|
||||||
|
use log::debug;
|
||||||
|
use libp2p::{PeerId, Multiaddr};
|
||||||
|
use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
|
||||||
|
use libp2p::core::{nodes::Substream, muxing::StreamMuxerBox};
|
||||||
|
use libp2p::core::protocols_handler::{ProtocolsHandler, IntoProtocolsHandler};
|
||||||
|
use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
/// Implementation of `NetworkBehaviour` that handles everything related to Substrate and Polkadot.
|
||||||
|
pub struct ProtocolBehaviour<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
|
||||||
|
/// Handles opening the unique substream and sending and receiving raw messages.
|
||||||
|
behaviour: CustomProto<Message<B>, Substream<StreamMuxerBox>>,
|
||||||
|
/// Handles the logic behind the raw messages that we receive.
|
||||||
|
protocol: Protocol<B, S, H>,
|
||||||
|
/// Used to report reputation changes.
|
||||||
|
peerset_handle: peerset::PeersetHandle,
|
||||||
|
transaction_pool: Arc<dyn TransactionPool<H, B>>,
|
||||||
|
/// When asked for a proof of finality, we use this struct to build one.
|
||||||
|
finality_proof_provider: Option<Arc<dyn FinalityProofProvider<B>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> ProtocolBehaviour<B, S, H> {
|
||||||
|
/// Builds a new `ProtocolBehaviour`.
|
||||||
|
pub fn new(
|
||||||
|
config: ProtocolConfig,
|
||||||
|
chain: Arc<dyn Client<B>>,
|
||||||
|
checker: Arc<dyn FetchChecker<B>>,
|
||||||
|
specialization: S,
|
||||||
|
transaction_pool: Arc<dyn TransactionPool<H, B>>,
|
||||||
|
finality_proof_provider: Option<Arc<dyn FinalityProofProvider<B>>>,
|
||||||
|
protocol_id: ProtocolId,
|
||||||
|
versions: &[u8],
|
||||||
|
peerset: peerset::Peerset,
|
||||||
|
peerset_handle: peerset::PeersetHandle,
|
||||||
|
) -> crate::error::Result<Self> {
|
||||||
|
let protocol = Protocol::new(config, chain, checker, specialization)?;
|
||||||
|
let behaviour = CustomProto::new(protocol_id, versions, peerset);
|
||||||
|
|
||||||
|
Ok(ProtocolBehaviour {
|
||||||
|
protocol,
|
||||||
|
behaviour,
|
||||||
|
peerset_handle,
|
||||||
|
transaction_pool,
|
||||||
|
finality_proof_provider,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the list of all the peers we have an open channel to.
|
||||||
|
pub fn open_peers(&self) -> impl Iterator<Item = &PeerId> {
|
||||||
|
self.behaviour.open_peers()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns true if we have a channel open with this node.
|
||||||
|
pub fn is_open(&self, peer_id: &PeerId) -> bool {
|
||||||
|
self.behaviour.is_open(peer_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Disconnects the given peer if we are connected to it.
|
||||||
|
pub fn disconnect_peer(&mut self, peer_id: &PeerId) {
|
||||||
|
self.behaviour.disconnect_peer(peer_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Adjusts the reputation of a node.
|
||||||
|
pub fn report_peer(&mut self, who: PeerId, reputation: i32) {
|
||||||
|
self.peerset_handle.report_peer(who, reputation)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns true if we try to open protocols with the given peer.
|
||||||
|
pub fn is_enabled(&self, peer_id: &PeerId) -> bool {
|
||||||
|
self.behaviour.is_enabled(peer_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sends a message to a peer.
|
||||||
|
///
|
||||||
|
/// Has no effect if the custom protocol is not open with the given peer.
|
||||||
|
///
|
||||||
|
/// Also note that even we have a valid open substream, it may in fact be already closed
|
||||||
|
/// without us knowing, in which case the packet will not be received.
|
||||||
|
pub fn send_packet(&mut self, target: &PeerId, message: Message<B>) {
|
||||||
|
self.behaviour.send_packet(target, message)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the state of the peerset manager, for debugging purposes.
|
||||||
|
pub fn peerset_debug_info(&mut self) -> serde_json::Value {
|
||||||
|
self.behaviour.peerset_debug_info()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the number of peers we're connected to.
|
||||||
|
pub fn num_connected_peers(&self) -> usize {
|
||||||
|
self.protocol.num_connected_peers()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the number of peers we're connected to and that are being queried.
|
||||||
|
pub fn num_active_peers(&self) -> usize {
|
||||||
|
self.protocol.num_active_peers()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Current global sync state.
|
||||||
|
pub fn sync_state(&self) -> SyncState {
|
||||||
|
self.protocol.sync_state()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Target sync block number.
|
||||||
|
pub fn best_seen_block(&self) -> Option<NumberFor<B>> {
|
||||||
|
self.protocol.best_seen_block()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Number of peers participating in syncing.
|
||||||
|
pub fn num_sync_peers(&self) -> u32 {
|
||||||
|
self.protocol.num_sync_peers()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Starts a new data demand request.
|
||||||
|
///
|
||||||
|
/// The parameter contains a `Sender` where the result, once received, must be sent.
|
||||||
|
pub(crate) fn add_on_demand_request(&mut self, rq: RequestData<B>) {
|
||||||
|
self.protocol.add_on_demand_request(
|
||||||
|
&mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle },
|
||||||
|
rq
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns information about all the peers we are connected to after the handshake message.
|
||||||
|
pub fn peers_info(&self) -> impl Iterator<Item = (&PeerId, &PeerInfo<B>)> {
|
||||||
|
self.protocol.peers_info()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Locks `self` and gives access to the protocol and a context that can be used in order to
|
||||||
|
/// use `consensus_gossip_lock` or `specialization_lock`.
|
||||||
|
///
|
||||||
|
/// **Important**: ONLY USE THIS FUNCTION TO CALL `consensus_gossip_lock` or `specialization_lock`.
|
||||||
|
/// This function is a very bad API.
|
||||||
|
pub fn protocol_context_lock<'a>(
|
||||||
|
&'a mut self,
|
||||||
|
) -> (&'a mut Protocol<B, S, H>, LocalNetworkOut<'a, B>) {
|
||||||
|
let net_out = LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle };
|
||||||
|
(&mut self.protocol, net_out)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gossip a consensus message to the network.
|
||||||
|
pub fn gossip_consensus_message(
|
||||||
|
&mut self,
|
||||||
|
topic: B::Hash,
|
||||||
|
engine_id: ConsensusEngineId,
|
||||||
|
message: Vec<u8>,
|
||||||
|
recipient: GossipMessageRecipient,
|
||||||
|
) {
|
||||||
|
self.protocol.gossip_consensus_message(
|
||||||
|
&mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle },
|
||||||
|
topic,
|
||||||
|
engine_id,
|
||||||
|
message,
|
||||||
|
recipient
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Call when we must propagate ready extrinsics to peers.
|
||||||
|
pub fn propagate_extrinsics(&mut self) {
|
||||||
|
self.protocol.propagate_extrinsics(
|
||||||
|
&mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle },
|
||||||
|
&*self.transaction_pool
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Make sure an important block is propagated to peers.
|
||||||
|
///
|
||||||
|
/// In chain-based consensus, we often need to make sure non-best forks are
|
||||||
|
/// at least temporarily synced.
|
||||||
|
pub fn announce_block(&mut self, hash: B::Hash) {
|
||||||
|
self.protocol.announce_block(
|
||||||
|
&mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle },
|
||||||
|
hash
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Call this when a block has been imported in the import queue and we should announce it on
|
||||||
|
/// the network.
|
||||||
|
pub fn on_block_imported(&mut self, hash: B::Hash, header: &B::Header) {
|
||||||
|
self.protocol.on_block_imported(
|
||||||
|
&mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle },
|
||||||
|
hash,
|
||||||
|
header
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Call this when a block has been finalized. The sync layer may have some additional
|
||||||
|
/// requesting to perform.
|
||||||
|
pub fn on_block_finalized(&mut self, hash: B::Hash, header: &B::Header) {
|
||||||
|
self.protocol.on_block_finalized(
|
||||||
|
&mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle },
|
||||||
|
hash,
|
||||||
|
header
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Request a justification for the given block.
|
||||||
|
///
|
||||||
|
/// Uses `protocol` to queue a new justification request and tries to dispatch all pending
|
||||||
|
/// requests.
|
||||||
|
pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
|
||||||
|
self.protocol.request_justification(
|
||||||
|
&mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle },
|
||||||
|
hash,
|
||||||
|
number
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Clears all pending justification requests.
|
||||||
|
pub fn clear_justification_requests(&mut self) {
|
||||||
|
self.protocol.clear_justification_requests()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A batch of blocks have been processed, with or without errors.
|
||||||
|
/// Call this when a batch of blocks have been processed by the import queue, with or without
|
||||||
|
/// errors.
|
||||||
|
pub fn blocks_processed(
|
||||||
|
&mut self,
|
||||||
|
processed_blocks: Vec<B::Hash>,
|
||||||
|
has_error: bool,
|
||||||
|
) {
|
||||||
|
self.protocol.blocks_processed(
|
||||||
|
&mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle },
|
||||||
|
processed_blocks,
|
||||||
|
has_error,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Restart the sync process.
|
||||||
|
pub fn restart(&mut self) {
|
||||||
|
let mut net_out = LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle };
|
||||||
|
self.protocol.restart(&mut net_out);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Notify about successful import of the given block.
|
||||||
|
pub fn block_imported(&mut self, hash: &B::Hash, number: NumberFor<B>) {
|
||||||
|
self.protocol.block_imported(hash, number)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_finality_proof_request_builder(&mut self, request_builder: SharedFinalityProofRequestBuilder<B>) {
|
||||||
|
self.protocol.set_finality_proof_request_builder(request_builder)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Call this when a justification has been processed by the import queue, with or without
|
||||||
|
/// errors.
|
||||||
|
pub fn justification_import_result(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
|
||||||
|
self.protocol.justification_import_result(hash, number, success)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Request a finality proof for the given block.
|
||||||
|
///
|
||||||
|
/// Queues a new finality proof request and tries to dispatch all pending requests.
|
||||||
|
pub fn request_finality_proof(
|
||||||
|
&mut self,
|
||||||
|
hash: &B::Hash,
|
||||||
|
number: NumberFor<B>,
|
||||||
|
) {
|
||||||
|
self.protocol.request_finality_proof(
|
||||||
|
&mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle },
|
||||||
|
&hash,
|
||||||
|
number,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn finality_proof_import_result(
|
||||||
|
&mut self,
|
||||||
|
request_block: (B::Hash, NumberFor<B>),
|
||||||
|
finalization_result: Result<(B::Hash, NumberFor<B>), ()>,
|
||||||
|
) {
|
||||||
|
self.protocol.finality_proof_import_result(request_block, finalization_result)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn tick(&mut self) {
|
||||||
|
self.protocol.tick(&mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> NetworkBehaviour for
|
||||||
|
ProtocolBehaviour<B, S, H> {
|
||||||
|
type ProtocolsHandler = <CustomProto<Message<B>, Substream<StreamMuxerBox>> as NetworkBehaviour>::ProtocolsHandler;
|
||||||
|
type OutEvent = CustomMessageOutcome<B>;
|
||||||
|
|
||||||
|
fn new_handler(&mut self) -> Self::ProtocolsHandler {
|
||||||
|
self.behaviour.new_handler()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
|
||||||
|
self.behaviour.addresses_of_peer(peer_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) {
|
||||||
|
self.behaviour.inject_connected(peer_id, endpoint)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) {
|
||||||
|
self.behaviour.inject_disconnected(peer_id, endpoint)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_node_event(
|
||||||
|
&mut self,
|
||||||
|
peer_id: PeerId,
|
||||||
|
event: <<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent,
|
||||||
|
) {
|
||||||
|
self.behaviour.inject_node_event(peer_id, event)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll(
|
||||||
|
&mut self,
|
||||||
|
params: &mut PollParameters,
|
||||||
|
) -> Async<
|
||||||
|
NetworkBehaviourAction<
|
||||||
|
<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
|
||||||
|
Self::OutEvent
|
||||||
|
>
|
||||||
|
> {
|
||||||
|
let mut net_out = LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle };
|
||||||
|
match self.protocol.poll(&mut net_out, &*self.transaction_pool) {
|
||||||
|
Ok(Async::Ready(v)) => void::unreachable(v),
|
||||||
|
Ok(Async::NotReady) => {}
|
||||||
|
Err(err) => void::unreachable(err),
|
||||||
|
}
|
||||||
|
|
||||||
|
let event = match self.behaviour.poll(params) {
|
||||||
|
Async::NotReady => return Async::NotReady,
|
||||||
|
Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)) => ev,
|
||||||
|
Async::Ready(NetworkBehaviourAction::DialAddress { address }) =>
|
||||||
|
return Async::Ready(NetworkBehaviourAction::DialAddress { address }),
|
||||||
|
Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }) =>
|
||||||
|
return Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }),
|
||||||
|
Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) =>
|
||||||
|
return Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }),
|
||||||
|
Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) =>
|
||||||
|
return Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut network_out = LocalNetworkOut {
|
||||||
|
inner: &mut self.behaviour,
|
||||||
|
peerset_handle: &self.peerset_handle,
|
||||||
|
};
|
||||||
|
|
||||||
|
let outcome = match event {
|
||||||
|
CustomProtoOut::CustomProtocolOpen { peer_id, version, .. } => {
|
||||||
|
debug_assert!(
|
||||||
|
version <= protocol::CURRENT_VERSION as u8
|
||||||
|
&& version >= protocol::MIN_VERSION as u8
|
||||||
|
);
|
||||||
|
self.protocol.on_peer_connected(&mut network_out, peer_id);
|
||||||
|
CustomMessageOutcome::None
|
||||||
|
}
|
||||||
|
CustomProtoOut::CustomProtocolClosed { peer_id, .. } => {
|
||||||
|
self.protocol.on_peer_disconnected(&mut network_out, peer_id);
|
||||||
|
CustomMessageOutcome::None
|
||||||
|
},
|
||||||
|
CustomProtoOut::CustomMessage { peer_id, message } =>
|
||||||
|
self.protocol.on_custom_message(
|
||||||
|
&mut network_out,
|
||||||
|
&*self.transaction_pool,
|
||||||
|
peer_id,
|
||||||
|
message,
|
||||||
|
self.finality_proof_provider.as_ref().map(|p| &**p)
|
||||||
|
),
|
||||||
|
CustomProtoOut::Clogged { peer_id, messages } => {
|
||||||
|
debug!(target: "sync", "{} clogging messages:", messages.len());
|
||||||
|
for msg in messages.into_iter().take(5) {
|
||||||
|
debug!(target: "sync", "{:?}", msg);
|
||||||
|
self.protocol.on_clogged_peer(&mut network_out, peer_id.clone(), Some(msg));
|
||||||
|
}
|
||||||
|
CustomMessageOutcome::None
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if let CustomMessageOutcome::None = outcome {
|
||||||
|
Async::NotReady
|
||||||
|
} else {
|
||||||
|
Async::Ready(NetworkBehaviourAction::GenerateEvent(outcome))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_replaced(&mut self, peer_id: PeerId, closed_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) {
|
||||||
|
self.behaviour.inject_replaced(peer_id, closed_endpoint, new_endpoint)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_addr_reach_failure(
|
||||||
|
&mut self,
|
||||||
|
peer_id: Option<&PeerId>,
|
||||||
|
addr: &Multiaddr,
|
||||||
|
error: &dyn std::error::Error
|
||||||
|
) {
|
||||||
|
self.behaviour.inject_addr_reach_failure(peer_id, addr, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
|
||||||
|
self.behaviour.inject_dial_failure(peer_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
|
||||||
|
self.behaviour.inject_new_listen_addr(addr)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
|
||||||
|
self.behaviour.inject_expired_listen_addr(addr)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
|
||||||
|
self.behaviour.inject_new_external_addr(addr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> DiscoveryNetBehaviour
|
||||||
|
for ProtocolBehaviour<B, S, H> {
|
||||||
|
fn add_discovered_nodes(&mut self, peer_ids: impl Iterator<Item = PeerId>) {
|
||||||
|
self.behaviour.add_discovered_nodes(peer_ids)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Has to be public for stupid API reasons. This should be made private again ASAP.
|
||||||
|
pub struct LocalNetworkOut<'a, B: BlockT> {
|
||||||
|
inner: &'a mut CustomProto<Message<B>, Substream<StreamMuxerBox>>,
|
||||||
|
peerset_handle: &'a peerset::PeersetHandle,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, B: BlockT> NetworkOut<B> for LocalNetworkOut<'a, B> {
|
||||||
|
fn report_peer(&mut self, who: PeerId, reputation: i32) {
|
||||||
|
self.peerset_handle.report_peer(who, reputation)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn disconnect_peer(&mut self, who: PeerId) {
|
||||||
|
self.inner.disconnect_peer(&who)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_message(&mut self, who: PeerId, message: Message<B>) {
|
||||||
|
self.inner.send_packet(&who, message)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -20,26 +20,24 @@ use std::sync::Arc;
|
|||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use log::{warn, debug, error, info};
|
use log::{warn, error, info};
|
||||||
use libp2p::core::swarm::NetworkBehaviour;
|
use libp2p::core::swarm::NetworkBehaviour;
|
||||||
use libp2p::core::{nodes::Substream, transport::boxed::Boxed, muxing::StreamMuxerBox};
|
use libp2p::core::{nodes::Substream, transport::boxed::Boxed, muxing::StreamMuxerBox};
|
||||||
use futures::{prelude::*, sync::oneshot, sync::mpsc};
|
use futures::{prelude::*, sync::oneshot, sync::mpsc};
|
||||||
use parking_lot::{Mutex, RwLock};
|
use parking_lot::{Mutex, RwLock};
|
||||||
use crate::custom_proto::{CustomProto, CustomProtoOut};
|
use crate::protocol_behaviour::ProtocolBehaviour;
|
||||||
use crate::{behaviour::Behaviour, parse_str_addr, ProtocolId};
|
use crate::{behaviour::Behaviour, parse_str_addr};
|
||||||
use crate::{NetworkState, NetworkStateNotConnectedPeer, NetworkStatePeer};
|
use crate::{NetworkState, NetworkStateNotConnectedPeer, NetworkStatePeer};
|
||||||
use crate::{transport, config::NodeKeyConfig, config::NonReservedPeerMode, config::NetworkConfiguration};
|
use crate::{transport, config::NodeKeyConfig, config::NonReservedPeerMode};
|
||||||
use peerset::PeersetHandle;
|
use peerset::PeersetHandle;
|
||||||
use consensus::import_queue::{ImportQueue, Link, SharedFinalityProofRequestBuilder};
|
use consensus::import_queue::{ImportQueue, Link, SharedFinalityProofRequestBuilder};
|
||||||
use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId};
|
use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId};
|
||||||
|
|
||||||
use crate::AlwaysBadChecker;
|
use crate::AlwaysBadChecker;
|
||||||
use crate::chain::FinalityProofProvider;
|
|
||||||
use crate::protocol::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient};
|
use crate::protocol::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient};
|
||||||
use crate::protocol::message::Message;
|
use crate::protocol::message::Message;
|
||||||
use crate::protocol::on_demand::RequestData;
|
use crate::protocol::on_demand::RequestData;
|
||||||
use crate::protocol::{self, Context, CustomMessageOutcome, Protocol, ConnectedPeer};
|
use crate::protocol::{self, Context, CustomMessageOutcome, ConnectedPeer, PeerInfo};
|
||||||
use crate::protocol::{PeerInfo, NetworkOut};
|
|
||||||
use crate::protocol::sync::SyncState;
|
use crate::protocol::sync::SyncState;
|
||||||
use crate::config::Params;
|
use crate::config::Params;
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
@@ -88,7 +86,7 @@ impl ReportHandle {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Substrate network service. Handles network IO and manages connectivity.
|
/// Substrate network service. Handles network IO and manages connectivity.
|
||||||
pub struct NetworkService<B: BlockT + 'static, S: NetworkSpecialization<B>> {
|
pub struct NetworkService<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> {
|
||||||
/// Are we connected to any peer?
|
/// Are we connected to any peer?
|
||||||
is_offline: Arc<AtomicBool>,
|
is_offline: Arc<AtomicBool>,
|
||||||
/// Are we actively catching up with the chain?
|
/// Are we actively catching up with the chain?
|
||||||
@@ -98,7 +96,7 @@ pub struct NetworkService<B: BlockT + 'static, S: NetworkSpecialization<B>> {
|
|||||||
/// Channel for networking messages processed by the background thread.
|
/// Channel for networking messages processed by the background thread.
|
||||||
network_chan: mpsc::UnboundedSender<NetworkMsg<B>>,
|
network_chan: mpsc::UnboundedSender<NetworkMsg<B>>,
|
||||||
/// Network service
|
/// Network service
|
||||||
network: Arc<Mutex<Swarm<B>>>,
|
network: Arc<Mutex<Swarm<B, S, H>>>,
|
||||||
/// Bandwidth logging system. Can be queried to know the average bandwidth consumed.
|
/// Bandwidth logging system. Can be queried to know the average bandwidth consumed.
|
||||||
bandwidth: Arc<transport::BandwidthSinks>,
|
bandwidth: Arc<transport::BandwidthSinks>,
|
||||||
/// Peerset manager (PSM); manages the reputation of nodes and indicates the network which
|
/// Peerset manager (PSM); manages the reputation of nodes and indicates the network which
|
||||||
@@ -120,28 +118,106 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
|
|||||||
let (network_chan, network_port) = mpsc::unbounded();
|
let (network_chan, network_port) = mpsc::unbounded();
|
||||||
let (protocol_sender, protocol_rx) = mpsc::unbounded();
|
let (protocol_sender, protocol_rx) = mpsc::unbounded();
|
||||||
|
|
||||||
|
if let Some(ref path) = params.network_config.net_config_path {
|
||||||
|
fs::create_dir_all(Path::new(path))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// List of multiaddresses that we know in the network.
|
||||||
|
let mut known_addresses = Vec::new();
|
||||||
|
let mut bootnodes = Vec::new();
|
||||||
|
let mut reserved_nodes = Vec::new();
|
||||||
|
|
||||||
|
// Process the bootnodes.
|
||||||
|
for bootnode in params.network_config.boot_nodes.iter() {
|
||||||
|
match parse_str_addr(bootnode) {
|
||||||
|
Ok((peer_id, addr)) => {
|
||||||
|
bootnodes.push(peer_id.clone());
|
||||||
|
known_addresses.push((peer_id, addr));
|
||||||
|
},
|
||||||
|
Err(_) => warn!(target: "sub-libp2p", "Not a valid bootnode address: {}", bootnode),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize the reserved peers.
|
||||||
|
for reserved in params.network_config.reserved_nodes.iter() {
|
||||||
|
if let Ok((peer_id, addr)) = parse_str_addr(reserved) {
|
||||||
|
reserved_nodes.push(peer_id.clone());
|
||||||
|
known_addresses.push((peer_id, addr));
|
||||||
|
} else {
|
||||||
|
warn!(target: "sub-libp2p", "Not a valid reserved node address: {}", reserved);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build the peerset.
|
||||||
|
let (peerset, peerset_handle) = peerset::Peerset::from_config(peerset::PeersetConfig {
|
||||||
|
in_peers: params.network_config.in_peers,
|
||||||
|
out_peers: params.network_config.out_peers,
|
||||||
|
bootnodes,
|
||||||
|
reserved_only: params.network_config.non_reserved_mode == NonReservedPeerMode::Deny,
|
||||||
|
reserved_nodes,
|
||||||
|
});
|
||||||
|
|
||||||
|
// Private and public keys configuration.
|
||||||
|
if let NodeKeyConfig::Secp256k1(_) = params.network_config.node_key {
|
||||||
|
warn!(target: "sub-libp2p", "Secp256k1 keys are deprecated in favour of ed25519");
|
||||||
|
}
|
||||||
|
let local_identity = params.network_config.node_key.clone().into_keypair()?;
|
||||||
|
let local_public = local_identity.public();
|
||||||
|
let local_peer_id = local_public.clone().into_peer_id();
|
||||||
|
info!(target: "sub-libp2p", "Local node identity is: {}", local_peer_id.to_base58());
|
||||||
|
|
||||||
// Start in off-line mode, since we're not connected to any nodes yet.
|
// Start in off-line mode, since we're not connected to any nodes yet.
|
||||||
let is_offline = Arc::new(AtomicBool::new(true));
|
let is_offline = Arc::new(AtomicBool::new(true));
|
||||||
let is_major_syncing = Arc::new(AtomicBool::new(false));
|
let is_major_syncing = Arc::new(AtomicBool::new(false));
|
||||||
let peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<B>>>> = Arc::new(Default::default());
|
let peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<B>>>> = Arc::new(Default::default());
|
||||||
let protocol = Protocol::new(
|
let protocol = ProtocolBehaviour::new(
|
||||||
protocol::ProtocolConfig { roles: params.roles },
|
protocol::ProtocolConfig { roles: params.roles },
|
||||||
params.chain,
|
params.chain,
|
||||||
params.on_demand.as_ref().map(|od| od.checker().clone())
|
params.on_demand.as_ref().map(|od| od.checker().clone())
|
||||||
.unwrap_or(Arc::new(AlwaysBadChecker)),
|
.unwrap_or(Arc::new(AlwaysBadChecker)),
|
||||||
params.specialization,
|
params.specialization,
|
||||||
|
params.transaction_pool,
|
||||||
|
params.finality_proof_provider,
|
||||||
|
params.protocol_id,
|
||||||
|
&((protocol::MIN_VERSION as u8)..=(protocol::CURRENT_VERSION as u8)).collect::<Vec<u8>>(),
|
||||||
|
peerset,
|
||||||
|
peerset_handle.clone(),
|
||||||
)?;
|
)?;
|
||||||
let versions: Vec<_> = ((protocol::MIN_VERSION as u8)..=(protocol::CURRENT_VERSION as u8)).collect();
|
|
||||||
|
|
||||||
// Start the main service.
|
// Build the swarm.
|
||||||
let (network, bandwidth, peerset) =
|
let (mut swarm, bandwidth) = {
|
||||||
match start_service::<B, _>(params.network_config, params.protocol_id, &versions) {
|
let user_agent = format!(
|
||||||
Ok((network, bandwidth, peerset)) => (Arc::new(Mutex::new(network)), bandwidth, peerset),
|
"{} ({})",
|
||||||
Err(err) => {
|
params.network_config.client_version,
|
||||||
warn!("Error starting network: {}", err);
|
params.network_config.node_name
|
||||||
return Err(err.into())
|
);
|
||||||
},
|
let behaviour = Behaviour::new(
|
||||||
};
|
protocol,
|
||||||
|
user_agent,
|
||||||
|
local_public,
|
||||||
|
known_addresses,
|
||||||
|
params.network_config.enable_mdns
|
||||||
|
);
|
||||||
|
let (transport, bandwidth) = transport::build_transport(
|
||||||
|
local_identity,
|
||||||
|
params.network_config.wasm_external_transport
|
||||||
|
);
|
||||||
|
(Swarm::<B, S, H>::new(transport, behaviour, local_peer_id.clone()), bandwidth)
|
||||||
|
};
|
||||||
|
|
||||||
|
// Listen on multiaddresses.
|
||||||
|
for addr in ¶ms.network_config.listen_addresses {
|
||||||
|
if let Err(err) = Swarm::<B, S, H>::listen_on(&mut swarm, addr.clone()) {
|
||||||
|
warn!(target: "sub-libp2p", "Can't listen on {} because: {:?}", addr, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add external addresses.
|
||||||
|
for addr in ¶ms.network_config.public_addresses {
|
||||||
|
Swarm::<B, S, H>::add_external_address(&mut swarm, addr.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
let network = Arc::new(Mutex::new(swarm));
|
||||||
|
|
||||||
let service = Arc::new(NetworkService {
|
let service = Arc::new(NetworkService {
|
||||||
bandwidth,
|
bandwidth,
|
||||||
@@ -149,7 +225,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
|
|||||||
is_major_syncing: is_major_syncing.clone(),
|
is_major_syncing: is_major_syncing.clone(),
|
||||||
network_chan,
|
network_chan,
|
||||||
peers: peers.clone(),
|
peers: peers.clone(),
|
||||||
peerset: peerset.clone(),
|
peerset: peerset_handle.clone(),
|
||||||
network: network.clone(),
|
network: network.clone(),
|
||||||
protocol_sender: protocol_sender.clone(),
|
protocol_sender: protocol_sender.clone(),
|
||||||
});
|
});
|
||||||
@@ -158,13 +234,10 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
|
|||||||
is_offline,
|
is_offline,
|
||||||
is_major_syncing,
|
is_major_syncing,
|
||||||
network_service: network,
|
network_service: network,
|
||||||
peerset,
|
peerset: peerset_handle,
|
||||||
service,
|
service,
|
||||||
protocol,
|
|
||||||
peers,
|
peers,
|
||||||
import_queue: params.import_queue,
|
import_queue: params.import_queue,
|
||||||
transaction_pool: params.transaction_pool,
|
|
||||||
finality_proof_provider: params.finality_proof_provider,
|
|
||||||
network_port,
|
network_port,
|
||||||
protocol_rx,
|
protocol_rx,
|
||||||
on_demand_in: params.on_demand.and_then(|od| od.extract_receiver()),
|
on_demand_in: params.on_demand.and_then(|od| od.extract_receiver()),
|
||||||
@@ -184,40 +257,40 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
|
|||||||
|
|
||||||
/// Returns the number of peers we're connected to.
|
/// Returns the number of peers we're connected to.
|
||||||
pub fn num_connected_peers(&self) -> usize {
|
pub fn num_connected_peers(&self) -> usize {
|
||||||
self.protocol.num_connected_peers()
|
self.network_service.lock().user_protocol_mut().num_connected_peers()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the number of peers we're connected to and that are being queried.
|
/// Returns the number of peers we're connected to and that are being queried.
|
||||||
pub fn num_active_peers(&self) -> usize {
|
pub fn num_active_peers(&self) -> usize {
|
||||||
self.protocol.num_active_peers()
|
self.network_service.lock().user_protocol_mut().num_active_peers()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Current global sync state.
|
/// Current global sync state.
|
||||||
pub fn sync_state(&self) -> SyncState {
|
pub fn sync_state(&self) -> SyncState {
|
||||||
self.protocol.sync_state()
|
self.network_service.lock().user_protocol_mut().sync_state()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Target sync block number.
|
/// Target sync block number.
|
||||||
pub fn best_seen_block(&self) -> Option<NumberFor<B>> {
|
pub fn best_seen_block(&self) -> Option<NumberFor<B>> {
|
||||||
self.protocol.best_seen_block()
|
self.network_service.lock().user_protocol_mut().best_seen_block()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Number of peers participating in syncing.
|
/// Number of peers participating in syncing.
|
||||||
pub fn num_sync_peers(&self) -> u32 {
|
pub fn num_sync_peers(&self) -> u32 {
|
||||||
self.protocol.num_sync_peers()
|
self.network_service.lock().user_protocol_mut().num_sync_peers()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return a `NetworkService` that can be shared through the code base and can be used to
|
/// Return a `NetworkService` that can be shared through the code base and can be used to
|
||||||
/// manipulate the worker.
|
/// manipulate the worker.
|
||||||
pub fn service(&self) -> &Arc<NetworkService<B, S>> {
|
pub fn service(&self) -> &Arc<NetworkService<B, S, H>> {
|
||||||
&self.service
|
&self.service
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<B: BlockT + 'static, S: NetworkSpecialization<B>> NetworkService<B, S> {
|
impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkService<B, S, H> {
|
||||||
/// Returns the network identity of the node.
|
/// Returns the network identity of the node.
|
||||||
pub fn local_peer_id(&self) -> PeerId {
|
pub fn local_peer_id(&self) -> PeerId {
|
||||||
Swarm::<B>::local_peer_id(&*self.network.lock()).clone()
|
Swarm::<B, S, H>::local_peer_id(&*self.network.lock()).clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Called when a new block is imported by the client.
|
/// Called when a new block is imported by the client.
|
||||||
@@ -275,13 +348,6 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> NetworkService<B, S> {
|
|||||||
let _ = self.network_chan.unbounded_send(NetworkMsg::DisconnectPeer(who));
|
let _ = self.network_chan.unbounded_send(NetworkMsg::DisconnectPeer(who));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send a message to the given peer. Has no effect if we're not connected to this peer.
|
|
||||||
///
|
|
||||||
/// This method is extremely poor in terms of API and should be eventually removed.
|
|
||||||
pub fn send_request(&self, who: PeerId, message: Message<B>) {
|
|
||||||
let _ = self.network_chan.unbounded_send(NetworkMsg::Outgoing(who, message));
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Execute a closure with the chain-specific network specialization.
|
/// Execute a closure with the chain-specific network specialization.
|
||||||
pub fn with_spec<F>(&self, f: F)
|
pub fn with_spec<F>(&self, f: F)
|
||||||
where F: FnOnce(&mut S, &mut dyn Context<B>) + Send + 'static
|
where F: FnOnce(&mut S, &mut dyn Context<B>) + Send + 'static
|
||||||
@@ -304,7 +370,9 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> NetworkService<B, S> {
|
|||||||
pub fn is_major_syncing(&self) -> bool {
|
pub fn is_major_syncing(&self) -> bool {
|
||||||
self.is_major_syncing.load(Ordering::Relaxed)
|
self.is_major_syncing.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkService<B, S, H> {
|
||||||
/// Get network state.
|
/// Get network state.
|
||||||
pub fn network_state(&self) -> NetworkState {
|
pub fn network_state(&self) -> NetworkState {
|
||||||
let mut swarm = self.network.lock();
|
let mut swarm = self.network.lock();
|
||||||
@@ -326,7 +394,8 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> NetworkService<B, S> {
|
|||||||
|
|
||||||
Some((peer_id.to_base58(), NetworkStatePeer {
|
Some((peer_id.to_base58(), NetworkStatePeer {
|
||||||
endpoint,
|
endpoint,
|
||||||
version_string: swarm.node(peer_id).and_then(|i| i.client_version().map(|s| s.to_owned())).clone(),
|
version_string: swarm.node(peer_id)
|
||||||
|
.and_then(|i| i.client_version().map(|s| s.to_owned())).clone(),
|
||||||
latest_ping_time: swarm.node(peer_id).and_then(|i| i.latest_ping()),
|
latest_ping_time: swarm.node(peer_id).and_then(|i| i.latest_ping()),
|
||||||
enabled: swarm.user_protocol().is_enabled(&peer_id),
|
enabled: swarm.user_protocol().is_enabled(&peer_id),
|
||||||
open: swarm.user_protocol().is_open(&peer_id),
|
open: swarm.user_protocol().is_open(&peer_id),
|
||||||
@@ -341,7 +410,8 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> NetworkService<B, S> {
|
|||||||
.cloned().collect::<Vec<_>>();
|
.cloned().collect::<Vec<_>>();
|
||||||
list.into_iter().map(move |peer_id| {
|
list.into_iter().map(move |peer_id| {
|
||||||
(peer_id.to_base58(), NetworkStateNotConnectedPeer {
|
(peer_id.to_base58(), NetworkStateNotConnectedPeer {
|
||||||
version_string: swarm.node(&peer_id).and_then(|i| i.client_version().map(|s| s.to_owned())).clone(),
|
version_string: swarm.node(&peer_id)
|
||||||
|
.and_then(|i| i.client_version().map(|s| s.to_owned())).clone(),
|
||||||
latest_ping_time: swarm.node(&peer_id).and_then(|i| i.latest_ping()),
|
latest_ping_time: swarm.node(&peer_id).and_then(|i| i.latest_ping()),
|
||||||
known_addresses: NetworkBehaviour::addresses_of_peer(&mut **swarm, &peer_id)
|
known_addresses: NetworkBehaviour::addresses_of_peer(&mut **swarm, &peer_id)
|
||||||
.into_iter().collect(),
|
.into_iter().collect(),
|
||||||
@@ -350,9 +420,9 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> NetworkService<B, S> {
|
|||||||
};
|
};
|
||||||
|
|
||||||
NetworkState {
|
NetworkState {
|
||||||
peer_id: Swarm::<B>::local_peer_id(&swarm).to_base58(),
|
peer_id: Swarm::<B, S, H>::local_peer_id(&swarm).to_base58(),
|
||||||
listened_addresses: Swarm::<B>::listeners(&swarm).cloned().collect(),
|
listened_addresses: Swarm::<B, S, H>::listeners(&swarm).cloned().collect(),
|
||||||
external_addresses: Swarm::<B>::external_addresses(&swarm).cloned().collect(),
|
external_addresses: Swarm::<B, S, H>::external_addresses(&swarm).cloned().collect(),
|
||||||
average_download_per_sec: self.bandwidth.average_download_per_sec(),
|
average_download_per_sec: self.bandwidth.average_download_per_sec(),
|
||||||
average_upload_per_sec: self.bandwidth.average_upload_per_sec(),
|
average_upload_per_sec: self.bandwidth.average_upload_per_sec(),
|
||||||
connected_peers,
|
connected_peers,
|
||||||
@@ -371,7 +441,8 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> NetworkService<B, S> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<B: BlockT + 'static, S: NetworkSpecialization<B>> ::consensus::SyncOracle for NetworkService<B, S> {
|
impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>
|
||||||
|
::consensus::SyncOracle for NetworkService<B, S, H> {
|
||||||
fn is_major_syncing(&self) -> bool {
|
fn is_major_syncing(&self) -> bool {
|
||||||
self.is_major_syncing()
|
self.is_major_syncing()
|
||||||
}
|
}
|
||||||
@@ -393,7 +464,7 @@ pub trait ManageNetwork {
|
|||||||
fn add_reserved_peer(&self, peer: String) -> Result<(), String>;
|
fn add_reserved_peer(&self, peer: String) -> Result<(), String>;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<B: BlockT + 'static, S: NetworkSpecialization<B>> ManageNetwork for NetworkService<B, S> {
|
impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> ManageNetwork for NetworkService<B, S, H> {
|
||||||
fn accept_unreserved_peers(&self) {
|
fn accept_unreserved_peers(&self) {
|
||||||
self.peerset.set_reserved_only(false);
|
self.peerset.set_reserved_only(false);
|
||||||
}
|
}
|
||||||
@@ -498,14 +569,11 @@ impl<B: BlockT, F: FnOnce(&mut ConsensusGossip<B>, &mut dyn Context<B>)> GossipT
|
|||||||
pub struct NetworkWorker<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> {
|
pub struct NetworkWorker<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> {
|
||||||
is_offline: Arc<AtomicBool>,
|
is_offline: Arc<AtomicBool>,
|
||||||
is_major_syncing: Arc<AtomicBool>,
|
is_major_syncing: Arc<AtomicBool>,
|
||||||
protocol: Protocol<B, S, H>,
|
|
||||||
/// The network service that can be extracted and shared through the codebase.
|
/// The network service that can be extracted and shared through the codebase.
|
||||||
service: Arc<NetworkService<B, S>>,
|
service: Arc<NetworkService<B, S, H>>,
|
||||||
network_service: Arc<Mutex<Swarm<B>>>,
|
network_service: Arc<Mutex<Swarm<B, S, H>>>,
|
||||||
peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<B>>>>,
|
peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<B>>>>,
|
||||||
import_queue: Box<dyn ImportQueue<B>>,
|
import_queue: Box<dyn ImportQueue<B>>,
|
||||||
transaction_pool: Arc<dyn TransactionPool<H, B>>,
|
|
||||||
finality_proof_provider: Option<Arc<dyn FinalityProofProvider<B>>>,
|
|
||||||
network_port: mpsc::UnboundedReceiver<NetworkMsg<B>>,
|
network_port: mpsc::UnboundedReceiver<NetworkMsg<B>>,
|
||||||
protocol_rx: mpsc::UnboundedReceiver<ProtocolMsg<B, S>>,
|
protocol_rx: mpsc::UnboundedReceiver<ProtocolMsg<B, S>>,
|
||||||
peerset: PeersetHandle,
|
peerset: PeersetHandle,
|
||||||
@@ -520,48 +588,33 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
|
|||||||
type Error = io::Error;
|
type Error = io::Error;
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
// Implementation of `protocol::NetworkOut` trait using the available local variables.
|
|
||||||
struct Context<'a, B: BlockT>(&'a mut Swarm<B>, &'a PeersetHandle);
|
|
||||||
impl<'a, B: BlockT> NetworkOut<B> for Context<'a, B> {
|
|
||||||
fn report_peer(&mut self, who: PeerId, reputation: i32) {
|
|
||||||
self.1.report_peer(who, reputation)
|
|
||||||
}
|
|
||||||
fn disconnect_peer(&mut self, who: PeerId) {
|
|
||||||
self.0.user_protocol_mut().disconnect_peer(&who)
|
|
||||||
}
|
|
||||||
fn send_message(&mut self, who: PeerId, message: Message<B>) {
|
|
||||||
self.0.user_protocol_mut().send_packet(&who, message)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Implementation of `import_queue::Link` trait using the available local variables.
|
// Implementation of `import_queue::Link` trait using the available local variables.
|
||||||
struct NetworkLink<'a, B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
|
struct NetworkLink<'a, B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
|
||||||
protocol: &'a mut Protocol<B, S, H>,
|
protocol: &'a mut Swarm<B, S, H>,
|
||||||
context: Context<'a, B>,
|
|
||||||
}
|
}
|
||||||
impl<'a, B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Link<B> for NetworkLink<'a, B, S, H> {
|
impl<'a, B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Link<B> for NetworkLink<'a, B, S, H> {
|
||||||
fn block_imported(&mut self, hash: &B::Hash, number: NumberFor<B>) {
|
fn block_imported(&mut self, hash: &B::Hash, number: NumberFor<B>) {
|
||||||
self.protocol.block_imported(&hash, number)
|
self.protocol.user_protocol_mut().block_imported(&hash, number)
|
||||||
}
|
}
|
||||||
fn blocks_processed(&mut self, hashes: Vec<B::Hash>, has_error: bool) {
|
fn blocks_processed(&mut self, hashes: Vec<B::Hash>, has_error: bool) {
|
||||||
self.protocol.blocks_processed(&mut self.context, hashes, has_error)
|
self.protocol.user_protocol_mut().blocks_processed(hashes, has_error)
|
||||||
}
|
}
|
||||||
fn justification_imported(&mut self, who: PeerId, hash: &B::Hash, number: NumberFor<B>, success: bool) {
|
fn justification_imported(&mut self, who: PeerId, hash: &B::Hash, number: NumberFor<B>, success: bool) {
|
||||||
self.protocol.justification_import_result(hash.clone(), number, success);
|
self.protocol.user_protocol_mut().justification_import_result(hash.clone(), number, success);
|
||||||
if !success {
|
if !success {
|
||||||
info!("Invalid justification provided by {} for #{}", who, hash);
|
info!("Invalid justification provided by {} for #{}", who, hash);
|
||||||
self.context.0.user_protocol_mut().disconnect_peer(&who);
|
self.protocol.user_protocol_mut().disconnect_peer(&who);
|
||||||
self.context.1.report_peer(who, i32::min_value());
|
self.protocol.user_protocol_mut().report_peer(who, i32::min_value());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fn clear_justification_requests(&mut self) {
|
fn clear_justification_requests(&mut self) {
|
||||||
self.protocol.clear_justification_requests()
|
self.protocol.user_protocol_mut().clear_justification_requests()
|
||||||
}
|
}
|
||||||
fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
|
fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
|
||||||
self.protocol.request_justification(&mut self.context, hash, number)
|
self.protocol.user_protocol_mut().request_justification(hash, number)
|
||||||
}
|
}
|
||||||
fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor<B>) {
|
fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor<B>) {
|
||||||
self.protocol.request_finality_proof(&mut self.context, hash, number)
|
self.protocol.user_protocol_mut().request_finality_proof(hash, number)
|
||||||
}
|
}
|
||||||
fn finality_proof_imported(
|
fn finality_proof_imported(
|
||||||
&mut self,
|
&mut self,
|
||||||
@@ -570,54 +623,45 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
|
|||||||
finalization_result: Result<(B::Hash, NumberFor<B>), ()>,
|
finalization_result: Result<(B::Hash, NumberFor<B>), ()>,
|
||||||
) {
|
) {
|
||||||
let success = finalization_result.is_ok();
|
let success = finalization_result.is_ok();
|
||||||
self.protocol.finality_proof_import_result(request_block, finalization_result);
|
self.protocol.user_protocol_mut().finality_proof_import_result(request_block, finalization_result);
|
||||||
if !success {
|
if !success {
|
||||||
info!("Invalid finality proof provided by {} for #{}", who, request_block.0);
|
info!("Invalid finality proof provided by {} for #{}", who, request_block.0);
|
||||||
self.context.0.user_protocol_mut().disconnect_peer(&who);
|
self.protocol.user_protocol_mut().disconnect_peer(&who);
|
||||||
self.context.1.report_peer(who, i32::min_value());
|
self.protocol.user_protocol_mut().report_peer(who, i32::min_value());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fn report_peer(&mut self, who: PeerId, reputation_change: i32) {
|
fn report_peer(&mut self, who: PeerId, reputation_change: i32) {
|
||||||
self.context.1.report_peer(who, reputation_change)
|
self.protocol.user_protocol_mut().report_peer(who, reputation_change)
|
||||||
}
|
}
|
||||||
fn restart(&mut self) {
|
fn restart(&mut self) {
|
||||||
self.protocol.restart(&mut self.context)
|
self.protocol.user_protocol_mut().restart()
|
||||||
}
|
}
|
||||||
fn set_finality_proof_request_builder(&mut self, builder: SharedFinalityProofRequestBuilder<B>) {
|
fn set_finality_proof_request_builder(&mut self, builder: SharedFinalityProofRequestBuilder<B>) {
|
||||||
self.protocol.set_finality_proof_request_builder(builder)
|
self.protocol.user_protocol_mut().set_finality_proof_request_builder(builder)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut network_service = self.network_service.lock();
|
let mut network_service = self.network_service.lock();
|
||||||
let mut link = NetworkLink {
|
let mut link = NetworkLink {
|
||||||
protocol: &mut self.protocol,
|
protocol: &mut network_service,
|
||||||
context: Context(&mut network_service, &self.peerset),
|
|
||||||
};
|
};
|
||||||
self.import_queue.poll_actions(&mut link);
|
self.import_queue.poll_actions(&mut link);
|
||||||
}
|
}
|
||||||
|
|
||||||
while let Ok(Async::Ready(_)) = self.connected_peers_interval.poll() {
|
while let Ok(Async::Ready(_)) = self.connected_peers_interval.poll() {
|
||||||
let infos = self.protocol.peers_info().map(|(id, info)| {
|
let mut network_service = self.network_service.lock();
|
||||||
|
let infos = network_service.user_protocol_mut().peers_info().map(|(id, info)| {
|
||||||
(id.clone(), ConnectedPeer { peer_info: info.clone() })
|
(id.clone(), ConnectedPeer { peer_info: info.clone() })
|
||||||
}).collect();
|
}).collect();
|
||||||
*self.peers.write() = infos;
|
*self.peers.write() = infos;
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
|
||||||
let mut network_service = self.network_service.lock();
|
|
||||||
let mut ctxt = Context(&mut *network_service, &self.peerset);
|
|
||||||
match self.protocol.poll(&mut ctxt, &*self.transaction_pool) {
|
|
||||||
Ok(Async::Ready(v)) => void::unreachable(v),
|
|
||||||
Ok(Async::NotReady) => {}
|
|
||||||
Err(err) => void::unreachable(err),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check for new incoming on-demand requests.
|
// Check for new incoming on-demand requests.
|
||||||
if let Some(on_demand_in) = self.on_demand_in.as_mut() {
|
if let Some(on_demand_in) = self.on_demand_in.as_mut() {
|
||||||
while let Ok(Async::Ready(Some(rq))) = on_demand_in.poll() {
|
while let Ok(Async::Ready(Some(rq))) = on_demand_in.poll() {
|
||||||
self.protocol.add_on_demand_request(&mut Context(&mut self.network_service.lock(), &self.peerset), rq);
|
let mut network_service = self.network_service.lock();
|
||||||
|
network_service.user_protocol_mut().add_on_demand_request(rq);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -646,47 +690,49 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
|
|||||||
};
|
};
|
||||||
|
|
||||||
let mut network_service = self.network_service.lock();
|
let mut network_service = self.network_service.lock();
|
||||||
let mut network_out = Context(&mut network_service, &self.peerset);
|
|
||||||
|
|
||||||
match msg {
|
match msg {
|
||||||
ProtocolMsg::BlockImported(hash, header) =>
|
ProtocolMsg::BlockImported(hash, header) =>
|
||||||
self.protocol.on_block_imported(&mut network_out, hash, &header),
|
network_service.user_protocol_mut().on_block_imported(hash, &header),
|
||||||
ProtocolMsg::BlockFinalized(hash, header) =>
|
ProtocolMsg::BlockFinalized(hash, header) =>
|
||||||
self.protocol.on_block_finalized(&mut network_out, hash, &header),
|
network_service.user_protocol_mut().on_block_finalized(hash, &header),
|
||||||
ProtocolMsg::ExecuteWithSpec(task) => {
|
ProtocolMsg::ExecuteWithSpec(task) => {
|
||||||
let (mut context, spec) = self.protocol.specialization_lock(&mut network_out);
|
let (protocol, mut net_out) = network_service.user_protocol_mut().protocol_context_lock();
|
||||||
|
let (mut context, spec) = protocol.specialization_lock(&mut net_out);
|
||||||
task.call_box(spec, &mut context);
|
task.call_box(spec, &mut context);
|
||||||
},
|
},
|
||||||
ProtocolMsg::ExecuteWithGossip(task) => {
|
ProtocolMsg::ExecuteWithGossip(task) => {
|
||||||
let (mut context, gossip) = self.protocol.consensus_gossip_lock(&mut network_out);
|
let (protocol, mut net_out) = network_service.user_protocol_mut().protocol_context_lock();
|
||||||
|
let (mut context, gossip) = protocol.consensus_gossip_lock(&mut net_out);
|
||||||
task.call_box(gossip, &mut context);
|
task.call_box(gossip, &mut context);
|
||||||
}
|
}
|
||||||
ProtocolMsg::GossipConsensusMessage(topic, engine_id, message, recipient) =>
|
ProtocolMsg::GossipConsensusMessage(topic, engine_id, message, recipient) =>
|
||||||
self.protocol.gossip_consensus_message(&mut network_out, topic, engine_id, message, recipient),
|
network_service.user_protocol_mut().gossip_consensus_message(topic, engine_id, message, recipient),
|
||||||
ProtocolMsg::BlocksProcessed(hashes, has_error) =>
|
ProtocolMsg::BlocksProcessed(hashes, has_error) =>
|
||||||
self.protocol.blocks_processed(&mut network_out, hashes, has_error),
|
network_service.user_protocol_mut().blocks_processed(hashes, has_error),
|
||||||
ProtocolMsg::RestartSync =>
|
ProtocolMsg::RestartSync =>
|
||||||
self.protocol.restart(&mut network_out),
|
network_service.user_protocol_mut().restart(),
|
||||||
ProtocolMsg::AnnounceBlock(hash) =>
|
ProtocolMsg::AnnounceBlock(hash) =>
|
||||||
self.protocol.announce_block(&mut network_out, hash),
|
network_service.user_protocol_mut().announce_block(hash),
|
||||||
ProtocolMsg::BlockImportedSync(hash, number) =>
|
ProtocolMsg::BlockImportedSync(hash, number) =>
|
||||||
self.protocol.block_imported(&hash, number),
|
network_service.user_protocol_mut().block_imported(&hash, number),
|
||||||
ProtocolMsg::ClearJustificationRequests =>
|
ProtocolMsg::ClearJustificationRequests =>
|
||||||
self.protocol.clear_justification_requests(),
|
network_service.user_protocol_mut().clear_justification_requests(),
|
||||||
ProtocolMsg::RequestJustification(hash, number) =>
|
ProtocolMsg::RequestJustification(hash, number) =>
|
||||||
self.protocol.request_justification(&mut network_out, &hash, number),
|
network_service.user_protocol_mut().request_justification(&hash, number),
|
||||||
ProtocolMsg::JustificationImportResult(hash, number, success) =>
|
ProtocolMsg::JustificationImportResult(hash, number, success) =>
|
||||||
self.protocol.justification_import_result(hash, number, success),
|
network_service.user_protocol_mut().justification_import_result(hash, number, success),
|
||||||
ProtocolMsg::SetFinalityProofRequestBuilder(builder) =>
|
ProtocolMsg::SetFinalityProofRequestBuilder(builder) =>
|
||||||
self.protocol.set_finality_proof_request_builder(builder),
|
network_service.user_protocol_mut().set_finality_proof_request_builder(builder),
|
||||||
ProtocolMsg::RequestFinalityProof(hash, number) =>
|
ProtocolMsg::RequestFinalityProof(hash, number) =>
|
||||||
self.protocol.request_finality_proof(&mut network_out, &hash, number),
|
network_service.user_protocol_mut().request_finality_proof(&hash, number),
|
||||||
ProtocolMsg::FinalityProofImportResult(requested_block, finalziation_result) =>
|
ProtocolMsg::FinalityProofImportResult(requested_block, finalziation_result) =>
|
||||||
self.protocol.finality_proof_import_result(requested_block, finalziation_result),
|
network_service.user_protocol_mut()
|
||||||
|
.finality_proof_import_result(requested_block, finalziation_result),
|
||||||
ProtocolMsg::PropagateExtrinsics =>
|
ProtocolMsg::PropagateExtrinsics =>
|
||||||
self.protocol.propagate_extrinsics(&mut network_out, &*self.transaction_pool),
|
network_service.user_protocol_mut().propagate_extrinsics(),
|
||||||
#[cfg(any(test, feature = "test-helpers"))]
|
#[cfg(any(test, feature = "test-helpers"))]
|
||||||
ProtocolMsg::Tick => self.protocol.tick(&mut network_out),
|
ProtocolMsg::Tick => network_service.user_protocol_mut().tick(),
|
||||||
#[cfg(any(test, feature = "test-helpers"))]
|
#[cfg(any(test, feature = "test-helpers"))]
|
||||||
ProtocolMsg::Synchronize => {},
|
ProtocolMsg::Synchronize => {},
|
||||||
}
|
}
|
||||||
@@ -695,39 +741,11 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
|
|||||||
loop {
|
loop {
|
||||||
let mut network_service = self.network_service.lock();
|
let mut network_service = self.network_service.lock();
|
||||||
let poll_value = network_service.poll();
|
let poll_value = network_service.poll();
|
||||||
let mut network_out = Context(&mut network_service, &self.peerset);
|
|
||||||
|
|
||||||
let outcome = match poll_value {
|
let outcome = match poll_value {
|
||||||
Ok(Async::NotReady) => break,
|
Ok(Async::NotReady) => break,
|
||||||
Ok(Async::Ready(Some(CustomProtoOut::CustomProtocolOpen { peer_id, version, .. }))) => {
|
Ok(Async::Ready(Some(outcome))) => outcome,
|
||||||
debug_assert!(
|
Ok(Async::Ready(None)) => CustomMessageOutcome::None,
|
||||||
version <= protocol::CURRENT_VERSION as u8
|
|
||||||
&& version >= protocol::MIN_VERSION as u8
|
|
||||||
);
|
|
||||||
self.protocol.on_peer_connected(&mut network_out, peer_id);
|
|
||||||
CustomMessageOutcome::None
|
|
||||||
}
|
|
||||||
Ok(Async::Ready(Some(CustomProtoOut::CustomProtocolClosed { peer_id, .. }))) => {
|
|
||||||
self.protocol.on_peer_disconnected(&mut network_out, peer_id);
|
|
||||||
CustomMessageOutcome::None
|
|
||||||
},
|
|
||||||
Ok(Async::Ready(Some(CustomProtoOut::CustomMessage { peer_id, message }))) =>
|
|
||||||
self.protocol.on_custom_message(
|
|
||||||
&mut network_out,
|
|
||||||
&*self.transaction_pool,
|
|
||||||
peer_id,
|
|
||||||
message,
|
|
||||||
self.finality_proof_provider.as_ref().map(|p| &**p)
|
|
||||||
),
|
|
||||||
Ok(Async::Ready(Some(CustomProtoOut::Clogged { peer_id, messages, .. }))) => {
|
|
||||||
debug!(target: "sync", "{} clogging messages:", messages.len());
|
|
||||||
for msg in messages.into_iter().take(5) {
|
|
||||||
debug!(target: "sync", "{:?}", msg);
|
|
||||||
self.protocol.on_clogged_peer(&mut network_out, peer_id.clone(), Some(msg));
|
|
||||||
}
|
|
||||||
CustomMessageOutcome::None
|
|
||||||
}
|
|
||||||
Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
|
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!(target: "sync", "Error in the network: {:?}", err);
|
error!(target: "sync", "Error in the network: {:?}", err);
|
||||||
return Err(err)
|
return Err(err)
|
||||||
@@ -745,8 +763,9 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.is_offline.store(self.protocol.num_connected_peers() == 0, Ordering::Relaxed);
|
let mut network_service = self.network_service.lock();
|
||||||
self.is_major_syncing.store(match self.protocol.sync_state() {
|
self.is_offline.store(network_service.user_protocol_mut().num_connected_peers() == 0, Ordering::Relaxed);
|
||||||
|
self.is_major_syncing.store(match network_service.user_protocol_mut().sync_state() {
|
||||||
SyncState::Idle => false,
|
SyncState::Idle => false,
|
||||||
SyncState::Downloading => true,
|
SyncState::Downloading => true,
|
||||||
}, Ordering::Relaxed);
|
}, Ordering::Relaxed);
|
||||||
@@ -756,91 +775,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// The libp2p swarm, customized for our needs.
|
/// The libp2p swarm, customized for our needs.
|
||||||
type Swarm<B> = libp2p::core::Swarm<
|
type Swarm<B, S, H> = libp2p::core::Swarm<
|
||||||
Boxed<(PeerId, StreamMuxerBox), io::Error>,
|
Boxed<(PeerId, StreamMuxerBox), io::Error>,
|
||||||
Behaviour<CustomProto<Message<B>, Substream<StreamMuxerBox>>, CustomProtoOut<Message<B>>, Substream<StreamMuxerBox>>
|
Behaviour<ProtocolBehaviour<B, S, H>, CustomMessageOutcome<B>, Substream<StreamMuxerBox>>
|
||||||
>;
|
>;
|
||||||
|
|
||||||
/// Starts the substrate libp2p service.
|
|
||||||
///
|
|
||||||
/// Returns a stream that must be polled regularly in order for the networking to function.
|
|
||||||
fn start_service<B: BlockT, Pid: Into<ProtocolId>>(
|
|
||||||
config: NetworkConfiguration,
|
|
||||||
protocol_id: Pid,
|
|
||||||
versions: &[u8],
|
|
||||||
) -> Result<(Swarm<B>, Arc<transport::BandwidthSinks>, peerset::PeersetHandle), io::Error> {
|
|
||||||
|
|
||||||
if let Some(ref path) = config.net_config_path {
|
|
||||||
fs::create_dir_all(Path::new(path))?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// List of multiaddresses that we know in the network.
|
|
||||||
let mut known_addresses = Vec::new();
|
|
||||||
let mut bootnodes = Vec::new();
|
|
||||||
let mut reserved_nodes = Vec::new();
|
|
||||||
|
|
||||||
// Process the bootnodes.
|
|
||||||
for bootnode in config.boot_nodes.iter() {
|
|
||||||
match parse_str_addr(bootnode) {
|
|
||||||
Ok((peer_id, addr)) => {
|
|
||||||
bootnodes.push(peer_id.clone());
|
|
||||||
known_addresses.push((peer_id, addr));
|
|
||||||
},
|
|
||||||
Err(_) => warn!(target: "sub-libp2p", "Not a valid bootnode address: {}", bootnode),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Initialize the reserved peers.
|
|
||||||
for reserved in config.reserved_nodes.iter() {
|
|
||||||
if let Ok((peer_id, addr)) = parse_str_addr(reserved) {
|
|
||||||
reserved_nodes.push(peer_id.clone());
|
|
||||||
known_addresses.push((peer_id, addr));
|
|
||||||
} else {
|
|
||||||
warn!(target: "sub-libp2p", "Not a valid reserved node address: {}", reserved);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Build the peerset.
|
|
||||||
let (peerset, peerset_handle) = peerset::Peerset::from_config(peerset::PeersetConfig {
|
|
||||||
in_peers: config.in_peers,
|
|
||||||
out_peers: config.out_peers,
|
|
||||||
bootnodes,
|
|
||||||
reserved_only: config.non_reserved_mode == NonReservedPeerMode::Deny,
|
|
||||||
reserved_nodes,
|
|
||||||
});
|
|
||||||
|
|
||||||
// Private and public keys configuration.
|
|
||||||
if let NodeKeyConfig::Secp256k1(_) = config.node_key {
|
|
||||||
warn!(target: "sub-libp2p", "Secp256k1 keys are deprecated in favour of ed25519");
|
|
||||||
}
|
|
||||||
let local_identity = config.node_key.clone().into_keypair()?;
|
|
||||||
let local_public = local_identity.public();
|
|
||||||
let local_peer_id = local_public.clone().into_peer_id();
|
|
||||||
info!(target: "sub-libp2p", "Local node identity is: {}", local_peer_id.to_base58());
|
|
||||||
|
|
||||||
// Build the swarm.
|
|
||||||
let (mut swarm, bandwidth) = {
|
|
||||||
let user_agent = format!("{} ({})", config.client_version, config.node_name);
|
|
||||||
let proto = CustomProto::new(protocol_id, versions, peerset);
|
|
||||||
let behaviour = Behaviour::new(proto, user_agent, local_public, known_addresses, config.enable_mdns);
|
|
||||||
let (transport, bandwidth) = transport::build_transport(
|
|
||||||
local_identity,
|
|
||||||
config.wasm_external_transport
|
|
||||||
);
|
|
||||||
(Swarm::<B>::new(transport, behaviour, local_peer_id.clone()), bandwidth)
|
|
||||||
};
|
|
||||||
|
|
||||||
// Listen on multiaddresses.
|
|
||||||
for addr in &config.listen_addresses {
|
|
||||||
if let Err(err) = Swarm::<B>::listen_on(&mut swarm, addr.clone()) {
|
|
||||||
warn!(target: "sub-libp2p", "Can't listen on {} because: {:?}", addr, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add external addresses.
|
|
||||||
for addr in &config.public_addresses {
|
|
||||||
Swarm::<B>::add_external_address(&mut swarm, addr.clone());
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok((swarm, bandwidth, peerset_handle))
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -38,9 +38,13 @@ use futures::sync::mpsc;
|
|||||||
|
|
||||||
// Type aliases.
|
// Type aliases.
|
||||||
// These exist mainly to avoid typing `<F as Factory>::Foo` all over the code.
|
// These exist mainly to avoid typing `<F as Factory>::Foo` all over the code.
|
||||||
/// Network service type for a factory.
|
|
||||||
pub type NetworkService<F> =
|
/// Network service type for `Components`.
|
||||||
network::NetworkService<<F as ServiceFactory>::Block, <F as ServiceFactory>::NetworkProtocol>;
|
pub type NetworkService<C> = network::NetworkService<
|
||||||
|
ComponentBlock<C>,
|
||||||
|
<<C as Components>::Factory as ServiceFactory>::NetworkProtocol,
|
||||||
|
ComponentExHash<C>
|
||||||
|
>;
|
||||||
|
|
||||||
/// Code executor type for a factory.
|
/// Code executor type for a factory.
|
||||||
pub type CodeExecutor<F> = NativeExecutor<<F as ServiceFactory>::RuntimeDispatch>;
|
pub type CodeExecutor<F> = NativeExecutor<<F as ServiceFactory>::RuntimeDispatch>;
|
||||||
|
|||||||
@@ -75,7 +75,7 @@ const DEFAULT_PROTOCOL_ID: &str = "sup";
|
|||||||
pub struct Service<Components: components::Components> {
|
pub struct Service<Components: components::Components> {
|
||||||
client: Arc<ComponentClient<Components>>,
|
client: Arc<ComponentClient<Components>>,
|
||||||
select_chain: Option<<Components as components::Components>::SelectChain>,
|
select_chain: Option<<Components as components::Components>::SelectChain>,
|
||||||
network: Arc<components::NetworkService<Components::Factory>>,
|
network: Arc<components::NetworkService<Components>>,
|
||||||
/// Sinks to propagate network status updates.
|
/// Sinks to propagate network status updates.
|
||||||
network_status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<NetworkStatus<ComponentBlock<Components>>>>>>,
|
network_status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<NetworkStatus<ComponentBlock<Components>>>>>>,
|
||||||
transaction_pool: Arc<TransactionPool<Components::TransactionPoolApi>>,
|
transaction_pool: Arc<TransactionPool<Components::TransactionPoolApi>>,
|
||||||
@@ -498,7 +498,7 @@ impl<Components> Service<Components> where Components: components::Components {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Get shared network instance.
|
/// Get shared network instance.
|
||||||
pub fn network(&self) -> Arc<components::NetworkService<Components::Factory>> {
|
pub fn network(&self) -> Arc<components::NetworkService<Components>> {
|
||||||
self.network.clone()
|
self.network.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -684,7 +684,7 @@ impl<C: Components> network::TransactionPool<ComponentExHash<C>, ComponentBlock<
|
|||||||
|
|
||||||
/// Builds a never-ending `Future` that answers the RPC requests coming on the receiver.
|
/// Builds a never-ending `Future` that answers the RPC requests coming on the receiver.
|
||||||
fn build_system_rpc_handler<Components: components::Components>(
|
fn build_system_rpc_handler<Components: components::Components>(
|
||||||
network: Arc<NetworkService<Components::Factory>>,
|
network: Arc<NetworkService<Components>>,
|
||||||
rx: mpsc::UnboundedReceiver<rpc::apis::system::Request<ComponentBlock<Components>>>,
|
rx: mpsc::UnboundedReceiver<rpc::apis::system::Request<ComponentBlock<Components>>>,
|
||||||
should_have_peers: bool,
|
should_have_peers: bool,
|
||||||
) -> impl Future<Item = (), Error = ()> {
|
) -> impl Future<Item = (), Error = ()> {
|
||||||
|
|||||||
Reference in New Issue
Block a user