Clean up the public API of sc-network-gossip (#5001)

This commit is contained in:
Pierre Krieger
2020-02-20 13:27:36 +01:00
committed by GitHub
parent 8cdf98c773
commit 5bf644b768
5 changed files with 13 additions and 24 deletions
@@ -20,8 +20,7 @@ use crate::state_machine::{ConsensusGossip, TopicNotification, PERIODIC_MAINTENA
use sc_network::message::generic::ConsensusMessage;
use sc_network::{Event, ReputationChange};
use futures::{prelude::*, channel::mpsc, compat::Compat01As03};
use futures01::stream::Stream as Stream01;
use futures::{prelude::*, channel::mpsc};
use libp2p::PeerId;
use parking_lot::Mutex;
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
@@ -38,7 +37,7 @@ struct GossipEngineInner<B: BlockT> {
state_machine: ConsensusGossip<B>,
network: Box<dyn Network<B> + Send>,
periodic_maintenance_interval: futures_timer::Delay,
network_event_stream: Compat01As03<Box<dyn Stream01<Error = (), Item = Event> + Send>>,
network_event_stream: Pin<Box<dyn Stream<Item = Event> + Send>>,
engine_id: ConsensusEngineId,
}
@@ -64,7 +63,7 @@ impl<B: BlockT> GossipEngine<B> {
state_machine,
network: Box::new(network),
periodic_maintenance_interval: futures_timer::Delay::new(PERIODIC_MAINTENANCE_INTERVAL),
network_event_stream: Compat01As03::new(network_event_stream),
network_event_stream,
engine_id,
}));
@@ -178,7 +177,7 @@ impl<B: BlockT> Future for GossipEngineInner<B> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = &mut *self;
while let Poll::Ready(Some(Ok(event))) = this.network_event_stream.poll_next_unpin(cx) {
while let Poll::Ready(Some(event)) = this.network_event_stream.poll_next_unpin(cx) {
match event {
Event::NotificationStreamOpened { remote, engine_id: msg_engine_id, roles } => {
if msg_engine_id != this.engine_id {
+4 -4
View File
@@ -61,7 +61,7 @@ pub use self::validator::{DiscardAll, MessageIntent, Validator, ValidatorContext
use futures::prelude::*;
use sc_network::{specialization::NetworkSpecialization, Event, ExHashT, NetworkService, PeerId, ReputationChange};
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
use std::sync::Arc;
use std::{pin::Pin, sync::Arc};
mod bridge;
mod state_machine;
@@ -70,7 +70,7 @@ mod validator;
/// Abstraction over a network.
pub trait Network<B: BlockT> {
/// Returns a stream of events representing what happens on the network.
fn event_stream(&self) -> Box<dyn futures01::Stream<Item = Event, Error = ()> + Send>;
fn event_stream(&self) -> Pin<Box<dyn Stream<Item = Event> + Send>>;
/// Adjust the reputation of a node.
fn report_peer(&self, peer_id: PeerId, reputation: ReputationChange);
@@ -97,8 +97,8 @@ pub trait Network<B: BlockT> {
}
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Network<B> for Arc<NetworkService<B, S, H>> {
fn event_stream(&self) -> Box<dyn futures01::Stream<Item = Event, Error = ()> + Send> {
Box::new(NetworkService::event_stream(self).map(|v| Ok::<_, ()>(v)).compat())
fn event_stream(&self) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
Box::pin(NetworkService::event_stream(self))
}
fn report_peer(&self, peer_id: PeerId, reputation: ReputationChange) {