Rewrite network protocol/service to use channels (#1340)

* rewrite network protocol/service to use channels

* remove use of unwrap

* re-introduce with_spec

* remove unnecessary mut

* remove unused param

* improve with_spec, add with_gossip

* rename job to task

* style: re-add comma

* remove extra string allocs

* rename use of channel

* turn TODO into FIXME

* remove mut in match

* remove Self in new

* pass headers by value to network service

* remove network sender from service

* remove TODO

* better expect

* rationalize use of network sender in ondemand
This commit is contained in:
Gregory Terzian
2019-02-06 19:54:02 +08:00
committed by Bastian Köcher
parent 8aae19e2db
commit a2d2ed69ab
19 changed files with 1314 additions and 903 deletions
+219 -170
View File
@@ -16,37 +16,41 @@
#![allow(missing_docs)]
#[cfg(test)]
mod sync;
#[cfg(test)]
mod block_import;
#[cfg(test)]
mod sync;
use std::collections::{VecDeque, HashSet, HashMap};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use parking_lot::RwLock;
use client;
use client::block_builder::BlockBuilder;
use primitives::{H256, Ed25519AuthorityId};
use runtime_primitives::Justification;
use runtime_primitives::generic::BlockId;
use runtime_primitives::traits::{AuthorityIdFor, Block as BlockT, Digest, DigestItem, Header, NumberFor, Zero};
use io::SyncIo;
use protocol::{Context, Protocol, ProtocolContext};
use codec::{Decode, Encode};
use config::ProtocolConfig;
use service::{NetworkLink, TransactionPool};
use network_libp2p::{NodeIndex, PeerId, Severity};
use keyring::Keyring;
use codec::Encode;
use consensus::{BlockOrigin, ImportBlock, JustificationImport, ForkChoiceStrategy, Error as ConsensusError, ErrorKind as ConsensusErrorKind};
use consensus::import_queue::{import_many_blocks, ImportQueue, ImportQueueStatus, IncomingBlock};
use consensus::import_queue::{Link, SharedBlockImport, SharedJustificationImport, Verifier};
use consensus::{Error as ConsensusError, ErrorKind as ConsensusErrorKind};
use consensus::{BlockOrigin, ForkChoiceStrategy, ImportBlock, JustificationImport};
use consensus_gossip::{ConsensusGossip, ConsensusMessage};
use crossbeam_channel::{self as channel, Sender};
use futures::Future;
use futures::sync::{mpsc, oneshot};
use keyring::Keyring;
use network_libp2p::{NodeIndex, ProtocolId, Severity};
use parking_lot::Mutex;
use primitives::{H256, Ed25519AuthorityId};
use protocol::{Context, Protocol, ProtocolMsg, ProtocolStatus};
use runtime_primitives::generic::BlockId;
use runtime_primitives::traits::{AuthorityIdFor, Block as BlockT, Digest, DigestItem, Header, Zero, NumberFor};
use runtime_primitives::Justification;
use service::{network_channel, NetworkChan, NetworkLink, NetworkMsg, NetworkPort, TransactionPool};
use specialization::NetworkSpecialization;
use consensus_gossip::ConsensusGossip;
use service::ExecuteInContext;
use test_client;
pub use test_client::runtime::{Block, Hash, Transfer, Extrinsic};
pub use test_client::runtime::{Block, Extrinsic, Hash, Transfer};
pub use test_client::TestClient;
#[cfg(any(test, feature = "test-helpers"))]
@@ -61,7 +65,7 @@ impl<B: BlockT> ImportCB<B> {
ImportCB(RefCell::new(None))
}
fn set<F>(&self, cb: Box<F>)
where F: 'static + Fn(BlockOrigin, Vec<IncomingBlock<B>>) -> bool
where F: 'static + Fn(BlockOrigin, Vec<IncomingBlock<B>>) -> bool,
{
*self.0.borrow_mut() = Some(cb);
}
@@ -168,7 +172,7 @@ impl<B: 'static + BlockT, V: 'static + Verifier<B>> ImportQueue<B> for SyncImpor
&link,
None,
(origin, new_blocks),
verifier,
verifier
)
}));
Ok(())
@@ -204,23 +208,13 @@ impl<B: 'static + BlockT, V: 'static + Verifier<B>> ImportQueue<B> for SyncImpor
}
}
struct DummyContextExecutor(Arc<Protocol<Block, DummySpecialization, Hash>>, Arc<RwLock<VecDeque<TestPacket>>>);
unsafe impl Send for DummyContextExecutor {}
unsafe impl Sync for DummyContextExecutor {}
impl ExecuteInContext<Block> for DummyContextExecutor {
fn execute_in_context<F: Fn(&mut Context<Block>)>(&self, closure: F) {
let mut io = TestIo::new(&self.1, None);
let mut context = ProtocolContext::new(&self.0.context_data(), &mut io);
closure(&mut context);
}
}
/// The test specialization.
pub struct DummySpecialization { }
impl NetworkSpecialization<Block> for DummySpecialization {
fn status(&self) -> Vec<u8> { vec![] }
fn status(&self) -> Vec<u8> {
vec![]
}
fn on_connect(&mut self, _ctx: &mut Context<Block>, _peer_id: NodeIndex, _status: ::message::Status<Block>) {
}
@@ -232,184 +226,198 @@ impl NetworkSpecialization<Block> for DummySpecialization {
&mut self,
_ctx: &mut Context<Block>,
_peer_id: NodeIndex,
_message: &mut Option<::message::Message<Block>>
_message: &mut Option<::message::Message<Block>>,
) {
}
}
pub struct TestIo<'p> {
queue: &'p RwLock<VecDeque<TestPacket>>,
pub to_disconnect: HashSet<NodeIndex>,
packets: Vec<TestPacket>,
_sender: Option<NodeIndex>,
}
impl<'p> TestIo<'p> where {
pub fn new(queue: &'p RwLock<VecDeque<TestPacket>>, sender: Option<NodeIndex>) -> TestIo<'p> {
TestIo {
queue: queue,
_sender: sender,
to_disconnect: HashSet::new(),
packets: Vec::new(),
}
}
}
impl<'p> Drop for TestIo<'p> {
fn drop(&mut self) {
self.queue.write().extend(self.packets.drain(..));
}
}
impl<'p> SyncIo for TestIo<'p> {
fn report_peer(&mut self, who: NodeIndex, _reason: Severity) {
self.to_disconnect.insert(who);
}
fn send(&mut self, who: NodeIndex, data: Vec<u8>) {
self.packets.push(TestPacket {
data: data,
recipient: who,
});
}
fn peer_debug_info(&self, _who: NodeIndex) -> String {
"unknown".to_string()
}
fn peer_id(&self, _peer_id: NodeIndex) -> Option<PeerId> {
None
}
}
/// Mocked subprotocol packet
pub struct TestPacket {
data: Vec<u8>,
recipient: NodeIndex,
}
pub type PeersClient = client::Client<test_client::Backend, test_client::Executor, Block, test_client::runtime::RuntimeApi>;
pub struct Peer<V: Verifier<Block>, D> {
pub struct Peer<V: 'static + Verifier<Block>, D> {
client: Arc<PeersClient>,
pub sync: Arc<Protocol<Block, DummySpecialization, Hash>>,
pub queue: Arc<RwLock<VecDeque<TestPacket>>>,
pub protocol_sender: Sender<ProtocolMsg<Block, DummySpecialization>>,
network_port: Mutex<NetworkPort>,
import_queue: Arc<SyncImportQueue<Block, V>>,
executor: Arc<DummyContextExecutor>,
/// Some custom data set up at initialization time.
network_sender: NetworkChan,
pub data: D,
}
impl<V: 'static + Verifier<Block>, D> Peer<V, D> {
fn new(
client: Arc<PeersClient>,
sync: Arc<Protocol<Block, DummySpecialization, Hash>>,
queue: Arc<RwLock<VecDeque<TestPacket>>>,
import_queue: Arc<SyncImportQueue<Block, V>>,
protocol_sender: Sender<ProtocolMsg<Block, DummySpecialization>>,
network_sender: NetworkChan,
network_port: NetworkPort,
data: D,
) -> Self {
let executor = Arc::new(DummyContextExecutor(sync.clone(), queue.clone()));
Peer { client, sync, queue, import_queue, executor, data }
let network_port = Mutex::new(network_port);
Peer {
client,
protocol_sender,
import_queue,
network_sender,
network_port,
data,
}
}
/// Called after blockchain has been populated to updated current state.
fn start(&self) {
// Update the sync state to the latest chain state.
let info = self.client.info().expect("In-mem client does not fail");
let header = self.client.header(&BlockId::Hash(info.chain.best_hash)).unwrap().unwrap();
let header = self
.client
.header(&BlockId::Hash(info.chain.best_hash))
.unwrap()
.unwrap();
let network_link = NetworkLink {
sync: Arc::downgrade(self.sync.sync()),
context: Arc::downgrade(&self.executor),
protocol_sender: self.protocol_sender.clone(),
network_sender: self.network_sender.clone(),
};
self.import_queue.start(network_link).expect("Test ImportQueue always starts");
self.sync.on_block_imported(&mut TestIo::new(&self.queue, None), info.chain.best_hash, &header);
let _ = self
.protocol_sender
.send(ProtocolMsg::BlockImported(info.chain.best_hash, header));
}
pub fn on_block_imported(
&self,
hash: <Block as BlockT>::Hash,
header: &<Block as BlockT>::Header,
) {
let _ = self
.protocol_sender
.send(ProtocolMsg::BlockImported(hash, header.clone()));
}
/// Called on connection to other indicated peer.
fn on_connect(&self, other: NodeIndex) {
self.sync.on_peer_connected(&mut TestIo::new(&self.queue, Some(other)), other);
}
pub fn consensus_gossip(&self) -> &RwLock<ConsensusGossip<Block>> {
self.sync.consensus_gossip()
let _ = self.protocol_sender.send(ProtocolMsg::PeerConnected(other, String::new()));
}
/// Called on disconnect from other indicated peer.
fn on_disconnect(&self, other: NodeIndex) {
let mut io = TestIo::new(&self.queue, Some(other));
self.sync.on_peer_disconnected(&mut io, other);
let _ = self
.protocol_sender
.send(ProtocolMsg::PeerDisconnected(other, String::new()));
}
/// Receive a message from another peer. Return a set of peers to disconnect.
fn receive_message(&self, from: NodeIndex, msg: TestPacket) -> HashSet<NodeIndex> {
let mut io = TestIo::new(&self.queue, Some(from));
self.sync.handle_packet(&mut io, from, &msg.data);
self.flush();
io.to_disconnect.clone()
}
#[cfg(test)]
fn with_io<'a, F, U>(&'a self, f: F) -> U where F: FnOnce(&mut TestIo<'a>) -> U {
let mut io = TestIo::new(&self.queue, None);
f(&mut io)
fn receive_message(&self, from: NodeIndex, msg: Vec<u8>) {
match Decode::decode(&mut (&msg as &[u8])) {
Some(m) => {
let _ = self
.protocol_sender
.send(ProtocolMsg::CustomMessage(from, m));
}
None => {
let _ = self.network_sender.send(NetworkMsg::ReportPeer(
from,
Severity::Bad("Peer sent us a packet with invalid format".to_string()),
));
}
}
}
/// Produce the next pending message to send to another peer.
fn pending_message(&self) -> Option<TestPacket> {
self.flush();
self.queue.write().pop_front()
fn pending_message(&self) -> Option<NetworkMsg> {
select! {
recv(self.network_port.lock().receiver()) -> msg => return msg.ok(),
// If there are no messages ready, give protocol a change to send one.
recv(channel::after(Duration::from_millis(100))) -> _ => return None,
}
}
/// Produce the next pending message to send to another peer, without waiting.
fn pending_message_fast(&self) -> Option<NetworkMsg> {
self.network_port.lock().receiver().try_recv().ok()
}
/// Whether this peer is done syncing (has no messages to send).
fn is_done(&self) -> bool {
self.queue.read().is_empty()
self.network_port.lock().receiver().is_empty()
}
/// Execute a "sync step". This is called for each peer after it sends a packet.
fn sync_step(&self) {
self.flush();
self.sync.tick(&mut TestIo::new(&self.queue, None));
let _ = self.protocol_sender.send(ProtocolMsg::Tick);
}
/// Send block import notifications.
fn send_import_notifications(&self) {
let info = self.client.info().expect("In-mem client does not fail");
let header = self.client.header(&BlockId::Hash(info.chain.best_hash)).unwrap().unwrap();
self.sync.on_block_imported(&mut TestIo::new(&self.queue, None), info.chain.best_hash, &header);
let _ = self
.protocol_sender
.send(ProtocolMsg::BlockImported(info.chain.best_hash, header));
}
/// Send block finalization notifications.
pub fn send_finality_notifications(&self) {
let info = self.client.info().expect("In-mem client does not fail");
let header = self.client.header(&BlockId::Hash(info.chain.finalized_hash)).unwrap().unwrap();
self.sync.on_block_finalized(&mut TestIo::new(&self.queue, None), info.chain.finalized_hash, &header);
let _ = self
.protocol_sender
.send(ProtocolMsg::BlockFinalized(info.chain.finalized_hash, header.clone()));
}
/// Restart sync for a peer.
fn restart_sync(&self) {
self.sync.abort();
let _ = self.protocol_sender.send(ProtocolMsg::Abort);
}
fn flush(&self) {
pub fn status(&self) -> ProtocolStatus<Block> {
let (sender, port) = channel::unbounded();
let _ = self.protocol_sender.send(ProtocolMsg::Status(sender));
port.recv().unwrap()
}
/// Push a message into the gossip network and relay to peers.
/// `TestNet::sync_step` needs to be called to ensure it's propagated.
pub fn gossip_message(&self, topic: Hash, data: Vec<u8>, broadcast: bool) {
self.sync.gossip_consensus_message(&mut TestIo::new(&self.queue, None), topic, data, broadcast);
pub fn gossip_message(&self, topic: <Block as BlockT>::Hash, data: Vec<u8>, broadcast: bool) {
let _ = self
.protocol_sender
.send(ProtocolMsg::GossipConsensusMessage(topic, data, broadcast));
}
pub fn consensus_gossip_collect_garbage_for(&self, topic: <Block as BlockT>::Hash) {
self.with_gossip(move |gossip, _| gossip.collect_garbage(|t| t == &topic))
}
/// access the underlying consensus gossip handler
pub fn consensus_gossip_messages_for(
&self,
topic: <Block as BlockT>::Hash,
) -> mpsc::UnboundedReceiver<ConsensusMessage> {
let (tx, rx) = oneshot::channel();
self.with_gossip(move |gossip, _| {
let inner_rx = gossip.messages_for(topic);
let _ = tx.send(inner_rx);
});
rx.wait().ok().expect("1. Network is running, 2. it should handle the above closure successfully")
}
/// Execute a closure with the consensus gossip.
pub fn with_gossip<F>(&self, f: F)
where F: FnOnce(&mut ConsensusGossip<Block>, &mut Context<Block>) + Send + 'static
{
let _ = self
.protocol_sender
.send(ProtocolMsg::ExecuteWithGossip(Box::new(f)));
}
/// Announce a block to peers.
pub fn announce_block(&self, block: Hash) {
self.sync.announce_block(&mut TestIo::new(&self.queue, None), block);
let _ = self.protocol_sender.send(ProtocolMsg::AnnounceBlock(block));
}
/// Request a justification for the given block.
#[cfg(test)]
fn request_justification(&self, hash: &::primitives::H256, number: NumberFor<Block>) {
self.executor.execute_in_context(|context| {
self.sync.sync().write().request_justification(hash, number, context);
})
let _ = self
.protocol_sender
.send(ProtocolMsg::RequestJustification(hash.clone(), number));
}
/// Add blocks to the peer -- edit the block before adding
@@ -429,23 +437,28 @@ impl<V: 'static + Verifier<Block>, D> Peer<V, D> {
let builder = self.client.new_block_at(&at).unwrap();
let block = edit_block(builder);
let hash = block.header.hash();
trace!("Generating {}, (#{}, parent={})", hash, block.header.number, block.header.parent_hash);
trace!(
"Generating {}, (#{}, parent={})",
hash,
block.header.number,
block.header.parent_hash
);
let header = block.header.clone();
at = BlockId::Hash(hash);
// NOTE: if we use a non-synchronous queue in the test-net in the future,
// this may not work.
self.import_queue.import_blocks(origin, vec![
IncomingBlock {
origin: None,
self.import_queue.import_blocks(
origin,
vec![IncomingBlock {
origin: None,
hash,
header: Some(header),
body: Some(block.extrinsics),
justification: None,
},
}
]);
}
}
/// Push blocks to the peer (simplified: with or without a TX)
@@ -483,13 +496,6 @@ impl<V: 'static + Verifier<Block>, D> Peer<V, D> {
});
}
/// Execute a function with specialization for this peer.
pub fn with_spec<F, U>(&self, f: F) -> U
where F: FnOnce(&mut DummySpecialization, &mut Context<Block>) -> U
{
self.sync.with_spec(&mut TestIo::new(&self.queue, None), f)
}
/// Get a reference to the client.
pub fn client(&self) -> &Arc<PeersClient> {
&self.client
@@ -554,23 +560,26 @@ pub trait TestNetFactory: Sized {
let tx_pool = Arc::new(EmptyTransactionPool);
let verifier = self.make_verifier(client.clone(), config);
let (block_import, justification_import, data) = self.make_block_import(client.clone());
let (network_sender, network_port) = network_channel(ProtocolId::default());
let import_queue = Arc::new(SyncImportQueue::new(verifier, block_import, justification_import));
let specialization = DummySpecialization { };
let sync = Protocol::new(
let protocol_sender = Protocol::new(
network_sender.clone(),
config.clone(),
client.clone(),
import_queue.clone(),
None,
tx_pool,
specialization
specialization,
).unwrap();
let peer = Arc::new(Peer::new(
client,
Arc::new(sync),
Arc::new(RwLock::new(VecDeque::new())),
import_queue,
protocol_sender,
network_sender,
network_port,
data,
));
@@ -594,44 +603,58 @@ pub trait TestNetFactory: Sized {
}
}
});
self.route(None);
self.set_started(true);
}
/// Do one step of routing.
fn route(&mut self) {
fn route(&mut self, disconnected: Option<HashSet<NodeIndex>>) {
self.mut_peers(move |peers| {
let mut to_disconnect = HashSet::new();
for peer in 0..peers.len() {
let packet = peers[peer].pending_message();
if let Some(packet) = packet {
let disconnecting = {
let recipient = packet.recipient;
trace!(target: "sync", "--- {} -> {} ---", peer, recipient);
let to_disconnect = peers[recipient].receive_message(peer as NodeIndex, packet);
for d in &to_disconnect {
// notify this that disconnecting peers are disconnecting
peers[recipient].on_disconnect(*d as NodeIndex);
match packet {
None => continue,
Some(NetworkMsg::Outgoing(recipient, packet)) => {
if let Some(disconnected) = disconnected.as_ref() {
let mut current = HashSet::new();
current.insert(peer);
current.insert(recipient);
// Not routing message between "disconnected" nodes.
if disconnected.is_subset(&current) {
continue;
}
}
to_disconnect
};
for d in &disconnecting {
// notify other peers that this peer is disconnecting
peers[*d].on_disconnect(peer as NodeIndex);
peers[recipient].receive_message(peer as NodeIndex, packet)
}
Some(NetworkMsg::ReportPeer(who, _)) => {
to_disconnect.insert(who);
}
Some(_msg) => continue,
}
}
for d in to_disconnect {
for peer in 0..peers.len() {
peers[peer].on_disconnect(d);
}
}
});
}
/// Route messages between peers until all queues are empty.
fn route_until_complete(&mut self) {
while !self.done() {
self.route()
}
/// Route all pending outgoing messages, without waiting or disconnecting.
fn route_fast(&mut self) {
self.mut_peers(move |peers| {
for peer in 0..peers.len() {
while let Some(NetworkMsg::Outgoing(recipient, packet)) = peers[peer].pending_message_fast() {
peers[recipient].receive_message(peer as NodeIndex, packet)
}
}
});
}
/// Do a step of synchronization.
fn sync_step(&mut self) {
self.route();
self.route(None);
self.mut_peers(|peers| {
for peer in peers {
@@ -667,10 +690,26 @@ pub trait TestNetFactory: Sized {
fn sync(&mut self) -> u32 {
self.start();
let mut total_steps = 0;
self.sync_step();
self.route(None);
while !self.done() {
total_steps += 1;
self.route(None);
}
total_steps
}
/// Perform synchronization until complete,
/// excluding sync between certain nodes.
fn sync_with_disconnected(&mut self, disconnected: HashSet<NodeIndex>) -> u32 {
self.start();
let mut total_steps = 0;
self.sync_step();
self.route(Some(disconnected.clone()));
while !self.done() {
self.sync_step();
total_steps += 1;
self.route();
self.route(Some(disconnected.clone()));
}
total_steps
}
@@ -685,6 +724,16 @@ pub trait TestNetFactory: Sized {
/// Whether all peers have synced.
fn done(&self) -> bool {
for _ in 0..10 {
if self.peers().iter().all(|p| p.is_done()) {
// If all peers are done, wait a little bit
// in case one is still about to send a message.
thread::sleep(Duration::from_millis(1000));
continue;
}
// Do another round of routing.
return false
}
self.peers().iter().all(|p| p.is_done())
}
}
+15 -8
View File
@@ -18,7 +18,10 @@ use client::backend::Backend;
use client::blockchain::HeaderBackend as BlockchainHeaderBackend;
use config::Roles;
use consensus::BlockOrigin;
use network_libp2p::NodeIndex;
use sync::SyncState;
use std::{thread, time};
use std::collections::HashSet;
use super::*;
#[test]
@@ -29,7 +32,7 @@ fn sync_from_two_peers_works() {
net.peer(2).push_blocks(100, false);
net.sync();
assert!(net.peer(0).client.backend().blockchain().equals_to(net.peer(1).client.backend().blockchain()));
let status = net.peer(0).sync.status();
let status = net.peer(0).status();
assert_eq!(status.sync.state, SyncState::Idle);
}
@@ -49,9 +52,12 @@ fn sync_from_two_peers_with_ancestry_search_works() {
fn sync_long_chain_works() {
let mut net = TestNet::new(2);
net.peer(1).push_blocks(500, false);
net.sync_steps(3);
assert_eq!(net.peer(0).sync.status().sync.state, SyncState::Downloading);
net.sync();
// Wait for peer 0 to import blocks received over the network.
thread::sleep(time::Duration::from_millis(1000));
net.sync();
// Wait for peers to get up to speed.
thread::sleep(time::Duration::from_millis(1000));
assert!(net.peer(0).client.backend().blockchain().equals_to(net.peer(1).client.backend().blockchain()));
}
@@ -137,7 +143,7 @@ fn own_blocks_are_announced() {
net.peer(0).generate_blocks(1, BlockOrigin::Own, |builder| builder.bake().unwrap());
let header = net.peer(0).client().header(&BlockId::Number(1)).unwrap().unwrap();
net.peer(0).with_io(|io| net.peer(0).sync.on_block_imported(io, header.hash(), &header));
net.peer(0).on_block_imported(header.hash(), &header);
net.sync();
assert_eq!(net.peer(0).client.backend().blockchain().info().unwrap().best_number, 1);
assert_eq!(net.peer(1).client.backend().blockchain().info().unwrap().best_number, 1);
@@ -166,10 +172,11 @@ fn blocks_are_not_announced_by_light_nodes() {
net.peer(0).on_connect(1);
net.peer(1).on_connect(2);
// generate block at peer0 && run sync
while !net.done() {
net.sync_step();
}
// Only sync between 0 -> 1, and 1 -> 2
let mut disconnected = HashSet::new();
disconnected.insert(0 as NodeIndex);
disconnected.insert(2 as NodeIndex);
net.sync_with_disconnected(disconnected);
// peer 0 has the best chain
// peer 1 has the best chain