diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index a40f588d29..5d32c8882b 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -3999,6 +3999,7 @@ dependencies = [ "libp2p 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 3.5.1 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "sr-primitives 2.0.0", "sr-std 2.0.0", "sr-version 2.0.0", diff --git a/substrate/core/consensus/common/Cargo.toml b/substrate/core/consensus/common/Cargo.toml index 5e55d555dd..a64ae120f4 100644 --- a/substrate/core/consensus/common/Cargo.toml +++ b/substrate/core/consensus/common/Cargo.toml @@ -18,6 +18,7 @@ runtime_version = { package = "sr-version", path = "../../sr-version" } runtime_primitives = { package = "sr-primitives", path = "../../sr-primitives" } tokio-timer = "0.2" parity-codec = { version = "3.3", features = ["derive"] } +parking_lot = "0.7.1" [dev-dependencies] test_client = { package = "substrate-test-client", path = "../../test-client" } diff --git a/substrate/core/consensus/common/src/import_queue.rs b/substrate/core/consensus/common/src/import_queue.rs index 4d64d799b1..26ae181b51 100644 --- a/substrate/core/consensus/common/src/import_queue.rs +++ b/substrate/core/consensus/common/src/import_queue.rs @@ -31,6 +31,7 @@ use crate::block_import::{ }; use crossbeam_channel::{self as channel, Receiver, Sender}; use parity_codec::Encode; +use parking_lot::Mutex; use std::sync::Arc; use std::thread; @@ -97,7 +98,7 @@ pub trait Verifier: Send + Sync { } /// Blocks import queue API. -pub trait ImportQueue: Send + Sync + ImportQueueClone { +pub trait ImportQueue: Send + Sync { /// Start background work for the queue as necessary. /// /// This is called automatically by the network service when synchronization @@ -105,8 +106,6 @@ pub trait ImportQueue: Send + Sync + ImportQueueClone { fn start(&self, _link: Box>) -> Result<(), std::io::Error> { Ok(()) } - /// Clears the import queue and stops importing. - fn stop(&self); /// Import bunch of blocks. fn import_blocks(&self, origin: BlockOrigin, blocks: Vec>); /// Import a block justification. @@ -115,13 +114,96 @@ pub trait ImportQueue: Send + Sync + ImportQueueClone { fn import_finality_proof(&self, who: Origin, hash: B::Hash, number: NumberFor, finality_proof: Vec); } -pub trait ImportQueueClone { - fn clone_box(&self) -> Box>; +/// Basic block import queue that performs import in the caller thread. +pub struct BasicSyncQueue> { + data: Arc>, } -impl Clone for Box> { - fn clone(&self) -> Box> { - self.clone_box() +struct BasicSyncQueueData> { + link: Mutex>>>, + block_import: SharedBlockImport, + verifier: Arc, + justification_import: Option>, + finality_proof_import: Option>, +} + +impl> BasicSyncQueue { + pub fn new( + block_import: SharedBlockImport, + verifier: Arc, + justification_import: Option>, + finality_proof_import: Option>, + ) -> Self { + BasicSyncQueue { + data: Arc::new(BasicSyncQueueData { + link: Mutex::new(None), + block_import, + verifier, + justification_import, + finality_proof_import, + }), + } + } +} + +impl> ImportQueue for BasicSyncQueue { + fn start(&self, link: Box>) -> Result<(), std::io::Error> { + if let Some(justification_import) = self.data.justification_import.as_ref() { + justification_import.on_start(&*link); + } + *self.data.link.lock() = Some(link); + Ok(()) + } + + fn import_blocks(&self, origin: BlockOrigin, blocks: Vec>) { + if blocks.is_empty() { + return; + } + + let (imported, count, results) = import_many_blocks( + &*self.data.block_import, + origin, + blocks, + self.data.verifier.clone(), + ); + + let link_ref = self.data.link.lock(); + let link = match link_ref.as_ref() { + Some(link) => link, + None => { + trace!(target: "sync", "Trying to import blocks before starting import queue"); + return; + }, + }; + + process_import_results(&**link, results); + + trace!(target: "sync", "Imported {} of {}", imported, count); + } + + fn import_justification(&self, who: Origin, hash: B::Hash, number: NumberFor, justification: Justification) { + import_single_justification( + &*self.data.link.lock(), + &self.data.justification_import, + who, + hash, + number, + justification, + ) + } + + fn import_finality_proof(&self, who: Origin, hash: B::Hash, number: NumberFor, finality_proof: Vec) { + let result = import_single_finality_proof( + &self.data.finality_proof_import, + &*self.data.verifier, + &who, + hash, + number, + finality_proof, + ); + if let Some(link) = self.data.link.lock().as_ref() { + link.finality_proof_imported(who, (hash, number), result); + } } } @@ -129,16 +211,20 @@ impl Clone for Box> { /// sequentially in a separate thread, with pluggable verification. #[derive(Clone)] pub struct BasicQueue { - sender: Sender>, + sender: Option>>, } -impl ImportQueueClone for BasicQueue { - fn clone_box(&self) -> Box> { - Box::new(self.clone()) +impl Drop for BasicQueue { + fn drop(&mut self) { + if let Some(sender) = self.sender.take() { + let (shutdown_sender, shutdown_receiver) = channel::unbounded(); + if sender.send(BlockImportMsg::Shutdown(shutdown_sender)).is_ok() { + let _ = shutdown_receiver.recv(); + } + } } } - /// "BasicQueue" is a wrapper around a channel sender to the "BlockImporter". /// "BasicQueue" itself does not keep any state or do any importing work, and /// can therefore be send to other threads. @@ -184,7 +270,7 @@ impl BasicQueue { ); Self { - sender: importer_sender, + sender: Some(importer_sender), } } @@ -194,52 +280,47 @@ impl BasicQueue { /// has synchronized with ImportQueue. #[cfg(any(test, feature = "test-helpers"))] pub fn synchronize(&self) { - self - .sender - .send(BlockImportMsg::Synchronize) - .expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed"); + if let Some(ref sender) = self.sender { + let _ = sender.send(BlockImportMsg::Synchronize); + } } } impl ImportQueue for BasicQueue { fn start(&self, link: Box>) -> Result<(), std::io::Error> { - let (sender, port) = channel::unbounded(); - let _ = self - .sender - .send(BlockImportMsg::Start(link, sender)) - .expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed"); - port.recv().expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed") - } - - fn stop(&self) { - let _ = self - .sender - .send(BlockImportMsg::Stop) - .expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed"); + let connect_err = || Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Failed to connect import queue threads", + )); + if let Some(ref sender) = self.sender { + let (start_sender, start_port) = channel::unbounded(); + let _ = sender.send(BlockImportMsg::Start(link, start_sender)); + start_port.recv().unwrap_or_else(|_| connect_err()) + } else { + connect_err() + } } fn import_blocks(&self, origin: BlockOrigin, blocks: Vec>) { if blocks.is_empty() { return; } - let _ = self - .sender - .send(BlockImportMsg::ImportBlocks(origin, blocks)) - .expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed"); + + if let Some(ref sender) = self.sender { + let _ = sender.send(BlockImportMsg::ImportBlocks(origin, blocks)); + } } fn import_justification(&self, who: Origin, hash: B::Hash, number: NumberFor, justification: Justification) { - let _ = self - .sender - .send(BlockImportMsg::ImportJustification(who.clone(), hash, number, justification)) - .expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed"); + if let Some(ref sender) = self.sender { + let _ = sender.send(BlockImportMsg::ImportJustification(who.clone(), hash, number, justification)); + } } fn import_finality_proof(&self, who: Origin, hash: B::Hash, number: NumberFor, finality_proof: Vec) { - let _ = self - .sender - .send(BlockImportMsg::ImportFinalityProof(who, hash, number, finality_proof)) - .expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed"); + if let Some(ref sender) = self.sender { + let _ = sender.send(BlockImportMsg::ImportFinalityProof(who, hash, number, finality_proof)); + } } } @@ -248,12 +329,12 @@ pub enum BlockImportMsg { ImportJustification(Origin, B::Hash, NumberFor, Justification), ImportFinalityProof(Origin, B::Hash, NumberFor, Vec), Start(Box>, Sender>), - Stop, + Shutdown(Sender<()>), #[cfg(any(test, feature = "test-helpers"))] Synchronize, } -#[cfg_attr(test, derive(Debug, PartialEq))] +#[cfg_attr(test, derive(Debug))] pub enum BlockImportWorkerMsg { ImportBlocks(BlockOrigin, Vec>), ImportedBlocks( @@ -264,6 +345,7 @@ pub enum BlockImportWorkerMsg { ), ImportFinalityProof(Origin, B::Hash, NumberFor, Vec), ImportedFinalityProof(Origin, (B::Hash, NumberFor), Result<(B::Hash, NumberFor), ()>), + Shutdown(Sender<()>), #[cfg(any(test, feature = "test-helpers"))] Synchronize, } @@ -276,7 +358,7 @@ enum ImportMsgType { struct BlockImporter { port: Receiver>, result_port: Receiver>, - worker_sender: Sender>, + worker_sender: Option>>, link: Option>>, verifier: Arc>, justification_import: Option>, @@ -301,7 +383,7 @@ impl BlockImporter { let mut importer = BlockImporter { port, result_port, - worker_sender, + worker_sender: Some(worker_sender), link: None, verifier, justification_import, @@ -345,7 +427,14 @@ impl BlockImporter { self.handle_import_blocks(origin, incoming_blocks) }, BlockImportMsg::ImportJustification(who, hash, number, justification) => { - self.handle_import_justification(who, hash, number, justification) + import_single_justification( + &self.link, + &self.justification_import, + who, + hash, + number, + justification, + ); }, BlockImportMsg::ImportFinalityProof(who, hash, number, finality_proof) => { self.handle_import_finality_proof(who, hash, number, finality_proof) @@ -363,13 +452,24 @@ impl BlockImporter { self.link = Some(link); let _ = sender.send(Ok(())); }, - BlockImportMsg::Stop => return false, + BlockImportMsg::Shutdown(result_sender) => { + // stop worker thread + if let Some(worker_sender) = self.worker_sender.take() { + let (sender, receiver) = channel::unbounded(); + if worker_sender.send(BlockImportWorkerMsg::Shutdown(sender)).is_ok() { + let _ = receiver.recv(); + } + } + // send shutdown notification + let _ = result_sender.send(()); + return false; + }, #[cfg(any(test, feature = "test-helpers"))] BlockImportMsg::Synchronize => { trace!(target: "sync", "Received synchronization message"); - self.worker_sender - .send(BlockImportWorkerMsg::Synchronize) - .expect("1. This is holding a sender to the worker, 2. the worker should not quit while a sender is still held; qed"); + if let Some(ref worker_sender) = self.worker_sender { + let _ = worker_sender.send(BlockImportWorkerMsg::Synchronize); + } }, } true @@ -398,106 +498,26 @@ impl BlockImporter { }, BlockImportWorkerMsg::ImportBlocks(_, _) | BlockImportWorkerMsg::ImportFinalityProof(_, _, _, _) - => unreachable!("Import Worker does not send Import* message; qed"), + | BlockImportWorkerMsg::Shutdown(_) + => unreachable!("Import Worker does not send Import*/Shutdown messages; qed"), }; - let mut has_error = false; - let mut hashes = vec![]; - for (result, hash) in results { - hashes.push(hash); - if has_error { - continue; - } - - if result.is_err() { - has_error = true; - } - - match result { - Ok(BlockImportResult::ImportedKnown(number)) => link.block_imported(&hash, number), - Ok(BlockImportResult::ImportedUnknown(number, aux, who)) => { - link.block_imported(&hash, number); - - if aux.clear_justification_requests { - trace!(target: "sync", "Block imported clears all pending justification requests {}: {:?}", number, hash); - link.clear_justification_requests(); - } - - if aux.needs_justification { - trace!(target: "sync", "Block imported but requires justification {}: {:?}", number, hash); - link.request_justification(&hash, number); - } - - if aux.bad_justification { - if let Some(peer) = who { - info!("Sent block with bad justification to import"); - link.report_peer(peer, BAD_JUSTIFICATION_REPUTATION_CHANGE); - } - } - - if aux.needs_finality_proof { - trace!(target: "sync", "Block imported but requires finality proof {}: {:?}", number, hash); - link.request_finality_proof(&hash, number); - } - }, - Err(BlockImportError::IncompleteHeader(who)) => { - if let Some(peer) = who { - info!("Peer sent block with incomplete header to import"); - link.report_peer(peer, INCOMPLETE_HEADER_REPUTATION_CHANGE); - link.restart(); - } - }, - Err(BlockImportError::VerificationFailed(who, e)) => { - if let Some(peer) = who { - info!("Verification failed from peer: {}", e); - link.report_peer(peer, VERIFICATION_FAIL_REPUTATION_CHANGE); - link.restart(); - } - }, - Err(BlockImportError::BadBlock(who)) => { - if let Some(peer) = who { - info!("Bad block"); - link.report_peer(peer, BAD_BLOCK_REPUTATION_CHANGE); - link.restart(); - } - }, - Err(BlockImportError::UnknownParent) | Err(BlockImportError::Error) => { - link.restart(); - }, - }; - } - if let Some(link) = self.link.as_ref() { - link.blocks_processed(hashes, has_error); - } + process_import_results(&**link, results); true } - fn handle_import_justification(&self, who: Origin, hash: B::Hash, number: NumberFor, justification: Justification) { - let success = self.justification_import.as_ref().map(|justification_import| { - justification_import.import_justification(hash, number, justification) - .map_err(|e| { - debug!(target: "sync", "Justification import failed with {:?} for hash: {:?} number: {:?} coming from node: {:?}", e, hash, number, who); - e - }).is_ok() - }).unwrap_or(false); - - if let Some(link) = self.link.as_ref() { - link.justification_imported(who, &hash, number, success); + fn handle_import_finality_proof(&self, who: Origin, hash: B::Hash, number: NumberFor, finality_proof: Vec) { + if let Some(ref worker_sender) = self.worker_sender { + trace!(target: "sync", "Scheduling finality proof of {}/{} for import", number, hash); + let _ = worker_sender.send(BlockImportWorkerMsg::ImportFinalityProof(who, hash, number, finality_proof)); } } - fn handle_import_finality_proof(&self, who: Origin, hash: B::Hash, number: NumberFor, finality_proof: Vec) { - trace!(target: "sync", "Scheduling finality proof of {}/{} for import", number, hash); - self.worker_sender - .send(BlockImportWorkerMsg::ImportFinalityProof(who, hash, number, finality_proof)) - .expect("1. This is holding a sender to the worker, 2. the worker should not quit while a sender is still held; qed"); - } - fn handle_import_blocks(&mut self, origin: BlockOrigin, blocks: Vec>) { - trace!(target: "sync", "Scheduling {} blocks for import", blocks.len()); - self.worker_sender - .send(BlockImportWorkerMsg::ImportBlocks(origin, blocks)) - .expect("1. This is holding a sender to the worker, 2. the worker should not quit while a sender is still held; qed"); + if let Some(ref worker_sender) = self.worker_sender { + trace!(target: "sync", "Scheduling {} blocks for import", blocks.len()); + let _ = worker_sender.send(BlockImportWorkerMsg::ImportBlocks(origin, blocks)); + } } } @@ -534,6 +554,10 @@ impl> BlockImportWorker { BlockImportWorkerMsg::ImportFinalityProof(who, hash, number, proof) => { worker.import_finality_proof(who, hash, number, proof); }, + BlockImportWorkerMsg::Shutdown(result_sender) => { + let _ = result_sender.send(()); + break; + }, #[cfg(any(test, feature = "test-helpers"))] BlockImportWorkerMsg::Synchronize => { trace!(target: "sync", "Sending sync message"); @@ -550,44 +574,12 @@ impl> BlockImportWorker { } fn import_a_batch_of_blocks(&self, origin: BlockOrigin, blocks: Vec>) { - let count = blocks.len(); - let mut imported = 0; - - let blocks_range = match ( - blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())), - blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())), - ) { - (Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last), - (Some(first), Some(_)) => format!(" ({})", first), - _ => Default::default(), - }; - - trace!(target: "sync", "Starting import of {} blocks {}", count, blocks_range); - - let mut results = vec![]; - - let mut has_error = false; - - // Blocks in the response/drain should be in ascending order. - for block in blocks { - let import_result = if has_error { - Err(BlockImportError::Error) - } else { - import_single_block( - &*self.block_import, - origin.clone(), - block.clone(), - self.verifier.clone(), - ) - }; - let was_ok = import_result.is_ok(); - results.push((import_result, block.hash)); - if was_ok { - imported += 1; - } else { - has_error = true; - } - } + let (imported, count, results) = import_many_blocks( + &*self.block_import, + origin, + blocks, + self.verifier.clone(), + ); let _ = self .result_sender @@ -597,24 +589,18 @@ impl> BlockImportWorker { } fn import_finality_proof(&self, who: Origin, hash: B::Hash, number: NumberFor, finality_proof: Vec) { - let result = self.finality_proof_import.as_ref().map(|finality_proof_import| { - finality_proof_import.import_finality_proof(hash, number, finality_proof, &*self.verifier) - .map_err(|e| { - debug!( - "Finality proof import failed with {:?} for hash: {:?} number: {:?} coming from node: {:?}", - e, - hash, - number, - who, - ); - }) - }).unwrap_or(Err(())); + let result = import_single_finality_proof( + &self.finality_proof_import, + &*self.verifier, + &who, + hash, + number, + finality_proof, + ); let _ = self .result_sender .send(BlockImportWorkerMsg::ImportedFinalityProof(who, (hash, number), result)); - - trace!(target: "sync", "Imported finality proof for {}/{}", number, hash); } } @@ -679,6 +665,193 @@ pub enum BlockImportError { Error, } +/// Imports single notification and send notification to the link (if provided). +fn import_single_justification( + link: &Option>>, + justification_import: &Option>, + who: Origin, + hash: B::Hash, + number: NumberFor, + justification: Justification, +) { + let success = justification_import.as_ref().map(|justification_import| { + justification_import.import_justification(hash, number, justification) + .map_err(|e| { + debug!( + target: "sync", + "Justification import failed with {:?} for hash: {:?} number: {:?} coming from node: {:?}", + e, + hash, + number, + who, + ); + e + }).is_ok() + }).unwrap_or(false); + + if let Some(ref link) = link { + link.justification_imported(who, &hash, number, success); + } +} + +/// Imports single finality_proof. +fn import_single_finality_proof>( + finality_proof_import: &Option>, + verifier: &V, + who: &Origin, + hash: B::Hash, + number: NumberFor, + finality_proof: Vec, +) -> Result<(B::Hash, NumberFor), ()> { + let result = finality_proof_import.as_ref().map(|finality_proof_import| { + finality_proof_import.import_finality_proof(hash, number, finality_proof, verifier) + .map_err(|e| { + debug!( + "Finality proof import failed with {:?} for hash: {:?} number: {:?} coming from node: {:?}", + e, + hash, + number, + who, + ); + }) + }).unwrap_or(Err(())); + + trace!(target: "sync", "Imported finality proof for {}/{}", number, hash); + + result +} + +/// Process result of block(s) import. +fn process_import_results( + link: &Link, + results: Vec<( + Result>, BlockImportError>, + B::Hash, + )>, +) +{ + let mut has_error = false; + let mut hashes = vec![]; + for (result, hash) in results { + hashes.push(hash); + + if has_error { + continue; + } + + if result.is_err() { + has_error = true; + } + + match result { + Ok(BlockImportResult::ImportedKnown(number)) => link.block_imported(&hash, number), + Ok(BlockImportResult::ImportedUnknown(number, aux, who)) => { + link.block_imported(&hash, number); + + if aux.clear_justification_requests { + trace!(target: "sync", "Block imported clears all pending justification requests {}: {:?}", number, hash); + link.clear_justification_requests(); + } + + if aux.needs_justification { + trace!(target: "sync", "Block imported but requires justification {}: {:?}", number, hash); + link.request_justification(&hash, number); + } + + if aux.bad_justification { + if let Some(peer) = who { + info!("Sent block with bad justification to import"); + link.report_peer(peer, BAD_JUSTIFICATION_REPUTATION_CHANGE); + } + } + + if aux.needs_finality_proof { + trace!(target: "sync", "Block imported but requires finality proof {}: {:?}", number, hash); + link.request_finality_proof(&hash, number); + } + }, + Err(BlockImportError::IncompleteHeader(who)) => { + if let Some(peer) = who { + info!("Peer sent block with incomplete header to import"); + link.report_peer(peer, INCOMPLETE_HEADER_REPUTATION_CHANGE); + link.restart(); + } + }, + Err(BlockImportError::VerificationFailed(who, e)) => { + if let Some(peer) = who { + info!("Verification failed from peer: {}", e); + link.report_peer(peer, VERIFICATION_FAIL_REPUTATION_CHANGE); + link.restart(); + } + }, + Err(BlockImportError::BadBlock(who)) => { + if let Some(peer) = who { + info!("Bad block"); + link.report_peer(peer, BAD_BLOCK_REPUTATION_CHANGE); + link.restart(); + } + }, + Err(BlockImportError::UnknownParent) | Err(BlockImportError::Error) => { + link.restart(); + }, + }; + } + link.blocks_processed(hashes, has_error); +} + +/// Import several blocks at once, returning import result for each block. +fn import_many_blocks>( + import_handle: &BlockImport, + blocks_origin: BlockOrigin, + blocks: Vec>, + verifier: Arc, +) -> (usize, usize, Vec<( + Result>, BlockImportError>, + B::Hash, +)>) { + let count = blocks.len(); + let mut imported = 0; + + let blocks_range = match ( + blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())), + blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())), + ) { + (Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last), + (Some(first), Some(_)) => format!(" ({})", first), + _ => Default::default(), + }; + + trace!(target: "sync", "Starting import of {} blocks {}", count, blocks_range); + + let mut results = vec![]; + + let mut has_error = false; + + // Blocks in the response/drain should be in ascending order. + for block in blocks { + let block_hash = block.hash; + let import_result = if has_error { + Err(BlockImportError::Error) + } else { + import_single_block( + import_handle, + blocks_origin.clone(), + block, + verifier.clone(), + ) + }; + let was_ok = import_result.is_ok(); + results.push((import_result, block_hash)); + if was_ok { + imported += 1; + } else { + has_error = true; + } + } + + (imported, count, results) +} + /// Single block import function. pub fn import_single_block>( import_handle: &BlockImport, @@ -909,12 +1082,19 @@ mod tests { )).unwrap(); // Wait until this request is redirected to the BlockImportWorker - assert_eq!(worker_receiver.recv(), Ok(BlockImportWorkerMsg::ImportFinalityProof( - who.clone(), - Default::default(), - 1, - vec![42], - ))); + match worker_receiver.recv().unwrap() { + BlockImportWorkerMsg::ImportFinalityProof( + cwho, + chash, + 1, + cproof, + ) => { + assert_eq!(cwho, who); + assert_eq!(chash, Default::default()); + assert_eq!(cproof, vec![42]); + }, + _ => unreachable!("Unexpected work request received"), + } // Send ack of proof import from BlockImportWorker to BlockImporter result_sender.send(BlockImportWorkerMsg::ImportedFinalityProof( diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index b2e56a83bd..c8679c96f7 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -211,6 +211,14 @@ impl> Service { let (network_chan, network_port) = mpsc::unbounded(); let (protocol_sender, protocol_rx) = mpsc::unbounded(); let status_sinks = Arc::new(Mutex::new(Vec::new())); + + // connect the import-queue to the network service. + let link = NetworkLink { + protocol_sender: protocol_sender.clone(), + network_sender: network_chan.clone(), + }; + import_queue.start(Box::new(link))?; + // Start in off-line mode, since we're not connected to any nodes yet. let is_offline = Arc::new(AtomicBool::new(true)); let is_major_syncing = Arc::new(AtomicBool::new(false)); @@ -229,7 +237,7 @@ impl> Service { is_major_syncing.clone(), protocol, peers.clone(), - import_queue.clone(), + import_queue, params.transaction_pool, params.finality_proof_provider, network_port, @@ -244,22 +252,14 @@ impl> Service { status_sinks, is_offline, is_major_syncing, - network_chan: network_chan.clone(), + network_chan, peers, peerset, network, - protocol_sender: protocol_sender.clone(), + protocol_sender, bg_thread: Some(thread), }); - // connect the import-queue to the network service. - let link = NetworkLink { - protocol_sender, - network_sender: network_chan, - }; - - import_queue.start(Box::new(link))?; - Ok(service) }