fix race in sync tests (#2259)

This commit is contained in:
Svyatoslav Nikolsky
2019-04-12 10:32:00 +03:00
committed by Bastian Köcher
parent 7f59cdb900
commit 5725e25448
10 changed files with 316 additions and 241 deletions
+1 -1
View File
@@ -950,7 +950,7 @@ mod tests {
let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL)
.for_each(move |_| {
net.lock().send_import_notifications();
net.lock().route_fast();
net.lock().sync_without_disconnects();
Ok(())
})
.map(|_| ())
@@ -20,3 +20,7 @@ parity-codec = { version = "3.3", features = ["derive"] }
[dev-dependencies]
test_client = { package = "substrate-test-client", path = "../../test-client" }
[features]
default = []
test-helpers = []
@@ -149,6 +149,18 @@ impl<B: BlockT> BasicQueue<B> {
sender: importer_sender,
}
}
/// Send synchronization request to the block import channel.
///
/// The caller should wait for Link::synchronized() call to ensure that it has synchronized
/// with ImportQueue.
#[cfg(any(test, feature = "test-helpers"))]
pub fn synchronize(&self) {
self
.sender
.send(BlockImportMsg::Synchronize)
.expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed");
}
}
impl<B: BlockT> ImportQueue<B> for BasicQueue<B> {
@@ -191,6 +203,8 @@ pub enum BlockImportMsg<B: BlockT> {
ImportJustification(Origin, B::Hash, NumberFor<B>, Justification),
Start(Box<Link<B>>, Sender<Result<(), std::io::Error>>),
Stop,
#[cfg(any(test, feature = "test-helpers"))]
Synchronize,
}
pub enum BlockImportWorkerMsg<B: BlockT> {
@@ -201,6 +215,8 @@ pub enum BlockImportWorkerMsg<B: BlockT> {
B::Hash,
)>,
),
#[cfg(any(test, feature = "test-helpers"))]
Synchronize,
}
enum ImportMsgType<B: BlockT> {
@@ -279,13 +295,32 @@ impl<B: BlockT> BlockImporter<B> {
let _ = sender.send(Ok(()));
},
BlockImportMsg::Stop => return false,
#[cfg(any(test, feature = "test-helpers"))]
BlockImportMsg::Synchronize => {
self.worker_sender
.send(BlockImportWorkerMsg::Synchronize)
.expect("1. This is holding a sender to the worker, 2. the worker should not quit while a sender is still held; qed");
},
}
true
}
fn handle_worker_msg(&mut self, msg: BlockImportWorkerMsg<B>) -> bool {
let link = match self.link.as_ref() {
Some(link) => link,
None => {
trace!(target: "sync", "Received import result while import-queue has no link");
return true;
},
};
let results = match msg {
BlockImportWorkerMsg::Imported(results) => (results),
#[cfg(any(test, feature = "test-helpers"))]
BlockImportWorkerMsg::Synchronize => {
link.synchronized();
return true;
},
_ => unreachable!("Import Worker does not send ImportBlocks message; qed"),
};
let mut has_error = false;
@@ -301,14 +336,6 @@ impl<B: BlockT> BlockImporter<B> {
has_error = true;
}
let link = match self.link.as_ref() {
Some(link) => link,
None => {
trace!(target: "sync", "Received import result for {} while import-queue has no link", hash);
return true;
},
};
match result {
Ok(BlockImportResult::ImportedKnown(number)) => link.block_imported(&hash, number),
Ok(BlockImportResult::ImportedUnknown(number, aux, who)) => {
@@ -403,8 +430,12 @@ impl<B: BlockT, V: 'static + Verifier<B>> BlockImportWorker<B, V> {
// Working until all senders have been dropped...
match msg {
BlockImportWorkerMsg::ImportBlocks(origin, blocks) => {
worker.import_a_batch_of_blocks(origin, blocks)
}
worker.import_a_batch_of_blocks(origin, blocks);
},
#[cfg(any(test, feature = "test-helpers"))]
BlockImportWorkerMsg::Synchronize => {
let _ = worker.result_sender.send(BlockImportWorkerMsg::Synchronize);
},
_ => unreachable!("Import Worker does not receive the Imported message; qed"),
}
}
@@ -480,6 +511,9 @@ pub trait Link<B: BlockT>: Send {
fn note_useless_and_restart_sync(&self, _who: Origin, _reason: &str) {}
/// Restart sync.
fn restart(&self) {}
/// Synchronization request has been processed.
#[cfg(any(test, feature = "test-helpers"))]
fn synchronized(&self) {}
}
/// Block import successful result.
@@ -25,6 +25,7 @@ fg_primitives = { package = "substrate-finality-grandpa-primitives", path = "pri
grandpa = { package = "finality-grandpa", version = "0.7.1", features = ["derive-codec"] }
[dev-dependencies]
consensus_common = { package = "substrate-consensus-common", path = "../consensus/common", features = ["test-helpers"] }
network = { package = "substrate-network", path = "../network", features = ["test-helpers"] }
keyring = { package = "substrate-keyring", path = "../keyring" }
test_client = { package = "substrate-test-client", path = "../test-client"}
+5 -5
View File
@@ -419,7 +419,7 @@ fn run_to_completion_with<F: FnOnce()>(
.for_each(move |_| {
net.lock().send_import_notifications();
net.lock().send_finality_notifications();
net.lock().route_fast();
net.lock().sync_without_disconnects();
Ok(())
})
.map(|_| ())
@@ -515,7 +515,7 @@ fn finalize_3_voters_1_observer() {
.map_err(|_| ());
let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL)
.for_each(move |_| { net.lock().route_fast(); Ok(()) })
.for_each(move |_| { net.lock().sync_without_disconnects(); Ok(()) })
.map(|_| ())
.map_err(|_| ());
@@ -680,7 +680,7 @@ fn transition_3_voters_twice_1_observer() {
.for_each(move |_| {
net.lock().send_import_notifications();
net.lock().send_finality_notifications();
net.lock().route_fast();
net.lock().sync_without_disconnects();
Ok(())
})
.map(|_| ())
@@ -789,7 +789,7 @@ fn sync_justifications_on_change_blocks() {
// the last peer should get the justification by syncing from other peers
while net.lock().peer(3).client().justification(&BlockId::Number(21)).unwrap().is_none() {
net.lock().route_fast();
net.lock().sync_without_disconnects();
}
}
@@ -1198,7 +1198,7 @@ fn voter_persists_its_votes() {
.for_each(move |_| {
net.lock().send_import_notifications();
net.lock().send_finality_notifications();
net.lock().route_fast();
net.lock().sync_without_disconnects();
Ok(())
})
.map(|_| ())
+1
View File
@@ -36,6 +36,7 @@ test_client = { package = "substrate-test-client", path = "../../core/test-clien
env_logger = { version = "0.6" }
keyring = { package = "substrate-keyring", path = "../../core/keyring" }
test_client = { package = "substrate-test-client", path = "../../core/test-client" }
consensus = { package = "substrate-consensus-common", path = "../../core/consensus/common", features = ["test-helpers"] }
[features]
default = []
+20 -10
View File
@@ -187,25 +187,25 @@ struct ContextData<B: BlockT, H: ExHashT> {
}
/// A task, consisting of a user-provided closure, to be executed on the Protocol thread.
pub trait SpecTask<B: BlockT, S: NetworkSpecialization<B>> {
fn call_box(self: Box<Self>, spec: &mut S, context: &mut Context<B>);
pub trait SpecTask<B: BlockT, S: NetworkSpecialization<B>> {
fn call_box(self: Box<Self>, spec: &mut S, context: &mut Context<B>);
}
impl<B: BlockT, S: NetworkSpecialization<B>, F: FnOnce(&mut S, &mut Context<B>)> SpecTask<B, S> for F {
fn call_box(self: Box<F>, spec: &mut S, context: &mut Context<B>) {
(*self)(spec, context)
}
fn call_box(self: Box<F>, spec: &mut S, context: &mut Context<B>) {
(*self)(spec, context)
}
}
/// A task, consisting of a user-provided closure, to be executed on the Protocol thread.
pub trait GossipTask<B: BlockT> {
fn call_box(self: Box<Self>, gossip: &mut ConsensusGossip<B>, context: &mut Context<B>);
pub trait GossipTask<B: BlockT> {
fn call_box(self: Box<Self>, gossip: &mut ConsensusGossip<B>, context: &mut Context<B>);
}
impl<B: BlockT, F: FnOnce(&mut ConsensusGossip<B>, &mut Context<B>)> GossipTask<B> for F {
fn call_box(self: Box<F>, gossip: &mut ConsensusGossip<B>, context: &mut Context<B>) {
(*self)(gossip, context)
}
fn call_box(self: Box<F>, gossip: &mut ConsensusGossip<B>, context: &mut Context<B>) {
(*self)(gossip, context)
}
}
/// Messages sent to Protocol from elsewhere inside the system.
@@ -246,6 +246,9 @@ pub enum ProtocolMsg<B: BlockT, S: NetworkSpecialization<B>> {
Stop,
/// Tell protocol to perform regular maintenance.
Tick,
/// Synchronization request.
#[cfg(any(test, feature = "test-helpers"))]
Synchronize,
}
/// Messages sent to Protocol from Network-libp2p.
@@ -258,6 +261,9 @@ pub enum FromNetworkMsg<B: BlockT> {
CustomMessage(PeerId, Message<B>),
/// Let protocol know a peer is currenlty clogged.
PeerClogged(PeerId, Option<Message<B>>),
/// Synchronization request.
#[cfg(any(test, feature = "test-helpers"))]
Synchronize,
}
enum Incoming<B: BlockT, S: NetworkSpecialization<B>> {
@@ -408,6 +414,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
self.stop();
return false;
},
#[cfg(any(test, feature = "test-helpers"))]
ProtocolMsg::Synchronize => self.network_chan.send(NetworkMsg::Synchronized),
}
true
}
@@ -420,6 +428,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
FromNetworkMsg::CustomMessage(who, message) => {
self.on_custom_message(who, message)
},
#[cfg(any(test, feature = "test-helpers"))]
FromNetworkMsg::Synchronize => self.network_chan.send(NetworkMsg::Synchronized),
}
true
}
+5
View File
@@ -466,6 +466,9 @@ pub enum NetworkMsg<B: BlockT + 'static> {
Outgoing(PeerId, Message<B>),
/// Report a peer.
ReportPeer(PeerId, Severity),
/// Synchronization response.
#[cfg(any(test, feature = "test-helpers"))]
Synchronized,
}
/// Starts the background thread that handles the networking.
@@ -548,6 +551,8 @@ fn run_thread<B: BlockT + 'static>(
},
}
},
#[cfg(any(test, feature = "test-helpers"))]
NetworkMsg::Synchronized => (),
}
Ok(())
})
+232 -204
View File
@@ -21,11 +21,9 @@ mod block_import;
#[cfg(test)]
mod sync;
use std::collections::{HashMap, HashSet};
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time::Duration;
use log::trace;
use client;
@@ -36,7 +34,7 @@ use consensus::import_queue::{Link, SharedBlockImport, SharedJustificationImport
use consensus::{Error as ConsensusError, ErrorKind as ConsensusErrorKind};
use consensus::{BlockOrigin, ForkChoiceStrategy, ImportBlock, JustificationImport};
use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient, TopicNotification};
use crossbeam_channel::{self as channel, Sender, select};
use crossbeam_channel::{Sender, RecvError};
use futures::Future;
use futures::sync::{mpsc, oneshot};
use crate::message::Message;
@@ -119,62 +117,28 @@ pub type PeersClient = client::Client<test_client::Backend, test_client::Executo
/// A Link that can wait for a block to have been imported.
pub struct TestLink<S: NetworkSpecialization<Block>> {
import_done: Arc<AtomicBool>,
hash: Arc<Mutex<Hash>>,
link: NetworkLink<Block, S>,
protocol_sender: Sender<ProtocolMsg<Block, S>>,
network_sender: NetworkChan<Block>,
network_to_protocol_sender: Sender<FromNetworkMsg<Block>>,
}
impl<S: NetworkSpecialization<Block>> TestLink<S> {
fn new(
protocol_sender: Sender<ProtocolMsg<Block, S>>,
network_to_protocol_sender: Sender<FromNetworkMsg<Block>>,
network_sender: NetworkChan<Block>
) -> TestLink<S> {
TestLink {
protocol_sender: protocol_sender.clone(),
network_sender: network_sender.clone(),
import_done: Arc::new(AtomicBool::new(false)),
hash: Arc::new(Mutex::new(Default::default())),
network_to_protocol_sender,
link: NetworkLink {
protocol_sender,
network_sender,
}
}
}
fn clone_link(&self) -> Self {
TestLink {
protocol_sender: self.protocol_sender.clone(),
network_sender: self.network_sender.clone(),
import_done: self.import_done.clone(),
hash: self.hash.clone(),
link: NetworkLink {
protocol_sender: self.protocol_sender.clone(),
network_sender: self.network_sender.clone(),
}
}
}
/// Set the hash which will be awaited for import.
fn with_hash(&self, hash: Hash) {
self.import_done.store(false, Ordering::SeqCst);
*self.hash.lock() = hash;
}
/// Simulate a synchronous import.
fn wait_for_import(&self) {
while !self.import_done.load(Ordering::SeqCst) {
thread::sleep(Duration::from_millis(20));
}
}
}
impl<S: NetworkSpecialization<Block>> Link<Block> for TestLink<S> {
fn block_imported(&self, hash: &Hash, number: NumberFor<Block>) {
if hash == &*self.hash.lock() {
self.import_done.store(true, Ordering::SeqCst);
}
self.link.block_imported(hash, number);
}
@@ -201,6 +165,10 @@ impl<S: NetworkSpecialization<Block>> Link<Block> for TestLink<S> {
fn restart(&self) {
self.link.restart();
}
fn synchronized(&self) {
let _ = self.network_to_protocol_sender.send(FromNetworkMsg::Synchronize);
}
}
pub struct Peer<D, S: NetworkSpecialization<Block>> {
@@ -209,43 +177,137 @@ pub struct Peer<D, S: NetworkSpecialization<Block>> {
pub peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<Block>>>>,
pub peer_id: PeerId,
client: Arc<PeersClient>,
network_to_protocol_sender: Sender<FromNetworkMsg<Block>>,
pub protocol_sender: Sender<ProtocolMsg<Block, S>>,
network_link: TestLink<S>,
network_port: Arc<Mutex<NetworkPort<Block>>>,
pub import_queue: Box<ImportQueue<Block>>,
net_proto_channel: ProtocolChannel<S>,
pub import_queue: Box<BasicQueue<Block>>,
pub data: D,
best_hash: Mutex<Option<H256>>,
finalized_hash: Mutex<Option<H256>>,
}
type MessageFilter = Fn(&NetworkMsg<Block>) -> bool;
struct ProtocolChannel<S: NetworkSpecialization<Block>> {
buffered_messages: Mutex<VecDeque<NetworkMsg<Block>>>,
network_to_protocol_sender: Sender<FromNetworkMsg<Block>>,
client_to_protocol_sender: Sender<ProtocolMsg<Block, S>>,
protocol_to_network_receiver: NetworkPort<Block>,
}
impl<S: NetworkSpecialization<Block>> ProtocolChannel<S> {
/// Create new buffered network port.
pub fn new(
network_to_protocol_sender: Sender<FromNetworkMsg<Block>>,
client_to_protocol_sender: Sender<ProtocolMsg<Block, S>>,
protocol_to_network_receiver: NetworkPort<Block>,
) -> Self {
ProtocolChannel {
buffered_messages: Mutex::new(VecDeque::new()),
network_to_protocol_sender,
client_to_protocol_sender,
protocol_to_network_receiver,
}
}
/// Send message from network to protocol.
pub fn send_from_net(&self, message: FromNetworkMsg<Block>) {
let _ = self.network_to_protocol_sender.send(message);
let _ = self.network_to_protocol_sender.send(FromNetworkMsg::Synchronize);
let _ = self.wait_sync();
}
/// Send message from client to protocol.
pub fn send_from_client(&self, message: ProtocolMsg<Block, S>) {
let _ = self.client_to_protocol_sender.send(message);
let _ = self.client_to_protocol_sender.send(ProtocolMsg::Synchronize);
let _ = self.wait_sync();
}
/// Wait until synchronization response is generated by the protocol.
pub fn wait_sync(&self) -> Result<(), RecvError> {
loop {
match self.protocol_to_network_receiver.receiver().recv() {
Ok(NetworkMsg::Synchronized) => return Ok(()),
Err(error) => return Err(error),
Ok(msg) => self.buffered_messages.lock().push_back(msg),
}
}
}
/// Produce the next pending message to send to another peer.
fn pending_message(&self, message_filter: &MessageFilter) -> Option<NetworkMsg<Block>> {
if let Some(message) = self.buffered_message(message_filter) {
return Some(message);
}
while let Some(message) = self.channel_message() {
if message_filter(&message) {
return Some(message)
} else {
self.buffered_messages.lock().push_back(message);
}
}
None
}
/// Whether this peer is done syncing (has no messages to send).
fn is_done(&self) -> bool {
self.buffered_messages.lock().is_empty()
&& self.protocol_to_network_receiver.receiver().is_empty()
}
/// Return oldest buffered message if it exists.
fn buffered_message(&self, message_filter: &MessageFilter) -> Option<NetworkMsg<Block>> {
let mut buffered_messages = self.buffered_messages.lock();
for i in 0..buffered_messages.len() {
if message_filter(&buffered_messages[i]) {
return buffered_messages.remove(i);
}
}
None
}
/// Receive message from the channel.
fn channel_message(&self) -> Option<NetworkMsg<Block>> {
self.protocol_to_network_receiver.receiver().try_recv().ok()
}
}
impl<D, S: NetworkSpecialization<Block>> Peer<D, S> {
fn new(
is_offline: Arc<AtomicBool>,
is_major_syncing: Arc<AtomicBool>,
peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<Block>>>>,
client: Arc<PeersClient>,
import_queue: Box<ImportQueue<Block>>,
import_queue: Box<BasicQueue<Block>>,
network_to_protocol_sender: Sender<FromNetworkMsg<Block>>,
protocol_sender: Sender<ProtocolMsg<Block, S>>,
network_sender: NetworkChan<Block>,
network_port: NetworkPort<Block>,
data: D,
) -> Self {
let network_port = Arc::new(Mutex::new(network_port));
let network_link = TestLink::new(protocol_sender.clone(), network_sender.clone());
import_queue.start(Box::new(network_link.clone_link())).expect("Test ImportQueue always starts");
let net_proto_channel = ProtocolChannel::new(
network_to_protocol_sender.clone(),
protocol_sender.clone(),
network_port,
);
let network_link = TestLink::new(
protocol_sender.clone(),
network_to_protocol_sender.clone(),
network_sender.clone(),
);
import_queue.start(Box::new(network_link)).expect("Test ImportQueue always starts");
Peer {
is_offline,
is_major_syncing,
peers,
peer_id: PeerId::random(),
client,
network_to_protocol_sender,
protocol_sender,
import_queue,
network_link,
network_port,
net_proto_channel,
data,
best_hash: Mutex::new(None),
finalized_hash: Mutex::new(None),
@@ -260,9 +322,7 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> {
.header(&BlockId::Hash(info.chain.best_hash))
.unwrap()
.unwrap();
let _ = self
.protocol_sender
.send(ProtocolMsg::BlockImported(info.chain.best_hash, header));
self.net_proto_channel.send_from_client(ProtocolMsg::BlockImported(info.chain.best_hash, header));
}
pub fn on_block_imported(
@@ -270,9 +330,7 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> {
hash: <Block as BlockT>::Hash,
header: &<Block as BlockT>::Header,
) {
let _ = self
.protocol_sender
.send(ProtocolMsg::BlockImported(hash, header.clone()));
self.net_proto_channel.send_from_client(ProtocolMsg::BlockImported(hash, header.clone()));
}
// SyncOracle: are we connected to any peer?
@@ -287,45 +345,38 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> {
/// Called on connection to other indicated peer.
fn on_connect(&self, other: &Self) {
let _ = self.network_to_protocol_sender.send(FromNetworkMsg::PeerConnected(other.peer_id.clone(), String::new()));
self.net_proto_channel.send_from_net(FromNetworkMsg::PeerConnected(other.peer_id.clone(), String::new()));
}
/// Called on disconnect from other indicated peer.
fn on_disconnect(&self, other: &Self) {
let _ = self
.network_to_protocol_sender
.send(FromNetworkMsg::PeerDisconnected(other.peer_id.clone(), String::new()));
self.net_proto_channel.send_from_net(FromNetworkMsg::PeerDisconnected(other.peer_id.clone(), String::new()));
}
/// Receive a message from another peer. Return a set of peers to disconnect.
fn receive_message(&self, from: &Self, msg: Message<Block>) {
let _ = self
.network_to_protocol_sender
.send(FromNetworkMsg::CustomMessage(from.peer_id.clone(), msg));
fn receive_message(&self, from: &PeerId, msg: Message<Block>) {
self.net_proto_channel.send_from_net(FromNetworkMsg::CustomMessage(from.clone(), msg));
}
/// Produce the next pending message to send to another peer.
fn pending_message(&self) -> Option<NetworkMsg<Block>> {
select! {
recv(self.network_port.lock().receiver()) -> msg => return msg.ok(),
// If there are no messages ready, give protocol a change to send one.
recv(channel::after(Duration::from_millis(100))) -> _ => return None,
}
}
/// Produce the next pending message to send to another peer, without waiting.
fn pending_message_fast(&self) -> Option<NetworkMsg<Block>> {
self.network_port.lock().receiver().try_recv().ok()
fn pending_message(&self, message_filter: &MessageFilter) -> Option<NetworkMsg<Block>> {
self.net_proto_channel.pending_message(message_filter)
}
/// Whether this peer is done syncing (has no messages to send).
fn is_done(&self) -> bool {
self.network_port.lock().receiver().is_empty()
self.net_proto_channel.is_done()
}
/// Synchronize with import queue.
fn import_queue_sync(&self) {
self.import_queue.synchronize();
let _ = self.net_proto_channel.wait_sync();
}
/// Execute a "sync step". This is called for each peer after it sends a packet.
fn sync_step(&self) {
let _ = self.protocol_sender.send(ProtocolMsg::Tick);
self.net_proto_channel.send_from_client(ProtocolMsg::Tick);
}
/// Send block import notifications.
@@ -340,10 +391,7 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> {
}
let header = self.client.header(&BlockId::Hash(info.chain.best_hash)).unwrap().unwrap();
let _ = self
.protocol_sender
.send(ProtocolMsg::BlockImported(info.chain.best_hash, header));
self.net_proto_channel.send_from_client(ProtocolMsg::BlockImported(info.chain.best_hash, header));
*best_hash = Some(info.chain.best_hash);
}
@@ -359,16 +407,13 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> {
}
let header = self.client.header(&BlockId::Hash(info.chain.finalized_hash)).unwrap().unwrap();
let _ = self
.protocol_sender
.send(ProtocolMsg::BlockFinalized(info.chain.finalized_hash, header.clone()));
self.net_proto_channel.send_from_client(ProtocolMsg::BlockFinalized(info.chain.finalized_hash, header.clone()));
*finalized_hash = Some(info.chain.finalized_hash);
}
/// Restart sync for a peer.
fn restart_sync(&self) {
let _ = self.protocol_sender.send(ProtocolMsg::Abort);
self.net_proto_channel.send_from_client(ProtocolMsg::Abort);
}
/// Push a message into the gossip network and relay to peers.
@@ -380,10 +425,14 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> {
data: Vec<u8>,
force: bool,
) {
let recipient = if force { GossipMessageRecipient::BroadcastToAll } else { GossipMessageRecipient::BroadcastNew };
let _ = self
.protocol_sender
.send(ProtocolMsg::GossipConsensusMessage(topic, engine_id, data, recipient));
let recipient = if force {
GossipMessageRecipient::BroadcastToAll
} else {
GossipMessageRecipient::BroadcastNew
};
self.net_proto_channel.send_from_client(
ProtocolMsg::GossipConsensusMessage(topic, engine_id, data, recipient),
);
}
pub fn consensus_gossip_collect_garbage_for_topic(&self, _topic: <Block as BlockT>::Hash) {
@@ -408,22 +457,18 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> {
pub fn with_gossip<F>(&self, f: F)
where F: FnOnce(&mut ConsensusGossip<Block>, &mut Context<Block>) + Send + 'static
{
let _ = self
.protocol_sender
.send(ProtocolMsg::ExecuteWithGossip(Box::new(f)));
self.net_proto_channel.send_from_client(ProtocolMsg::ExecuteWithGossip(Box::new(f)));
}
/// Announce a block to peers.
pub fn announce_block(&self, block: Hash) {
let _ = self.protocol_sender.send(ProtocolMsg::AnnounceBlock(block));
self.net_proto_channel.send_from_client(ProtocolMsg::AnnounceBlock(block));
}
/// Request a justification for the given block.
#[cfg(test)]
fn request_justification(&self, hash: &::primitives::H256, number: NumberFor<Block>) {
let _ = self
.protocol_sender
.send(ProtocolMsg::RequestJustification(hash.clone(), number));
self.net_proto_channel.send_from_client(ProtocolMsg::RequestJustification(hash.clone(), number));
}
/// Add blocks to the peer -- edit the block before adding
@@ -452,7 +497,6 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> {
);
let header = block.header.clone();
at = hash;
self.network_link.with_hash(hash);
self.import_queue.import_blocks(
origin,
vec![IncomingBlock {
@@ -463,9 +507,11 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> {
justification: None,
}],
);
// Simulate a sync import.
self.network_link.wait_for_import();
// make sure block import has completed
self.import_queue_sync();
}
at
}
@@ -525,7 +571,7 @@ impl TransactionPool<Hash, Block> for EmptyTransactionPool {
}
pub trait SpecializationFactory {
fn create() -> Self;
fn create() -> Self;
}
impl SpecializationFactory for DummySpecialization {
@@ -633,86 +679,86 @@ pub trait TestNetFactory: Sized {
}
}
}
self.route(None);
loop {
// we only deliver Status messages during start
let need_continue = self.route_single(true, None, &|msg| match *msg {
NetworkMsg::Outgoing(_, crate::message::generic::Message::Status(_)) => true,
NetworkMsg::Outgoing(_, _) => false,
NetworkMsg::ReportPeer(_, _) | NetworkMsg::Synchronized => true,
});
if !need_continue {
break;
}
}
self.set_started(true);
}
/// Do one step of routing.
fn route(&mut self, disconnected: Option<HashSet<usize>>) {
self.mut_peers(move |peers| {
let mut to_disconnect = HashSet::new();
for (peer_pos, peer) in peers.iter().enumerate() {
let packet = peer.pending_message();
match packet {
None => continue,
Some(NetworkMsg::Outgoing(recipient, packet)) => {
let recipient = peers.iter().position(|p| p.peer_id == recipient).unwrap();
if let Some(disconnected) = disconnected.as_ref() {
let mut current = HashSet::new();
current.insert(peer_pos);
current.insert(recipient);
// Not routing message between "disconnected" nodes.
if disconnected.is_subset(&current) {
continue;
/// Do single round of message routing: single message from every peer is routed.
fn route_single(
&mut self,
disconnect: bool,
disconnected: Option<HashSet<usize>>,
message_filter: &MessageFilter,
) -> bool {
let mut had_messages = false;
let mut to_disconnect = HashSet::new();
let peers = self.peers();
for peer in peers {
if let Some(message) = peer.pending_message(message_filter) {
match message {
NetworkMsg::Outgoing(recipient_id, packet) => {
had_messages = true;
let sender_pos = peers.iter().position(|p| p.peer_id == peer.peer_id).unwrap();
let recipient_pos = peers.iter().position(|p| p.peer_id == recipient_id).unwrap();
if disconnect {
if let Some(ref disconnected) = disconnected {
let mut current = HashSet::new();
current.insert(sender_pos);
current.insert(recipient_pos);
// Not routing message between "disconnected" nodes.
if disconnected.is_subset(&current) {
continue;
}
}
}
peers[recipient].receive_message(peer, packet)
}
Some(NetworkMsg::ReportPeer(who, _)) => {
to_disconnect.insert(who);
}
peers[recipient_pos].receive_message(&peer.peer_id, packet);
},
NetworkMsg::ReportPeer(who, _) => {
if disconnect {
to_disconnect.insert(who);
}
},
_ => (),
}
}
for d in to_disconnect {
if let Some(d) = peers.iter().find(|p| p.peer_id == d) {
for peer in 0..peers.len() {
peers[peer].on_disconnect(d);
}
}
for d in to_disconnect {
if let Some(d) = peers.iter().find(|p| p.peer_id == d) {
for peer in 0..peers.len() {
peers[peer].on_disconnect(d);
}
}
});
}
}
/// Route all pending outgoing messages, without waiting or disconnecting.
fn route_fast(&mut self) {
self.mut_peers(move |peers| {
for peer in 0..peers.len() {
while let Some(NetworkMsg::Outgoing(recipient, packet)) = peers[peer].pending_message_fast() {
if let Some(p) = peers.iter().find(|p| p.peer_id == recipient) {
p.receive_message(&peers[peer], packet)
}
}
}
});
}
// make sure that the protocol(s) has processed all messages that have been queued
self.peers().iter().for_each(|peer| peer.import_queue_sync());
/// Do a step of synchronization.
fn sync_step(&mut self) {
self.route(None);
self.mut_peers(|peers| {
for peer in peers {
peer.sync_step();
}
})
had_messages
}
/// Send block import notifications for all peers.
fn send_import_notifications(&mut self) {
self.mut_peers(|peers| {
for peer in peers {
peer.send_import_notifications();
}
})
self.peers().iter().for_each(|peer| peer.send_import_notifications())
}
/// Send block finalization notifications for all peers.
fn send_finality_notifications(&mut self) {
self.mut_peers(|peers| {
for peer in peers {
peer.send_finality_notifications();
}
})
self.peers().iter().for_each(|peer| peer.send_finality_notifications())
}
/// Restart sync for a peer.
@@ -722,48 +768,30 @@ pub trait TestNetFactory: Sized {
/// Perform synchronization until complete, if provided the
/// given nodes set are excluded from sync.
fn sync_with(&mut self, disconnected: Option<HashSet<usize>>) -> u32 {
fn sync_with(&mut self, disconnect: bool, disconnected: Option<HashSet<usize>>) {
self.start();
let mut total_steps = 0;
let mut done = 0;
loop {
if done > 3 { break; }
if self.done() {
done += 1;
} else {
done = 0;
}
self.sync_step();
self.route(disconnected.clone());
total_steps += 1;
}
total_steps
}
/// Perform synchronization until complete.
fn sync(&mut self) -> u32 {
self.sync_with(None)
}
/// Perform synchronization until complete,
/// excluding sync between certain nodes.
fn sync_with_disconnected(&mut self, disconnected: HashSet<usize>) -> u32 {
self.sync_with(Some(disconnected))
}
/// Do the given amount of sync steps.
fn sync_steps(&mut self, count: usize) {
self.start();
for _ in 0..count {
self.sync_step();
while self.route_single(disconnect, disconnected.clone(), &|_| true) {
// give protocol a chance to do its maintain procedures
self.peers().iter().for_each(|peer| peer.sync_step());
}
}
/// Whether all peers have synced.
/// Deliver at most 1 pending message from every peer.
fn sync_step(&mut self) {
self.route_single(true, None, &|_| true);
}
/// Deliver pending messages until there are no more.
fn sync(&mut self) {
self.sync_with(true, None)
}
/// Deliver pending messages until there are no more. Do not disconnect nodes.
fn sync_without_disconnects(&mut self) {
self.sync_with(false, None)
}
/// Whether all peers have no pending outgoing messages.
fn done(&self) -> bool {
self.peers().iter().all(|p| p.is_done())
}
+3 -11
View File
@@ -48,8 +48,7 @@ fn sync_peers_works() {
net.sync();
for peer in 0..3 {
// Assert peers is up to date.
let peers = net.peer(peer).peers.read();
assert_eq!(peers.len(), 2);
assert_eq!(net.peer(peer).peers.read().len(), 2);
// And then disconnect.
for other in 0..3 {
if other != peer {
@@ -78,9 +77,6 @@ fn sync_cycle_from_offline_to_syncing_to_offline() {
// Generate blocks.
net.peer(2).push_blocks(100, false);
net.start();
net.route_fast();
thread::sleep(Duration::from_millis(100));
net.route_fast();
for peer in 0..3 {
// Online
assert!(!net.peer(peer).is_offline());
@@ -102,7 +98,6 @@ fn sync_cycle_from_offline_to_syncing_to_offline() {
net.peer(peer).on_disconnect(net.peer(other));
}
}
thread::sleep(Duration::from_millis(100));
assert!(net.peer(peer).is_offline());
assert!(!net.peer(peer).is_major_syncing());
}
@@ -116,9 +111,7 @@ fn syncing_node_not_major_syncing_when_disconnected() {
// Generate blocks.
net.peer(2).push_blocks(100, false);
net.start();
net.route_fast();
thread::sleep(Duration::from_millis(100));
net.route_fast();
net.sync_step();
// Peer 1 is major-syncing.
assert!(net.peer(1).is_major_syncing());
@@ -126,7 +119,6 @@ fn syncing_node_not_major_syncing_when_disconnected() {
// Disconnect peer 1 form everyone else.
net.peer(1).on_disconnect(net.peer(0));
net.peer(1).on_disconnect(net.peer(2));
thread::sleep(Duration::from_millis(100));
// Peer 1 is not major-syncing.
assert!(!net.peer(1).is_major_syncing());
@@ -363,7 +355,7 @@ fn blocks_are_not_announced_by_light_nodes() {
let mut disconnected = HashSet::new();
disconnected.insert(0);
disconnected.insert(2);
net.sync_with_disconnected(disconnected);
net.sync_with(true, Some(disconnected));
// peer 0 has the best chain
// peer 1 has the best chain