Remove blocking sync -> import-queue operations (#1818)

* remove blocking sync -> import-queue operations

add specialization to testnet

remove add peer default impl on TestNetFactory

* remove empty brackets from dummy specialization

* nits

* make mut_peers take an fn once

* add SpecializationFactory trait in test

* remove add_peer imple in grandpa, fix typo

* use cmp::max for best importing number comparison

* remove import of non-existent create_peer

* add sender to start message
This commit is contained in:
Gregory Terzian
2019-02-28 19:11:18 +08:00
committed by Bastian Köcher
parent 04fed82940
commit 8a72abffdd
7 changed files with 190 additions and 187 deletions
+5 -4
View File
@@ -699,11 +699,12 @@ mod tests {
const TEST_ROUTING_INTERVAL: Duration = Duration::from_millis(50);
pub struct AuraTestNet {
peers: Vec<Arc<Peer<()>>>,
peers: Vec<Arc<Peer<(), DummySpecialization>>>,
started: bool,
}
impl TestNetFactory for AuraTestNet {
type Specialization = DummySpecialization;
type Verifier = AuraVerifier<PeersClient, NothingExtra>;
type PeerData = ();
@@ -734,15 +735,15 @@ mod tests {
})
}
fn peer(&self, i: usize) -> &Peer<Self::PeerData> {
fn peer(&self, i: usize) -> &Peer<Self::PeerData, DummySpecialization> {
&self.peers[i]
}
fn peers(&self) -> &Vec<Arc<Peer<Self::PeerData>>> {
fn peers(&self) -> &Vec<Arc<Peer<Self::PeerData, DummySpecialization>>> {
&self.peers
}
fn mut_peers<F: Fn(&mut Vec<Arc<Peer<Self::PeerData>>>)>(&mut self, closure: F) {
fn mut_peers<F: FnOnce(&mut Vec<Arc<Peer<Self::PeerData, DummySpecialization>>>)>(&mut self, closure: F) {
closure(&mut self.peers);
}
@@ -27,12 +27,11 @@
use crate::block_import::{ImportBlock, BlockImport, JustificationImport, ImportResult, BlockOrigin};
use crossbeam_channel::{self as channel, Receiver, Sender};
use std::collections::HashSet;
use std::sync::Arc;
use std::thread;
use runtime_primitives::traits::{
AuthorityIdFor, Block as BlockT, Header as HeaderT, NumberFor, Zero,
AuthorityIdFor, Block as BlockT, Header as HeaderT, NumberFor
};
use runtime_primitives::Justification;
@@ -85,14 +84,8 @@ pub trait ImportQueue<B: BlockT>: Send + Sync + ImportQueueClone<B> {
fn start(&self, _link: Box<Link<B>>) -> Result<(), std::io::Error> {
Ok(())
}
/// Clear the queue when sync is restarting.
fn clear(&self);
/// Clears the import queue and stops importing.
fn stop(&self);
/// Get queue status.
fn status(&self) -> ImportQueueStatus<B>;
/// Is block with given hash currently in the queue.
fn is_importing(&self, hash: &B::Hash) -> bool;
/// Import bunch of blocks.
fn import_blocks(&self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>);
/// Import a block justification.
@@ -109,15 +102,6 @@ impl<B: BlockT> Clone for Box<ImportQueue<B>> {
}
}
/// Import queue status. It isn't completely accurate.
#[derive(Debug)]
pub struct ImportQueueStatus<B: BlockT> {
/// Number of blocks that are currently in the queue.
pub importing_count: usize,
/// The number of the best block that was ever in the queue since start/last failure.
pub best_importing_number: NumberFor<B>,
}
/// Interface to a basic block import queue that is importing blocks sequentially in a separate thread,
/// with pluggable verification.
#[derive(Clone)]
@@ -165,18 +149,12 @@ impl<B: BlockT> BasicQueue<B> {
impl<B: BlockT> ImportQueue<B> for BasicQueue<B> {
fn start(&self, link: Box<Link<B>>) -> Result<(), std::io::Error> {
let (sender, port) = channel::unbounded();
let _ = self
.sender
.send(BlockImportMsg::Start(link))
.expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed");
Ok(())
}
fn clear(&self) {
let _ = self
.sender
.send(BlockImportMsg::Clear)
.send(BlockImportMsg::Start(link, sender))
.expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed");
port.recv().expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed")
}
fn stop(&self) {
@@ -186,24 +164,6 @@ impl<B: BlockT> ImportQueue<B> for BasicQueue<B> {
.expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed");
}
fn status(&self) -> ImportQueueStatus<B> {
let (sender, port) = channel::unbounded();
let _ = self
.sender
.send(BlockImportMsg::Status(sender))
.expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed");
port.recv().expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed")
}
fn is_importing(&self, hash: &B::Hash) -> bool {
let (sender, port) = channel::unbounded();
let _ = self
.sender
.send(BlockImportMsg::IsImporting(hash.clone(), sender))
.expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed");
port.recv().expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed")
}
fn import_blocks(&self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
if blocks.is_empty() {
return;
@@ -224,11 +184,8 @@ impl<B: BlockT> ImportQueue<B> for BasicQueue<B> {
pub enum BlockImportMsg<B: BlockT> {
ImportBlocks(BlockOrigin, Vec<IncomingBlock<B>>),
Clear,
Status(Sender<ImportQueueStatus<B>>),
IsImporting(B::Hash, Sender<bool>),
ImportJustification(Origin, B::Hash, NumberFor<B>, Justification),
Start(Box<Link<B>>),
Start(Box<Link<B>>, Sender<Result<(), std::io::Error>>),
Stop,
}
@@ -251,8 +208,6 @@ struct BlockImporter<B: BlockT> {
port: Receiver<BlockImportMsg<B>>,
result_port: Receiver<BlockImportWorkerMsg<B>>,
worker_sender: Sender<BlockImportWorkerMsg<B>>,
queue_blocks: HashSet<B::Hash>,
best_importing_number: NumberFor<B>,
link: Option<Box<dyn Link<B>>>,
justification_import: Option<SharedJustificationImport<B>>,
}
@@ -271,8 +226,6 @@ impl<B: BlockT> BlockImporter<B> {
port,
result_port,
worker_sender,
queue_blocks: HashSet::new(),
best_importing_number: Zero::zero(),
link: None,
justification_import,
};
@@ -310,25 +263,18 @@ impl<B: BlockT> BlockImporter<B> {
match msg {
BlockImportMsg::ImportBlocks(origin, incoming_blocks) => {
self.handle_import_blocks(origin, incoming_blocks)
}
BlockImportMsg::Clear => self.handle_clear(),
BlockImportMsg::Status(reply_sender) => self.handle_status(reply_sender),
BlockImportMsg::IsImporting(hash, reply_sender) => {
self.handle_is_importing(hash, reply_sender)
}
},
BlockImportMsg::ImportJustification(who, hash, number, justification) => {
self.handle_import_justification(who, hash, number, justification)
}
BlockImportMsg::Start(link) => {
},
BlockImportMsg::Start(link, sender) => {
if let Some(justification_import) = self.justification_import.as_ref() {
justification_import.on_start(&*link);
}
self.link = Some(link);
}
BlockImportMsg::Stop => {
self.handle_clear();
return false;
}
let _ = sender.send(Ok(()));
},
BlockImportMsg::Stop => return false,
}
true
}
@@ -339,15 +285,15 @@ impl<B: BlockT> BlockImporter<B> {
_ => unreachable!("Import Worker does not send ImportBlocks message; qed"),
};
let mut has_error = false;
let mut hashes = vec![];
for (result, hash) in results {
self.queue_blocks.remove(&hash);
hashes.push(hash);
if has_error {
continue;
}
if result.is_err() {
self.best_importing_number = Zero::zero();
has_error = true;
}
@@ -389,28 +335,11 @@ impl<B: BlockT> BlockImporter<B> {
};
}
if let Some(link) = self.link.as_ref() {
link.maintain_sync();
link.blocks_processed(hashes, has_error);
}
true
}
fn handle_clear(&mut self) {
self.queue_blocks.clear();
self.best_importing_number = Zero::zero();
}
fn handle_status(&self, reply_sender: Sender<ImportQueueStatus<B>>) {
let status = ImportQueueStatus {
importing_count: self.queue_blocks.len(),
best_importing_number: self.best_importing_number,
};
let _ = reply_sender.send(status);
}
fn handle_is_importing(&self, hash: B::Hash, reply_sender: Sender<bool>) {
let _ = reply_sender.send(self.queue_blocks.contains(&hash));
}
fn handle_import_justification(&self, who: Origin, hash: B::Hash, number: NumberFor<B>, justification: Justification) {
let success = self.justification_import.as_ref().map(|justification_import| {
justification_import.import_justification(hash, number, justification)
@@ -426,16 +355,6 @@ impl<B: BlockT> BlockImporter<B> {
fn handle_import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
trace!(target:"sync", "Scheduling {} blocks for import", blocks.len());
let new_best_importing_number = blocks
.last()
.and_then(|b| b.header.as_ref().map(|h| h.number().clone()))
.unwrap_or_else(|| Zero::zero());
self.queue_blocks
.extend(blocks.iter().map(|b| b.hash.clone()));
if new_best_importing_number > self.best_importing_number {
self.best_importing_number = new_best_importing_number;
}
self.worker_sender
.send(BlockImportWorkerMsg::ImportBlocks(origin, blocks))
.expect("1. This is holding a sender to the worker, 2. the worker should not quit while a sender is still held; qed");
@@ -530,12 +449,12 @@ impl<B: BlockT, V: 'static + Verifier<B>> BlockImportWorker<B, V> {
pub trait Link<B: BlockT>: Send {
/// Block imported.
fn block_imported(&self, _hash: &B::Hash, _number: NumberFor<B>) { }
/// Batch of blocks imported, with or without error.
fn blocks_processed(&self, _processed_blocks: Vec<B::Hash>, _has_error: bool) {}
/// Justification import result.
fn justification_imported(&self, _who: Origin, _hash: &B::Hash, _number: NumberFor<B>, _success: bool) { }
/// Request a justification for the given block.
fn request_justification(&self, _hash: &B::Hash, _number: NumberFor<B>) { }
/// Maintain sync.
fn maintain_sync(&self) {}
/// Disconnect from peer.
fn useless_peer(&self, _who: Origin, _reason: &str) {}
/// Disconnect from peer and restart sync.
@@ -674,8 +593,6 @@ mod tests {
fn block_imported(&self, _hash: &Hash, _number: NumberFor<Block>) {
let _ = self.sender.send(LinkMsg::BlockImported);
}
fn maintain_sync(&self) {
}
fn useless_peer(&self, _: Origin, _: &str) {
let _ = self.sender.send(LinkMsg::Disconnected);
}
@@ -695,12 +612,11 @@ mod tests {
let (link_sender, link_port) = channel::unbounded();
let importer_sender = BlockImporter::<Block>::new(result_port, worker_sender, None);
let link = TestLink::new(link_sender);
let _ = importer_sender.send(BlockImportMsg::Start(Box::new(link.clone())));
let (ack_sender, start_ack_port) = channel::bounded(4);
let _ = importer_sender.send(BlockImportMsg::Start(Box::new(link.clone()), ack_sender));
// Ensure the importer handles Start before any result messages.
let (ack_sender, ack_port) = channel::unbounded();
let _ = importer_sender.send(BlockImportMsg::Status(ack_sender));
let _ = ack_port.recv();
let _ = start_ack_port.recv();
// Send a known
let results = vec![(Ok(BlockImportResult::ImportedKnown(Default::default())), Default::default())];
+6 -7
View File
@@ -17,7 +17,7 @@
//! Tests and test helpers for GRANDPA.
use super::*;
use network::test::{Block, Hash, TestNetFactory, Peer, PeersClient};
use network::test::{Block, DummySpecialization, Hash, TestNetFactory, Peer, PeersClient};
use network::test::{PassThroughVerifier};
use network::config::{ProtocolConfig, Roles};
use parking_lot::Mutex;
@@ -52,12 +52,12 @@ type PeerData =
>
>
>;
type GrandpaPeer = Peer<PeerData>;
type GrandpaPeer = Peer<PeerData, DummySpecialization>;
struct GrandpaTestNet {
peers: Vec<Arc<GrandpaPeer>>,
test_config: TestApi,
started: bool
started: bool,
}
impl GrandpaTestNet {
@@ -68,16 +68,15 @@ impl GrandpaTestNet {
test_config,
};
let config = Self::default_config();
for _ in 0..n_peers {
net.add_peer(&config);
}
net
}
}
impl TestNetFactory for GrandpaTestNet {
type Specialization = DummySpecialization;
type Verifier = PassThroughVerifier;
type PeerData = PeerData;
@@ -86,7 +85,7 @@ impl TestNetFactory for GrandpaTestNet {
GrandpaTestNet {
peers: Vec::new(),
test_config: Default::default(),
started: false
started: false,
}
}
@@ -122,7 +121,7 @@ impl TestNetFactory for GrandpaTestNet {
&self.peers
}
fn mut_peers<F: Fn(&mut Vec<Arc<GrandpaPeer>>)>(&mut self, closure: F) {
fn mut_peers<F: FnOnce(&mut Vec<Arc<GrandpaPeer>>)>(&mut self, closure: F) {
closure(&mut self.peers);
}
+6 -14
View File
@@ -206,8 +206,8 @@ impl<B: BlockT, F: FnOnce(&mut ConsensusGossip<B>, &mut Context<B>)> GossipTask<
/// Messages sent to Protocol from elsewhere inside the system.
pub enum ProtocolMsg<B: BlockT, S: NetworkSpecialization<B>> {
/// Tell protocol to maintain sync.
MaintainSync,
/// A batch of blocks has been processed, with or without errors.
BlocksProcessed(Vec<B::Hash>, bool),
/// Tell protocol to restart sync.
RestartSync,
/// Ask the protocol for its status.
@@ -367,11 +367,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
ProtocolMsg::GossipConsensusMessage(topic, engine_id, message) => {
self.gossip_consensus_message(topic, engine_id, message)
}
ProtocolMsg::MaintainSync => {
ProtocolMsg::BlocksProcessed(hashes, has_error) => {
self.sync.blocks_processed(hashes, has_error);
let mut context =
ProtocolContext::new(&mut self.context_data, &self.network_chan);
self.sync.maintain_sync(&mut context);
}
},
ProtocolMsg::RestartSync => {
let mut context =
ProtocolContext::new(&mut self.context_data, &self.network_chan);
@@ -624,16 +625,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
response,
);
} else {
// import_queue.import_blocks also acquires sync.write();
// Break the cycle by doing these separately from the outside;
let new_blocks = {
self.sync.on_block_data(&mut ProtocolContext::new(&mut self.context_data, &self.network_chan), peer, request, response)
};
if let Some((origin, new_blocks)) = new_blocks {
let import_queue = self.sync.import_queue();
import_queue.import_blocks(origin, new_blocks);
}
self.sync.on_block_data(&mut ProtocolContext::new(&mut self.context_data, &self.network_chan), peer, request, response);
}
}
+5 -4
View File
@@ -71,6 +71,7 @@ pub trait TransactionPool<H: ExHashT, B: BlockT>: Send + Sync {
}
/// A link implementation that connects to the network.
#[derive(Clone)]
pub struct NetworkLink<B: BlockT, S: NetworkSpecialization<B>> {
/// The protocol sender
pub(crate) protocol_sender: Sender<ProtocolMsg<B, S>>,
@@ -83,6 +84,10 @@ impl<B: BlockT, S: NetworkSpecialization<B>> Link<B> for NetworkLink<B, S> {
let _ = self.protocol_sender.send(ProtocolMsg::BlockImportedSync(hash.clone(), number));
}
fn blocks_processed(&self, processed_blocks: Vec<B::Hash>, has_error: bool) {
let _ = self.protocol_sender.send(ProtocolMsg::BlocksProcessed(processed_blocks, has_error));
}
fn justification_imported(&self, who: NodeIndex, hash: &B::Hash, number: NumberFor<B>, success: bool) {
let _ = self.protocol_sender.send(ProtocolMsg::JustificationImportResult(hash.clone(), number, success));
if !success {
@@ -95,10 +100,6 @@ impl<B: BlockT, S: NetworkSpecialization<B>> Link<B> for NetworkLink<B, S> {
let _ = self.protocol_sender.send(ProtocolMsg::RequestJustification(hash.clone(), number));
}
fn maintain_sync(&self) {
let _ = self.protocol_sender.send(ProtocolMsg::MaintainSync);
}
fn useless_peer(&self, who: NodeIndex, reason: &str) {
trace!(target:"sync", "Useless peer {}, {}", who, reason);
self.network_sender.send(NetworkMsg::ReportPeer(who, Severity::Useless(reason.to_string())));
+38 -20
View File
@@ -14,6 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::cmp::max;
use std::collections::{HashMap, VecDeque};
use std::time::{Duration, Instant};
use log::{debug, trace, warn};
@@ -26,10 +27,11 @@ use consensus::import_queue::{ImportQueue, IncomingBlock};
use client::error::Error as ClientError;
use crate::blocks::BlockCollection;
use runtime_primitives::Justification;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, NumberFor};
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, NumberFor, Zero};
use runtime_primitives::generic::BlockId;
use crate::message::{self, generic::Message as GenericMessage};
use crate::config::Roles;
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
@@ -303,6 +305,8 @@ pub struct ChainSync<B: BlockT> {
required_block_attributes: message::BlockAttributes,
justifications: PendingJustifications<B>,
import_queue: Box<ImportQueue<B>>,
queue_blocks: HashSet<B::Hash>,
best_importing_number: NumberFor<B>,
is_stopping: AtomicBool,
is_offline: Arc<AtomicBool>,
is_major_syncing: Arc<AtomicBool>,
@@ -367,6 +371,8 @@ impl<B: BlockT> ChainSync<B> {
justifications: PendingJustifications::new(),
required_block_attributes,
import_queue,
queue_blocks: Default::default(),
best_importing_number: Zero::zero(),
is_stopping: Default::default(),
is_offline,
is_major_syncing,
@@ -377,11 +383,6 @@ impl<B: BlockT> ChainSync<B> {
self.peers.values().max_by_key(|p| p.best_number).map(|p| p.best_number)
}
/// Returns import queue reference.
pub(crate) fn import_queue(&self) -> Box<ImportQueue<B>> {
self.import_queue.clone()
}
fn state(&self, best_seen: &Option<NumberFor<B>>) -> SyncState {
match best_seen {
&Some(n) if n > self.best_queued_number && n - self.best_queued_number > As::sa(5) => SyncState::Downloading,
@@ -410,7 +411,7 @@ impl<B: BlockT> ChainSync<B> {
let previous_state = self.state(&previous_best_seen);
if let Some(info) = protocol.peer_info(who) {
match (block_status(&*protocol.client(), &*self.import_queue, info.best_hash), info.best_number) {
match (block_status(&*protocol.client(), &self.queue_blocks, info.best_hash), info.best_number) {
(Err(e), _) => {
debug!(target:"sync", "Error reading blockchain: {:?}", e);
let reason = format!("Error legimimately reading blockchain status: {:?}", e);
@@ -424,7 +425,7 @@ impl<B: BlockT> ChainSync<B> {
let reason = format!("New peer with unknown genesis hash {} ({}).", info.best_hash, info.best_number);
protocol.report_peer(who, Severity::Bad(reason));
},
(Ok(BlockStatus::Unknown), _) if self.import_queue.status().importing_count > MAJOR_SYNC_BLOCKS => {
(Ok(BlockStatus::Unknown), _) if self.queue_blocks.len() > MAJOR_SYNC_BLOCKS => {
// when actively syncing the common point moves too fast.
debug!(target:"sync", "New peer with unknown best hash {} ({}), assuming common block.", self.best_queued_hash, self.best_queued_number);
self.peers.insert(who, PeerSync {
@@ -498,7 +499,7 @@ impl<B: BlockT> ChainSync<B> {
who: NodeIndex,
request: message::BlockRequest<B>,
response: message::BlockResponse<B>
) -> Option<(BlockOrigin, Vec<IncomingBlock<B>>)> {
) {
let new_blocks: Vec<IncomingBlock<B>> = if let Some(ref mut peer) = self.peers.get_mut(&who) {
let mut blocks = response.blocks;
if request.direction == message::Direction::Descending {
@@ -553,24 +554,24 @@ impl<B: BlockT> ChainSync<B> {
let n = n - As::sa(1);
peer.state = PeerSyncState::AncestorSearch(n);
Self::request_ancestry(protocol, who, n);
return None;
return;
},
Ok(_) => { // genesis mismatch
trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", who);
protocol.report_peer(who, Severity::Bad("Ancestry search: genesis mismatch for peer".to_string()));
return None;
return;
},
Err(e) => {
let reason = format!("Error answering legitimate blockchain query: {:?}", e);
protocol.report_peer(who, Severity::Useless(reason));
return None;
return;
}
}
},
None => {
trace!(target:"sync", "Invalid response when searching for ancestor from {}", who);
protocol.report_peer(who, Severity::Bad("Invalid response when searching for ancestor".to_string()));
return None;
return;
}
}
},
@@ -593,7 +594,14 @@ impl<B: BlockT> ChainSync<B> {
self.block_queued(&hash, number);
}
self.maintain_sync(protocol);
Some((origin, new_blocks))
let new_best_importing_number = new_blocks
.last()
.and_then(|b| b.header.as_ref().map(|h| h.number().clone()))
.unwrap_or_else(|| Zero::zero());
self.queue_blocks
.extend(new_blocks.iter().map(|b| b.hash.clone()));
self.best_importing_number = max(new_best_importing_number, self.best_importing_number);
self.import_queue.import_blocks(origin, new_blocks);
}
/// Handle new justification data.
@@ -644,6 +652,16 @@ impl<B: BlockT> ChainSync<B> {
self.maintain_sync(protocol);
}
/// A batch of blocks have been processed, with or without errors.
pub fn blocks_processed(&mut self, processed_blocks: Vec<B::Hash>, has_error: bool) {
for hash in processed_blocks {
self.queue_blocks.remove(&hash);
}
if has_error {
self.best_importing_number = Zero::zero();
}
}
/// Maintain the sync process (download new blocks, fetch justifications).
pub fn maintain_sync(&mut self, protocol: &mut Context<B>) {
if self.is_stopping.load(Ordering::SeqCst) {
@@ -787,7 +805,7 @@ impl<B: BlockT> ChainSync<B> {
}
fn is_known(&self, protocol: &mut Context<B>, hash: &B::Hash) -> bool {
block_status(&*protocol.client(), &*self.import_queue, *hash).ok().map_or(false, |s| s != BlockStatus::Unknown)
block_status(&*protocol.client(), &self.queue_blocks, *hash).ok().map_or(false, |s| s != BlockStatus::Unknown)
}
/// Handle disconnected peer.
@@ -813,7 +831,8 @@ impl<B: BlockT> ChainSync<B> {
/// Restart the sync process.
pub(crate) fn restart(&mut self, protocol: &mut Context<B>) {
self.import_queue.clear();
self.queue_blocks.clear();
self.best_importing_number = Zero::zero();
self.blocks.clear();
match protocol.client().info() {
Ok(info) => {
@@ -884,9 +903,8 @@ impl<B: BlockT> ChainSync<B> {
// Issue a request for a peer to download new blocks, if any are available
fn download_new(&mut self, protocol: &mut Context<B>, who: NodeIndex) {
if let Some(ref mut peer) = self.peers.get_mut(&who) {
let import_status = self.import_queue.status();
// when there are too many blocks in the queue => do not try to download new blocks
if import_status.importing_count > MAX_IMPORTING_BLOCKS {
if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS {
trace!(target: "sync", "Too many blocks in the queue.");
return;
}
@@ -931,10 +949,10 @@ impl<B: BlockT> ChainSync<B> {
/// Get block status, taking into account import queue.
fn block_status<B: BlockT>(
chain: &crate::chain::Client<B>,
queue: &ImportQueue<B>,
queue_blocks: &HashSet<B::Hash>,
hash: B::Hash) -> Result<BlockStatus, ClientError>
{
if queue.is_importing(&hash) {
if queue_blocks.contains(&hash) {
return Ok(BlockStatus::Queued);
}
+111 -35
View File
@@ -93,7 +93,8 @@ pub struct NoopLink { }
impl<B: BlockT> Link<B> for NoopLink { }
/// The test specialization.
pub struct DummySpecialization { }
#[derive(Clone)]
pub struct DummySpecialization;
impl NetworkSpecialization<Block> for DummySpecialization {
fn status(&self) -> Vec<u8> {
@@ -117,23 +118,92 @@ impl NetworkSpecialization<Block> for DummySpecialization {
pub type PeersClient = client::Client<test_client::Backend, test_client::Executor, Block, test_client::runtime::RuntimeApi>;
pub struct Peer<D> {
#[derive(Clone)]
/// A Link that can wait for a block to have been imported.
pub struct TestLink<S: NetworkSpecialization<Block> + Clone> {
import_done: Arc<AtomicBool>,
hash: Arc<Mutex<Hash>>,
link: NetworkLink<Block, S>,
}
impl<S: NetworkSpecialization<Block> + Clone> TestLink<S> {
fn new(
protocol_sender: Sender<ProtocolMsg<Block, S>>,
network_sender: NetworkChan<Block>
) -> TestLink<S> {
TestLink {
import_done: Arc::new(AtomicBool::new(false)),
hash: Arc::new(Mutex::new(Default::default())),
link: NetworkLink {
protocol_sender,
network_sender,
}
}
}
/// Set the hash which will be awaited for import.
fn with_hash(&self, hash: Hash) {
self.import_done.store(false, Ordering::SeqCst);
*self.hash.lock() = hash;
}
/// Simulate a synchronous import.
fn wait_for_import(&self) {
while !self.import_done.load(Ordering::SeqCst) {
thread::sleep(Duration::from_millis(20));
}
}
}
impl<S: NetworkSpecialization<Block> + Clone> Link<Block> for TestLink<S> {
fn block_imported(&self, hash: &Hash, number: NumberFor<Block>) {
if hash == &*self.hash.lock() {
self.import_done.store(true, Ordering::SeqCst);
}
self.link.block_imported(hash, number);
}
fn blocks_processed(&self, processed_blocks: Vec<Hash>, has_error: bool) {
self.link.blocks_processed(processed_blocks, has_error);
}
fn justification_imported(&self, who: NodeIndex, hash: &Hash, number:NumberFor<Block>, success: bool) {
self.link.justification_imported(who, hash, number, success);
}
fn request_justification(&self, hash: &Hash, number: NumberFor<Block>) {
self.link.request_justification(hash, number);
}
fn useless_peer(&self, who: NodeIndex, reason: &str) {
self.link.useless_peer(who, reason);
}
fn note_useless_and_restart_sync(&self, who: NodeIndex, reason: &str) {
self.link.note_useless_and_restart_sync(who, reason);
}
fn restart(&self) {
self.link.restart();
}
}
pub struct Peer<D, S: NetworkSpecialization<Block> + Clone> {
pub is_offline: Arc<AtomicBool>,
pub is_major_syncing: Arc<AtomicBool>,
pub peers: Arc<RwLock<HashMap<NodeIndex, ConnectedPeer<Block>>>>,
client: Arc<PeersClient>,
network_to_protocol_sender: Sender<FromNetworkMsg<Block>>,
pub protocol_sender: Sender<ProtocolMsg<Block, DummySpecialization>>,
network_port: Mutex<NetworkPort<Block>>,
pub protocol_sender: Sender<ProtocolMsg<Block, S>>,
network_link: TestLink<S>,
network_port: Arc<Mutex<NetworkPort<Block>>>,
pub import_queue: Box<ImportQueue<Block>>,
network_sender: NetworkChan<Block>,
pub data: D,
best_hash: Mutex<Option<H256>>,
finalized_hash: Mutex<Option<H256>>,
}
impl<D> Peer<D> {
impl<D, S: NetworkSpecialization<Block> + Clone> Peer<D, S> {
fn new(
is_offline: Arc<AtomicBool>,
is_major_syncing: Arc<AtomicBool>,
@@ -141,12 +211,14 @@ impl<D> Peer<D> {
client: Arc<PeersClient>,
import_queue: Box<ImportQueue<Block>>,
network_to_protocol_sender: Sender<FromNetworkMsg<Block>>,
protocol_sender: Sender<ProtocolMsg<Block, DummySpecialization>>,
protocol_sender: Sender<ProtocolMsg<Block, S>>,
network_sender: NetworkChan<Block>,
network_port: NetworkPort<Block>,
data: D,
) -> Self {
let network_port = Mutex::new(network_port);
let network_port = Arc::new(Mutex::new(network_port));
let network_link = TestLink::new(protocol_sender.clone(), network_sender.clone());
import_queue.start(Box::new(network_link.clone())).expect("Test ImportQueue always starts");
Peer {
is_offline,
is_major_syncing,
@@ -155,7 +227,7 @@ impl<D> Peer<D> {
network_to_protocol_sender,
protocol_sender,
import_queue,
network_sender,
network_link,
network_port,
data,
best_hash: Mutex::new(None),
@@ -171,12 +243,6 @@ impl<D> Peer<D> {
.header(&BlockId::Hash(info.chain.best_hash))
.unwrap()
.unwrap();
let network_link = NetworkLink {
protocol_sender: self.protocol_sender.clone(),
network_sender: self.network_sender.clone(),
};
self.import_queue.start(Box::new(network_link)).expect("Test ImportQueue always starts");
let _ = self
.protocol_sender
.send(ProtocolMsg::BlockImported(info.chain.best_hash, header));
@@ -237,8 +303,7 @@ impl<D> Peer<D> {
/// Whether this peer is done syncing (has no messages to send).
fn is_done(&self) -> bool {
self.import_queue.status().importing_count == 0 &&
self.network_port.lock().receiver().is_empty()
self.network_port.lock().receiver().is_empty()
}
/// Execute a "sync step". This is called for each peer after it sends a packet.
@@ -362,7 +427,7 @@ impl<D> Peer<D> {
);
let header = block.header.clone();
at = hash;
self.network_link.with_hash(hash);
self.import_queue.import_blocks(
origin,
vec![IncomingBlock {
@@ -373,10 +438,8 @@ impl<D> Peer<D> {
justification: None,
}],
);
// Simulate a synchronous import.
while self.import_queue.status().importing_count > 0 {
thread::sleep(Duration::from_millis(20));
}
// Simulate a sync import.
self.network_link.wait_for_import();
}
at
}
@@ -436,7 +499,18 @@ impl TransactionPool<Hash, Block> for EmptyTransactionPool {
fn on_broadcasted(&self, _: HashMap<Hash, Vec<String>>) {}
}
pub trait SpecializationFactory {
fn create() -> Self;
}
impl SpecializationFactory for DummySpecialization {
fn create() -> DummySpecialization {
DummySpecialization
}
}
pub trait TestNetFactory: Sized {
type Specialization: NetworkSpecialization<Block> + Clone + SpecializationFactory;
type Verifier: 'static + Verifier<Block>;
type PeerData: Default;
@@ -445,9 +519,9 @@ pub trait TestNetFactory: Sized {
fn make_verifier(&self, client: Arc<PeersClient>, config: &ProtocolConfig) -> Arc<Self::Verifier>;
/// Get reference to peer.
fn peer(&self, i: usize) -> &Peer<Self::PeerData>;
fn peers(&self) -> &Vec<Arc<Peer<Self::PeerData>>>;
fn mut_peers<F: Fn(&mut Vec<Arc<Peer<Self::PeerData>>>)>(&mut self, closure: F);
fn peer(&self, i: usize) -> &Peer<Self::PeerData, Self::Specialization>;
fn peers(&self) -> &Vec<Arc<Peer<Self::PeerData, Self::Specialization>>>;
fn mut_peers<F: FnOnce(&mut Vec<Arc<Peer<Self::PeerData, Self::Specialization>>>)>(&mut self, closure: F);
fn started(&self) -> bool;
fn set_started(&mut self, now: bool);
@@ -483,9 +557,9 @@ pub trait TestNetFactory: Sized {
let (network_sender, network_port) = network_channel(ProtocolId::default());
let import_queue = Box::new(BasicQueue::new(verifier, block_import, justification_import));
let specialization = DummySpecialization {};
let is_offline = Arc::new(AtomicBool::new(true));
let is_major_syncing = Arc::new(AtomicBool::new(false));
let specialization = self::SpecializationFactory::create();
let peers: Arc<RwLock<HashMap<NodeIndex, ConnectedPeer<Block>>>> = Arc::new(Default::default());
let (protocol_sender, network_to_protocol_sender) = Protocol::new(
is_offline.clone(),
@@ -514,7 +588,7 @@ pub trait TestNetFactory: Sized {
));
self.mut_peers(|peers| {
peers.push(peer.clone())
peers.push(peer)
});
}
@@ -666,11 +740,12 @@ pub trait TestNetFactory: Sized {
}
pub struct TestNet {
peers: Vec<Arc<Peer<()>>>,
peers: Vec<Arc<Peer<(), DummySpecialization>>>,
started: bool,
}
impl TestNetFactory for TestNet {
type Specialization = DummySpecialization;
type Verifier = PassThroughVerifier;
type PeerData = ();
@@ -688,15 +763,15 @@ impl TestNetFactory for TestNet {
Arc::new(PassThroughVerifier(false))
}
fn peer(&self, i: usize) -> &Peer<()> {
fn peer(&self, i: usize) -> &Peer<(), Self::Specialization> {
&self.peers[i]
}
fn peers(&self) -> &Vec<Arc<Peer<()>>> {
fn peers(&self) -> &Vec<Arc<Peer<(), Self::Specialization>>> {
&self.peers
}
fn mut_peers<F: Fn(&mut Vec<Arc<Peer<()>>>)>(&mut self, closure: F) {
fn mut_peers<F: FnOnce(&mut Vec<Arc<Peer<(), Self::Specialization>>>)>(&mut self, closure: F) {
closure(&mut self.peers);
}
@@ -728,6 +803,7 @@ impl JustificationImport<Block> for ForceFinalized {
pub struct JustificationTestNet(TestNet);
impl TestNetFactory for JustificationTestNet {
type Specialization = DummySpecialization;
type Verifier = PassThroughVerifier;
type PeerData = ();
@@ -741,15 +817,15 @@ impl TestNetFactory for JustificationTestNet {
self.0.make_verifier(client, config)
}
fn peer(&self, i: usize) -> &Peer<Self::PeerData> {
fn peer(&self, i: usize) -> &Peer<Self::PeerData, Self::Specialization> {
self.0.peer(i)
}
fn peers(&self) -> &Vec<Arc<Peer<Self::PeerData>>> {
fn peers(&self) -> &Vec<Arc<Peer<Self::PeerData, Self::Specialization>>> {
self.0.peers()
}
fn mut_peers<F: Fn(&mut Vec<Arc<Peer<Self::PeerData>>>)>(&mut self, closure: F ) {
fn mut_peers<F: FnOnce(&mut Vec<Arc<Peer<Self::PeerData, Self::Specialization>>>)>(&mut self, closure: F ) {
self.0.mut_peers(closure)
}