Use async/await instead of manual polling of NetworkWorker (#13219)

* Convert `NetworkWorker::poll()` into async `next_action()`

* Use `NetworkWorker::next_action` instead of `poll` in `sc-network-test`

* Revert "Use `NetworkWorker::next_action` instead of `poll` in `sc-network-test`"

This reverts commit 4b5d851ec864f78f9d083a18a618fbe117c896d2.

* Fix `sc-network-test` to poll `NetworkWorker::next_action`

* Fix `sc_network::service` tests to poll `NetworkWorker::next_action`

* Fix docs

* kick CI

* Factor out `next_worker_message()` & `next_swarm_event()`

* Error handling: replace `futures::pending!()` with `expect()`

* Simplify stream polling in `select!`

* Replace `NetworkWorker::next_action()` with `run()`

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <git@kchr.de>

* minor: comment

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <git@kchr.de>

* Print debug log when network future is shut down

* Evaluate `NetworkWorker::run()` future once before the loop

* Fix client code to match new `NetworkService` interfaces

* Make clippy happy

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <git@kchr.de>

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <git@kchr.de>

* Revert "Apply suggestions from code review"

This reverts commit 9fa646d0ed613e5f8623d3d37d1d59ec0a535850.

* Make `NetworkWorker::run()` consume `self`

* Terminate system RPC future if RPC rx stream has terminated.

* Rewrite with let-else

* Fix comments

* Get `best_seen_block` and call `on_block_finalized` via `ChainSync` instead of `NetworkService`

* rustfmt

* make clippy happy

* Tests: schedule wake if `next_action()` returned true

* minor: comment

* minor: fix `NetworkWorker` rustdoc

* minor: amend the rustdoc

* Fix bug that caused `on_demand_beefy_justification_sync` test to hang

* rustfmt

* Apply review suggestions

---------

Co-authored-by: Bastian Köcher <git@kchr.de>
This commit is contained in:
Dmitry Markin
2023-02-20 15:08:02 +03:00
committed by GitHub
parent fdd5203add
commit 8d033b6dfb
12 changed files with 861 additions and 747 deletions
@@ -184,6 +184,10 @@ impl NetworkStateInfo for TestNetwork {
fn external_addresses(&self) -> Vec<Multiaddr> { fn external_addresses(&self) -> Vec<Multiaddr> {
self.external_addresses.clone() self.external_addresses.clone()
} }
fn listen_addresses(&self) -> Vec<Multiaddr> {
self.external_addresses.clone()
}
} }
struct TestSigner<'a> { struct TestSigner<'a> {
@@ -180,13 +180,13 @@ pub trait NetworkPeers {
/// purposes. /// purposes.
fn deny_unreserved_peers(&self); fn deny_unreserved_peers(&self);
/// Adds a `PeerId` and its `Multiaddr` as reserved. /// Adds a `PeerId` and its `Multiaddr` as reserved for a sync protocol (default peer set).
/// ///
/// Returns an `Err` if the given string is not a valid multiaddress /// Returns an `Err` if the given string is not a valid multiaddress
/// or contains an invalid peer ID (which includes the local peer ID). /// or contains an invalid peer ID (which includes the local peer ID).
fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String>; fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String>;
/// Removes a `PeerId` from the list of reserved peers. /// Removes a `PeerId` from the list of reserved peers for a sync protocol (default peer set).
fn remove_reserved_peer(&self, peer_id: PeerId); fn remove_reserved_peer(&self, peer_id: PeerId);
/// Sets the reserved set of a protocol to the given set of peers. /// Sets the reserved set of a protocol to the given set of peers.
@@ -359,6 +359,9 @@ pub trait NetworkStateInfo {
/// Returns the local external addresses. /// Returns the local external addresses.
fn external_addresses(&self) -> Vec<Multiaddr>; fn external_addresses(&self) -> Vec<Multiaddr>;
/// Returns the listening addresses (without trailing `/p2p/` with our `PeerId`).
fn listen_addresses(&self) -> Vec<Multiaddr>;
/// Returns the local Peer ID. /// Returns the local Peer ID.
fn local_peer_id(&self) -> PeerId; fn local_peer_id(&self) -> PeerId;
} }
@@ -372,6 +375,10 @@ where
T::external_addresses(self) T::external_addresses(self)
} }
fn listen_addresses(&self) -> Vec<Multiaddr> {
T::listen_addresses(self)
}
fn local_peer_id(&self) -> PeerId { fn local_peer_id(&self) -> PeerId {
T::local_peer_id(self) T::local_peer_id(self)
} }
+251 -241
View File
@@ -19,13 +19,13 @@
//! Main entry point of the sc-network crate. //! Main entry point of the sc-network crate.
//! //!
//! There are two main structs in this module: [`NetworkWorker`] and [`NetworkService`]. //! There are two main structs in this module: [`NetworkWorker`] and [`NetworkService`].
//! The [`NetworkWorker`] *is* the network and implements the `Future` trait. It must be polled in //! The [`NetworkWorker`] *is* the network. Network is driven by [`NetworkWorker::run`] future that
//! order for the network to advance. //! terminates only when all instances of the control handles [`NetworkService`] were dropped.
//! The [`NetworkService`] is merely a shared version of the [`NetworkWorker`]. You can obtain an //! The [`NetworkService`] is merely a shared version of the [`NetworkWorker`]. You can obtain an
//! `Arc<NetworkService>` by calling [`NetworkWorker::service`]. //! `Arc<NetworkService>` by calling [`NetworkWorker::service`].
//! //!
//! The methods of the [`NetworkService`] are implemented by sending a message over a channel, //! The methods of the [`NetworkService`] are implemented by sending a message over a channel,
//! which is then processed by [`NetworkWorker::poll`]. //! which is then processed by [`NetworkWorker::next_action`].
use crate::{ use crate::{
behaviour::{self, Behaviour, BehaviourOut}, behaviour::{self, Behaviour, BehaviourOut},
@@ -46,8 +46,9 @@ use libp2p::{
multiaddr, multiaddr,
ping::Failure as PingFailure, ping::Failure as PingFailure,
swarm::{ swarm::{
AddressScore, ConnectionError, ConnectionLimits, DialError, Executor, NetworkBehaviour, AddressScore, ConnectionError, ConnectionHandler, ConnectionLimits, DialError, Executor,
PendingConnectionError, Swarm, SwarmBuilder, SwarmEvent, IntoConnectionHandler, NetworkBehaviour, PendingConnectionError, Swarm, SwarmBuilder,
SwarmEvent,
}, },
Multiaddr, PeerId, Multiaddr, PeerId,
}; };
@@ -87,7 +88,6 @@ use std::{
atomic::{AtomicBool, AtomicUsize, Ordering}, atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Arc,
}, },
task::Poll,
}; };
pub use behaviour::{InboundFailure, OutboundFailure, ResponseFailure}; pub use behaviour::{InboundFailure, OutboundFailure, ResponseFailure};
@@ -100,12 +100,20 @@ mod tests;
pub use libp2p::identity::{error::DecodingError, Keypair, PublicKey}; pub use libp2p::identity::{error::DecodingError, Keypair, PublicKey};
use sc_network_common::service::{NetworkBlock, NetworkRequest}; use sc_network_common::service::{NetworkBlock, NetworkRequest};
/// Custom error that can be produced by the [`ConnectionHandler`] of the [`NetworkBehaviour`].
/// Used as a template parameter of [`SwarmEvent`] below.
type ConnectionHandlerErr<TBehaviour> =
<<<TBehaviour as NetworkBehaviour>::ConnectionHandler as IntoConnectionHandler>
::Handler as ConnectionHandler>::Error;
/// Substrate network service. Handles network IO and manages connectivity. /// Substrate network service. Handles network IO and manages connectivity.
pub struct NetworkService<B: BlockT + 'static, H: ExHashT> { pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
/// Number of peers we're connected to. /// Number of peers we're connected to.
num_connected: Arc<AtomicUsize>, num_connected: Arc<AtomicUsize>,
/// The local external addresses. /// The local external addresses.
external_addresses: Arc<Mutex<Vec<Multiaddr>>>, external_addresses: Arc<Mutex<Vec<Multiaddr>>>,
/// Listen addresses. Do **NOT** include a trailing `/p2p/` with our `PeerId`.
listen_addresses: Arc<Mutex<Vec<Multiaddr>>>,
/// Are we actively catching up with the chain? /// Are we actively catching up with the chain?
is_major_syncing: Arc<AtomicBool>, is_major_syncing: Arc<AtomicBool>,
/// Local copy of the `PeerId` of the local node. /// Local copy of the `PeerId` of the local node.
@@ -434,11 +442,13 @@ where
} }
let external_addresses = Arc::new(Mutex::new(Vec::new())); let external_addresses = Arc::new(Mutex::new(Vec::new()));
let listen_addresses = Arc::new(Mutex::new(Vec::new()));
let peers_notifications_sinks = Arc::new(Mutex::new(HashMap::new())); let peers_notifications_sinks = Arc::new(Mutex::new(HashMap::new()));
let service = Arc::new(NetworkService { let service = Arc::new(NetworkService {
bandwidth, bandwidth,
external_addresses: external_addresses.clone(), external_addresses: external_addresses.clone(),
listen_addresses: listen_addresses.clone(),
num_connected: num_connected.clone(), num_connected: num_connected.clone(),
is_major_syncing: is_major_syncing.clone(), is_major_syncing: is_major_syncing.clone(),
peerset: peerset_handle, peerset: peerset_handle,
@@ -455,6 +465,7 @@ where
Ok(NetworkWorker { Ok(NetworkWorker {
external_addresses, external_addresses,
listen_addresses,
num_connected, num_connected,
is_major_syncing, is_major_syncing,
network_service: swarm, network_service: swarm,
@@ -711,6 +722,34 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
} }
} }
/// Get connected peers debug information.
///
/// Returns an error if the `NetworkWorker` is no longer running.
pub async fn peers_debug_info(&self) -> Result<Vec<(PeerId, PeerInfo<B>)>, ()> {
let (tx, rx) = oneshot::channel();
let _ = self
.to_worker
.unbounded_send(ServiceToWorkerMsg::PeersDebugInfo { pending_response: tx });
// The channel can only be closed if the network worker no longer exists.
rx.await.map_err(|_| ())
}
/// Get the list of reserved peers.
///
/// Returns an error if the `NetworkWorker` is no longer running.
pub async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
let (tx, rx) = oneshot::channel();
let _ = self
.to_worker
.unbounded_send(ServiceToWorkerMsg::ReservedPeers { pending_response: tx });
// The channel can only be closed if the network worker no longer exists.
rx.await.map_err(|_| ())
}
/// Utility function to extract `PeerId` from each `Multiaddr` for peer set updates. /// Utility function to extract `PeerId` from each `Multiaddr` for peer set updates.
/// ///
/// Returns an `Err` if one of the given addresses is invalid or contains an /// Returns an `Err` if one of the given addresses is invalid or contains an
@@ -774,6 +813,11 @@ where
self.external_addresses.lock().clone() self.external_addresses.lock().clone()
} }
/// Returns the listener addresses (without trailing `/p2p/` with our `PeerId`).
fn listen_addresses(&self) -> Vec<Multiaddr> {
self.listen_addresses.lock().clone()
}
/// Returns the local Peer ID. /// Returns the local Peer ID.
fn local_peer_id(&self) -> PeerId { fn local_peer_id(&self) -> PeerId {
self.local_peer_id self.local_peer_id
@@ -1243,6 +1287,12 @@ enum ServiceToWorkerMsg<B: BlockT> {
}, },
DisconnectPeer(PeerId, ProtocolName), DisconnectPeer(PeerId, ProtocolName),
NewBestBlockImported(B::Hash, NumberFor<B>), NewBestBlockImported(B::Hash, NumberFor<B>),
PeersDebugInfo {
pending_response: oneshot::Sender<Vec<(PeerId, PeerInfo<B>)>>,
},
ReservedPeers {
pending_response: oneshot::Sender<Vec<PeerId>>,
},
} }
/// Main network worker. Must be polled in order for the network to advance. /// Main network worker. Must be polled in order for the network to advance.
@@ -1258,6 +1308,8 @@ where
/// Updated by the `NetworkWorker` and loaded by the `NetworkService`. /// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
external_addresses: Arc<Mutex<Vec<Multiaddr>>>, external_addresses: Arc<Mutex<Vec<Multiaddr>>>,
/// Updated by the `NetworkWorker` and loaded by the `NetworkService`. /// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
listen_addresses: Arc<Mutex<Vec<Multiaddr>>>,
/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
num_connected: Arc<AtomicUsize>, num_connected: Arc<AtomicUsize>,
/// Updated by the `NetworkWorker` and loaded by the `NetworkService`. /// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
is_major_syncing: Arc<AtomicBool>, is_major_syncing: Arc<AtomicBool>,
@@ -1281,97 +1333,152 @@ where
_marker: PhantomData<H>, _marker: PhantomData<H>,
} }
impl<B, H, Client> Future for NetworkWorker<B, H, Client> impl<B, H, Client> NetworkWorker<B, H, Client>
where where
B: BlockT + 'static, B: BlockT + 'static,
H: ExHashT, H: ExHashT,
Client: HeaderBackend<B> + 'static, Client: HeaderBackend<B> + 'static,
{ {
type Output = (); /// Run the network.
pub async fn run(mut self) {
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll<Self::Output> { while self.next_action().await {}
let this = &mut *self;
// At the time of writing of this comment, due to a high volume of messages, the network
// worker sometimes takes a long time to process the loop below. When that happens, the
// rest of the polling is frozen. In order to avoid negative side-effects caused by this
// freeze, a limit to the number of iterations is enforced below. If the limit is reached,
// the task is interrupted then scheduled again.
//
// This allows for a more even distribution in the time taken by each sub-part of the
// polling.
let mut num_iterations = 0;
loop {
num_iterations += 1;
if num_iterations >= 100 {
cx.waker().wake_by_ref();
break
} }
// Process the next message coming from the `NetworkService`. /// Perform one action on the network.
let msg = match this.from_service.poll_next_unpin(cx) { ///
Poll::Ready(Some(msg)) => msg, /// Returns `false` when the worker should be shutdown.
Poll::Ready(None) => return Poll::Ready(()), /// Use in tests only.
Poll::Pending => break, pub async fn next_action(&mut self) -> bool {
futures::select! {
// Next message from the service.
msg = self.from_service.next() => {
if let Some(msg) = msg {
self.handle_worker_message(msg);
} else {
return false
}
},
// Next event from `Swarm` (the stream guaranteed to never terminate).
event = self.network_service.select_next_some() => {
self.handle_swarm_event(event);
},
}; };
let num_connected_peers =
self.network_service.behaviour_mut().user_protocol_mut().num_connected_peers();
// Update the variables shared with the `NetworkService`.
self.num_connected.store(num_connected_peers, Ordering::Relaxed);
{
let external_addresses =
self.network_service.external_addresses().map(|r| &r.addr).cloned().collect();
*self.external_addresses.lock() = external_addresses;
let listen_addresses =
self.network_service.listeners().map(ToOwned::to_owned).collect();
*self.listen_addresses.lock() = listen_addresses;
}
let is_major_syncing = self
.network_service
.behaviour_mut()
.user_protocol_mut()
.sync_state()
.state
.is_major_syncing();
self.is_major_syncing.store(is_major_syncing, Ordering::Relaxed);
if let Some(metrics) = self.metrics.as_ref() {
if let Some(buckets) = self.network_service.behaviour_mut().num_entries_per_kbucket() {
for (lower_ilog2_bucket_bound, num_entries) in buckets {
metrics
.kbuckets_num_nodes
.with_label_values(&[&lower_ilog2_bucket_bound.to_string()])
.set(num_entries as u64);
}
}
if let Some(num_entries) = self.network_service.behaviour_mut().num_kademlia_records() {
metrics.kademlia_records_count.set(num_entries as u64);
}
if let Some(num_entries) =
self.network_service.behaviour_mut().kademlia_records_total_size()
{
metrics.kademlia_records_sizes_total.set(num_entries as u64);
}
metrics
.peerset_num_discovered
.set(self.network_service.behaviour_mut().user_protocol().num_discovered_peers()
as u64);
metrics.pending_connections.set(
Swarm::network_info(&self.network_service).connection_counters().num_pending()
as u64,
);
}
true
}
/// Process the next message coming from the `NetworkService`.
fn handle_worker_message(&mut self, msg: ServiceToWorkerMsg<B>) {
match msg { match msg {
ServiceToWorkerMsg::AnnounceBlock(hash, data) => this ServiceToWorkerMsg::AnnounceBlock(hash, data) => self
.network_service .network_service
.behaviour_mut() .behaviour_mut()
.user_protocol_mut() .user_protocol_mut()
.announce_block(hash, data), .announce_block(hash, data),
ServiceToWorkerMsg::GetValue(key) => ServiceToWorkerMsg::GetValue(key) =>
this.network_service.behaviour_mut().get_value(key), self.network_service.behaviour_mut().get_value(key),
ServiceToWorkerMsg::PutValue(key, value) => ServiceToWorkerMsg::PutValue(key, value) =>
this.network_service.behaviour_mut().put_value(key, value), self.network_service.behaviour_mut().put_value(key, value),
ServiceToWorkerMsg::SetReservedOnly(reserved_only) => this ServiceToWorkerMsg::SetReservedOnly(reserved_only) => self
.network_service .network_service
.behaviour_mut() .behaviour_mut()
.user_protocol_mut() .user_protocol_mut()
.set_reserved_only(reserved_only), .set_reserved_only(reserved_only),
ServiceToWorkerMsg::SetReserved(peers) => this ServiceToWorkerMsg::SetReserved(peers) => self
.network_service .network_service
.behaviour_mut() .behaviour_mut()
.user_protocol_mut() .user_protocol_mut()
.set_reserved_peers(peers), .set_reserved_peers(peers),
ServiceToWorkerMsg::SetPeersetReserved(protocol, peers) => this ServiceToWorkerMsg::SetPeersetReserved(protocol, peers) => self
.network_service .network_service
.behaviour_mut() .behaviour_mut()
.user_protocol_mut() .user_protocol_mut()
.set_reserved_peerset_peers(protocol, peers), .set_reserved_peerset_peers(protocol, peers),
ServiceToWorkerMsg::AddReserved(peer_id) => this ServiceToWorkerMsg::AddReserved(peer_id) => self
.network_service .network_service
.behaviour_mut() .behaviour_mut()
.user_protocol_mut() .user_protocol_mut()
.add_reserved_peer(peer_id), .add_reserved_peer(peer_id),
ServiceToWorkerMsg::RemoveReserved(peer_id) => this ServiceToWorkerMsg::RemoveReserved(peer_id) => self
.network_service .network_service
.behaviour_mut() .behaviour_mut()
.user_protocol_mut() .user_protocol_mut()
.remove_reserved_peer(peer_id), .remove_reserved_peer(peer_id),
ServiceToWorkerMsg::AddSetReserved(protocol, peer_id) => this ServiceToWorkerMsg::AddSetReserved(protocol, peer_id) => self
.network_service .network_service
.behaviour_mut() .behaviour_mut()
.user_protocol_mut() .user_protocol_mut()
.add_set_reserved_peer(protocol, peer_id), .add_set_reserved_peer(protocol, peer_id),
ServiceToWorkerMsg::RemoveSetReserved(protocol, peer_id) => this ServiceToWorkerMsg::RemoveSetReserved(protocol, peer_id) => self
.network_service .network_service
.behaviour_mut() .behaviour_mut()
.user_protocol_mut() .user_protocol_mut()
.remove_set_reserved_peer(protocol, peer_id), .remove_set_reserved_peer(protocol, peer_id),
ServiceToWorkerMsg::AddKnownAddress(peer_id, addr) => ServiceToWorkerMsg::AddKnownAddress(peer_id, addr) =>
this.network_service.behaviour_mut().add_known_address(peer_id, addr), self.network_service.behaviour_mut().add_known_address(peer_id, addr),
ServiceToWorkerMsg::AddToPeersSet(protocol, peer_id) => this ServiceToWorkerMsg::AddToPeersSet(protocol, peer_id) => self
.network_service .network_service
.behaviour_mut() .behaviour_mut()
.user_protocol_mut() .user_protocol_mut()
.add_to_peers_set(protocol, peer_id), .add_to_peers_set(protocol, peer_id),
ServiceToWorkerMsg::RemoveFromPeersSet(protocol, peer_id) => this ServiceToWorkerMsg::RemoveFromPeersSet(protocol, peer_id) => self
.network_service .network_service
.behaviour_mut() .behaviour_mut()
.user_protocol_mut() .user_protocol_mut()
.remove_from_peers_set(protocol, peer_id), .remove_from_peers_set(protocol, peer_id),
ServiceToWorkerMsg::EventStream(sender) => this.event_streams.push(sender), ServiceToWorkerMsg::EventStream(sender) => self.event_streams.push(sender),
ServiceToWorkerMsg::Request { ServiceToWorkerMsg::Request {
target, target,
protocol, protocol,
@@ -1379,7 +1486,7 @@ where
pending_response, pending_response,
connect, connect,
} => { } => {
this.network_service.behaviour_mut().send_request( self.network_service.behaviour_mut().send_request(
&target, &target,
&protocol, &protocol,
request, request,
@@ -1388,47 +1495,39 @@ where
); );
}, },
ServiceToWorkerMsg::NetworkStatus { pending_response } => { ServiceToWorkerMsg::NetworkStatus { pending_response } => {
let _ = pending_response.send(Ok(this.status())); let _ = pending_response.send(Ok(self.status()));
}, },
ServiceToWorkerMsg::NetworkState { pending_response } => { ServiceToWorkerMsg::NetworkState { pending_response } => {
let _ = pending_response.send(Ok(this.network_state())); let _ = pending_response.send(Ok(self.network_state()));
}, },
ServiceToWorkerMsg::DisconnectPeer(who, protocol_name) => this ServiceToWorkerMsg::DisconnectPeer(who, protocol_name) => self
.network_service .network_service
.behaviour_mut() .behaviour_mut()
.user_protocol_mut() .user_protocol_mut()
.disconnect_peer(&who, protocol_name), .disconnect_peer(&who, protocol_name),
ServiceToWorkerMsg::NewBestBlockImported(hash, number) => this ServiceToWorkerMsg::NewBestBlockImported(hash, number) => self
.network_service .network_service
.behaviour_mut() .behaviour_mut()
.user_protocol_mut() .user_protocol_mut()
.new_best_block_imported(hash, number), .new_best_block_imported(hash, number),
ServiceToWorkerMsg::PeersDebugInfo { pending_response } => {
let _ = pending_response.send(self.peers_debug_info());
},
ServiceToWorkerMsg::ReservedPeers { pending_response } => {
let _ =
pending_response.send(self.reserved_peers().map(ToOwned::to_owned).collect());
},
} }
} }
// `num_iterations` serves the same purpose as in the previous loop. /// Process the next event coming from `Swarm`.
// See the previous loop for explanations. fn handle_swarm_event(
let mut num_iterations = 0; &mut self,
loop { event: SwarmEvent<BehaviourOut, ConnectionHandlerErr<Behaviour<B, Client>>>,
num_iterations += 1; ) {
if num_iterations >= 1000 { match event {
cx.waker().wake_by_ref(); SwarmEvent::Behaviour(BehaviourOut::InboundRequest { protocol, result, .. }) => {
break if let Some(metrics) = self.metrics.as_ref() {
}
// Process the next action coming from the network.
let next_event = this.network_service.select_next_some();
futures::pin_mut!(next_event);
let poll_value = next_event.poll_unpin(cx);
match poll_value {
Poll::Pending => break,
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::InboundRequest {
protocol,
result,
..
})) => {
if let Some(metrics) = this.metrics.as_ref() {
match result { match result {
Ok(serve_time) => { Ok(serve_time) => {
metrics metrics
@@ -1438,36 +1537,37 @@ where
}, },
Err(err) => { Err(err) => {
let reason = match err { let reason = match err {
ResponseFailure::Network(InboundFailure::Timeout) => "timeout", ResponseFailure::Network(InboundFailure::Timeout) =>
ResponseFailure::Network( Some("timeout"),
InboundFailure::UnsupportedProtocols, ResponseFailure::Network(InboundFailure::UnsupportedProtocols) =>
) =>
// `UnsupportedProtocols` is reported for every single // `UnsupportedProtocols` is reported for every single
// inbound request whenever a request with an unsupported // inbound request whenever a request with an unsupported
// protocol is received. This is not reported in order to // protocol is received. This is not reported in order to
// avoid confusions. // avoid confusions.
continue, None,
ResponseFailure::Network(InboundFailure::ResponseOmission) => ResponseFailure::Network(InboundFailure::ResponseOmission) =>
"busy-omitted", Some("busy-omitted"),
ResponseFailure::Network(InboundFailure::ConnectionClosed) => ResponseFailure::Network(InboundFailure::ConnectionClosed) =>
"connection-closed", Some("connection-closed"),
}; };
if let Some(reason) = reason {
metrics metrics
.requests_in_failure_total .requests_in_failure_total
.with_label_values(&[&protocol, reason]) .with_label_values(&[&protocol, reason])
.inc(); .inc();
}
}, },
} }
} }
}, },
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RequestFinished { SwarmEvent::Behaviour(BehaviourOut::RequestFinished {
protocol, protocol,
duration, duration,
result, result,
.. ..
})) => }) =>
if let Some(metrics) = this.metrics.as_ref() { if let Some(metrics) = self.metrics.as_ref() {
match result { match result {
Ok(_) => { Ok(_) => {
metrics metrics
@@ -1486,9 +1586,8 @@ where
RequestFailure::Network(OutboundFailure::Timeout) => "timeout", RequestFailure::Network(OutboundFailure::Timeout) => "timeout",
RequestFailure::Network(OutboundFailure::ConnectionClosed) => RequestFailure::Network(OutboundFailure::ConnectionClosed) =>
"connection-closed", "connection-closed",
RequestFailure::Network( RequestFailure::Network(OutboundFailure::UnsupportedProtocols) =>
OutboundFailure::UnsupportedProtocols, "unsupported",
) => "unsupported",
}; };
metrics metrics
@@ -1498,24 +1597,17 @@ where
}, },
} }
}, },
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::ReputationChanges { SwarmEvent::Behaviour(BehaviourOut::ReputationChanges { peer, changes }) =>
peer,
changes,
})) =>
for change in changes { for change in changes {
this.network_service.behaviour().user_protocol().report_peer(peer, change); self.network_service.behaviour().user_protocol().report_peer(peer, change);
}, },
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::PeerIdentify { SwarmEvent::Behaviour(BehaviourOut::PeerIdentify {
peer_id, peer_id,
info: info:
IdentifyInfo { IdentifyInfo {
protocol_version, protocol_version, agent_version, mut listen_addrs, protocols, ..
agent_version,
mut listen_addrs,
protocols,
..
}, },
})) => { }) => {
if listen_addrs.len() > 30 { if listen_addrs.len() > 30 {
debug!( debug!(
target: "sub-libp2p", target: "sub-libp2p",
@@ -1525,57 +1617,57 @@ where
listen_addrs.truncate(30); listen_addrs.truncate(30);
} }
for addr in listen_addrs { for addr in listen_addrs {
this.network_service self.network_service
.behaviour_mut() .behaviour_mut()
.add_self_reported_address_to_dht(&peer_id, &protocols, addr); .add_self_reported_address_to_dht(&peer_id, &protocols, addr);
} }
this.network_service self.network_service
.behaviour_mut() .behaviour_mut()
.user_protocol_mut() .user_protocol_mut()
.add_default_set_discovered_nodes(iter::once(peer_id)); .add_default_set_discovered_nodes(iter::once(peer_id));
}, },
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Discovered(peer_id))) => { SwarmEvent::Behaviour(BehaviourOut::Discovered(peer_id)) => {
this.network_service self.network_service
.behaviour_mut() .behaviour_mut()
.user_protocol_mut() .user_protocol_mut()
.add_default_set_discovered_nodes(iter::once(peer_id)); .add_default_set_discovered_nodes(iter::once(peer_id));
}, },
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted)) => SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted) =>
if let Some(metrics) = this.metrics.as_ref() { if let Some(metrics) = self.metrics.as_ref() {
metrics.kademlia_random_queries_total.inc(); metrics.kademlia_random_queries_total.inc();
}, },
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationStreamOpened { SwarmEvent::Behaviour(BehaviourOut::NotificationStreamOpened {
remote, remote,
protocol, protocol,
negotiated_fallback, negotiated_fallback,
notifications_sink, notifications_sink,
role, role,
})) => { }) => {
if let Some(metrics) = this.metrics.as_ref() { if let Some(metrics) = self.metrics.as_ref() {
metrics metrics
.notifications_streams_opened_total .notifications_streams_opened_total
.with_label_values(&[&protocol]) .with_label_values(&[&protocol])
.inc(); .inc();
} }
{ {
let mut peers_notifications_sinks = this.peers_notifications_sinks.lock(); let mut peers_notifications_sinks = self.peers_notifications_sinks.lock();
let _previous_value = peers_notifications_sinks let _previous_value = peers_notifications_sinks
.insert((remote, protocol.clone()), notifications_sink); .insert((remote, protocol.clone()), notifications_sink);
debug_assert!(_previous_value.is_none()); debug_assert!(_previous_value.is_none());
} }
this.event_streams.send(Event::NotificationStreamOpened { self.event_streams.send(Event::NotificationStreamOpened {
remote, remote,
protocol, protocol,
negotiated_fallback, negotiated_fallback,
role, role,
}); });
}, },
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationStreamReplaced { SwarmEvent::Behaviour(BehaviourOut::NotificationStreamReplaced {
remote, remote,
protocol, protocol,
notifications_sink, notifications_sink,
})) => { }) => {
let mut peers_notifications_sinks = this.peers_notifications_sinks.lock(); let mut peers_notifications_sinks = self.peers_notifications_sinks.lock();
if let Some(s) = peers_notifications_sinks.get_mut(&(remote, protocol)) { if let Some(s) = peers_notifications_sinks.get_mut(&(remote, protocol)) {
*s = notifications_sink; *s = notifications_sink;
} else { } else {
@@ -1597,41 +1689,33 @@ where
// acceptable, this bug is at the moment intentionally left there and is // acceptable, this bug is at the moment intentionally left there and is
// intended to be fixed at the same time as // intended to be fixed at the same time as
// https://github.com/paritytech/substrate/issues/6403. // https://github.com/paritytech/substrate/issues/6403.
// this.event_streams.send(Event::NotificationStreamClosed { // self.event_streams.send(Event::NotificationStreamClosed {
// remote, // remote,
// protocol, // protocol,
// }); // });
// this.event_streams.send(Event::NotificationStreamOpened { // self.event_streams.send(Event::NotificationStreamOpened {
// remote, // remote,
// protocol, // protocol,
// role, // role,
// }); // });
}, },
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationStreamClosed { SwarmEvent::Behaviour(BehaviourOut::NotificationStreamClosed { remote, protocol }) => {
remote, if let Some(metrics) = self.metrics.as_ref() {
protocol,
})) => {
if let Some(metrics) = this.metrics.as_ref() {
metrics metrics
.notifications_streams_closed_total .notifications_streams_closed_total
.with_label_values(&[&protocol[..]]) .with_label_values(&[&protocol[..]])
.inc(); .inc();
} }
this.event_streams.send(Event::NotificationStreamClosed { self.event_streams
remote, .send(Event::NotificationStreamClosed { remote, protocol: protocol.clone() });
protocol: protocol.clone(),
});
{ {
let mut peers_notifications_sinks = this.peers_notifications_sinks.lock(); let mut peers_notifications_sinks = self.peers_notifications_sinks.lock();
let _previous_value = peers_notifications_sinks.remove(&(remote, protocol)); let _previous_value = peers_notifications_sinks.remove(&(remote, protocol));
debug_assert!(_previous_value.is_some()); debug_assert!(_previous_value.is_some());
} }
}, },
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationsReceived { SwarmEvent::Behaviour(BehaviourOut::NotificationsReceived { remote, messages }) => {
remote, if let Some(metrics) = self.metrics.as_ref() {
messages,
})) => {
if let Some(metrics) = this.metrics.as_ref() {
for (protocol, message) in &messages { for (protocol, message) in &messages {
metrics metrics
.notifications_sizes .notifications_sizes
@@ -1639,16 +1723,16 @@ where
.observe(message.len() as f64); .observe(message.len() as f64);
} }
} }
this.event_streams.send(Event::NotificationsReceived { remote, messages }); self.event_streams.send(Event::NotificationsReceived { remote, messages });
}, },
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::SyncConnected(remote))) => { SwarmEvent::Behaviour(BehaviourOut::SyncConnected(remote)) => {
this.event_streams.send(Event::SyncConnected { remote }); self.event_streams.send(Event::SyncConnected { remote });
}, },
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::SyncDisconnected(remote))) => { SwarmEvent::Behaviour(BehaviourOut::SyncDisconnected(remote)) => {
this.event_streams.send(Event::SyncDisconnected { remote }); self.event_streams.send(Event::SyncDisconnected { remote });
}, },
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Dht(event, duration))) => { SwarmEvent::Behaviour(BehaviourOut::Dht(event, duration)) => {
if let Some(metrics) = this.metrics.as_ref() { if let Some(metrics) = self.metrics.as_ref() {
let query_type = match event { let query_type = match event {
DhtEvent::ValueFound(_) => "value-found", DhtEvent::ValueFound(_) => "value-found",
DhtEvent::ValueNotFound(_) => "value-not-found", DhtEvent::ValueNotFound(_) => "value-not-found",
@@ -1661,24 +1745,24 @@ where
.observe(duration.as_secs_f64()); .observe(duration.as_secs_f64());
} }
this.event_streams.send(Event::Dht(event)); self.event_streams.send(Event::Dht(event));
}, },
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::None)) => { SwarmEvent::Behaviour(BehaviourOut::None) => {
// Ignored event from lower layers. // Ignored event from lower layers.
}, },
Poll::Ready(SwarmEvent::ConnectionEstablished { SwarmEvent::ConnectionEstablished {
peer_id, peer_id,
endpoint, endpoint,
num_established, num_established,
concurrent_dial_errors, concurrent_dial_errors,
}) => { } => {
if let Some(errors) = concurrent_dial_errors { if let Some(errors) = concurrent_dial_errors {
debug!(target: "sub-libp2p", "Libp2p => Connected({:?}) with errors: {:?}", peer_id, errors); debug!(target: "sub-libp2p", "Libp2p => Connected({:?}) with errors: {:?}", peer_id, errors);
} else { } else {
debug!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id); debug!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id);
} }
if let Some(metrics) = this.metrics.as_ref() { if let Some(metrics) = self.metrics.as_ref() {
let direction = match endpoint { let direction = match endpoint {
ConnectedPoint::Dialer { .. } => "out", ConnectedPoint::Dialer { .. } => "out",
ConnectedPoint::Listener { .. } => "in", ConnectedPoint::Listener { .. } => "in",
@@ -1690,14 +1774,9 @@ where
} }
} }
}, },
Poll::Ready(SwarmEvent::ConnectionClosed { SwarmEvent::ConnectionClosed { peer_id, cause, endpoint, num_established } => {
peer_id,
cause,
endpoint,
num_established,
}) => {
debug!(target: "sub-libp2p", "Libp2p => Disconnected({:?}, {:?})", peer_id, cause); debug!(target: "sub-libp2p", "Libp2p => Disconnected({:?}, {:?})", peer_id, cause);
if let Some(metrics) = this.metrics.as_ref() { if let Some(metrics) = self.metrics.as_ref() {
let direction = match endpoint { let direction = match endpoint {
ConnectedPoint::Dialer { .. } => "out", ConnectedPoint::Dialer { .. } => "out",
ConnectedPoint::Listener { .. } => "in", ConnectedPoint::Listener { .. } => "in",
@@ -1714,10 +1793,7 @@ where
Some(ConnectionError::KeepAliveTimeout) => "keep-alive-timeout", Some(ConnectionError::KeepAliveTimeout) => "keep-alive-timeout",
None => "actively-closed", None => "actively-closed",
}; };
metrics metrics.connections_closed_total.with_label_values(&[direction, reason]).inc();
.connections_closed_total
.with_label_values(&[direction, reason])
.inc();
// `num_established` represents the number of *remaining* connections. // `num_established` represents the number of *remaining* connections.
if num_established == 0 { if num_established == 0 {
@@ -1725,19 +1801,19 @@ where
} }
} }
}, },
Poll::Ready(SwarmEvent::NewListenAddr { address, .. }) => { SwarmEvent::NewListenAddr { address, .. } => {
trace!(target: "sub-libp2p", "Libp2p => NewListenAddr({})", address); trace!(target: "sub-libp2p", "Libp2p => NewListenAddr({})", address);
if let Some(metrics) = this.metrics.as_ref() { if let Some(metrics) = self.metrics.as_ref() {
metrics.listeners_local_addresses.inc(); metrics.listeners_local_addresses.inc();
} }
}, },
Poll::Ready(SwarmEvent::ExpiredListenAddr { address, .. }) => { SwarmEvent::ExpiredListenAddr { address, .. } => {
info!(target: "sub-libp2p", "📪 No longer listening on {}", address); info!(target: "sub-libp2p", "📪 No longer listening on {}", address);
if let Some(metrics) = this.metrics.as_ref() { if let Some(metrics) = self.metrics.as_ref() {
metrics.listeners_local_addresses.dec(); metrics.listeners_local_addresses.dec();
} }
}, },
Poll::Ready(SwarmEvent::OutgoingConnectionError { peer_id, error }) => { SwarmEvent::OutgoingConnectionError { peer_id, error } => {
if let Some(peer_id) = peer_id { if let Some(peer_id) = peer_id {
trace!( trace!(
target: "sub-libp2p", target: "sub-libp2p",
@@ -1745,11 +1821,9 @@ where
peer_id, error, peer_id, error,
); );
if this.boot_node_ids.contains(&peer_id) { if self.boot_node_ids.contains(&peer_id) {
if let DialError::WrongPeerId { obtained, endpoint } = &error { if let DialError::WrongPeerId { obtained, endpoint } = &error {
if let ConnectedPoint::Dialer { address, role_override: _ } = if let ConnectedPoint::Dialer { address, role_override: _ } = endpoint {
endpoint
{
warn!( warn!(
"💔 The bootnode you want to connect to at `{}` provided a different peer ID `{}` than the one you expect `{}`.", "💔 The bootnode you want to connect to at `{}` provided a different peer ID `{}` than the one you expect `{}`.",
address, address,
@@ -1761,7 +1835,7 @@ where
} }
} }
if let Some(metrics) = this.metrics.as_ref() { if let Some(metrics) = self.metrics.as_ref() {
let reason = match error { let reason = match error {
DialError::ConnectionLimit(_) => Some("limit-reached"), DialError::ConnectionLimit(_) => Some("limit-reached"),
DialError::InvalidPeerId(_) => Some("invalid-peer-id"), DialError::InvalidPeerId(_) => Some("invalid-peer-id"),
@@ -1775,39 +1849,32 @@ where
DialError::Aborted => None, // ignore them DialError::Aborted => None, // ignore them
}; };
if let Some(reason) = reason { if let Some(reason) = reason {
metrics metrics.pending_connections_errors_total.with_label_values(&[reason]).inc();
.pending_connections_errors_total
.with_label_values(&[reason])
.inc();
} }
} }
}, },
Poll::Ready(SwarmEvent::Dialing(peer_id)) => { SwarmEvent::Dialing(peer_id) => {
trace!(target: "sub-libp2p", "Libp2p => Dialing({:?})", peer_id) trace!(target: "sub-libp2p", "Libp2p => Dialing({:?})", peer_id)
}, },
Poll::Ready(SwarmEvent::IncomingConnection { local_addr, send_back_addr }) => { SwarmEvent::IncomingConnection { local_addr, send_back_addr } => {
trace!(target: "sub-libp2p", "Libp2p => IncomingConnection({},{}))", trace!(target: "sub-libp2p", "Libp2p => IncomingConnection({},{}))",
local_addr, send_back_addr); local_addr, send_back_addr);
if let Some(metrics) = this.metrics.as_ref() { if let Some(metrics) = self.metrics.as_ref() {
metrics.incoming_connections_total.inc(); metrics.incoming_connections_total.inc();
} }
}, },
Poll::Ready(SwarmEvent::IncomingConnectionError { SwarmEvent::IncomingConnectionError { local_addr, send_back_addr, error } => {
local_addr,
send_back_addr,
error,
}) => {
debug!( debug!(
target: "sub-libp2p", target: "sub-libp2p",
"Libp2p => IncomingConnectionError({},{}): {}", "Libp2p => IncomingConnectionError({},{}): {}",
local_addr, send_back_addr, error, local_addr, send_back_addr, error,
); );
if let Some(metrics) = this.metrics.as_ref() { if let Some(metrics) = self.metrics.as_ref() {
let reason = match error { let reason = match error {
PendingConnectionError::ConnectionLimit(_) => Some("limit-reached"), PendingConnectionError::ConnectionLimit(_) => Some("limit-reached"),
PendingConnectionError::WrongPeerId { .. } => Some("invalid-peer-id"), PendingConnectionError::WrongPeerId { .. } => Some("invalid-peer-id"),
PendingConnectionError::Transport(_) | PendingConnectionError::Transport(_) | PendingConnectionError::IO(_) =>
PendingConnectionError::IO(_) => Some("transport-error"), Some("transport-error"),
PendingConnectionError::Aborted => None, // ignore it PendingConnectionError::Aborted => None, // ignore it
}; };
@@ -1819,21 +1886,18 @@ where
} }
} }
}, },
Poll::Ready(SwarmEvent::BannedPeer { peer_id, endpoint }) => { SwarmEvent::BannedPeer { peer_id, endpoint } => {
debug!( debug!(
target: "sub-libp2p", target: "sub-libp2p",
"Libp2p => BannedPeer({}). Connected via {:?}.", "Libp2p => BannedPeer({}). Connected via {:?}.",
peer_id, endpoint, peer_id, endpoint,
); );
if let Some(metrics) = this.metrics.as_ref() { if let Some(metrics) = self.metrics.as_ref() {
metrics metrics.incoming_connections_errors_total.with_label_values(&["banned"]).inc();
.incoming_connections_errors_total
.with_label_values(&["banned"])
.inc();
} }
}, },
Poll::Ready(SwarmEvent::ListenerClosed { reason, addresses, .. }) => { SwarmEvent::ListenerClosed { reason, addresses, .. } => {
if let Some(metrics) = this.metrics.as_ref() { if let Some(metrics) = self.metrics.as_ref() {
metrics.listeners_local_addresses.sub(addresses.len() as u64); metrics.listeners_local_addresses.sub(addresses.len() as u64);
} }
let addrs = let addrs =
@@ -1851,67 +1915,13 @@ where
), ),
} }
}, },
Poll::Ready(SwarmEvent::ListenerError { error, .. }) => { SwarmEvent::ListenerError { error, .. } => {
debug!(target: "sub-libp2p", "Libp2p => ListenerError: {}", error); debug!(target: "sub-libp2p", "Libp2p => ListenerError: {}", error);
if let Some(metrics) = this.metrics.as_ref() { if let Some(metrics) = self.metrics.as_ref() {
metrics.listeners_errors_total.inc(); metrics.listeners_errors_total.inc();
} }
}, },
};
} }
let num_connected_peers =
this.network_service.behaviour_mut().user_protocol_mut().num_connected_peers();
// Update the variables shared with the `NetworkService`.
this.num_connected.store(num_connected_peers, Ordering::Relaxed);
{
let external_addresses =
Swarm::<Behaviour<B, Client>>::external_addresses(&this.network_service)
.map(|r| &r.addr)
.cloned()
.collect();
*this.external_addresses.lock() = external_addresses;
}
let is_major_syncing = this
.network_service
.behaviour_mut()
.user_protocol_mut()
.sync_state()
.state
.is_major_syncing();
this.is_major_syncing.store(is_major_syncing, Ordering::Relaxed);
if let Some(metrics) = this.metrics.as_ref() {
if let Some(buckets) = this.network_service.behaviour_mut().num_entries_per_kbucket() {
for (lower_ilog2_bucket_bound, num_entries) in buckets {
metrics
.kbuckets_num_nodes
.with_label_values(&[&lower_ilog2_bucket_bound.to_string()])
.set(num_entries as u64);
}
}
if let Some(num_entries) = this.network_service.behaviour_mut().num_kademlia_records() {
metrics.kademlia_records_count.set(num_entries as u64);
}
if let Some(num_entries) =
this.network_service.behaviour_mut().kademlia_records_total_size()
{
metrics.kademlia_records_sizes_total.set(num_entries as u64);
}
metrics
.peerset_num_discovered
.set(this.network_service.behaviour_mut().user_protocol().num_discovered_peers()
as u64);
metrics.pending_connections.set(
Swarm::network_info(&this.network_service).connection_counters().num_pending()
as u64,
);
}
Poll::Pending
} }
} }
@@ -75,12 +75,8 @@ async fn normal_network_poll_no_peers() {
.with_chain_sync((chain_sync, chain_sync_service)) .with_chain_sync((chain_sync, chain_sync_service))
.build(); .build();
// poll the network once // perform one action on network
futures::future::poll_fn(|cx| { let _ = network.network().next_action().await;
let _ = network.network().poll_unpin(cx);
Poll::Ready(())
})
.await;
} }
#[tokio::test] #[tokio::test]
@@ -110,11 +106,8 @@ async fn request_justification() {
// send "request justifiction" message and poll the network // send "request justifiction" message and poll the network
network.service().request_justification(&hash, number); network.service().request_justification(&hash, number);
futures::future::poll_fn(|cx| { // perform one action on network
let _ = network.network().poll_unpin(cx); let _ = network.network().next_action().await;
Poll::Ready(())
})
.await;
} }
#[tokio::test] #[tokio::test]
@@ -141,11 +134,8 @@ async fn clear_justification_requests() {
// send "request justifiction" message and poll the network // send "request justifiction" message and poll the network
network.service().clear_justification_requests(); network.service().clear_justification_requests();
futures::future::poll_fn(|cx| { // perform one action on network
let _ = network.network().poll_unpin(cx); let _ = network.network().next_action().await;
Poll::Ready(())
})
.await;
} }
#[tokio::test] #[tokio::test]
@@ -180,11 +170,8 @@ async fn set_sync_fork_request() {
// send "set sync fork request" message and poll the network // send "set sync fork request" message and poll the network
network.service().set_sync_fork_request(copy_peers, hash, number); network.service().set_sync_fork_request(copy_peers, hash, number);
futures::future::poll_fn(|cx| { // perform one action on network
let _ = network.network().poll_unpin(cx); let _ = network.network().next_action().await;
Poll::Ready(())
})
.await;
} }
#[tokio::test] #[tokio::test]
@@ -225,11 +212,8 @@ async fn on_block_finalized() {
// send "set sync fork request" message and poll the network // send "set sync fork request" message and poll the network
network.network().on_block_finalized(hash, header); network.network().on_block_finalized(hash, header);
futures::future::poll_fn(|cx| { // perform one action on network
let _ = network.network().poll_unpin(cx); let _ = network.network().next_action().await;
Poll::Ready(())
})
.await;
} }
// report from mock import queue that importing a justification was not successful // report from mock import queue that importing a justification was not successful
@@ -80,10 +80,7 @@ impl TestNetwork {
let service = worker.service().clone(); let service = worker.service().clone();
let event_stream = service.event_stream("test"); let event_stream = service.event_stream("test");
tokio::spawn(async move { tokio::spawn(worker.run());
futures::pin_mut!(worker);
let _ = worker.await;
});
(service, event_stream) (service, event_stream)
} }
+6
View File
@@ -1358,6 +1358,12 @@ where
); );
} }
}, },
ToServiceCommand::BlockFinalized(hash, number) => {
self.on_block_finalized(&hash, number);
},
ToServiceCommand::Status { pending_response } => {
let _ = pending_response.send(self.status());
},
} }
} }
@@ -16,9 +16,10 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>. // along with this program. If not, see <https://www.gnu.org/licenses/>.
use futures::channel::oneshot;
use libp2p::PeerId; use libp2p::PeerId;
use sc_consensus::{BlockImportError, BlockImportStatus, JustificationSyncLink, Link}; use sc_consensus::{BlockImportError, BlockImportStatus, JustificationSyncLink, Link};
use sc_network_common::service::NetworkSyncForkRequest; use sc_network_common::{service::NetworkSyncForkRequest, sync::SyncStatus};
use sc_utils::mpsc::TracingUnboundedSender; use sc_utils::mpsc::TracingUnboundedSender;
use sp_runtime::traits::{Block as BlockT, NumberFor}; use sp_runtime::traits::{Block as BlockT, NumberFor};
@@ -34,6 +35,10 @@ pub enum ToServiceCommand<B: BlockT> {
Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>, Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
), ),
JustificationImported(PeerId, B::Hash, NumberFor<B>, bool), JustificationImported(PeerId, B::Hash, NumberFor<B>, bool),
BlockFinalized(B::Hash, NumberFor<B>),
Status {
pending_response: oneshot::Sender<SyncStatus<B>>,
},
} }
/// Handle for communicating with `ChainSync` asynchronously /// Handle for communicating with `ChainSync` asynchronously
@@ -47,6 +52,21 @@ impl<B: BlockT> ChainSyncInterfaceHandle<B> {
pub fn new(tx: TracingUnboundedSender<ToServiceCommand<B>>) -> Self { pub fn new(tx: TracingUnboundedSender<ToServiceCommand<B>>) -> Self {
Self { tx } Self { tx }
} }
/// Notify ChainSync about finalized block
pub fn on_block_finalized(&self, hash: B::Hash, number: NumberFor<B>) {
let _ = self.tx.unbounded_send(ToServiceCommand::BlockFinalized(hash, number));
}
/// Get sync status
///
/// Returns an error if `ChainSync` has terminated.
pub async fn status(&self) -> Result<SyncStatus<B>, ()> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(ToServiceCommand::Status { pending_response: tx });
rx.await.map_err(|_| ())
}
} }
impl<B: BlockT + 'static> NetworkSyncForkRequest<B::Hash, NumberFor<B>> impl<B: BlockT + 'static> NetworkSyncForkRequest<B::Hash, NumberFor<B>>
+13 -4
View File
@@ -31,7 +31,7 @@ use std::{
time::Duration, time::Duration,
}; };
use futures::{channel::oneshot, future::BoxFuture, prelude::*}; use futures::{channel::oneshot, future::BoxFuture, pin_mut, prelude::*};
use libp2p::{build_multiaddr, PeerId}; use libp2p::{build_multiaddr, PeerId};
use log::trace; use log::trace;
use parking_lot::Mutex; use parking_lot::Mutex;
@@ -83,7 +83,7 @@ use sp_runtime::{
}; };
use substrate_test_runtime_client::AccountKeyring; use substrate_test_runtime_client::AccountKeyring;
pub use substrate_test_runtime_client::{ pub use substrate_test_runtime_client::{
runtime::{Block, Extrinsic, Hash, Transfer}, runtime::{Block, Extrinsic, Hash, Header, Transfer},
TestClient, TestClientBuilder, TestClientBuilderExt, TestClient, TestClientBuilder, TestClientBuilderExt,
}; };
use tokio::time::timeout; use tokio::time::timeout;
@@ -1078,8 +1078,17 @@ where
self.mut_peers(|peers| { self.mut_peers(|peers| {
for (i, peer) in peers.iter_mut().enumerate() { for (i, peer) in peers.iter_mut().enumerate() {
trace!(target: "sync", "-- Polling {}: {}", i, peer.id()); trace!(target: "sync", "-- Polling {}: {}", i, peer.id());
if let Poll::Ready(()) = peer.network.poll_unpin(cx) { loop {
panic!("NetworkWorker has terminated unexpectedly.") // The code below is not quite correct, because we are polling a different
// instance of the future every time. But as long as
// `NetworkWorker::next_action()` contains just streams polling not interleaved
// with other `.await`s, dropping the future and recreating it works the same as
// polling a single instance.
let net_poll_future = peer.network.next_action();
pin_mut!(net_poll_future);
if let Poll::Pending = net_poll_future.poll(cx) {
break
}
} }
trace!(target: "sync", "-- Polling complete {}: {}", i, peer.id()); trace!(target: "sync", "-- Polling complete {}: {}", i, peer.id());
+4
View File
@@ -419,6 +419,10 @@ mod tests {
fn local_peer_id(&self) -> PeerId { fn local_peer_id(&self) -> PeerId {
PeerId::random() PeerId::random()
} }
fn listen_addresses(&self) -> Vec<Multiaddr> {
Vec::new()
}
} }
fn offchain_api() -> (Api, AsyncApi) { fn offchain_api() -> (Api, AsyncApi) {
+4
View File
@@ -270,6 +270,10 @@ mod tests {
fn local_peer_id(&self) -> PeerId { fn local_peer_id(&self) -> PeerId {
PeerId::random() PeerId::random()
} }
fn listen_addresses(&self) -> Vec<Multiaddr> {
Vec::new()
}
} }
impl NetworkPeers for TestNetwork { impl NetworkPeers for TestNetwork {
+17 -7
View File
@@ -17,7 +17,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>. // along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::{ use crate::{
build_network_future, build_network_future, build_system_rpc_future,
client::{Client, ClientConfig}, client::{Client, ClientConfig},
config::{Configuration, KeystoreConfig, PrometheusConfig}, config::{Configuration, KeystoreConfig, PrometheusConfig},
error::Error, error::Error,
@@ -963,19 +963,29 @@ where
Some("networking"), Some("networking"),
chain_sync_network_provider.run(network.clone()), chain_sync_network_provider.run(network.clone()),
); );
spawn_handle.spawn("import-queue", None, import_queue.run(Box::new(chain_sync_service))); spawn_handle.spawn(
"import-queue",
None,
import_queue.run(Box::new(chain_sync_service.clone())),
);
let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc", 10_000); let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc", 10_000);
spawn_handle.spawn(
let future = build_network_future( "system-rpc-handler",
Some("networking"),
build_system_rpc_future(
config.role.clone(), config.role.clone(),
network_mut, network_mut.service().clone(),
client, chain_sync_service.clone(),
client.clone(),
system_rpc_rx, system_rpc_rx,
has_bootnodes, has_bootnodes,
config.announce_block, ),
); );
let future =
build_network_future(network_mut, client, chain_sync_service, config.announce_block);
// TODO: Normally, one is supposed to pass a list of notifications protocols supported by the // TODO: Normally, one is supposed to pass a list of notifications protocols supported by the
// node through the `NetworkConfiguration` struct. But because this function doesn't know in // node through the `NetworkConfiguration` struct. But because this function doesn't know in
// advance which components, such as GrandPa or Polkadot, will be plugged on top of the // advance which components, such as GrandPa or Polkadot, will be plugged on top of the
+113 -54
View File
@@ -37,12 +37,16 @@ mod task_manager;
use std::{collections::HashMap, net::SocketAddr}; use std::{collections::HashMap, net::SocketAddr};
use codec::{Decode, Encode}; use codec::{Decode, Encode};
use futures::{channel::mpsc, FutureExt, StreamExt}; use futures::{channel::mpsc, pin_mut, FutureExt, StreamExt};
use jsonrpsee::{core::Error as JsonRpseeError, RpcModule}; use jsonrpsee::{core::Error as JsonRpseeError, RpcModule};
use log::{debug, error, warn}; use log::{debug, error, warn};
use sc_client_api::{blockchain::HeaderBackend, BlockBackend, BlockchainEvents, ProofProvider}; use sc_client_api::{blockchain::HeaderBackend, BlockBackend, BlockchainEvents, ProofProvider};
use sc_network::PeerId; use sc_network::{NetworkStateInfo, PeerId};
use sc_network_common::{config::MultiaddrWithPeerId, service::NetworkBlock}; use sc_network_common::{
config::MultiaddrWithPeerId,
service::{NetworkBlock, NetworkPeers},
};
use sc_network_sync::service::chain_sync::ChainSyncInterfaceHandle;
use sc_utils::mpsc::TracingUnboundedReceiver; use sc_utils::mpsc::TracingUnboundedReceiver;
use sp_blockchain::HeaderMetadata; use sp_blockchain::HeaderMetadata;
use sp_consensus::SyncOracle; use sp_consensus::SyncOracle;
@@ -138,9 +142,7 @@ pub struct PartialComponents<Client, Backend, SelectChain, ImportQueue, Transact
pub other: Other, pub other: Other,
} }
/// Builds a never-ending future that continuously polls the network. /// Builds a future that continuously polls the network.
///
/// The `status_sink` contain a list of senders to send a periodic network status to.
async fn build_network_future< async fn build_network_future<
B: BlockT, B: BlockT,
C: BlockchainEvents<B> C: BlockchainEvents<B>
@@ -153,21 +155,21 @@ async fn build_network_future<
+ 'static, + 'static,
H: sc_network_common::ExHashT, H: sc_network_common::ExHashT,
>( >(
role: Role, network: sc_network::NetworkWorker<B, H, C>,
mut network: sc_network::NetworkWorker<B, H, C>,
client: Arc<C>, client: Arc<C>,
mut rpc_rx: TracingUnboundedReceiver<sc_rpc::system::Request<B>>, chain_sync_service: ChainSyncInterfaceHandle<B>,
should_have_peers: bool,
announce_imported_blocks: bool, announce_imported_blocks: bool,
) { ) {
let mut imported_blocks_stream = client.import_notification_stream().fuse(); let mut imported_blocks_stream = client.import_notification_stream().fuse();
// Current best block at initialization, to report to the RPC layer.
let starting_block = client.info().best_number;
// Stream of finalized blocks reported by the client. // Stream of finalized blocks reported by the client.
let mut finality_notification_stream = client.finality_notification_stream().fuse(); let mut finality_notification_stream = client.finality_notification_stream().fuse();
let network_service = network.service().clone();
let network_run = network.run().fuse();
pin_mut!(network_run);
loop { loop {
futures::select! { futures::select! {
// List of blocks that the client has imported. // List of blocks that the client has imported.
@@ -176,15 +178,18 @@ async fn build_network_future<
Some(n) => n, Some(n) => n,
// If this stream is shut down, that means the client has shut down, and the // If this stream is shut down, that means the client has shut down, and the
// most appropriate thing to do for the network future is to shut down too. // most appropriate thing to do for the network future is to shut down too.
None => return, None => {
debug!("Block import stream has terminated, shutting down the network future.");
return
},
}; };
if announce_imported_blocks { if announce_imported_blocks {
network.service().announce_block(notification.hash, None); network_service.announce_block(notification.hash, None);
} }
if notification.is_new_best { if notification.is_new_best {
network.service().new_best_block_imported( network_service.new_best_block_imported(
notification.hash, notification.hash,
*notification.header.number(), *notification.header.number(),
); );
@@ -193,76 +198,131 @@ async fn build_network_future<
// List of blocks that the client has finalized. // List of blocks that the client has finalized.
notification = finality_notification_stream.select_next_some() => { notification = finality_notification_stream.select_next_some() => {
network.on_block_finalized(notification.hash, notification.header); chain_sync_service.on_block_finalized(notification.hash, *notification.header.number());
} }
// Drive the network. Shut down the network future if `NetworkWorker` has terminated.
_ = network_run => {
debug!("`NetworkWorker` has terminated, shutting down the network future.");
return
}
}
}
}
/// Builds a future that processes system RPC requests.
async fn build_system_rpc_future<
B: BlockT,
C: BlockchainEvents<B>
+ HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
H: sc_network_common::ExHashT,
>(
role: Role,
network_service: Arc<sc_network::NetworkService<B, H>>,
chain_sync_service: ChainSyncInterfaceHandle<B>,
client: Arc<C>,
mut rpc_rx: TracingUnboundedReceiver<sc_rpc::system::Request<B>>,
should_have_peers: bool,
) {
// Current best block at initialization, to report to the RPC layer.
let starting_block = client.info().best_number;
loop {
// Answer incoming RPC requests. // Answer incoming RPC requests.
request = rpc_rx.select_next_some() => { let Some(req) = rpc_rx.next().await else {
match request { debug!("RPC requests stream has terminated, shutting down the system RPC future.");
return;
};
match req {
sc_rpc::system::Request::Health(sender) => { sc_rpc::system::Request::Health(sender) => {
let peers = network_service.peers_debug_info().await;
if let Ok(peers) = peers {
let _ = sender.send(sc_rpc::system::Health { let _ = sender.send(sc_rpc::system::Health {
peers: network.peers_debug_info().len(), peers: peers.len(),
is_syncing: network.service().is_major_syncing(), is_syncing: network_service.is_major_syncing(),
should_have_peers, should_have_peers,
}); });
} else {
break
}
}, },
sc_rpc::system::Request::LocalPeerId(sender) => { sc_rpc::system::Request::LocalPeerId(sender) => {
let _ = sender.send(network.local_peer_id().to_base58()); let _ = sender.send(network_service.local_peer_id().to_base58());
}, },
sc_rpc::system::Request::LocalListenAddresses(sender) => { sc_rpc::system::Request::LocalListenAddresses(sender) => {
let peer_id = (*network.local_peer_id()).into(); let peer_id = network_service.local_peer_id().into();
let p2p_proto_suffix = sc_network::multiaddr::Protocol::P2p(peer_id); let p2p_proto_suffix = sc_network::multiaddr::Protocol::P2p(peer_id);
let addresses = network.listen_addresses() let addresses = network_service
.listen_addresses()
.iter()
.map(|addr| addr.clone().with(p2p_proto_suffix.clone()).to_string()) .map(|addr| addr.clone().with(p2p_proto_suffix.clone()).to_string())
.collect(); .collect();
let _ = sender.send(addresses); let _ = sender.send(addresses);
}, },
sc_rpc::system::Request::Peers(sender) => { sc_rpc::system::Request::Peers(sender) => {
let _ = sender.send(network.peers_debug_info().into_iter().map(|(peer_id, p)| let peers = network_service.peers_debug_info().await;
sc_rpc::system::PeerInfo { if let Ok(peers) = peers {
let _ = sender.send(
peers
.into_iter()
.map(|(peer_id, p)| sc_rpc::system::PeerInfo {
peer_id: peer_id.to_base58(), peer_id: peer_id.to_base58(),
roles: format!("{:?}", p.roles), roles: format!("{:?}", p.roles),
best_hash: p.best_hash, best_hash: p.best_hash,
best_number: p.best_number, best_number: p.best_number,
})
.collect(),
);
} else {
break
} }
).collect()); },
}
sc_rpc::system::Request::NetworkState(sender) => { sc_rpc::system::Request::NetworkState(sender) => {
if let Ok(network_state) = serde_json::to_value(&network.network_state()) { let network_state = network_service.network_state().await;
if let Ok(network_state) = network_state {
if let Ok(network_state) = serde_json::to_value(network_state) {
let _ = sender.send(network_state); let _ = sender.send(network_state);
} }
} else {
break
} }
},
sc_rpc::system::Request::NetworkAddReservedPeer(peer_addr, sender) => { sc_rpc::system::Request::NetworkAddReservedPeer(peer_addr, sender) => {
let result = match MultiaddrWithPeerId::try_from(peer_addr) { let result = match MultiaddrWithPeerId::try_from(peer_addr) {
Ok(peer) => { Ok(peer) => network_service.add_reserved_peer(peer),
network.add_reserved_peer(peer) Err(err) => Err(err.to_string()),
},
Err(err) => {
Err(err.to_string())
},
}; };
let x = result.map_err(sc_rpc::system::error::Error::MalformattedPeerArg); let x = result.map_err(sc_rpc::system::error::Error::MalformattedPeerArg);
let _ = sender.send(x); let _ = sender.send(x);
} },
sc_rpc::system::Request::NetworkRemoveReservedPeer(peer_id, sender) => { sc_rpc::system::Request::NetworkRemoveReservedPeer(peer_id, sender) => {
let _ = match peer_id.parse::<PeerId>() { let _ = match peer_id.parse::<PeerId>() {
Ok(peer_id) => { Ok(peer_id) => {
network.remove_reserved_peer(peer_id); network_service.remove_reserved_peer(peer_id);
sender.send(Ok(())) sender.send(Ok(()))
} },
Err(e) => sender.send(Err(sc_rpc::system::error::Error::MalformattedPeerArg( Err(e) => sender.send(Err(sc_rpc::system::error::Error::MalformattedPeerArg(
e.to_string(), e.to_string(),
))), ))),
}; };
} },
sc_rpc::system::Request::NetworkReservedPeers(sender) => { sc_rpc::system::Request::NetworkReservedPeers(sender) => {
let reserved_peers = network.reserved_peers(); let reserved_peers = network_service.reserved_peers().await;
let reserved_peers = reserved_peers if let Ok(reserved_peers) = reserved_peers {
.map(|peer_id| peer_id.to_base58()) let reserved_peers =
.collect(); reserved_peers.iter().map(|peer_id| peer_id.to_base58()).collect();
let _ = sender.send(reserved_peers); let _ = sender.send(reserved_peers);
} else {
break
} }
},
sc_rpc::system::Request::NodeRoles(sender) => { sc_rpc::system::Request::NodeRoles(sender) => {
use sc_rpc::system::NodeRole; use sc_rpc::system::NodeRole;
@@ -272,27 +332,26 @@ async fn build_network_future<
}; };
let _ = sender.send(vec![node_role]); let _ = sender.send(vec![node_role]);
} },
sc_rpc::system::Request::SyncState(sender) => { sc_rpc::system::Request::SyncState(sender) => {
use sc_rpc::system::SyncState; use sc_rpc::system::SyncState;
let best_number = client.info().best_number; let best_number = client.info().best_number;
let Ok(status) = chain_sync_service.status().await else {
debug!("`ChainSync` has terminated, shutting down the system RPC future.");
return
};
let _ = sender.send(SyncState { let _ = sender.send(SyncState {
starting_block, starting_block,
current_block: best_number, current_block: best_number,
highest_block: network.best_seen_block().unwrap_or(best_number), highest_block: status.best_seen_block.unwrap_or(best_number),
}); });
},
} }
} }
} debug!("`NetworkWorker` has terminated, shutting down the system RPC future.");
// The network worker has done something. Nothing special to do, but could be
// used in the future to perform actions in response of things that happened on
// the network.
_ = (&mut network).fuse() => {}
}
}
} }
// Wrapper for HTTP and WS servers that makes sure they are properly shut down. // Wrapper for HTTP and WS servers that makes sure they are properly shut down.