diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index 355900d970..33e376aab3 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use futures::{prelude::*, sync::mpsc}; +use futures::prelude::*; use network_libp2p::PeerId; use primitives::storage::StorageKey; use consensus::{import_queue::IncomingBlock, import_queue::Origin, BlockOrigin}; @@ -77,7 +77,6 @@ const RPC_FAILED_REPUTATION_CHANGE: i32 = -(1 << 12); // Lock must always be taken in order declared here. pub struct Protocol, H: ExHashT> { network_chan: NetworkChan, - port: mpsc::UnboundedReceiver>, /// Interval at which we call `tick`. tick_timeout: tokio::timer::Interval, /// Interval at which we call `propagate_extrinsics`. @@ -187,13 +186,19 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context for ProtocolContext<'a, B, } fn send_consensus(&mut self, who: PeerId, consensus: ConsensusMessage) { - send_message(&mut self.context_data.peers, &self.network_chan, who, + send_message( + &mut self.context_data.peers, + &self.network_chan, + who, GenericMessage::Consensus(consensus) ) } fn send_chain_specific(&mut self, who: PeerId, message: Vec) { - send_message(&mut self.context_data.peers, &self.network_chan, who, + send_message( + &mut self.context_data.peers, + &self.network_chan, + who, GenericMessage::ChainSpecific(message) ) } @@ -217,13 +222,19 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> SyncContext for ProtocolContext<'a, } fn send_finality_proof_request(&mut self, who: PeerId, request: FinalityProofRequestMessage) { - send_message(&mut self.context_data.peers, &self.network_chan, who, + send_message( + &mut self.context_data.peers, + &self.network_chan, + who, GenericMessage::FinalityProofRequest(request) ) } fn send_block_request(&mut self, who: PeerId, request: BlockRequestMessage) { - send_message(&mut self.context_data.peers, &self.network_chan, who, + send_message( + &mut self.context_data.peers, + &self.network_chan, + who, GenericMessage::BlockRequest(request) ) } @@ -237,70 +248,6 @@ struct ContextData { pub finality_proof_provider: Option>>, } -/// A task, consisting of a user-provided closure, to be executed on the Protocol thread. -pub trait SpecTask> { - fn call_box(self: Box, spec: &mut S, context: &mut Context); -} - -impl, F: FnOnce(&mut S, &mut Context)> SpecTask for F { - fn call_box(self: Box, spec: &mut S, context: &mut Context) { - (*self)(spec, context) - } -} - -/// A task, consisting of a user-provided closure, to be executed on the Protocol thread. -pub trait GossipTask { - fn call_box(self: Box, gossip: &mut ConsensusGossip, context: &mut Context); -} - -impl, &mut Context)> GossipTask for F { - fn call_box(self: Box, gossip: &mut ConsensusGossip, context: &mut Context) { - (*self)(gossip, context) - } -} - -/// Messages sent to Protocol from elsewhere inside the system. -pub enum ProtocolMsg> { - /// A batch of blocks has been processed, with or without errors. - BlocksProcessed(Vec, bool), - /// Tell protocol to restart sync. - RestartSync, - /// Tell protocol to propagate extrinsics. - PropagateExtrinsics, - /// Tell protocol that a block was imported (sent by the import-queue). - BlockImportedSync(B::Hash, NumberFor), - /// Tell protocol to clear all pending justification requests. - ClearJustificationRequests, - /// Tell protocol to request justification for a block. - RequestJustification(B::Hash, NumberFor), - /// Inform protocol whether a justification was successfully imported. - JustificationImportResult(B::Hash, NumberFor, bool), - /// Set finality proof request builder. - SetFinalityProofRequestBuilder(SharedFinalityProofRequestBuilder), - /// Tell protocol to request finality proof for a block. - RequestFinalityProof(B::Hash, NumberFor), - /// Inform protocol whether a finality proof was successfully imported. - FinalityProofImportResult((B::Hash, NumberFor), Result<(B::Hash, NumberFor), ()>), - /// Propagate a block to peers. - AnnounceBlock(B::Hash), - /// A block has been imported (sent by the client). - BlockImported(B::Hash, B::Header), - /// A block has been finalized (sent by the client). - BlockFinalized(B::Hash, B::Header), - /// Execute a closure with the chain-specific network specialization. - ExecuteWithSpec(Box + Send + 'static>), - /// Execute a closure with the consensus gossip. - ExecuteWithGossip(Box + Send + 'static>), - /// Incoming gossip consensus message. - GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec, GossipMessageRecipient), - /// Tell protocol to perform regular maintenance. - #[cfg(any(test, feature = "test-helpers"))] - Tick, - /// Synchronization request. - #[cfg(any(test, feature = "test-helpers"))] - Synchronize, -} - impl, H: ExHashT> Protocol { /// Create a new instance. pub fn new( @@ -312,13 +259,11 @@ impl, H: ExHashT> Protocol { on_demand: Option>>, transaction_pool: Arc>, specialization: S, - ) -> error::Result<(Protocol, mpsc::UnboundedSender>)> { - let (protocol_sender, port) = mpsc::unbounded(); + ) -> error::Result> { let info = chain.info()?; let sync = ChainSync::new(config.roles, &info); - let protocol = Protocol { + Ok(Protocol { network_chan, - port, tick_timeout: tokio::timer::Interval::new_interval(TICK_TIMEOUT), propagate_timeout: tokio::timer::Interval::new_interval(PROPAGATE_TIMEOUT), config: config, @@ -335,9 +280,7 @@ impl, H: ExHashT> Protocol { handshaking_peers: HashMap::new(), connected_peers, transaction_pool: transaction_pool, - }; - - Ok((protocol, protocol_sender)) + }) } /// Returns an object representing the status of the protocol. @@ -376,84 +319,19 @@ impl, H: ExHashT> Future for Protocol return Ok(Async::Ready(())), - Ok(Async::Ready(Some(msg))) => if !self.handle_client_msg(msg) { - return Ok(Async::Ready(())) - } - Ok(Async::NotReady) => break, - } - } - Ok(Async::NotReady) } } impl, H: ExHashT> Protocol { - fn handle_client_msg(&mut self, msg: ProtocolMsg) -> bool { - match msg { - ProtocolMsg::BlockImported(hash, header) => self.on_block_imported(hash, &header), - ProtocolMsg::BlockFinalized(hash, header) => self.on_block_finalized(hash, &header), - ProtocolMsg::ExecuteWithSpec(task) => { - let mut context = - ProtocolContext::new(&mut self.context_data, &self.network_chan); - task.call_box(&mut self.specialization, &mut context); - }, - ProtocolMsg::ExecuteWithGossip(task) => { - let mut context = - ProtocolContext::new(&mut self.context_data, &self.network_chan); - task.call_box(&mut self.consensus_gossip, &mut context); - } - ProtocolMsg::GossipConsensusMessage(topic, engine_id, message, recipient) => { - self.gossip_consensus_message(topic, engine_id, message, recipient) - } - ProtocolMsg::BlocksProcessed(hashes, has_error) => { - self.sync.blocks_processed(hashes, has_error); - let mut context = - ProtocolContext::new(&mut self.context_data, &self.network_chan); - self.sync.maintain_sync(&mut context); - }, - ProtocolMsg::RestartSync => { - let mut context = - ProtocolContext::new(&mut self.context_data, &self.network_chan); - self.sync.restart(&mut context); - } - ProtocolMsg::AnnounceBlock(hash) => self.announce_block(hash), - ProtocolMsg::BlockImportedSync(hash, number) => self.sync.block_imported(&hash, number), - ProtocolMsg::ClearJustificationRequests => self.sync.clear_justification_requests(), - ProtocolMsg::RequestJustification(hash, number) => { - let mut context = - ProtocolContext::new(&mut self.context_data, &self.network_chan); - self.sync.request_justification(&hash, number, &mut context); - }, - ProtocolMsg::JustificationImportResult(hash, number, success) => self.sync.justification_import_result(hash, number, success), - ProtocolMsg::SetFinalityProofRequestBuilder(builder) => self.sync.set_finality_proof_request_builder(builder), - ProtocolMsg::RequestFinalityProof(hash, number) => { - let mut context = - ProtocolContext::new(&mut self.context_data, &self.network_chan); - self.sync.request_finality_proof(&hash, number, &mut context); - }, - ProtocolMsg::FinalityProofImportResult( - requested_block, - finalziation_result, - ) => self.sync.finality_proof_import_result(requested_block, finalziation_result), - ProtocolMsg::PropagateExtrinsics => self.propagate_extrinsics(), - #[cfg(any(test, feature = "test-helpers"))] - ProtocolMsg::Tick => self.tick(), - #[cfg(any(test, feature = "test-helpers"))] - ProtocolMsg::Synchronize => { - trace!(target: "sync", "handle_client_msg: received Synchronize msg"); - self.network_chan.send(NetworkMsg::Synchronized) - } - } - true - } - - fn handle_response(&mut self, who: PeerId, response: &message::BlockResponse) -> Option> { + fn handle_response( + &mut self, + who: PeerId, + response: &message::BlockResponse + ) -> Option> { if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) { if let Some(_) = peer.obsolete_requests.remove(&response.id) { - trace!(target: "sync", "Ignoring obsolete block response packet from {} ({})", who, response.id,); + trace!(target: "sync", "Ignoring obsolete block response packet from {} ({})", who, response.id); return None; } // Clear the request. If the response is invalid peer will be disconnected anyway. @@ -536,7 +414,20 @@ impl, H: ExHashT> Protocol { ); } - fn gossip_consensus_message( + /// Locks `self` and returns a context plus the `ConsensusGossip` struct. + pub fn consensus_gossip_lock<'a>(&'a mut self) -> (impl Context + 'a, &'a mut ConsensusGossip) { + let context = ProtocolContext::new(&mut self.context_data, &self.network_chan); + (context, &mut self.consensus_gossip) + } + + /// Locks `self` and returns a context plus the network specialization. + pub fn specialization_lock<'a>(&'a mut self) -> (impl Context + 'a, &'a mut S) { + let context = ProtocolContext::new(&mut self.context_data, &self.network_chan); + (context, &mut self.specialization) + } + + /// Gossip a consensus message to the network. + pub fn gossip_consensus_message( &mut self, topic: B::Hash, engine_id: ConsensusEngineId, @@ -684,15 +575,19 @@ impl, H: ExHashT> Protocol { response: message::BlockResponse, ) -> CustomMessageOutcome { let blocks_range = match ( - response.blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())), - response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())), - ) { - (Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last), - (Some(first), Some(_)) => format!(" ({})", first), - _ => Default::default(), - }; + response.blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())), + response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())), + ) { + (Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last), + (Some(first), Some(_)) => format!(" ({})", first), + _ => Default::default(), + }; trace!(target: "sync", "BlockResponse {} from {} with {} blocks {}", - response.id, peer, response.blocks.len(), blocks_range); + response.id, + peer, + response.blocks.len(), + blocks_range + ); // TODO [andre]: move this logic to the import queue so that // justifications are imported asynchronously (#1482) @@ -711,7 +606,12 @@ impl, H: ExHashT> Protocol { } } else { - let outcome = self.sync.on_block_data(&mut ProtocolContext::new(&mut self.context_data, &self.network_chan), peer, request, response); + let outcome = self.sync.on_block_data( + &mut ProtocolContext::new(&mut self.context_data, &self.network_chan), + peer, + request, + response + ); if let Some((origin, blocks)) = outcome { CustomMessageOutcome::BlockImport(origin, blocks) } else { @@ -721,7 +621,9 @@ impl, H: ExHashT> Protocol { } /// Perform time based maintenance. - fn tick(&mut self) { + /// + /// > **Note**: This method normally doesn't have to be called except for testing purposes. + pub fn tick(&mut self) { self.consensus_gossip.tick(&mut ProtocolContext::new(&mut self.context_data, &self.network_chan)); self.maintain_peers(); self.sync.tick(&mut ProtocolContext::new(&mut self.context_data, &self.network_chan)); @@ -743,7 +645,9 @@ impl, H: ExHashT> Protocol { aborting.push(who.clone()); } } - for (who, _) in self.handshaking_peers.iter().filter(|(_, handshaking)| (tick - handshaking.timestamp).as_secs() > REQUEST_TIMEOUT_SEC) { + for (who, _) in self.handshaking_peers.iter() + .filter(|(_, handshaking)| (tick - handshaking.timestamp).as_secs() > REQUEST_TIMEOUT_SEC) + { trace!(target: "sync", "Handshake timeout {}", who); aborting.push(who.clone()); } @@ -795,7 +699,9 @@ impl, H: ExHashT> Protocol { .unwrap_or(0); if blocks_difference > LIGHT_MAXIMAL_BLOCKS_DIFFERENCE { debug!(target: "sync", "Peer {} is far behind us and will unable to serve light requests", who); - self.network_chan.send(NetworkMsg::ReportPeer(who.clone(), PEER_BEHIND_US_LIGHT_REPUTATION_CHANGE)); + self.network_chan.send( + NetworkMsg::ReportPeer(who.clone(), PEER_BEHIND_US_LIGHT_REPUTATION_CHANGE) + ); self.network_chan.send(NetworkMsg::DisconnectPeer(who)); return; } @@ -867,8 +773,8 @@ impl, H: ExHashT> Protocol { } } - /// Called when we propagate ready extrinsics to peers. - fn propagate_extrinsics(&mut self) { + /// Call when we must propagate ready extrinsics to peers. + pub fn propagate_extrinsics(&mut self) { debug!(target: "sync", "Propagating extrinsics"); // Accept transactions only when fully synced @@ -903,7 +809,7 @@ impl, H: ExHashT> Protocol { /// /// In chain-based consensus, we often need to make sure non-best forks are /// at least temporarily synced. - fn announce_block(&mut self, hash: B::Hash) { + pub fn announce_block(&mut self, hash: B::Hash) { let header = match self.context_data.chain.header(&BlockId::Hash(hash)) { Ok(Some(header)) => header, Ok(None) => { @@ -961,7 +867,9 @@ impl, H: ExHashT> Protocol { ); } - fn on_block_imported(&mut self, hash: B::Hash, header: &B::Header) { + /// Call this when a block has been imported in the import queue and we should announce it on + /// the network. + pub fn on_block_imported(&mut self, hash: B::Hash, header: &B::Header) { self.sync.update_chain_info(header); self.specialization.on_block_imported( &mut ProtocolContext::new(&mut self.context_data, &self.network_chan), @@ -986,7 +894,9 @@ impl, H: ExHashT> Protocol { } } - fn on_block_finalized(&mut self, hash: B::Hash, header: &B::Header) { + /// Call this when a block has been finalized. The sync layer may have some additional + /// requesting to perform. + pub fn on_block_finalized(&mut self, hash: B::Hash, header: &B::Header) { self.sync.on_block_finalized( &hash, *header.number(), @@ -999,7 +909,12 @@ impl, H: ExHashT> Protocol { who: PeerId, request: message::RemoteCallRequest, ) { - trace!(target: "sync", "Remote call request {} from {} ({} at {})", request.id, who, request.method, request.block); + trace!(target: "sync", "Remote call request {} from {} ({} at {})", + request.id, + who, + request.method, + request.block + ); let proof = match self.context_data.chain.execution_proof( &request.block, &request.method, @@ -1008,7 +923,12 @@ impl, H: ExHashT> Protocol { Ok((_, proof)) => proof, Err(error) => { trace!(target: "sync", "Remote call request {} from {} ({} at {}) failed with: {}", - request.id, who, request.method, request.block, error); + request.id, + who, + request.method, + request.block, + error + ); self.network_chan.send(NetworkMsg::ReportPeer(who.clone(), RPC_FAILED_REPUTATION_CHANGE)); Default::default() } @@ -1023,6 +943,68 @@ impl, H: ExHashT> Protocol { ); } + /// Request a justification for the given block. + /// + /// Uses `protocol` to queue a new justification request and tries to dispatch all pending + /// requests. + pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { + let mut context = + ProtocolContext::new(&mut self.context_data, &self.network_chan); + self.sync.request_justification(&hash, number, &mut context); + } + + /// Clears all pending justification requests. + pub fn clear_justification_requests(&mut self) { + self.sync.clear_justification_requests() + } + + /// A batch of blocks have been processed, with or without errors. + /// Call this when a batch of blocks have been processed by the import queue, with or without + /// errors. + pub fn blocks_processed(&mut self, processed_blocks: Vec, has_error: bool) { + self.sync.blocks_processed(processed_blocks, has_error); + let mut context = + ProtocolContext::new(&mut self.context_data, &self.network_chan); + self.sync.maintain_sync(&mut context); + } + + /// Restart the sync process. + pub fn restart(&mut self) { + let mut context = ProtocolContext::new(&mut self.context_data, &self.network_chan); + self.sync.restart(&mut context); + } + + /// Notify about successful import of the given block. + pub fn block_imported(&mut self, hash: &B::Hash, number: NumberFor) { + self.sync.block_imported(hash, number) + } + + pub fn set_finality_proof_request_builder(&mut self, request_builder: SharedFinalityProofRequestBuilder) { + self.sync.set_finality_proof_request_builder(request_builder) + } + + /// Call this when a justification has been processed by the import queue, with or without + /// errors. + pub fn justification_import_result(&mut self, hash: B::Hash, number: NumberFor, success: bool) { + self.sync.justification_import_result(hash, number, success) + } + + /// Request a finality proof for the given block. + /// + /// Queues a new finality proof request and tries to dispatch all pending requests. + pub fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor) { + let mut context = ProtocolContext::new(&mut self.context_data, &self.network_chan); + self.sync.request_finality_proof(&hash, number, &mut context); + } + + pub fn finality_proof_import_result( + &mut self, + request_block: (B::Hash, NumberFor), + finalization_result: Result<(B::Hash, NumberFor), ()>, + ) { + self.sync.finality_proof_import_result(request_block, finalization_result) + } + fn on_remote_call_response(&mut self, who: PeerId, response: message::RemoteCallResponse) { trace!(target: "sync", "Remote call response {} from {}", response.id, who); self.on_demand @@ -1041,7 +1023,12 @@ impl, H: ExHashT> Protocol { Ok(proof) => proof, Err(error) => { trace!(target: "sync", "Remote read request {} from {} ({} at {}) failed with: {}", - request.id, who, request.key.to_hex::(), request.block, error); + request.id, + who, + request.key.to_hex::(), + request.block, + error + ); Default::default() } }; @@ -1071,7 +1058,11 @@ impl, H: ExHashT> Protocol { Ok((header, proof)) => (Some(header), proof), Err(error) => { trace!(target: "sync", "Remote header proof request {} from {} ({}) failed with: {}", - request.id, who, request.block, error); + request.id, + who, + request.block, + error + ); (Default::default(), Default::default()) } }; @@ -1102,13 +1093,30 @@ impl, H: ExHashT> Protocol { request: message::RemoteChangesRequest, ) { trace!(target: "sync", "Remote changes proof request {} from {} for key {} ({}..{})", - request.id, who, request.key.to_hex::(), request.first, request.last); + request.id, + who, + request.key.to_hex::(), + request.first, + request.last + ); let key = StorageKey(request.key); - let proof = match self.context_data.chain.key_changes_proof(request.first, request.last, request.min, request.max, &key) { + let proof = match self.context_data.chain.key_changes_proof( + request.first, + request.last, + request.min, + request.max, + &key + ) { Ok(proof) => proof, Err(error) => { trace!(target: "sync", "Remote changes proof request {} from {} for key {} ({}..{}) failed with: {}", - request.id, who, key.0.to_hex::(), request.first, request.last, error); + request.id, + who, + key.0.to_hex::(), + request.first, + request.last, + error + ); ChangesProof:: { max_block: Zero::zero(), proof: vec![], @@ -1135,7 +1143,10 @@ impl, H: ExHashT> Protocol { response: message::RemoteChangesResponse, B::Hash>, ) { trace!(target: "sync", "Remote changes proof response {} from {} (max={})", - response.id, who, response.max); + response.id, + who, + response.max + ); self.on_demand .as_ref() .map(|s| s.on_remote_changes_response(who, response)); @@ -1149,13 +1160,17 @@ impl, H: ExHashT> Protocol { trace!(target: "sync", "Finality proof request from {} for {}", who, request.block); let finality_proof = self.context_data.finality_proof_provider.as_ref() .ok_or_else(|| String::from("Finality provider is not configured")) - .and_then(|provider| provider.prove_finality(request.block, &request.request) - .map_err(|e| e.to_string())); + .and_then(|provider| + provider.prove_finality(request.block, &request.request).map_err(|e| e.to_string()) + ); let finality_proof = match finality_proof { Ok(finality_proof) => finality_proof, Err(error) => { trace!(target: "sync", "Finality proof request from {} for {} failed with: {}", - who, request.block, error); + who, + request.block, + error + ); None }, }; diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index 3b2e0ae9ff..e63ce4037f 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -31,7 +31,7 @@ use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient}; use crate::message::Message; -use crate::protocol::{self, Context, CustomMessageOutcome, Protocol, ConnectedPeer, ProtocolMsg, ProtocolStatus, PeerInfo}; +use crate::protocol::{self, Context, CustomMessageOutcome, Protocol, ConnectedPeer, ProtocolStatus, PeerInfo}; use crate::config::Params; use crate::error::Error; use crate::specialization::NetworkSpecialization; @@ -200,12 +200,13 @@ impl> Service { import_queue: Box>, ) -> Result>, Error> { let (network_chan, network_port) = network_channel(); + let (protocol_sender, protocol_rx) = mpsc::unbounded(); let status_sinks = Arc::new(Mutex::new(Vec::new())); // Start in off-line mode, since we're not connected to any nodes yet. let is_offline = Arc::new(AtomicBool::new(true)); let is_major_syncing = Arc::new(AtomicBool::new(false)); let peers: Arc>>> = Arc::new(Default::default()); - let (protocol, protocol_sender) = Protocol::new( + let protocol = Protocol::new( peers.clone(), network_chan.clone(), params.config, @@ -223,6 +224,7 @@ impl> Service { protocol, import_queue.clone(), network_port, + protocol_rx, status_sinks.clone(), params.network_config, registered, @@ -520,6 +522,70 @@ pub enum NetworkMsg { Synchronized, } +/// Messages sent to Protocol from elsewhere inside the system. +pub enum ProtocolMsg> { + /// A batch of blocks has been processed, with or without errors. + BlocksProcessed(Vec, bool), + /// Tell protocol to restart sync. + RestartSync, + /// Tell protocol to propagate extrinsics. + PropagateExtrinsics, + /// Tell protocol that a block was imported (sent by the import-queue). + BlockImportedSync(B::Hash, NumberFor), + /// Tell protocol to clear all pending justification requests. + ClearJustificationRequests, + /// Tell protocol to request justification for a block. + RequestJustification(B::Hash, NumberFor), + /// Inform protocol whether a justification was successfully imported. + JustificationImportResult(B::Hash, NumberFor, bool), + /// Set finality proof request builder. + SetFinalityProofRequestBuilder(SharedFinalityProofRequestBuilder), + /// Tell protocol to request finality proof for a block. + RequestFinalityProof(B::Hash, NumberFor), + /// Inform protocol whether a finality proof was successfully imported. + FinalityProofImportResult((B::Hash, NumberFor), Result<(B::Hash, NumberFor), ()>), + /// Propagate a block to peers. + AnnounceBlock(B::Hash), + /// A block has been imported (sent by the client). + BlockImported(B::Hash, B::Header), + /// A block has been finalized (sent by the client). + BlockFinalized(B::Hash, B::Header), + /// Execute a closure with the chain-specific network specialization. + ExecuteWithSpec(Box + Send + 'static>), + /// Execute a closure with the consensus gossip. + ExecuteWithGossip(Box + Send + 'static>), + /// Incoming gossip consensus message. + GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec, GossipMessageRecipient), + /// Tell protocol to perform regular maintenance. + #[cfg(any(test, feature = "test-helpers"))] + Tick, + /// Synchronization request. + #[cfg(any(test, feature = "test-helpers"))] + Synchronize, +} + +/// A task, consisting of a user-provided closure, to be executed on the Protocol thread. +pub trait SpecTask> { + fn call_box(self: Box, spec: &mut S, context: &mut Context); +} + +impl, F: FnOnce(&mut S, &mut Context)> SpecTask for F { + fn call_box(self: Box, spec: &mut S, context: &mut Context) { + (*self)(spec, context) + } +} + +/// A task, consisting of a user-provided closure, to be executed on the Protocol thread. +pub trait GossipTask { + fn call_box(self: Box, gossip: &mut ConsensusGossip, context: &mut Context); +} + +impl, &mut Context)> GossipTask for F { + fn call_box(self: Box, gossip: &mut ConsensusGossip, context: &mut Context) { + (*self)(gossip, context) + } +} + /// Starts the background thread that handles the networking. fn start_thread, H: ExHashT>( is_offline: Arc, @@ -527,6 +593,7 @@ fn start_thread, H: ExHashT>( protocol: Protocol, import_queue: Box>, network_port: NetworkPort, + protocol_rx: mpsc::UnboundedReceiver>, status_sinks: Arc>>>>, config: NetworkConfiguration, registered: RegisteredProtocol>, @@ -545,7 +612,17 @@ fn start_thread, H: ExHashT>( let mut runtime = RuntimeBuilder::new().name_prefix("libp2p-").build()?; let peerset_clone = peerset.clone(); let thread = thread::Builder::new().name("network".to_string()).spawn(move || { - let fut = run_thread(is_offline, is_major_syncing, protocol, service_clone, import_queue, network_port, status_sinks, peerset_clone) + let fut = run_thread( + is_offline, + is_major_syncing, + protocol, + service_clone, + import_queue, + network_port, + protocol_rx, + status_sinks, + peerset_clone + ) .select(close_rx.then(|_| Ok(()))) .map(|(val, _)| val) .map_err(|(err,_ )| err); @@ -569,6 +646,7 @@ fn run_thread, H: ExHashT>( network_service: Arc>>>, import_queue: Box>, network_port: NetworkPort, + mut protocol_rx: mpsc::UnboundedReceiver>, status_sinks: Arc>>>>, peerset: PeersetHandle, ) -> impl Future { @@ -603,6 +681,56 @@ fn run_thread, H: ExHashT>( } } + loop { + let msg = match protocol_rx.poll() { + Ok(Async::Ready(Some(msg))) => msg, + Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), + Ok(Async::NotReady) => break, + }; + + match msg { + ProtocolMsg::BlockImported(hash, header) => + protocol.on_block_imported(hash, &header), + ProtocolMsg::BlockFinalized(hash, header) => + protocol.on_block_finalized(hash, &header), + ProtocolMsg::ExecuteWithSpec(task) => { + let (mut context, spec) = protocol.specialization_lock(); + task.call_box(spec, &mut context); + }, + ProtocolMsg::ExecuteWithGossip(task) => { + let (mut context, gossip) = protocol.consensus_gossip_lock(); + task.call_box(gossip, &mut context); + } + ProtocolMsg::GossipConsensusMessage(topic, engine_id, message, recipient) => + protocol.gossip_consensus_message(topic, engine_id, message, recipient), + ProtocolMsg::BlocksProcessed(hashes, has_error) => + protocol.blocks_processed(hashes, has_error), + ProtocolMsg::RestartSync => + protocol.restart(), + ProtocolMsg::AnnounceBlock(hash) => + protocol.announce_block(hash), + ProtocolMsg::BlockImportedSync(hash, number) => + protocol.block_imported(&hash, number), + ProtocolMsg::ClearJustificationRequests => + protocol.clear_justification_requests(), + ProtocolMsg::RequestJustification(hash, number) => + protocol.request_justification(&hash, number), + ProtocolMsg::JustificationImportResult(hash, number, success) => + protocol.justification_import_result(hash, number, success), + ProtocolMsg::SetFinalityProofRequestBuilder(builder) => + protocol.set_finality_proof_request_builder(builder), + ProtocolMsg::RequestFinalityProof(hash, number) => + protocol.request_finality_proof(&hash, number), + ProtocolMsg::FinalityProofImportResult(requested_block, finalziation_result) => + protocol.finality_proof_import_result(requested_block, finalziation_result), + ProtocolMsg::PropagateExtrinsics => protocol.propagate_extrinsics(), + #[cfg(any(test, feature = "test-helpers"))] + ProtocolMsg::Tick => protocol.tick(), + #[cfg(any(test, feature = "test-helpers"))] + ProtocolMsg::Synchronize => protocol.synchronize(), + } + } + loop { let outcome = match network_service.lock().poll() { Ok(Async::NotReady) => break, diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index a75cb4d24c..44e0eafd91 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -44,13 +44,11 @@ use crate::message::Message; use network_libp2p::PeerId; use parking_lot::{Mutex, RwLock}; use primitives::{H256, sr25519::Public as AuthorityId, Blake2Hasher}; -use crate::protocol::{ - ConnectedPeer, Context, Protocol, ProtocolStatus, ProtocolMsg, CustomMessageOutcome, -}; +use crate::protocol::{ConnectedPeer, Context, Protocol, ProtocolStatus, CustomMessageOutcome}; use runtime_primitives::generic::BlockId; use runtime_primitives::traits::{AuthorityIdFor, Block as BlockT, Digest, DigestItem, Header, NumberFor}; use runtime_primitives::{Justification, ConsensusEngineId}; -use crate::service::{network_channel, NetworkChan, NetworkLink, NetworkMsg, NetworkPort, TransactionPool}; +use crate::service::{network_channel, NetworkChan, NetworkLink, NetworkMsg, NetworkPort, ProtocolMsg, TransactionPool}; use crate::specialization::NetworkSpecialization; use test_client::{self, AccountKeyring}; @@ -520,7 +518,9 @@ impl> Peer { } let header = self.client.header(&BlockId::Hash(info.chain.finalized_hash)).unwrap().unwrap(); - self.net_proto_channel.send_from_client(ProtocolMsg::BlockFinalized(info.chain.finalized_hash, header.clone())); + self.net_proto_channel.send_from_client( + ProtocolMsg::BlockFinalized(info.chain.finalized_hash, header.clone()) + ); *finalized_hash = Some(info.chain.finalized_hash); } @@ -748,6 +748,7 @@ pub trait TestNetFactory: Sized { import_queue: Box>, mut protocol: Protocol, mut network_to_protocol_rx: mpsc::UnboundedReceiver>, + mut protocol_rx: mpsc::UnboundedReceiver>, peer: Arc>, ) { std::thread::spawn(move || { @@ -782,12 +783,64 @@ pub trait TestNetFactory: Sized { } } + loop { + let msg = match protocol_rx.poll() { + Ok(Async::Ready(Some(msg))) => msg, + Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), + Ok(Async::NotReady) => break, + }; + + match msg { + ProtocolMsg::BlockImported(hash, header) => + protocol.on_block_imported(hash, &header), + ProtocolMsg::BlockFinalized(hash, header) => + protocol.on_block_finalized(hash, &header), + ProtocolMsg::ExecuteWithSpec(task) => { + let (mut context, spec) = protocol.specialization_lock(); + task.call_box(spec, &mut context); + }, + ProtocolMsg::ExecuteWithGossip(task) => { + let (mut context, gossip) = protocol.consensus_gossip_lock(); + task.call_box(gossip, &mut context); + } + ProtocolMsg::GossipConsensusMessage(topic, engine_id, message, recipient) => + protocol.gossip_consensus_message(topic, engine_id, message, recipient), + ProtocolMsg::BlocksProcessed(hashes, has_error) => + protocol.blocks_processed(hashes, has_error), + ProtocolMsg::RestartSync => + protocol.restart(), + ProtocolMsg::AnnounceBlock(hash) => + protocol.announce_block(hash), + ProtocolMsg::BlockImportedSync(hash, number) => + protocol.block_imported(&hash, number), + ProtocolMsg::ClearJustificationRequests => + protocol.clear_justification_requests(), + ProtocolMsg::RequestJustification(hash, number) => + protocol.request_justification(&hash, number), + ProtocolMsg::JustificationImportResult(hash, number, success) => + protocol.justification_import_result(hash, number, success), + ProtocolMsg::SetFinalityProofRequestBuilder(builder) => + protocol.set_finality_proof_request_builder(builder), + ProtocolMsg::RequestFinalityProof(hash, number) => + protocol.request_finality_proof(&hash, number), + ProtocolMsg::FinalityProofImportResult(requested_block, finalziation_result) => + protocol.finality_proof_import_result(requested_block, finalziation_result), + ProtocolMsg::PropagateExtrinsics => protocol.propagate_extrinsics(), + #[cfg(any(test, feature = "test-helpers"))] + ProtocolMsg::Tick => protocol.tick(), + #[cfg(any(test, feature = "test-helpers"))] + ProtocolMsg::Synchronize => { + trace!(target: "sync", "handle_client_msg: received Synchronize msg"); + protocol.synchronize(); + } + } + } + if let Async::Ready(_) = protocol.poll().unwrap() { return Ok(Async::Ready(())) } *protocol_status.write() = protocol.status(); - Ok(Async::NotReady) })); }); @@ -825,8 +878,9 @@ pub trait TestNetFactory: Sized { let peers: Arc>>> = Arc::new(Default::default()); let (network_to_protocol_sender, network_to_protocol_rx) = mpsc::unbounded(); + let (protocol_sender, protocol_rx) = mpsc::unbounded(); - let (protocol, protocol_sender) = Protocol::new( + let protocol = Protocol::new( peers.clone(), network_sender.clone(), config.clone(), @@ -843,6 +897,7 @@ pub trait TestNetFactory: Sized { import_queue.clone(), protocol, network_to_protocol_rx, + protocol_rx, Arc::new(Peer::new( protocol_status, peers, @@ -880,8 +935,9 @@ pub trait TestNetFactory: Sized { let peers: Arc>>> = Arc::new(Default::default()); let (network_to_protocol_sender, network_to_protocol_rx) = mpsc::unbounded(); + let (protocol_sender, protocol_rx) = mpsc::unbounded(); - let (protocol, protocol_sender) = Protocol::new( + let protocol = Protocol::new( peers.clone(), network_sender.clone(), config, @@ -898,6 +954,7 @@ pub trait TestNetFactory: Sized { import_queue.clone(), protocol, network_to_protocol_rx, + protocol_rx, Arc::new(Peer::new( protocol_status, peers,