mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-14 05:11:09 +00:00
generalize some import_queue params
This commit is contained in:
@@ -892,7 +892,9 @@ impl<B, E, Block> Client<B, E, Block> where
|
|||||||
/// TODO [snd] possibly implement this on blockchain::Backend and just redirect here
|
/// TODO [snd] possibly implement this on blockchain::Backend and just redirect here
|
||||||
/// Returns `Ok(None)` if `target_hash` is not found in search space.
|
/// Returns `Ok(None)` if `target_hash` is not found in search space.
|
||||||
/// TODO [snd] write down time complexity
|
/// TODO [snd] write down time complexity
|
||||||
pub fn best_containing(&self, target_hash: Block::Hash, maybe_max_number: Option<NumberFor<Block>>) -> error::Result<Option<Block::Hash>> {
|
pub fn best_containing(&self, target_hash: Block::Hash, maybe_max_number: Option<NumberFor<Block>>)
|
||||||
|
-> error::Result<Option<Block::Hash>>
|
||||||
|
{
|
||||||
let target_header = {
|
let target_header = {
|
||||||
match self.backend.blockchain().header(BlockId::Hash(target_hash))? {
|
match self.backend.blockchain().header(BlockId::Hash(target_hash))? {
|
||||||
Some(x) => x,
|
Some(x) => x,
|
||||||
|
|||||||
@@ -65,15 +65,9 @@ pub trait ImportQueue<B: BlockT>: Send + Sync {
|
|||||||
///
|
///
|
||||||
/// This is called automatically by the network service when synchronization
|
/// This is called automatically by the network service when synchronization
|
||||||
/// begins.
|
/// begins.
|
||||||
|
fn start<L>(&self, _link: L) -> Result<(), Error> where
|
||||||
fn start<E>(
|
|
||||||
&self,
|
|
||||||
_sync: Weak<RwLock<ChainSync<B>>>,
|
|
||||||
_service: Weak<E>,
|
|
||||||
_chain: Weak<Client<B>>
|
|
||||||
) -> Result<(), Error> where
|
|
||||||
Self: Sized,
|
Self: Sized,
|
||||||
E: 'static + ExecuteInContext<B>,
|
L: 'static + Link<B>,
|
||||||
{
|
{
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -138,18 +132,16 @@ impl<B: BlockT> AsyncImportQueueData<B> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<B: BlockT, V: 'static + Verifier<B>> ImportQueue<B> for BasicQueue<B, V> {
|
impl<B: BlockT, V: 'static + Verifier<B>> ImportQueue<B> for BasicQueue<B, V> {
|
||||||
fn start<E: 'static + ExecuteInContext<B>>(
|
fn start<L: 'static + Link<B>>(
|
||||||
&self,
|
&self,
|
||||||
sync: Weak<RwLock<ChainSync<B>>>,
|
link: L,
|
||||||
service: Weak<E>,
|
|
||||||
chain: Weak<Client<B>>
|
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
debug_assert!(self.handle.lock().is_none());
|
debug_assert!(self.handle.lock().is_none());
|
||||||
|
|
||||||
let qdata = self.data.clone();
|
let qdata = self.data.clone();
|
||||||
let verifier = self.verifier.clone();
|
let verifier = self.verifier.clone();
|
||||||
*self.handle.lock() = Some(::std::thread::Builder::new().name("ImportQueue".into()).spawn(move || {
|
*self.handle.lock() = Some(::std::thread::Builder::new().name("ImportQueue".into()).spawn(move || {
|
||||||
import_thread(sync, service, chain, qdata, verifier)
|
import_thread(link, qdata, verifier)
|
||||||
}).map_err(|err| Error::from(ErrorKind::Io(err)))?);
|
}).map_err(|err| Error::from(ErrorKind::Io(err)))?);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -211,10 +203,8 @@ impl<B: BlockT, V: 'static + Verifier<B>> Drop for BasicQueue<B, V> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Blocks import thread.
|
/// Blocks import thread.
|
||||||
fn import_thread<B: BlockT, E: ExecuteInContext<B>, V: Verifier<B>>(
|
fn import_thread<B: BlockT, L: Link<B>, V: Verifier<B>>(
|
||||||
sync: Weak<RwLock<ChainSync<B>>>,
|
link: L,
|
||||||
service: Weak<E>,
|
|
||||||
chain: Weak<Client<B>>,
|
|
||||||
qdata: Arc<AsyncImportQueueData<B>>,
|
qdata: Arc<AsyncImportQueueData<B>>,
|
||||||
verifier: Arc<V>
|
verifier: Arc<V>
|
||||||
) {
|
) {
|
||||||
@@ -236,91 +226,89 @@ fn import_thread<B: BlockT, E: ExecuteInContext<B>, V: Verifier<B>>(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
match (sync.upgrade(), service.upgrade(), chain.upgrade()) {
|
let blocks_hashes: Vec<B::Hash> = new_blocks.1.iter().map(|b| b.block.hash.clone()).collect();
|
||||||
(Some(sync), Some(service), Some(chain)) => {
|
if !import_many_blocks(
|
||||||
let blocks_hashes: Vec<B::Hash> = new_blocks.1.iter().map(|b| b.block.hash.clone()).collect();
|
&link,
|
||||||
if !import_many_blocks(
|
Some(&*qdata),
|
||||||
&mut SyncLink{chain: &sync, client: &*chain, context: &*service},
|
new_blocks,
|
||||||
Some(&*qdata),
|
verifier.clone(),
|
||||||
new_blocks,
|
) {
|
||||||
verifier.clone(),
|
break;
|
||||||
) {
|
}
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut queue_blocks = qdata.queue_blocks.write();
|
let mut queue_blocks = qdata.queue_blocks.write();
|
||||||
for blocks_hash in blocks_hashes {
|
for blocks_hash in blocks_hashes {
|
||||||
queue_blocks.remove(&blocks_hash);
|
queue_blocks.remove(&blocks_hash);
|
||||||
}
|
|
||||||
},
|
|
||||||
_ => break,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
trace!(target: "sync", "Stopping import thread");
|
trace!(target: "sync", "Stopping import thread");
|
||||||
}
|
}
|
||||||
/// ChainSync link trait.
|
|
||||||
trait SyncLinkApi<B: BlockT> {
|
/// Hooks that the verification queue can use to influence the synchronization
|
||||||
|
/// algorithm.
|
||||||
|
pub trait Link<B: BlockT>: Send {
|
||||||
/// Get chain reference.
|
/// Get chain reference.
|
||||||
fn chain(&self) -> &Client<B>;
|
fn chain(&self) -> &Client<B>;
|
||||||
/// Block imported.
|
/// Block imported.
|
||||||
fn block_imported(&mut self, hash: &B::Hash, number: NumberFor<B>);
|
fn block_imported(&self, hash: &B::Hash, number: NumberFor<B>);
|
||||||
/// Maintain sync.
|
/// Maintain sync.
|
||||||
fn maintain_sync(&mut self);
|
fn maintain_sync(&self);
|
||||||
/// Disconnect from peer.
|
/// Disconnect from peer.
|
||||||
fn useless_peer(&mut self, who: NodeIndex, reason: &str);
|
fn useless_peer(&self, who: NodeIndex, reason: &str);
|
||||||
/// Disconnect from peer and restart sync.
|
/// Disconnect from peer and restart sync.
|
||||||
fn note_useless_and_restart_sync(&mut self, who: NodeIndex, reason: &str);
|
fn note_useless_and_restart_sync(&self, who: NodeIndex, reason: &str);
|
||||||
/// Restart sync.
|
/// Restart sync.
|
||||||
fn restart(&mut self);
|
fn restart(&self);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A link implementation that connects to the network.
|
||||||
/// Link with the ChainSync service.
|
pub struct NetworkLink<B: BlockT, E: ExecuteInContext<B>> {
|
||||||
struct SyncLink<'a, B: 'a + BlockT, E: 'a + ExecuteInContext<B>> {
|
/// The client handle.
|
||||||
pub chain: &'a RwLock<ChainSync<B>>,
|
pub(crate) client: Arc<Client<B>>,
|
||||||
pub client: &'a Client<B>,
|
/// The chain-sync handle
|
||||||
pub context: &'a E,
|
pub(crate) sync: Weak<RwLock<ChainSync<B>>>,
|
||||||
|
/// Network context.
|
||||||
|
pub(crate) context: Weak<E>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, B: 'static + BlockT, E: 'a + ExecuteInContext<B>> SyncLink<'a, B, E> {
|
impl<B: BlockT, E: ExecuteInContext<B>> NetworkLink<B, E> {
|
||||||
/// Execute closure with locked ChainSync.
|
/// Execute closure with locked ChainSync.
|
||||||
fn with_sync<F: Fn(&mut ChainSync<B>, &mut Context<B>)>(&mut self, closure: F) {
|
fn with_sync<F: Fn(&mut ChainSync<B>, &mut Context<B>)>(&self, closure: F) {
|
||||||
let service = self.context;
|
if let (Some(sync), Some(service)) = (self.sync.upgrade(), self.context.upgrade()) {
|
||||||
let sync = self.chain;
|
service.execute_in_context(move |protocol| {
|
||||||
service.execute_in_context(move |protocol| {
|
let mut sync = sync.write();
|
||||||
let mut sync = sync.write();
|
closure(&mut *sync, protocol)
|
||||||
closure(&mut *sync, protocol)
|
});
|
||||||
});
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, B: 'static + BlockT, E: 'a + ExecuteInContext<B>> SyncLinkApi<B> for SyncLink<'a, B, E> {
|
impl<B: BlockT, E: ExecuteInContext<B>> Link<B> for NetworkLink<B, E> {
|
||||||
|
|
||||||
fn chain(&self) -> &Client<B> {
|
fn chain(&self) -> &Client<B> {
|
||||||
self.client
|
&*self.client
|
||||||
}
|
}
|
||||||
|
|
||||||
fn block_imported(&mut self, hash: &B::Hash, number: NumberFor<B>) {
|
fn block_imported(&self, hash: &B::Hash, number: NumberFor<B>) {
|
||||||
self.with_sync(|sync, _| sync.block_imported(&hash, number))
|
self.with_sync(|sync, _| sync.block_imported(&hash, number))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn maintain_sync(&mut self) {
|
fn maintain_sync(&self) {
|
||||||
self.with_sync(|sync, protocol| sync.maintain_sync(protocol))
|
self.with_sync(|sync, protocol| sync.maintain_sync(protocol))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn useless_peer(&mut self, who: NodeIndex, reason: &str) {
|
fn useless_peer(&self, who: NodeIndex, reason: &str) {
|
||||||
self.with_sync(|_, protocol| protocol.report_peer(who, Severity::Useless(reason)))
|
self.with_sync(|_, protocol| protocol.report_peer(who, Severity::Useless(reason)))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn note_useless_and_restart_sync(&mut self, who: NodeIndex, reason: &str) {
|
fn note_useless_and_restart_sync(&self, who: NodeIndex, reason: &str) {
|
||||||
self.with_sync(|sync, protocol| {
|
self.with_sync(|sync, protocol| {
|
||||||
protocol.report_peer(who, Severity::Useless(reason)); // is this actually malign or just useless?
|
protocol.report_peer(who, Severity::Useless(reason)); // is this actually malign or just useless?
|
||||||
sync.restart(protocol);
|
sync.restart(protocol);
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn restart(&mut self) {
|
fn restart(&self) {
|
||||||
self.with_sync(|sync, protocol| sync.restart(protocol))
|
self.with_sync(|sync, protocol| sync.restart(protocol))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -352,8 +340,8 @@ enum BlockImportError {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Import a bunch of blocks.
|
/// Import a bunch of blocks.
|
||||||
fn import_many_blocks<'a, B: BlockT, V: Verifier<B>>(
|
fn import_many_blocks<'a, B: BlockT, L: Link<B>, V: Verifier<B>>(
|
||||||
link: &mut SyncLinkApi<B>,
|
link: &L,
|
||||||
qdata: Option<&AsyncImportQueueData<B>>,
|
qdata: Option<&AsyncImportQueueData<B>>,
|
||||||
blocks: (BlockOrigin, Vec<BlockData<B>>),
|
blocks: (BlockOrigin, Vec<BlockData<B>>),
|
||||||
verifier: Arc<V>
|
verifier: Arc<V>
|
||||||
@@ -473,7 +461,7 @@ fn import_single_block<B: BlockT, V: Verifier<B>>(
|
|||||||
|
|
||||||
/// Process single block import result.
|
/// Process single block import result.
|
||||||
fn process_import_result<'a, B: BlockT>(
|
fn process_import_result<'a, B: BlockT>(
|
||||||
link: &mut SyncLinkApi<B>,
|
link: &Link<B>,
|
||||||
result: Result<BlockImportResult<B::Hash, <<B as BlockT>::Header as HeaderT>::Number>, BlockImportError>
|
result: Result<BlockImportResult<B::Hash, <<B as BlockT>::Header as HeaderT>::Number>, BlockImportError>
|
||||||
) -> usize
|
) -> usize
|
||||||
{
|
{
|
||||||
@@ -545,7 +533,7 @@ unsafe impl<B: BlockT> Sync for ImportCB<B> {}
|
|||||||
|
|
||||||
#[cfg(any(test, feature = "test-helpers"))]
|
#[cfg(any(test, feature = "test-helpers"))]
|
||||||
/// A Verifier that accepts all blocks and passes them on with the configured
|
/// A Verifier that accepts all blocks and passes them on with the configured
|
||||||
/// finality to be imported.
|
/// finality to be imported.
|
||||||
pub struct PassThroughVerifier(pub bool);
|
pub struct PassThroughVerifier(pub bool);
|
||||||
|
|
||||||
#[cfg(any(test, feature = "test-helpers"))]
|
#[cfg(any(test, feature = "test-helpers"))]
|
||||||
@@ -585,25 +573,19 @@ impl<B: BlockT, V: Verifier<B>> SyncImportQueue<B, V> {
|
|||||||
#[cfg(any(test, feature = "test-helpers"))]
|
#[cfg(any(test, feature = "test-helpers"))]
|
||||||
impl<B: 'static + BlockT, V: 'static + Verifier<B>> ImportQueue<B> for SyncImportQueue<B, V>
|
impl<B: 'static + BlockT, V: 'static + Verifier<B>> ImportQueue<B> for SyncImportQueue<B, V>
|
||||||
{
|
{
|
||||||
fn start<E: 'static + ExecuteInContext<B>>(
|
fn start<L: 'static + Link<B>>(
|
||||||
&self,
|
&self,
|
||||||
sync: Weak<RwLock<ChainSync<B>>>,
|
link: L,
|
||||||
service: Weak<E>,
|
|
||||||
chain: Weak<Client<B>>
|
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let v = self.0.clone();
|
let v = self.0.clone();
|
||||||
self.1.set(Box::new(move | origin, new_blocks | {
|
self.1.set(Box::new(move |origin, new_blocks| {
|
||||||
let verifier = v.clone();
|
let verifier = v.clone();
|
||||||
match (sync.upgrade(), service.upgrade(), chain.upgrade()) {
|
import_many_blocks(
|
||||||
(Some(sync), Some(service), Some(chain)) =>
|
&link,
|
||||||
import_many_blocks(
|
None,
|
||||||
&mut SyncLink{chain: &sync, client: &*chain, context: &*service},
|
(origin, new_blocks),
|
||||||
None,
|
verifier,
|
||||||
(origin, new_blocks),
|
)
|
||||||
verifier,
|
|
||||||
),
|
|
||||||
_ => false
|
|
||||||
}
|
|
||||||
}));
|
}));
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -635,40 +617,51 @@ pub mod tests {
|
|||||||
use test_client::runtime::{Block, Hash};
|
use test_client::runtime::{Block, Hash};
|
||||||
use on_demand::tests::DummyExecutor;
|
use on_demand::tests::DummyExecutor;
|
||||||
use runtime_primitives::generic::BlockId;
|
use runtime_primitives::generic::BlockId;
|
||||||
|
use std::cell::Cell;
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
|
|
||||||
struct TestLink {
|
struct TestLink {
|
||||||
chain: Arc<Client<Block>>,
|
chain: Arc<Client<Block>>,
|
||||||
imported: usize,
|
imported: Cell<usize>,
|
||||||
maintains: usize,
|
maintains: Cell<usize>,
|
||||||
disconnects: usize,
|
disconnects: Cell<usize>,
|
||||||
restarts: usize,
|
restarts: Cell<usize>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TestLink {
|
impl TestLink {
|
||||||
fn new() -> TestLink {
|
fn new() -> TestLink {
|
||||||
TestLink {
|
TestLink {
|
||||||
chain: Arc::new(test_client::new()),
|
chain: Arc::new(test_client::new()),
|
||||||
imported: 0,
|
imported: Cell::new(0),
|
||||||
maintains: 0,
|
maintains: Cell::new(0),
|
||||||
disconnects: 0,
|
disconnects: Cell::new(0),
|
||||||
restarts: 0,
|
restarts: Cell::new(0),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn total(&self) -> usize {
|
fn total(&self) -> usize {
|
||||||
self.imported + self.maintains + self.disconnects + self.restarts
|
self.imported.get() + self.maintains.get() + self.disconnects.get() + self.restarts.get()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SyncLinkApi<Block> for TestLink {
|
impl Link<Block> for TestLink {
|
||||||
fn chain(&self) -> &Client<Block> { &*self.chain }
|
fn chain(&self) -> &Client<Block> { &*self.chain }
|
||||||
fn block_imported(&mut self, _hash: &Hash, _number: NumberFor<Block>) { self.imported += 1; }
|
fn block_imported(&self, _hash: &Hash, _number: NumberFor<Block>) {
|
||||||
fn maintain_sync(&mut self) { self.maintains += 1; }
|
self.imported.set(self.imported.get() + 1);
|
||||||
fn useless_peer(&mut self, _: NodeIndex, _: &str) { self.disconnects += 1; }
|
}
|
||||||
fn note_useless_and_restart_sync(&mut self, _: NodeIndex, _: &str) { self.disconnects += 1; self.restarts += 1; }
|
fn maintain_sync(&self) {
|
||||||
fn restart(&mut self) { self.restarts += 1; }
|
self.maintains.set(self.maintains.get() + 1);
|
||||||
|
}
|
||||||
|
fn useless_peer(&self, _: NodeIndex, _: &str) {
|
||||||
|
self.disconnects.set(self.disconnects.get() + 1);
|
||||||
|
}
|
||||||
|
fn note_useless_and_restart_sync(&self, id: NodeIndex, r: &str) {
|
||||||
|
self.useless_peer(id, r);
|
||||||
|
self.restart();
|
||||||
|
}
|
||||||
|
fn restart(&self) {
|
||||||
|
self.restarts.set(self.restarts.get() + 1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn prepare_good_block() -> (client::Client<test_client::Backend, test_client::Executor, Block>, Hash, u64, BlockData<Block>) {
|
fn prepare_good_block() -> (client::Client<test_client::Backend, test_client::Executor, Block>, Hash, u64, BlockData<Block>) {
|
||||||
@@ -729,39 +722,39 @@ pub mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn process_import_result_works() {
|
fn process_import_result_works() {
|
||||||
let mut link = TestLink::new();
|
let link = TestLink::new();
|
||||||
assert_eq!(process_import_result::<Block>(&mut link, Ok(BlockImportResult::ImportedKnown(Default::default(), 0))), 1);
|
assert_eq!(process_import_result::<Block>(&link, Ok(BlockImportResult::ImportedKnown(Default::default(), 0))), 1);
|
||||||
assert_eq!(link.total(), 1);
|
assert_eq!(link.total(), 1);
|
||||||
|
|
||||||
let mut link = TestLink::new();
|
let link = TestLink::new();
|
||||||
assert_eq!(process_import_result::<Block>(&mut link, Ok(BlockImportResult::ImportedKnown(Default::default(), 0))), 1);
|
assert_eq!(process_import_result::<Block>(&link, Ok(BlockImportResult::ImportedKnown(Default::default(), 0))), 1);
|
||||||
assert_eq!(link.total(), 1);
|
assert_eq!(link.total(), 1);
|
||||||
assert_eq!(link.imported, 1);
|
assert_eq!(link.imported.get(), 1);
|
||||||
|
|
||||||
let mut link = TestLink::new();
|
let link = TestLink::new();
|
||||||
assert_eq!(process_import_result::<Block>(&mut link, Ok(BlockImportResult::ImportedUnknown(Default::default(), 0))), 1);
|
assert_eq!(process_import_result::<Block>(&link, Ok(BlockImportResult::ImportedUnknown(Default::default(), 0))), 1);
|
||||||
assert_eq!(link.total(), 1);
|
assert_eq!(link.total(), 1);
|
||||||
assert_eq!(link.imported, 1);
|
assert_eq!(link.imported.get(), 1);
|
||||||
|
|
||||||
let mut link = TestLink::new();
|
let link = TestLink::new();
|
||||||
assert_eq!(process_import_result::<Block>(&mut link, Err(BlockImportError::IncompleteHeader(Some(0)))), 0);
|
assert_eq!(process_import_result::<Block>(&link, Err(BlockImportError::IncompleteHeader(Some(0)))), 0);
|
||||||
assert_eq!(link.total(), 1);
|
assert_eq!(link.total(), 1);
|
||||||
assert_eq!(link.disconnects, 1);
|
assert_eq!(link.disconnects.get(), 1);
|
||||||
|
|
||||||
let mut link = TestLink::new();
|
let link = TestLink::new();
|
||||||
assert_eq!(process_import_result::<Block>(&mut link, Err(BlockImportError::IncompleteJustification(Some(0)))), 0);
|
assert_eq!(process_import_result::<Block>(&link, Err(BlockImportError::IncompleteJustification(Some(0)))), 0);
|
||||||
assert_eq!(link.total(), 1);
|
assert_eq!(link.total(), 1);
|
||||||
assert_eq!(link.disconnects, 1);
|
assert_eq!(link.disconnects.get(), 1);
|
||||||
|
|
||||||
let mut link = TestLink::new();
|
let link = TestLink::new();
|
||||||
assert_eq!(process_import_result::<Block>(&mut link, Err(BlockImportError::UnknownParent)), 0);
|
assert_eq!(process_import_result::<Block>(&link, Err(BlockImportError::UnknownParent)), 0);
|
||||||
assert_eq!(link.total(), 1);
|
assert_eq!(link.total(), 1);
|
||||||
assert_eq!(link.restarts, 1);
|
assert_eq!(link.restarts.get(), 1);
|
||||||
|
|
||||||
let mut link = TestLink::new();
|
let link = TestLink::new();
|
||||||
assert_eq!(process_import_result::<Block>(&mut link, Err(BlockImportError::Error)), 0);
|
assert_eq!(process_import_result::<Block>(&link, Err(BlockImportError::Error)), 0);
|
||||||
assert_eq!(link.total(), 1);
|
assert_eq!(link.total(), 1);
|
||||||
assert_eq!(link.restarts, 1);
|
assert_eq!(link.restarts.get(), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@@ -782,9 +775,7 @@ pub mod tests {
|
|||||||
fn async_import_queue_drops() {
|
fn async_import_queue_drops() {
|
||||||
let verifier = Arc::new(PassThroughVerifier(true));
|
let verifier = Arc::new(PassThroughVerifier(true));
|
||||||
let queue = BasicQueue::new(verifier);
|
let queue = BasicQueue::new(verifier);
|
||||||
let service = Arc::new(DummyExecutor);
|
queue.start(TestLink::new()).unwrap();
|
||||||
let chain = Arc::new(test_client::new());
|
|
||||||
queue.start(Weak::new(), Arc::downgrade(&service), Arc::downgrade(&chain) as Weak<Client<Block>>).unwrap();
|
|
||||||
drop(queue);
|
drop(queue);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -129,7 +129,7 @@ impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> Service<B, S, H> {
|
|||||||
params: Params<B, S, H>,
|
params: Params<B, S, H>,
|
||||||
protocol_id: ProtocolId,
|
protocol_id: ProtocolId,
|
||||||
import_queue: I,
|
import_queue: I,
|
||||||
) -> Result<Arc<Service<B, S, H>>, Error> {
|
) -> Result<Arc<Service<B, S, H>>, Error> {
|
||||||
let chain = params.chain.clone();
|
let chain = params.chain.clone();
|
||||||
let import_queue = Arc::new(import_queue);
|
let import_queue = Arc::new(import_queue);
|
||||||
let handler = Arc::new(Protocol::new(
|
let handler = Arc::new(Protocol::new(
|
||||||
@@ -144,20 +144,23 @@ impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> Service<B, S, H> {
|
|||||||
let registered = RegisteredProtocol::new(protocol_id, &versions[..]);
|
let registered = RegisteredProtocol::new(protocol_id, &versions[..]);
|
||||||
let (thread, network) = start_thread(params.network_config, handler.clone(), registered)?;
|
let (thread, network) = start_thread(params.network_config, handler.clone(), registered)?;
|
||||||
|
|
||||||
let sync = Arc::new(Service {
|
let service = Arc::new(Service {
|
||||||
network,
|
network,
|
||||||
protocol_id,
|
protocol_id,
|
||||||
handler,
|
handler,
|
||||||
bg_thread: Some(thread),
|
bg_thread: Some(thread),
|
||||||
});
|
});
|
||||||
|
|
||||||
import_queue.start(
|
// connect the import-queue to the network service.
|
||||||
Arc::downgrade(sync.handler.sync()),
|
let link = ::import_queue::NetworkLink {
|
||||||
Arc::downgrade(&sync),
|
client: chain,
|
||||||
Arc::downgrade(&chain)
|
sync: Arc::downgrade(service.handler.sync()),
|
||||||
)?;
|
context: Arc::downgrade(&service),
|
||||||
|
};
|
||||||
|
|
||||||
Ok(sync)
|
import_queue.start(link)?;
|
||||||
|
|
||||||
|
Ok(service)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Called when a new block is imported by the client.
|
/// Called when a new block is imported by the client.
|
||||||
|
|||||||
@@ -158,10 +158,13 @@ impl Peer {
|
|||||||
// Update the sync state to the latest chain state.
|
// Update the sync state to the latest chain state.
|
||||||
let info = self.client.info().expect("In-mem client does not fail");
|
let info = self.client.info().expect("In-mem client does not fail");
|
||||||
let header = self.client.header(&BlockId::Hash(info.chain.best_hash)).unwrap().unwrap();
|
let header = self.client.header(&BlockId::Hash(info.chain.best_hash)).unwrap().unwrap();
|
||||||
self.import_queue.start(
|
let network_link = ::import_queue::NetworkLink {
|
||||||
Arc::downgrade(&self.sync.sync()),
|
client: self.sync.context_data().chain.clone(),
|
||||||
Arc::downgrade(&self.executor),
|
sync: Arc::downgrade(self.sync.sync()),
|
||||||
Arc::downgrade(&self.sync.context_data().chain)).expect("Test ImportQueue always starts");
|
context: Arc::downgrade(&self.executor),
|
||||||
|
};
|
||||||
|
|
||||||
|
self.import_queue.start(network_link).expect("Test ImportQueue always starts");
|
||||||
self.sync.on_block_imported(&mut TestIo::new(&self.queue, None), info.chain.best_hash, &header);
|
self.sync.on_block_imported(&mut TestIo::new(&self.queue, None), info.chain.best_hash, &header);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user