client/network-gossip: Integrate GossipEngine tasks into Future impl (#4767)

`GossipEngine` spawns two tasks, one for a periodic tick, one to forward
messages from the network to subscribers. These tasks hold an `Arc` to a
`GossipEngineInner`.

To reduce the amount of shared ownership (locking) this patch integrates
the two tasks into a `Future` implementation on the `GossipEngine`
struct. This `Future` implementation can now be called from a single
owner, e.g. the `finality-grandpa` `NetworkBridge`.

As a side effect this removes the requirement on the `network-gossip`
crate to spawn tasks and thereby removes the requirement on the
`finality-grandpa` crate to spawn any tasks.

This is part of a greater effort to reduce the number of owners of
components within `finality-grandpa`, `network` and `network-gossip` as
well as to reduce the amount of unbounded channels. For details see
d9837d7dd, 5f80929dc and 597c0a6c4.
This commit is contained in:
Max Inden
2020-02-12 13:15:26 +01:00
committed by GitHub
parent 13971fe2a7
commit 3f3910ccaf
9 changed files with 121 additions and 155 deletions
+66 -71
View File
@@ -15,16 +15,17 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use crate::{Network, Validator};
use crate::state_machine::{ConsensusGossip, TopicNotification};
use crate::state_machine::{ConsensusGossip, TopicNotification, PERIODIC_MAINTENANCE_INTERVAL};
use sc_network::message::generic::ConsensusMessage;
use sc_network::{Event, ReputationChange};
use futures::{prelude::*, channel::mpsc, compat::Compat01As03, task::SpawnExt as _};
use futures::{prelude::*, channel::mpsc, compat::Compat01As03};
use futures01::stream::Stream as Stream01;
use libp2p::PeerId;
use parking_lot::Mutex;
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
use std::{sync::Arc, time::Duration};
use std::{pin::Pin, sync::Arc, task::{Context, Poll}};
/// Wraps around an implementation of the `Network` crate and provides gossiping capabilities on
/// top of it.
@@ -36,13 +37,17 @@ pub struct GossipEngine<B: BlockT> {
struct GossipEngineInner<B: BlockT> {
state_machine: ConsensusGossip<B>,
network: Box<dyn Network<B> + Send>,
periodic_maintenance_interval: futures_timer::Delay,
network_event_stream: Compat01As03<Box<dyn Stream01<Error = (), Item = Event> + Send>>,
engine_id: ConsensusEngineId,
}
impl<B: BlockT> Unpin for GossipEngineInner<B> {}
impl<B: BlockT> GossipEngine<B> {
/// Create a new instance.
pub fn new<N: Network<B> + Send + Clone + 'static>(
mut network: N,
executor: &impl futures::task::Spawn,
engine_id: ConsensusEngineId,
validator: Arc<dyn Validator<B>>,
) -> Self where B: 'static {
@@ -50,7 +55,7 @@ impl<B: BlockT> GossipEngine<B> {
// We grab the event stream before registering the notifications protocol, otherwise we
// might miss events.
let event_stream = network.event_stream();
let network_event_stream = network.event_stream();
network.register_notifications_protocol(engine_id);
state_machine.register_validator(&mut network, engine_id, validator);
@@ -58,6 +63,9 @@ impl<B: BlockT> GossipEngine<B> {
let inner = Arc::new(Mutex::new(GossipEngineInner {
state_machine,
network: Box::new(network),
periodic_maintenance_interval: futures_timer::Delay::new(PERIODIC_MAINTENANCE_INTERVAL),
network_event_stream: Compat01As03::new(network_event_stream),
engine_id,
}));
let gossip_engine = GossipEngine {
@@ -65,72 +73,6 @@ impl<B: BlockT> GossipEngine<B> {
engine_id,
};
let res = executor.spawn({
let inner = Arc::downgrade(&inner);
async move {
loop {
let _ = futures_timer::Delay::new(Duration::from_millis(1100)).await;
if let Some(inner) = inner.upgrade() {
let mut inner = inner.lock();
let inner = &mut *inner;
inner.state_machine.tick(&mut *inner.network);
} else {
// We reach this branch if the `Arc<GossipEngineInner>` has no reference
// left. We can now let the task end.
break;
}
}
}
});
// Note: we consider the chances of an error to spawn a background task almost null.
if res.is_err() {
log::error!(target: "gossip", "Failed to spawn background task");
}
let res = executor.spawn(async move {
let mut stream = Compat01As03::new(event_stream);
while let Some(Ok(event)) = stream.next().await {
match event {
Event::NotificationStreamOpened { remote, engine_id: msg_engine_id, roles } => {
if msg_engine_id != engine_id {
continue;
}
let mut inner = inner.lock();
let inner = &mut *inner;
inner.state_machine.new_peer(&mut *inner.network, remote, roles);
}
Event::NotificationStreamClosed { remote, engine_id: msg_engine_id } => {
if msg_engine_id != engine_id {
continue;
}
let mut inner = inner.lock();
let inner = &mut *inner;
inner.state_machine.peer_disconnected(&mut *inner.network, remote);
},
Event::NotificationsReceived { remote, messages } => {
let mut inner = inner.lock();
let inner = &mut *inner;
inner.state_machine.on_incoming(
&mut *inner.network,
remote,
messages.into_iter()
.filter_map(|(engine, data)| if engine == engine_id {
Some(ConsensusMessage { engine_id: engine, data: data.to_vec() })
} else { None })
.collect()
);
},
Event::Dht(_) => {}
}
}
});
// Note: we consider the chances of an error to spawn a background task almost null.
if res.is_err() {
log::error!(target: "gossip", "Failed to spawn background task");
}
gossip_engine
}
@@ -222,6 +164,59 @@ impl<B: BlockT> GossipEngine<B> {
}
}
impl<B: BlockT> Future for GossipEngine<B> {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.inner.lock().poll_unpin(cx)
}
}
impl<B: BlockT> Future for GossipEngineInner<B> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = &mut *self;
while let Poll::Ready(Some(Ok(event))) = this.network_event_stream.poll_next_unpin(cx) {
match event {
Event::NotificationStreamOpened { remote, engine_id: msg_engine_id, roles } => {
if msg_engine_id != this.engine_id {
continue;
}
this.state_machine.new_peer(&mut *this.network, remote, roles);
}
Event::NotificationStreamClosed { remote, engine_id: msg_engine_id } => {
if msg_engine_id != this.engine_id {
continue;
}
this.state_machine.peer_disconnected(&mut *this.network, remote);
},
Event::NotificationsReceived { remote, messages } => {
let engine_id = this.engine_id.clone();
this.state_machine.on_incoming(
&mut *this.network,
remote,
messages.into_iter()
.filter_map(|(engine, data)| if engine == engine_id {
Some(ConsensusMessage { engine_id: engine, data: data.to_vec() })
} else { None })
.collect()
);
},
Event::Dht(_) => {}
}
}
while let Poll::Ready(()) = this.periodic_maintenance_interval.poll_unpin(cx) {
this.periodic_maintenance_interval.reset(PERIODIC_MAINTENANCE_INTERVAL);
this.state_machine.tick(&mut *this.network);
}
Poll::Pending
}
}
impl<B: BlockT> Clone for GossipEngine<B> {
fn clone(&self) -> Self {
GossipEngine {
@@ -35,6 +35,8 @@ const KNOWN_MESSAGES_CACHE_SIZE: usize = 4096;
const REBROADCAST_INTERVAL: time::Duration = time::Duration::from_secs(30);
pub(crate) const PERIODIC_MAINTENANCE_INTERVAL: time::Duration = time::Duration::from_millis(1100);
mod rep {
use sc_network::ReputationChange as Rep;
/// Reputation change when a peer sends us a gossip message that we didn't know about.