mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-11 04:51:09 +00:00
Merge the protocol thread with the network thread (#2475)
* Remove the Incoming enum * Use tokio in the protocol background thread * Some internal protocol logic simplifications * Merge the protocol thread with the network thread * More the status_sinks logic to Service * Remove FromNetworkMsg
This commit is contained in:
@@ -17,7 +17,7 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::{io, thread};
|
||||
use std::{io, thread, time::Duration};
|
||||
|
||||
use log::{warn, debug, error, info};
|
||||
use futures::{Async, Future, Stream, sync::oneshot, sync::mpsc};
|
||||
@@ -31,7 +31,7 @@ use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId
|
||||
|
||||
use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient};
|
||||
use crate::message::Message;
|
||||
use crate::protocol::{self, Context, FromNetworkMsg, Protocol, ConnectedPeer, ProtocolMsg, ProtocolStatus, PeerInfo};
|
||||
use crate::protocol::{self, Context, Protocol, ConnectedPeer, ProtocolMsg, ProtocolStatus, PeerInfo};
|
||||
use crate::config::Params;
|
||||
use crate::error::Error;
|
||||
use crate::specialization::NetworkSpecialization;
|
||||
@@ -40,6 +40,9 @@ use crossbeam_channel::{self as channel, Receiver, Sender, TryRecvError};
|
||||
use tokio::prelude::task::AtomicTask;
|
||||
use tokio::runtime::Builder as RuntimeBuilder;
|
||||
|
||||
/// Interval at which we send status updates on the SyncProvider status stream.
|
||||
const STATUS_INTERVAL: Duration = Duration::from_millis(5000);
|
||||
|
||||
pub use network_libp2p::PeerId;
|
||||
|
||||
/// Type that represents fetch completion future.
|
||||
@@ -81,22 +84,22 @@ pub trait TransactionPool<H: ExHashT, B: BlockT>: Send + Sync {
|
||||
#[derive(Clone)]
|
||||
pub struct NetworkLink<B: BlockT, S: NetworkSpecialization<B>> {
|
||||
/// The protocol sender
|
||||
pub(crate) protocol_sender: Sender<ProtocolMsg<B, S>>,
|
||||
pub(crate) protocol_sender: mpsc::UnboundedSender<ProtocolMsg<B, S>>,
|
||||
/// The network sender
|
||||
pub(crate) network_sender: NetworkChan<B>,
|
||||
}
|
||||
|
||||
impl<B: BlockT, S: NetworkSpecialization<B>> Link<B> for NetworkLink<B, S> {
|
||||
fn block_imported(&self, hash: &B::Hash, number: NumberFor<B>) {
|
||||
let _ = self.protocol_sender.send(ProtocolMsg::BlockImportedSync(hash.clone(), number));
|
||||
let _ = self.protocol_sender.unbounded_send(ProtocolMsg::BlockImportedSync(hash.clone(), number));
|
||||
}
|
||||
|
||||
fn blocks_processed(&self, processed_blocks: Vec<B::Hash>, has_error: bool) {
|
||||
let _ = self.protocol_sender.send(ProtocolMsg::BlocksProcessed(processed_blocks, has_error));
|
||||
let _ = self.protocol_sender.unbounded_send(ProtocolMsg::BlocksProcessed(processed_blocks, has_error));
|
||||
}
|
||||
|
||||
fn justification_imported(&self, who: PeerId, hash: &B::Hash, number: NumberFor<B>, success: bool) {
|
||||
let _ = self.protocol_sender.send(ProtocolMsg::JustificationImportResult(hash.clone(), number, success));
|
||||
let _ = self.protocol_sender.unbounded_send(ProtocolMsg::JustificationImportResult(hash.clone(), number, success));
|
||||
if !success {
|
||||
info!("Invalid justification provided by {} for #{}", who, hash);
|
||||
let _ = self.network_sender.send(NetworkMsg::ReportPeer(who.clone(), i32::min_value()));
|
||||
@@ -105,11 +108,11 @@ impl<B: BlockT, S: NetworkSpecialization<B>> Link<B> for NetworkLink<B, S> {
|
||||
}
|
||||
|
||||
fn clear_justification_requests(&self) {
|
||||
let _ = self.protocol_sender.send(ProtocolMsg::ClearJustificationRequests);
|
||||
let _ = self.protocol_sender.unbounded_send(ProtocolMsg::ClearJustificationRequests);
|
||||
}
|
||||
|
||||
fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) {
|
||||
let _ = self.protocol_sender.send(ProtocolMsg::RequestJustification(hash.clone(), number));
|
||||
let _ = self.protocol_sender.unbounded_send(ProtocolMsg::RequestJustification(hash.clone(), number));
|
||||
}
|
||||
|
||||
fn report_peer(&self, who: PeerId, reputation_change: i32) {
|
||||
@@ -117,7 +120,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>> Link<B> for NetworkLink<B, S> {
|
||||
}
|
||||
|
||||
fn restart(&self) {
|
||||
let _ = self.protocol_sender.send(ProtocolMsg::RestartSync);
|
||||
let _ = self.protocol_sender.unbounded_send(ProtocolMsg::RestartSync);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -151,7 +154,7 @@ pub struct Service<B: BlockT + 'static, S: NetworkSpecialization<B>> {
|
||||
/// nodes it should be connected to or not.
|
||||
peerset: PeersetHandle,
|
||||
/// Protocol sender
|
||||
protocol_sender: Sender<ProtocolMsg<B, S>>,
|
||||
protocol_sender: mpsc::UnboundedSender<ProtocolMsg<B, S>>,
|
||||
/// Sender for messages to the background service task, and handle for the background thread.
|
||||
/// Dropping the sender should close the task and the thread.
|
||||
/// This is an `Option` because we need to extract it in the destructor.
|
||||
@@ -171,8 +174,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
|
||||
let is_offline = Arc::new(AtomicBool::new(true));
|
||||
let is_major_syncing = Arc::new(AtomicBool::new(false));
|
||||
let peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<B>>>> = Arc::new(Default::default());
|
||||
let (protocol_sender, network_to_protocol_sender) = Protocol::new(
|
||||
status_sinks.clone(),
|
||||
let (protocol, protocol_sender) = Protocol::new(
|
||||
is_offline.clone(),
|
||||
is_major_syncing.clone(),
|
||||
peers.clone(),
|
||||
@@ -187,8 +189,9 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
|
||||
let versions: Vec<_> = ((protocol::MIN_VERSION as u8)..=(protocol::CURRENT_VERSION as u8)).collect();
|
||||
let registered = RegisteredProtocol::new(protocol_id, &versions);
|
||||
let (thread, network, peerset) = start_thread(
|
||||
network_to_protocol_sender,
|
||||
protocol,
|
||||
network_port,
|
||||
status_sinks.clone(),
|
||||
params.network_config,
|
||||
registered,
|
||||
)?;
|
||||
@@ -236,19 +239,19 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
|
||||
pub fn on_block_imported(&self, hash: B::Hash, header: B::Header) {
|
||||
let _ = self
|
||||
.protocol_sender
|
||||
.send(ProtocolMsg::BlockImported(hash, header));
|
||||
.unbounded_send(ProtocolMsg::BlockImported(hash, header));
|
||||
}
|
||||
|
||||
/// Called when a new block is finalized by the client.
|
||||
pub fn on_block_finalized(&self, hash: B::Hash, header: B::Header) {
|
||||
let _ = self
|
||||
.protocol_sender
|
||||
.send(ProtocolMsg::BlockFinalized(hash, header));
|
||||
.unbounded_send(ProtocolMsg::BlockFinalized(hash, header));
|
||||
}
|
||||
|
||||
/// Called when new transactons are imported by the client.
|
||||
pub fn trigger_repropagate(&self) {
|
||||
let _ = self.protocol_sender.send(ProtocolMsg::PropagateExtrinsics);
|
||||
let _ = self.protocol_sender.unbounded_send(ProtocolMsg::PropagateExtrinsics);
|
||||
}
|
||||
|
||||
/// Make sure an important block is propagated to peers.
|
||||
@@ -256,7 +259,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
|
||||
/// In chain-based consensus, we often need to make sure non-best forks are
|
||||
/// at least temporarily synced.
|
||||
pub fn announce_block(&self, hash: B::Hash) {
|
||||
let _ = self.protocol_sender.send(ProtocolMsg::AnnounceBlock(hash));
|
||||
let _ = self.protocol_sender.unbounded_send(ProtocolMsg::AnnounceBlock(hash));
|
||||
}
|
||||
|
||||
/// Send a consensus message through the gossip
|
||||
@@ -269,7 +272,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
|
||||
) {
|
||||
let _ = self
|
||||
.protocol_sender
|
||||
.send(ProtocolMsg::GossipConsensusMessage(
|
||||
.unbounded_send(ProtocolMsg::GossipConsensusMessage(
|
||||
topic, engine_id, message, recipient,
|
||||
));
|
||||
}
|
||||
@@ -286,7 +289,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
|
||||
{
|
||||
let _ = self
|
||||
.protocol_sender
|
||||
.send(ProtocolMsg::ExecuteWithSpec(Box::new(f)));
|
||||
.unbounded_send(ProtocolMsg::ExecuteWithSpec(Box::new(f)));
|
||||
}
|
||||
|
||||
/// Execute a closure with the consensus gossip.
|
||||
@@ -295,7 +298,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
|
||||
{
|
||||
let _ = self
|
||||
.protocol_sender
|
||||
.send(ProtocolMsg::ExecuteWithGossip(Box::new(f)));
|
||||
.unbounded_send(ProtocolMsg::ExecuteWithGossip(Box::new(f)));
|
||||
}
|
||||
|
||||
/// Are we in the process of downloading the chain?
|
||||
@@ -471,9 +474,10 @@ pub enum NetworkMsg<B: BlockT + 'static> {
|
||||
}
|
||||
|
||||
/// Starts the background thread that handles the networking.
|
||||
fn start_thread<B: BlockT + 'static>(
|
||||
protocol_sender: Sender<FromNetworkMsg<B>>,
|
||||
fn start_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
|
||||
protocol: Protocol<B, S, H>,
|
||||
network_port: NetworkPort<B>,
|
||||
status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<ProtocolStatus<B>>>>>,
|
||||
config: NetworkConfiguration,
|
||||
registered: RegisteredProtocol<Message<B>>,
|
||||
) -> Result<((oneshot::Sender<()>, thread::JoinHandle<()>), Arc<Mutex<NetworkService<Message<B>>>>, PeersetHandle), Error> {
|
||||
@@ -491,7 +495,7 @@ fn start_thread<B: BlockT + 'static>(
|
||||
let mut runtime = RuntimeBuilder::new().name_prefix("libp2p-").build()?;
|
||||
let peerset_clone = peerset.clone();
|
||||
let thread = thread::Builder::new().name("network".to_string()).spawn(move || {
|
||||
let fut = run_thread(protocol_sender, service_clone, network_port, peerset_clone)
|
||||
let fut = run_thread(protocol, service_clone, network_port, status_sinks, peerset_clone)
|
||||
.select(close_rx.then(|_| Ok(())))
|
||||
.map(|(val, _)| val)
|
||||
.map_err(|(err,_ )| err);
|
||||
@@ -508,14 +512,28 @@ fn start_thread<B: BlockT + 'static>(
|
||||
}
|
||||
|
||||
/// Runs the background thread that handles the networking.
|
||||
fn run_thread<B: BlockT + 'static>(
|
||||
protocol_sender: Sender<FromNetworkMsg<B>>,
|
||||
fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
|
||||
mut protocol: Protocol<B, S, H>,
|
||||
network_service: Arc<Mutex<NetworkService<Message<B>>>>,
|
||||
network_port: NetworkPort<B>,
|
||||
status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<ProtocolStatus<B>>>>>,
|
||||
peerset: PeersetHandle,
|
||||
) -> impl Future<Item = (), Error = io::Error> {
|
||||
// Interval at which we send status updates on the `status_sinks`.
|
||||
let mut status_interval = tokio::timer::Interval::new_interval(STATUS_INTERVAL);
|
||||
|
||||
futures::future::poll_fn(move || {
|
||||
while let Ok(Async::Ready(_)) = status_interval.poll() {
|
||||
let status = protocol.status();
|
||||
status_sinks.lock().retain(|sink| sink.unbounded_send(status.clone()).is_ok());
|
||||
}
|
||||
|
||||
match protocol.poll() {
|
||||
Ok(Async::Ready(())) => return Ok(Async::Ready(())),
|
||||
Ok(Async::NotReady) => {}
|
||||
Err(err) => void::unreachable(err),
|
||||
}
|
||||
|
||||
loop {
|
||||
match network_port.take_one_message() {
|
||||
Ok(None) => break,
|
||||
@@ -540,19 +558,17 @@ fn run_thread<B: BlockT + 'static>(
|
||||
version <= protocol::CURRENT_VERSION as u8
|
||||
&& version >= protocol::MIN_VERSION as u8
|
||||
);
|
||||
let _ = protocol_sender.send(FromNetworkMsg::PeerConnected(peer_id, debug_info));
|
||||
}
|
||||
Ok(Async::Ready(Some(NetworkServiceEvent::ClosedCustomProtocol { peer_id, debug_info, .. }))) => {
|
||||
let _ = protocol_sender.send(FromNetworkMsg::PeerDisconnected(peer_id, debug_info));
|
||||
}
|
||||
Ok(Async::Ready(Some(NetworkServiceEvent::CustomMessage { peer_id, message, .. }))) => {
|
||||
let _ = protocol_sender.send(FromNetworkMsg::CustomMessage(peer_id, message));
|
||||
protocol.on_peer_connected(peer_id, debug_info);
|
||||
}
|
||||
Ok(Async::Ready(Some(NetworkServiceEvent::ClosedCustomProtocol { peer_id, debug_info, .. }))) =>
|
||||
protocol.on_peer_disconnected(peer_id, debug_info),
|
||||
Ok(Async::Ready(Some(NetworkServiceEvent::CustomMessage { peer_id, message, .. }))) =>
|
||||
protocol.on_custom_message(peer_id, message),
|
||||
Ok(Async::Ready(Some(NetworkServiceEvent::Clogged { peer_id, messages, .. }))) => {
|
||||
debug!(target: "sync", "{} clogging messages:", messages.len());
|
||||
for msg in messages.into_iter().take(5) {
|
||||
debug!(target: "sync", "{:?}", msg);
|
||||
let _ = protocol_sender.send(FromNetworkMsg::PeerClogged(peer_id.clone(), Some(msg)));
|
||||
protocol.on_clogged_peer(peer_id.clone(), Some(msg));
|
||||
}
|
||||
}
|
||||
Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
|
||||
|
||||
Reference in New Issue
Block a user