Use async/await in build_network_future (#6533)

* Use async/await in build_network_future

* Address concerns

* Fix test
This commit is contained in:
Pierre Krieger
2020-07-07 12:32:30 +02:00
committed by GitHub
parent cccf57a79f
commit 36cf6e7dc4
6 changed files with 257 additions and 170 deletions
-1
View File
@@ -6564,7 +6564,6 @@ dependencies = [
"futures 0.3.5",
"log",
"parity-util-mem",
"parking_lot 0.10.2",
"sc-client-api",
"sc-network",
"sp-blockchain",
+1 -2
View File
@@ -16,11 +16,10 @@ ansi_term = "0.12.1"
futures = "0.3.4"
log = "0.4.8"
parity-util-mem = { version = "0.6.1", default-features = false, features = ["primitive-types"] }
wasm-timer = "0.2"
sc-client-api = { version = "2.0.0-rc4", path = "../api" }
sc-network = { version = "0.8.0-rc4", path = "../network" }
sp-blockchain = { version = "2.0.0-rc4", path = "../../primitives/blockchain" }
sp-runtime = { version = "2.0.0-rc4", path = "../../primitives/runtime" }
sp-utils = { version = "2.0.0-rc2", path = "../../primitives/utils" }
sp-transaction-pool = { version = "2.0.0-rc2", path = "../../primitives/transaction-pool" }
parking_lot = "0.10.2"
wasm-timer = "0.2"
+2 -3
View File
@@ -29,7 +29,6 @@ use sp_runtime::traits::{Block as BlockT, Header};
use sp_transaction_pool::TransactionPool;
use sp_utils::{status_sinks, mpsc::tracing_unbounded};
use std::{fmt::Display, sync::Arc, time::Duration, collections::VecDeque};
use parking_lot::Mutex;
mod display;
@@ -82,7 +81,7 @@ impl<T: TransactionPool + MallocSizeOf> TransactionPoolAndMaybeMallogSizeOf for
/// Builds the informant and returns a `Future` that drives the informant.
pub fn build<B: BlockT, C>(
client: Arc<C>,
network_status_sinks: Arc<Mutex<status_sinks::StatusSinks<(NetworkStatus<B>, NetworkState)>>>,
network_status_sinks: Arc<status_sinks::StatusSinks<(NetworkStatus<B>, NetworkState)>>,
pool: Arc<impl TransactionPoolAndMaybeMallogSizeOf>,
format: OutputFormat,
) -> impl futures::Future<Output = ()>
@@ -94,7 +93,7 @@ where
let client_1 = client.clone();
let (network_status_sink, network_status_stream) = tracing_unbounded("mpsc_network_status");
network_status_sinks.lock().push(Duration::from_millis(5000), network_status_sink);
network_status_sinks.push(Duration::from_millis(5000), network_status_sink);
let display_notifications = network_status_stream
.for_each(move |(net_status, _)| {
+6 -6
View File
@@ -1157,7 +1157,7 @@ async fn telemetry_periodic_send<TBl, TExPool, TCl>(
client: Arc<TCl>,
transaction_pool: Arc<TExPool>,
mut metrics_service: MetricsService,
network_status_sinks: Arc<Mutex<status_sinks::StatusSinks<(NetworkStatus<TBl>, NetworkState)>>>
network_status_sinks: Arc<status_sinks::StatusSinks<(NetworkStatus<TBl>, NetworkState)>>
)
where
TBl: BlockT,
@@ -1165,7 +1165,7 @@ async fn telemetry_periodic_send<TBl, TExPool, TCl>(
TExPool: MaintainedTransactionPool<Block=TBl, Hash = <TBl as BlockT>::Hash>,
{
let (state_tx, state_rx) = tracing_unbounded::<(NetworkStatus<_>, NetworkState)>("mpsc_netstat1");
network_status_sinks.lock().push(std::time::Duration::from_millis(5000), state_tx);
network_status_sinks.push(std::time::Duration::from_millis(5000), state_tx);
state_rx.for_each(move |(net_status, _)| {
let info = client.usage_info();
metrics_service.tick(
@@ -1178,11 +1178,11 @@ async fn telemetry_periodic_send<TBl, TExPool, TCl>(
}
async fn telemetry_periodic_network_state<TBl: BlockT>(
network_status_sinks: Arc<Mutex<status_sinks::StatusSinks<(NetworkStatus<TBl>, NetworkState)>>>
network_status_sinks: Arc<status_sinks::StatusSinks<(NetworkStatus<TBl>, NetworkState)>>
) {
// Periodically send the network state to the telemetry.
let (netstat_tx, netstat_rx) = tracing_unbounded::<(NetworkStatus<_>, NetworkState)>("mpsc_netstat2");
network_status_sinks.lock().push(std::time::Duration::from_secs(30), netstat_tx);
network_status_sinks.push(std::time::Duration::from_secs(30), netstat_tx);
netstat_rx.for_each(move |(_, network_state)| {
telemetry!(
SUBSTRATE_INFO;
@@ -1347,7 +1347,7 @@ fn build_network<TBl, TExPool, TImpQu, TCl>(
) -> Result<
(
Arc<NetworkService<TBl, <TBl as BlockT>::Hash>>,
Arc<Mutex<status_sinks::StatusSinks<(NetworkStatus<TBl>, NetworkState)>>>,
Arc<status_sinks::StatusSinks<(NetworkStatus<TBl>, NetworkState)>>,
Pin<Box<dyn Future<Output = ()> + Send>>
),
Error
@@ -1407,7 +1407,7 @@ fn build_network<TBl, TExPool, TImpQu, TCl>(
let has_bootnodes = !network_params.network_config.boot_nodes.is_empty();
let network_mut = sc_network::NetworkWorker::new(network_params)?;
let network = network_mut.service().clone();
let network_status_sinks = Arc::new(Mutex::new(status_sinks::StatusSinks::new()));
let network_status_sinks = Arc::new(status_sinks::StatusSinks::new());
let future = build_network_future(
config.role.clone(),
+138 -116
View File
@@ -20,7 +20,7 @@
//! Manages communication between them.
#![warn(missing_docs)]
#![recursion_limit="128"]
#![recursion_limit = "1024"]
pub mod config;
pub mod chain_ops;
@@ -42,7 +42,7 @@ use wasm_timer::Instant;
use std::task::Poll;
use parking_lot::Mutex;
use futures::{Future, FutureExt, Stream, StreamExt, compat::*};
use futures::{Future, FutureExt, Stream, StreamExt, stream, compat::*};
use sc_network::{NetworkStatus, network_state::NetworkState, PeerId};
use log::{log, warn, debug, error, Level};
use codec::{Encode, Decode};
@@ -118,12 +118,12 @@ impl RpcHandlers {
/// Sinks to propagate network status updates.
/// For each element, every time the `Interval` fires we push an element on the sender.
pub struct NetworkStatusSinks<Block: BlockT>(
Arc<Mutex<status_sinks::StatusSinks<(NetworkStatus<Block>, NetworkState)>>>,
Arc<status_sinks::StatusSinks<(NetworkStatus<Block>, NetworkState)>>,
);
impl<Block: BlockT> NetworkStatusSinks<Block> {
fn new(
sinks: Arc<Mutex<status_sinks::StatusSinks<(NetworkStatus<Block>, NetworkState)>>>
sinks: Arc<status_sinks::StatusSinks<(NetworkStatus<Block>, NetworkState)>>
) -> Self {
Self(sinks)
}
@@ -132,7 +132,7 @@ impl<Block: BlockT> NetworkStatusSinks<Block> {
pub fn network_status(&self, interval: Duration)
-> TracingUnboundedReceiver<(NetworkStatus<Block>, NetworkState)> {
let (sink, stream) = tracing_unbounded("mpsc_network_status");
self.0.lock().push(interval, sink);
self.0.push(interval, sink);
stream
}
}
@@ -181,7 +181,7 @@ pub struct ServiceComponents<TBl: BlockT, TBackend: Backend<TBl>, TSc, TExPool,
/// Builds a never-ending future that continuously polls the network.
///
/// The `status_sink` contain a list of senders to send a periodic network status to.
fn build_network_future<
async fn build_network_future<
B: BlockT,
C: BlockchainEvents<B>,
H: sc_network::ExHashT
@@ -189,126 +189,150 @@ fn build_network_future<
role: Role,
mut network: sc_network::NetworkWorker<B, H>,
client: Arc<C>,
status_sinks: Arc<Mutex<status_sinks::StatusSinks<(NetworkStatus<B>, NetworkState)>>>,
status_sinks: Arc<status_sinks::StatusSinks<(NetworkStatus<B>, NetworkState)>>,
mut rpc_rx: TracingUnboundedReceiver<sc_rpc::system::Request<B>>,
should_have_peers: bool,
announce_imported_blocks: bool,
) -> impl Future<Output = ()> {
) {
let mut imported_blocks_stream = client.import_notification_stream().fuse();
let mut finality_notification_stream = client.finality_notification_stream().fuse();
futures::future::poll_fn(move |cx| {
// Stream of finalized blocks reported by the client.
let mut finality_notification_stream = {
let mut finality_notification_stream = client.finality_notification_stream().fuse();
// We tweak the `Stream` in order to merge together multiple items if they happen to be
// ready. This way, we only get the latest finalized block.
stream::poll_fn(move |cx| {
let mut last = None;
while let Poll::Ready(Some(item)) = Pin::new(&mut finality_notification_stream).poll_next(cx) {
last = Some(item);
}
if let Some(last) = last {
Poll::Ready(Some(last))
} else {
Poll::Pending
}
}).fuse()
};
loop {
let before_polling = Instant::now();
// We poll `imported_blocks_stream`.
while let Poll::Ready(Some(notification)) = Pin::new(&mut imported_blocks_stream).poll_next(cx) {
if announce_imported_blocks {
network.service().announce_block(notification.hash, Vec::new());
}
futures::select!{
// List of blocks that the client has imported.
notification = imported_blocks_stream.next() => {
let notification = match notification {
Some(n) => n,
// If this stream is shut down, that means the client has shut down, and the
// most appropriate thing to do for the network future is to shut down too.
None => return,
};
if let sp_consensus::BlockOrigin::Own = notification.origin {
network.service().own_block_imported(
notification.hash,
notification.header.number().clone(),
);
}
}
// We poll `finality_notification_stream`, but we only take the last event.
let mut last = None;
while let Poll::Ready(Some(item)) = Pin::new(&mut finality_notification_stream).poll_next(cx) {
last = Some(item);
}
if let Some(notification) = last {
network.on_block_finalized(notification.hash, notification.header);
}
// Poll the RPC requests and answer them.
while let Poll::Ready(Some(request)) = Pin::new(&mut rpc_rx).poll_next(cx) {
match request {
sc_rpc::system::Request::Health(sender) => {
let _ = sender.send(sc_rpc::system::Health {
peers: network.peers_debug_info().len(),
is_syncing: network.service().is_major_syncing(),
should_have_peers,
});
},
sc_rpc::system::Request::LocalPeerId(sender) => {
let _ = sender.send(network.local_peer_id().to_base58());
},
sc_rpc::system::Request::LocalListenAddresses(sender) => {
let peer_id = network.local_peer_id().clone().into();
let p2p_proto_suffix = sc_network::multiaddr::Protocol::P2p(peer_id);
let addresses = network.listen_addresses()
.map(|addr| addr.clone().with(p2p_proto_suffix.clone()).to_string())
.collect();
let _ = sender.send(addresses);
},
sc_rpc::system::Request::Peers(sender) => {
let _ = sender.send(network.peers_debug_info().into_iter().map(|(peer_id, p)|
sc_rpc::system::PeerInfo {
peer_id: peer_id.to_base58(),
roles: format!("{:?}", p.roles),
protocol_version: p.protocol_version,
best_hash: p.best_hash,
best_number: p.best_number,
}
).collect());
if announce_imported_blocks {
network.service().announce_block(notification.hash, Vec::new());
}
sc_rpc::system::Request::NetworkState(sender) => {
if let Some(network_state) = serde_json::to_value(&network.network_state()).ok() {
let _ = sender.send(network_state);
if let sp_consensus::BlockOrigin::Own = notification.origin {
network.service().own_block_imported(
notification.hash,
notification.header.number().clone(),
);
}
}
// List of blocks that the client has finalized.
notification = finality_notification_stream.select_next_some() => {
network.on_block_finalized(notification.hash, notification.header);
}
// Answer incoming RPC requests.
request = rpc_rx.select_next_some() => {
match request {
sc_rpc::system::Request::Health(sender) => {
let _ = sender.send(sc_rpc::system::Health {
peers: network.peers_debug_info().len(),
is_syncing: network.service().is_major_syncing(),
should_have_peers,
});
},
sc_rpc::system::Request::LocalPeerId(sender) => {
let _ = sender.send(network.local_peer_id().to_base58());
},
sc_rpc::system::Request::LocalListenAddresses(sender) => {
let peer_id = network.local_peer_id().clone().into();
let p2p_proto_suffix = sc_network::multiaddr::Protocol::P2p(peer_id);
let addresses = network.listen_addresses()
.map(|addr| addr.clone().with(p2p_proto_suffix.clone()).to_string())
.collect();
let _ = sender.send(addresses);
},
sc_rpc::system::Request::Peers(sender) => {
let _ = sender.send(network.peers_debug_info().into_iter().map(|(peer_id, p)|
sc_rpc::system::PeerInfo {
peer_id: peer_id.to_base58(),
roles: format!("{:?}", p.roles),
protocol_version: p.protocol_version,
best_hash: p.best_hash,
best_number: p.best_number,
}
).collect());
}
sc_rpc::system::Request::NetworkState(sender) => {
if let Some(network_state) = serde_json::to_value(&network.network_state()).ok() {
let _ = sender.send(network_state);
}
}
sc_rpc::system::Request::NetworkAddReservedPeer(peer_addr, sender) => {
let x = network.add_reserved_peer(peer_addr)
.map_err(sc_rpc::system::error::Error::MalformattedPeerArg);
let _ = sender.send(x);
}
sc_rpc::system::Request::NetworkRemoveReservedPeer(peer_id, sender) => {
let _ = match peer_id.parse::<PeerId>() {
Ok(peer_id) => {
network.remove_reserved_peer(peer_id);
sender.send(Ok(()))
}
Err(e) => sender.send(Err(sc_rpc::system::error::Error::MalformattedPeerArg(
e.to_string(),
))),
};
}
sc_rpc::system::Request::NodeRoles(sender) => {
use sc_rpc::system::NodeRole;
let node_role = match role {
Role::Authority { .. } => NodeRole::Authority,
Role::Light => NodeRole::LightClient,
Role::Full => NodeRole::Full,
Role::Sentry { .. } => NodeRole::Sentry,
};
let _ = sender.send(vec![node_role]);
}
}
sc_rpc::system::Request::NetworkAddReservedPeer(peer_addr, sender) => {
let x = network.add_reserved_peer(peer_addr)
.map_err(sc_rpc::system::error::Error::MalformattedPeerArg);
let _ = sender.send(x);
}
sc_rpc::system::Request::NetworkRemoveReservedPeer(peer_id, sender) => {
let _ = match peer_id.parse::<PeerId>() {
Ok(peer_id) => {
network.remove_reserved_peer(peer_id);
sender.send(Ok(()))
}
Err(e) => sender.send(Err(sc_rpc::system::error::Error::MalformattedPeerArg(
e.to_string(),
))),
};
}
sc_rpc::system::Request::NodeRoles(sender) => {
use sc_rpc::system::NodeRole;
}
let node_role = match role {
Role::Authority { .. } => NodeRole::Authority,
Role::Light => NodeRole::LightClient,
Role::Full => NodeRole::Full,
Role::Sentry { .. } => NodeRole::Sentry,
};
// The network worker has done something. Nothing special to do, but could be
// used in the future to perform actions in response of things that happened on
// the network.
_ = (&mut network).fuse() => {}
let _ = sender.send(vec![node_role]);
}
};
}
// Interval report for the external API.
status_sinks.lock().poll(cx, || {
let status = NetworkStatus {
sync_state: network.sync_state(),
best_seen_block: network.best_seen_block(),
num_sync_peers: network.num_sync_peers(),
num_connected_peers: network.num_connected_peers(),
num_active_peers: network.num_active_peers(),
average_download_per_sec: network.average_download_per_sec(),
average_upload_per_sec: network.average_upload_per_sec(),
};
let state = network.network_state();
(status, state)
});
// Main network polling.
if let Poll::Ready(()) = network.poll_unpin(cx) {
return Poll::Ready(());
// At a regular interval, we send the state of the network on what is called
// the "status sinks".
ready_sink = status_sinks.next().fuse() => {
let status = NetworkStatus {
sync_state: network.sync_state(),
best_seen_block: network.best_seen_block(),
num_sync_peers: network.num_sync_peers(),
num_connected_peers: network.num_connected_peers(),
num_active_peers: network.num_active_peers(),
average_download_per_sec: network.average_download_per_sec(),
average_upload_per_sec: network.average_upload_per_sec(),
};
let state = network.network_state();
ready_sink.send((status, state));
}
}
// Now some diagnostic for performances.
@@ -319,9 +343,7 @@ fn build_network_future<
"⚠️ Polling the network future took {:?}",
polling_dur
);
Poll::Pending
})
}
}
#[cfg(not(target_os = "unknown"))]
+110 -42
View File
@@ -14,19 +14,27 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use futures::{Stream, stream::futures_unordered::FuturesUnordered};
use std::time::Duration;
use std::pin::Pin;
use std::task::{Poll, Context};
use crate::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use futures::{prelude::*, lock::Mutex};
use futures_timer::Delay;
use crate::mpsc::TracingUnboundedSender;
use std::{pin::Pin, task::{Poll, Context}, time::Duration};
/// Holds a list of `UnboundedSender`s, each associated with a certain time period. Every time the
/// period elapses, we push an element on the sender.
///
/// Senders are removed only when they are closed.
pub struct StatusSinks<T> {
entries: FuturesUnordered<YieldAfter<T>>,
/// Should only be locked by `next`.
inner: Mutex<Inner<T>>,
/// Sending side of `Inner::entries_rx`.
entries_tx: TracingUnboundedSender<YieldAfter<T>>,
}
struct Inner<T> {
/// The actual entries of the list.
entries: stream::FuturesUnordered<YieldAfter<T>>,
/// Receives new entries and puts them in `entries`.
entries_rx: TracingUnboundedReceiver<YieldAfter<T>>,
}
struct YieldAfter<T> {
@@ -38,56 +46,114 @@ struct YieldAfter<T> {
impl<T> StatusSinks<T> {
/// Builds a new empty collection.
pub fn new() -> StatusSinks<T> {
let (entries_tx, entries_rx) = tracing_unbounded("status-sinks-entries");
StatusSinks {
entries: FuturesUnordered::new(),
inner: Mutex::new(Inner {
entries: stream::FuturesUnordered::new(),
entries_rx,
}),
entries_tx,
}
}
/// Adds a sender to the collection.
///
/// The `interval` is the time period between two pushes on the sender.
pub fn push(&mut self, interval: Duration, sender: TracingUnboundedSender<T>) {
self.entries.push(YieldAfter {
pub fn push(&self, interval: Duration, sender: TracingUnboundedSender<T>) {
let _ = self.entries_tx.unbounded_send(YieldAfter {
delay: Delay::new(interval),
interval,
sender: Some(sender),
})
});
}
/// Processes all the senders. If any sender is ready, calls the `status_grab` function and
/// pushes what it returns to the sender.
/// Waits until one of the sinks is ready, then returns an object that can be used to send
/// an element on said sink.
///
/// This function doesn't return anything, but it should be treated as if it implicitly
/// returns `Poll::Pending`. In particular, it should be called again when the task
/// is waken up.
///
/// # Panic
///
/// Panics if not called within the context of a task.
pub fn poll(&mut self, cx: &mut Context, mut status_grab: impl FnMut() -> T) {
/// If the object isn't used to send an element, the slot is skipped.
pub async fn next(&self) -> ReadySinkEvent<'_, T> {
// This is only ever locked by `next`, which means that one `next` at a time can run.
let mut inner = self.inner.lock().await;
let inner = &mut *inner;
loop {
match Pin::new(&mut self.entries).poll_next(cx) {
Poll::Ready(Some((sender, interval))) => {
let status = status_grab();
if sender.unbounded_send(status).is_ok() {
self.entries.push(YieldAfter {
// Note that since there's a small delay between the moment a task is
// waken up and the moment it is polled, the period is actually not
// `interval` but `interval + <delay>`. We ignore this problem in
// practice.
delay: Delay::new(interval),
interval,
sender: Some(sender),
});
// Future that produces the next ready entry in `entries`, or doesn't produce anything if
// the list is empty.
let next_ready_entry = {
let entries = &mut inner.entries;
async move {
if let Some(v) = entries.next().await {
v
} else {
loop {
futures::pending!()
}
}
}
};
futures::select!{
new_entry = inner.entries_rx.next() => {
if let Some(new_entry) = new_entry {
inner.entries.push(new_entry);
}
},
(sender, interval) = next_ready_entry.fuse() => {
return ReadySinkEvent {
sinks: self,
sender: Some(sender),
interval,
}
}
Poll::Ready(None) |
Poll::Pending => break,
}
}
}
}
/// One of the sinks is ready.
#[must_use]
pub struct ReadySinkEvent<'a, T> {
sinks: &'a StatusSinks<T>,
sender: Option<TracingUnboundedSender<T>>,
interval: Duration,
}
impl<'a, T> ReadySinkEvent<'a, T> {
/// Sends an element on the sender.
pub fn send(mut self, element: T) {
if let Some(sender) = self.sender.take() {
if sender.unbounded_send(element).is_ok() {
let _ = self.sinks.entries_tx.unbounded_send(YieldAfter {
// Note that since there's a small delay between the moment a task is
// woken up and the moment it is polled, the period is actually not
// `interval` but `interval + <delay>`. We ignore this problem in
// practice.
delay: Delay::new(self.interval),
interval: self.interval,
sender: Some(sender),
});
}
}
}
}
impl<'a, T> Drop for ReadySinkEvent<'a, T> {
fn drop(&mut self) {
if let Some(sender) = self.sender.take() {
if sender.is_closed() {
return;
}
let _ = self.sinks.entries_tx.unbounded_send(YieldAfter {
delay: Delay::new(self.interval),
interval: self.interval,
sender: Some(sender),
});
}
}
}
impl<T> futures::Future for YieldAfter<T> {
type Output = (TracingUnboundedSender<T>, Duration);
@@ -107,28 +173,30 @@ impl<T> futures::Future for YieldAfter<T> {
#[cfg(test)]
mod tests {
use crate::mpsc::tracing_unbounded;
use super::StatusSinks;
use futures::prelude::*;
use crate::mpsc::tracing_unbounded;
use std::time::Duration;
use std::task::Poll;
#[test]
fn works() {
// We're not testing that the `StatusSink` properly enforces an order in the intervals, as
// this easily causes test failures on busy CPUs.
let mut status_sinks = StatusSinks::new();
let status_sinks = StatusSinks::new();
let (tx, rx) = tracing_unbounded("status_sink_test");
let (tx, rx) = tracing_unbounded("test");
status_sinks.push(Duration::from_millis(100), tx);
let mut val_order = 5;
futures::executor::block_on(futures::future::select(
futures::future::poll_fn(move |cx| {
status_sinks.poll(cx, || { val_order += 1; val_order });
Poll::<()>::Pending
Box::pin(async move {
loop {
let ev = status_sinks.next().await;
val_order += 1;
ev.send(val_order);
}
}),
Box::pin(async {
let items: Vec<i32> = rx.take(3).collect().await;