mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-14 04:01:10 +00:00
Remove the NetworkChan (#2577)
* Remove the NetworkChan from the API * Remove the NetworkChan altogether * Address review * Fix line widths * More line width fixes * Remove pub visibility from entire world * Fix tests
This commit is contained in:
@@ -31,13 +31,12 @@ use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId
|
||||
|
||||
use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient};
|
||||
use crate::message::Message;
|
||||
use crate::protocol::{self, Context, CustomMessageOutcome, Protocol, ConnectedPeer, ProtocolStatus, PeerInfo};
|
||||
use crate::protocol::{self, Context, CustomMessageOutcome, Protocol, ConnectedPeer};
|
||||
use crate::protocol::{ProtocolStatus, PeerInfo, NetworkOut};
|
||||
use crate::config::Params;
|
||||
use crate::error::Error;
|
||||
use crate::specialization::NetworkSpecialization;
|
||||
|
||||
use crossbeam_channel::{self as channel, Receiver, Sender, TryRecvError};
|
||||
use tokio::prelude::task::AtomicTask;
|
||||
use tokio::runtime::Builder as RuntimeBuilder;
|
||||
|
||||
/// Interval at which we send status updates on the SyncProvider status stream.
|
||||
@@ -86,7 +85,7 @@ pub struct NetworkLink<B: BlockT, S: NetworkSpecialization<B>> {
|
||||
/// The protocol sender
|
||||
pub(crate) protocol_sender: mpsc::UnboundedSender<ProtocolMsg<B, S>>,
|
||||
/// The network sender
|
||||
pub(crate) network_sender: NetworkChan<B>,
|
||||
pub(crate) network_sender: mpsc::UnboundedSender<NetworkMsg<B>>,
|
||||
}
|
||||
|
||||
impl<B: BlockT, S: NetworkSpecialization<B>> Link<B> for NetworkLink<B, S> {
|
||||
@@ -102,8 +101,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>> Link<B> for NetworkLink<B, S> {
|
||||
let _ = self.protocol_sender.unbounded_send(ProtocolMsg::JustificationImportResult(hash.clone(), number, success));
|
||||
if !success {
|
||||
info!("Invalid justification provided by {} for #{}", who, hash);
|
||||
let _ = self.network_sender.send(NetworkMsg::ReportPeer(who.clone(), i32::min_value()));
|
||||
let _ = self.network_sender.send(NetworkMsg::DisconnectPeer(who.clone()));
|
||||
let _ = self.network_sender.unbounded_send(NetworkMsg::ReportPeer(who.clone(), i32::min_value()));
|
||||
let _ = self.network_sender.unbounded_send(NetworkMsg::DisconnectPeer(who.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,13 +134,13 @@ impl<B: BlockT, S: NetworkSpecialization<B>> Link<B> for NetworkLink<B, S> {
|
||||
));
|
||||
if !success {
|
||||
info!("Invalid finality proof provided by {} for #{}", who, request_block.0);
|
||||
let _ = self.network_sender.send(NetworkMsg::ReportPeer(who.clone(), i32::min_value()));
|
||||
let _ = self.network_sender.send(NetworkMsg::DisconnectPeer(who.clone()));
|
||||
let _ = self.network_sender.unbounded_send(NetworkMsg::ReportPeer(who.clone(), i32::min_value()));
|
||||
let _ = self.network_sender.unbounded_send(NetworkMsg::DisconnectPeer(who.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
fn report_peer(&self, who: PeerId, reputation_change: i32) {
|
||||
self.network_sender.send(NetworkMsg::ReportPeer(who, reputation_change));
|
||||
let _ = self.network_sender.unbounded_send(NetworkMsg::ReportPeer(who, reputation_change));
|
||||
}
|
||||
|
||||
fn restart(&self) {
|
||||
@@ -178,7 +177,7 @@ pub struct Service<B: BlockT + 'static, S: NetworkSpecialization<B>> {
|
||||
/// Peers whom we are connected with.
|
||||
peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<B>>>>,
|
||||
/// Channel for networking messages processed by the background thread.
|
||||
network_chan: NetworkChan<B>,
|
||||
network_chan: mpsc::UnboundedSender<NetworkMsg<B>>,
|
||||
/// Network service
|
||||
network: Arc<Mutex<NetworkService<Message<B>>>>,
|
||||
/// Peerset manager (PSM); manages the reputation of nodes and indicates the network which
|
||||
@@ -199,7 +198,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
|
||||
protocol_id: ProtocolId,
|
||||
import_queue: Box<ImportQueue<B>>,
|
||||
) -> Result<Arc<Service<B, S>>, Error> {
|
||||
let (network_chan, network_port) = network_channel();
|
||||
let (network_chan, network_port) = mpsc::unbounded();
|
||||
let (protocol_sender, protocol_rx) = mpsc::unbounded();
|
||||
let status_sinks = Arc::new(Mutex::new(Vec::new()));
|
||||
// Start in off-line mode, since we're not connected to any nodes yet.
|
||||
@@ -208,7 +207,6 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
|
||||
let peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<B>>>> = Arc::new(Default::default());
|
||||
let protocol = Protocol::new(
|
||||
peers.clone(),
|
||||
network_chan.clone(),
|
||||
params.config,
|
||||
params.chain,
|
||||
params.finality_proof_provider,
|
||||
@@ -322,14 +320,14 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
|
||||
///
|
||||
/// This method is extremely poor in terms of API and should be eventually removed.
|
||||
pub fn disconnect_peer(&self, who: PeerId) {
|
||||
let _ = self.network_chan.send(NetworkMsg::DisconnectPeer(who));
|
||||
let _ = self.network_chan.unbounded_send(NetworkMsg::DisconnectPeer(who));
|
||||
}
|
||||
|
||||
/// Send a message to the given peer. Has no effect if we're not connected to this peer.
|
||||
///
|
||||
/// This method is extremely poor in terms of API and should be eventually removed.
|
||||
pub fn send_request(&self, who: PeerId, message: Message<B>) {
|
||||
let _ = self.network_chan.send(NetworkMsg::Outgoing(who, message));
|
||||
let _ = self.network_chan.unbounded_send(NetworkMsg::Outgoing(who, message));
|
||||
}
|
||||
|
||||
/// Execute a closure with the chain-specific network specialization.
|
||||
@@ -433,81 +431,6 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> ManageNetwork for Service
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Create a NetworkPort/Chan pair.
|
||||
pub fn network_channel<B: BlockT + 'static>() -> (NetworkChan<B>, NetworkPort<B>) {
|
||||
let (network_sender, network_receiver) = channel::unbounded();
|
||||
let task_notify = Arc::new(AtomicTask::new());
|
||||
let network_port = NetworkPort::new(network_receiver, task_notify.clone());
|
||||
let network_chan = NetworkChan::new(network_sender, task_notify);
|
||||
(network_chan, network_port)
|
||||
}
|
||||
|
||||
|
||||
/// A sender of NetworkMsg that notifies a task when a message has been sent.
|
||||
#[derive(Clone)]
|
||||
pub struct NetworkChan<B: BlockT + 'static> {
|
||||
sender: Sender<NetworkMsg<B>>,
|
||||
task_notify: Arc<AtomicTask>,
|
||||
}
|
||||
|
||||
impl<B: BlockT + 'static> NetworkChan<B> {
|
||||
/// Create a new network chan.
|
||||
pub fn new(sender: Sender<NetworkMsg<B>>, task_notify: Arc<AtomicTask>) -> Self {
|
||||
NetworkChan {
|
||||
sender,
|
||||
task_notify,
|
||||
}
|
||||
}
|
||||
|
||||
/// Send a messaging, to be handled on a stream. Notify the task handling the stream.
|
||||
pub fn send(&self, msg: NetworkMsg<B>) {
|
||||
let _ = self.sender.send(msg);
|
||||
self.task_notify.notify();
|
||||
}
|
||||
}
|
||||
|
||||
impl<B: BlockT + 'static> Drop for NetworkChan<B> {
|
||||
/// Notifying the task when a sender is dropped(when all are dropped, the stream is finished).
|
||||
fn drop(&mut self) {
|
||||
self.task_notify.notify();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// A receiver of NetworkMsg that makes the protocol-id available with each message.
|
||||
pub struct NetworkPort<B: BlockT + 'static> {
|
||||
receiver: Receiver<NetworkMsg<B>>,
|
||||
task_notify: Arc<AtomicTask>,
|
||||
}
|
||||
|
||||
impl<B: BlockT + 'static> NetworkPort<B> {
|
||||
/// Create a new network port for a given protocol-id.
|
||||
pub fn new(receiver: Receiver<NetworkMsg<B>>, task_notify: Arc<AtomicTask>) -> Self {
|
||||
Self {
|
||||
receiver,
|
||||
task_notify,
|
||||
}
|
||||
}
|
||||
|
||||
/// Receive a message, if any is currently-enqueued.
|
||||
/// Register the current tokio task for notification when a new message is available.
|
||||
pub fn take_one_message(&self) -> Result<Option<NetworkMsg<B>>, ()> {
|
||||
self.task_notify.register();
|
||||
match self.receiver.try_recv() {
|
||||
Ok(msg) => Ok(Some(msg)),
|
||||
Err(TryRecvError::Empty) => Ok(None),
|
||||
Err(TryRecvError::Disconnected) => Err(()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a reference to the underlying crossbeam receiver.
|
||||
#[cfg(any(test, feature = "test-helpers"))]
|
||||
pub fn receiver(&self) -> &Receiver<NetworkMsg<B>> {
|
||||
&self.receiver
|
||||
}
|
||||
}
|
||||
|
||||
/// Messages to be handled by NetworkService.
|
||||
#[derive(Debug)]
|
||||
pub enum NetworkMsg<B: BlockT + 'static> {
|
||||
@@ -592,7 +515,7 @@ fn start_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
|
||||
is_major_syncing: Arc<AtomicBool>,
|
||||
protocol: Protocol<B, S, H>,
|
||||
import_queue: Box<ImportQueue<B>>,
|
||||
network_port: NetworkPort<B>,
|
||||
network_port: mpsc::UnboundedReceiver<NetworkMsg<B>>,
|
||||
protocol_rx: mpsc::UnboundedReceiver<ProtocolMsg<B, S>>,
|
||||
status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<ProtocolStatus<B>>>>>,
|
||||
config: NetworkConfiguration,
|
||||
@@ -645,11 +568,25 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
|
||||
mut protocol: Protocol<B, S, H>,
|
||||
network_service: Arc<Mutex<NetworkService<Message<B>>>>,
|
||||
import_queue: Box<ImportQueue<B>>,
|
||||
network_port: NetworkPort<B>,
|
||||
mut network_port: mpsc::UnboundedReceiver<NetworkMsg<B>>,
|
||||
mut protocol_rx: mpsc::UnboundedReceiver<ProtocolMsg<B, S>>,
|
||||
status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<ProtocolStatus<B>>>>>,
|
||||
peerset: PeersetHandle,
|
||||
) -> impl Future<Item = (), Error = io::Error> {
|
||||
// Implementation of `protocol::NetworkOut` using the available local variables.
|
||||
struct Ctxt<'a, B: BlockT>(&'a mut NetworkService<Message<B>>, &'a PeersetHandle);
|
||||
impl<'a, B: BlockT> NetworkOut<B> for Ctxt<'a, B> {
|
||||
fn report_peer(&mut self, who: PeerId, reputation: i32) {
|
||||
self.1.report_peer(who, reputation)
|
||||
}
|
||||
fn disconnect_peer(&mut self, who: PeerId) {
|
||||
self.0.drop_node(&who)
|
||||
}
|
||||
fn send_message(&mut self, who: PeerId, message: Message<B>) {
|
||||
self.0.send_custom_message(&who, message)
|
||||
}
|
||||
}
|
||||
|
||||
// Interval at which we send status updates on the `status_sinks`.
|
||||
let mut status_interval = tokio::timer::Interval::new_interval(STATUS_INTERVAL);
|
||||
|
||||
@@ -659,25 +596,26 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
|
||||
status_sinks.lock().retain(|sink| sink.unbounded_send(status.clone()).is_ok());
|
||||
}
|
||||
|
||||
match protocol.poll() {
|
||||
Ok(Async::Ready(())) => return Ok(Async::Ready(())),
|
||||
match protocol.poll(&mut Ctxt(&mut network_service.lock(), &peerset)) {
|
||||
Ok(Async::Ready(v)) => void::unreachable(v),
|
||||
Ok(Async::NotReady) => {}
|
||||
Err(err) => void::unreachable(err),
|
||||
}
|
||||
|
||||
loop {
|
||||
match network_port.take_one_message() {
|
||||
Ok(None) => break,
|
||||
Ok(Some(NetworkMsg::Outgoing(who, outgoing_message))) =>
|
||||
match network_port.poll() {
|
||||
Ok(Async::NotReady) => break,
|
||||
Ok(Async::Ready(Some(NetworkMsg::Outgoing(who, outgoing_message)))) =>
|
||||
network_service.lock().send_custom_message(&who, outgoing_message),
|
||||
Ok(Some(NetworkMsg::ReportPeer(who, reputation))) =>
|
||||
Ok(Async::Ready(Some(NetworkMsg::ReportPeer(who, reputation)))) =>
|
||||
peerset.report_peer(who, reputation),
|
||||
Ok(Some(NetworkMsg::DisconnectPeer(who))) =>
|
||||
Ok(Async::Ready(Some(NetworkMsg::DisconnectPeer(who)))) =>
|
||||
network_service.lock().drop_node(&who),
|
||||
#[cfg(any(test, feature = "test-helpers"))]
|
||||
Ok(Some(NetworkMsg::Synchronized)) => {}
|
||||
|
||||
Err(_) => return Ok(Async::Ready(())),
|
||||
#[cfg(any(test, feature = "test-helpers"))]
|
||||
Ok(Async::Ready(Some(NetworkMsg::Synchronized))) => {}
|
||||
|
||||
Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -688,71 +626,78 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
|
||||
Ok(Async::NotReady) => break,
|
||||
};
|
||||
|
||||
let mut network_service = network_service.lock();
|
||||
let mut network_out = Ctxt(&mut network_service, &peerset);
|
||||
|
||||
match msg {
|
||||
ProtocolMsg::BlockImported(hash, header) =>
|
||||
protocol.on_block_imported(hash, &header),
|
||||
protocol.on_block_imported(&mut network_out, hash, &header),
|
||||
ProtocolMsg::BlockFinalized(hash, header) =>
|
||||
protocol.on_block_finalized(hash, &header),
|
||||
protocol.on_block_finalized(&mut network_out, hash, &header),
|
||||
ProtocolMsg::ExecuteWithSpec(task) => {
|
||||
let (mut context, spec) = protocol.specialization_lock();
|
||||
let (mut context, spec) = protocol.specialization_lock(&mut network_out);
|
||||
task.call_box(spec, &mut context);
|
||||
},
|
||||
ProtocolMsg::ExecuteWithGossip(task) => {
|
||||
let (mut context, gossip) = protocol.consensus_gossip_lock();
|
||||
let (mut context, gossip) = protocol.consensus_gossip_lock(&mut network_out);
|
||||
task.call_box(gossip, &mut context);
|
||||
}
|
||||
ProtocolMsg::GossipConsensusMessage(topic, engine_id, message, recipient) =>
|
||||
protocol.gossip_consensus_message(topic, engine_id, message, recipient),
|
||||
protocol.gossip_consensus_message(&mut network_out, topic, engine_id, message, recipient),
|
||||
ProtocolMsg::BlocksProcessed(hashes, has_error) =>
|
||||
protocol.blocks_processed(hashes, has_error),
|
||||
protocol.blocks_processed(&mut network_out, hashes, has_error),
|
||||
ProtocolMsg::RestartSync =>
|
||||
protocol.restart(),
|
||||
protocol.restart(&mut network_out),
|
||||
ProtocolMsg::AnnounceBlock(hash) =>
|
||||
protocol.announce_block(hash),
|
||||
protocol.announce_block(&mut network_out, hash),
|
||||
ProtocolMsg::BlockImportedSync(hash, number) =>
|
||||
protocol.block_imported(&hash, number),
|
||||
ProtocolMsg::ClearJustificationRequests =>
|
||||
protocol.clear_justification_requests(),
|
||||
ProtocolMsg::RequestJustification(hash, number) =>
|
||||
protocol.request_justification(&hash, number),
|
||||
protocol.request_justification(&mut network_out, &hash, number),
|
||||
ProtocolMsg::JustificationImportResult(hash, number, success) =>
|
||||
protocol.justification_import_result(hash, number, success),
|
||||
ProtocolMsg::SetFinalityProofRequestBuilder(builder) =>
|
||||
protocol.set_finality_proof_request_builder(builder),
|
||||
ProtocolMsg::RequestFinalityProof(hash, number) =>
|
||||
protocol.request_finality_proof(&hash, number),
|
||||
protocol.request_finality_proof(&mut network_out, &hash, number),
|
||||
ProtocolMsg::FinalityProofImportResult(requested_block, finalziation_result) =>
|
||||
protocol.finality_proof_import_result(requested_block, finalziation_result),
|
||||
ProtocolMsg::PropagateExtrinsics => protocol.propagate_extrinsics(),
|
||||
ProtocolMsg::PropagateExtrinsics => protocol.propagate_extrinsics(&mut network_out),
|
||||
#[cfg(any(test, feature = "test-helpers"))]
|
||||
ProtocolMsg::Tick => protocol.tick(),
|
||||
ProtocolMsg::Tick => protocol.tick(&mut network_out),
|
||||
#[cfg(any(test, feature = "test-helpers"))]
|
||||
ProtocolMsg::Synchronize => protocol.synchronize(),
|
||||
ProtocolMsg::Synchronize => {},
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
let outcome = match network_service.lock().poll() {
|
||||
let mut network_service = network_service.lock();
|
||||
let poll_value = network_service.poll();
|
||||
let mut network_out = Ctxt(&mut network_service, &peerset);
|
||||
|
||||
let outcome = match poll_value {
|
||||
Ok(Async::NotReady) => break,
|
||||
Ok(Async::Ready(Some(NetworkServiceEvent::OpenedCustomProtocol { peer_id, version, debug_info, .. }))) => {
|
||||
debug_assert!(
|
||||
version <= protocol::CURRENT_VERSION as u8
|
||||
&& version >= protocol::MIN_VERSION as u8
|
||||
);
|
||||
protocol.on_peer_connected(peer_id, debug_info);
|
||||
protocol.on_peer_connected(&mut network_out, peer_id, debug_info);
|
||||
CustomMessageOutcome::None
|
||||
}
|
||||
Ok(Async::Ready(Some(NetworkServiceEvent::ClosedCustomProtocol { peer_id, debug_info, .. }))) => {
|
||||
protocol.on_peer_disconnected(peer_id, debug_info);
|
||||
protocol.on_peer_disconnected(&mut network_out, peer_id, debug_info);
|
||||
CustomMessageOutcome::None
|
||||
},
|
||||
Ok(Async::Ready(Some(NetworkServiceEvent::CustomMessage { peer_id, message, .. }))) =>
|
||||
protocol.on_custom_message(peer_id, message),
|
||||
protocol.on_custom_message(&mut network_out, peer_id, message),
|
||||
Ok(Async::Ready(Some(NetworkServiceEvent::Clogged { peer_id, messages, .. }))) => {
|
||||
debug!(target: "sync", "{} clogging messages:", messages.len());
|
||||
for msg in messages.into_iter().take(5) {
|
||||
debug!(target: "sync", "{:?}", msg);
|
||||
protocol.on_clogged_peer(peer_id.clone(), Some(msg));
|
||||
protocol.on_clogged_peer(&mut network_out, peer_id.clone(), Some(msg));
|
||||
}
|
||||
CustomMessageOutcome::None
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user