Use bounded channels for network -> sync -> import (#1874)

* use bounded channels for network -> sync -> import

* bound at 4

* indent

* use return value of handle_network_msg
This commit is contained in:
Gregory Terzian
2019-02-27 16:19:57 +08:00
committed by Gav Wood
parent 64685c0536
commit 5282615416
4 changed files with 100 additions and 61 deletions
+75 -40
View File
@@ -58,6 +58,7 @@ const LIGHT_MAXIMAL_BLOCKS_DIFFERENCE: u64 = 8192;
pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
network_chan: NetworkChan<B>,
port: Receiver<ProtocolMsg<B, S>>,
from_network_port: Receiver<FromNetworkMsg<B>>,
config: ProtocolConfig,
on_demand: Option<Arc<OnDemandService<B>>>,
genesis_hash: B::Hash,
@@ -69,6 +70,7 @@ pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
handshaking_peers: HashMap<NodeIndex, time::Instant>,
transaction_pool: Arc<TransactionPool<H, B>>,
}
/// Syncing status and statistics
#[derive(Clone)]
pub struct ProtocolStatus<B: BlockT> {
@@ -183,18 +185,28 @@ impl<B: BlockT, F: FnOnce(&mut ConsensusGossip<B>, &mut Context<B>)> GossipTask<
}
}
/// Messages sent to Protocol.
pub enum ProtocolMsg<B: BlockT, S: NetworkSpecialization<B>,> {
/// A peer connected, with debug info.
PeerConnected(NodeIndex, String),
/// A peer disconnected, with debug info.
PeerDisconnected(NodeIndex, String),
/// A custom message from another peer.
CustomMessage(NodeIndex, Message<B>),
/// Messages sent to Protocol from elsewhere inside the system.
pub enum ProtocolMsg<B: BlockT, S: NetworkSpecialization<B>> {
/// Tell protocol to maintain sync.
MaintainSync,
/// Tell protocol to restart sync.
RestartSync,
/// Ask the protocol for its status.
Status(Sender<ProtocolStatus<B>>),
/// Tell protocol to propagate extrinsics.
PropagateExtrinsics,
/// Tell protocol that a block was imported (sent by the import-queue).
BlockImportedSync(B::Hash, NumberFor<B>),
/// Tell protocol to request justification for a block.
RequestJustification(B::Hash, NumberFor<B>),
/// Inform protocol whether a justification was successfully imported.
JustificationImportResult(B::Hash, NumberFor<B>, bool),
/// Propagate a block to peers.
AnnounceBlock(B::Hash),
/// A block has been imported (sent by the client).
BlockImported(B::Hash, B::Header),
/// A block has been finalized (sent by the client).
BlockFinalized(B::Hash, B::Header),
/// Execute a closure with the chain-specific network specialization.
ExecuteWithSpec(Box<SpecTask<B, S> + Send + 'static>),
/// Execute a closure with the consensus gossip.
@@ -203,24 +215,6 @@ pub enum ProtocolMsg<B: BlockT, S: NetworkSpecialization<B>,> {
GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec<u8>),
/// Return a list of peers currently known to protocol.
Peers(Sender<Vec<(NodeIndex, PeerInfo<B>)>>),
/// Let protocol know a peer is currenlty clogged.
PeerClogged(NodeIndex, Option<Message<B>>),
/// Tell protocol to maintain sync.
MaintainSync,
/// Tell protocol to restart sync.
RestartSync,
/// Propagate a block to peers.
AnnounceBlock(B::Hash),
/// Tell protocol that a block was imported (sent by the import-queue).
BlockImportedSync(B::Hash, NumberFor<B>),
/// Tell protocol to request justification for a block.
RequestJustification(B::Hash, NumberFor<B>),
/// Inform protocol whether a justification was successfully imported.
JustificationImportResult(B::Hash, NumberFor<B>, bool),
/// A block has been imported (sent by the client).
BlockImported(B::Hash, B::Header),
/// A block has been finalized (sent by the client).
BlockFinalized(B::Hash, B::Header),
/// Tell protocol to abort sync (does not stop protocol).
/// Only used in tests.
#[cfg(any(test, feature = "test-helpers"))]
@@ -231,6 +225,23 @@ pub enum ProtocolMsg<B: BlockT, S: NetworkSpecialization<B>,> {
Tick,
}
/// Messages sent to Protocol from Network-libp2p.
pub enum FromNetworkMsg<B: BlockT> {
/// A peer connected, with debug info.
PeerConnected(NodeIndex, String),
/// A peer disconnected, with debug info.
PeerDisconnected(NodeIndex, String),
/// A custom message from another peer.
CustomMessage(NodeIndex, Message<B>),
/// Let protocol know a peer is currenlty clogged.
PeerClogged(NodeIndex, Option<Message<B>>),
}
enum Incoming<B: BlockT, S: NetworkSpecialization<B>> {
FromNetwork(FromNetworkMsg<B>),
FromClient(ProtocolMsg<B, S>)
}
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
/// Create a new instance.
pub fn new(
@@ -243,8 +254,9 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
on_demand: Option<Arc<OnDemandService<B>>>,
transaction_pool: Arc<TransactionPool<H, B>>,
specialization: S,
) -> error::Result<Sender<ProtocolMsg<B, S>>> {
let (sender, port) = channel::unbounded();
) -> error::Result<(Sender<ProtocolMsg<B, S>>, Sender<FromNetworkMsg<B>>)> {
let (protocol_sender, port) = channel::unbounded();
let (from_network_sender, from_network_port) = channel::bounded(4);
let info = chain.info()?;
let sync = ChainSync::new(is_offline, is_major_syncing, config.roles, &info, import_queue);
let _ = thread::Builder::new()
@@ -252,6 +264,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
.spawn(move || {
let mut protocol = Protocol {
network_chan,
from_network_port,
port,
config: config,
context_data: ContextData {
@@ -273,7 +286,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
}
})
.expect("Protocol thread spawning failed");
Ok(sender)
Ok((protocol_sender, from_network_sender))
}
fn run(
@@ -284,35 +297,45 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
let msg = select! {
recv(self.port) -> event => {
match event {
Ok(msg) => msg,
Ok(msg) => Incoming::FromClient(msg),
// Our sender has been dropped, quit.
Err(_) => {
ProtocolMsg::Stop
Incoming::FromClient(ProtocolMsg::Stop)
},
}
},
recv(self.from_network_port) -> event => {
match event {
Ok(msg) => Incoming::FromNetwork(msg),
// Our sender has been dropped, quit.
Err(_) => {
Incoming::FromClient(ProtocolMsg::Stop)
},
}
},
recv(tick_timeout) -> _ => {
ProtocolMsg::Tick
Incoming::FromClient(ProtocolMsg::Tick)
},
recv(propagate_timeout) -> _ => {
ProtocolMsg::PropagateExtrinsics
Incoming::FromClient(ProtocolMsg::PropagateExtrinsics)
},
};
self.handle_msg(msg)
}
fn handle_msg(&mut self, msg: ProtocolMsg<B, S>) -> bool {
fn handle_msg(&mut self, msg: Incoming<B, S>) -> bool {
match msg {
Incoming::FromNetwork(msg) => self.handle_network_msg(msg),
Incoming::FromClient(msg) => self.handle_client_msg(msg),
}
}
fn handle_client_msg(&mut self, msg: ProtocolMsg<B, S>) -> bool {
match msg {
ProtocolMsg::Peers(sender) => {
let peers = self.context_data.peers.iter().map(|(idx, p)| (*idx, p.info.clone())).collect();
let _ = sender.send(peers);
},
ProtocolMsg::PeerDisconnected(who, debug_info) => self.on_peer_disconnected(who, debug_info),
ProtocolMsg::PeerConnected(who, debug_info) => self.on_peer_connected(who, debug_info),
ProtocolMsg::PeerClogged(who, message) => self.on_clogged_peer(who, message),
ProtocolMsg::CustomMessage(who, message) => {
self.on_custom_message(who, message)
},
ProtocolMsg::Status(sender) => self.status(sender),
ProtocolMsg::BlockImported(hash, header) => self.on_block_imported(hash, &header),
ProtocolMsg::BlockFinalized(hash, header) => self.on_block_finalized(hash, &header),
@@ -359,6 +382,18 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
true
}
fn handle_network_msg(&mut self, msg: FromNetworkMsg<B>) -> bool {
match msg {
FromNetworkMsg::PeerDisconnected(who, debug_info) => self.on_peer_disconnected(who, debug_info),
FromNetworkMsg::PeerConnected(who, debug_info) => self.on_peer_connected(who, debug_info),
FromNetworkMsg::PeerClogged(who, message) => self.on_clogged_peer(who, message),
FromNetworkMsg::CustomMessage(who, message) => {
self.on_custom_message(who, message)
},
}
true
}
fn handle_response(&mut self, who: NodeIndex, response: &message::BlockResponse<B>) -> Option<message::BlockRequest<B>> {
if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) {
if let Some(_) = peer.obsolete_requests.remove(&response.id) {