More extractions from the Protocol struct (#2641)

* Pass the TransactionPool explicitly

* Extract finality_proof_provider

* Remove Protocol::connected_peers

* Add note and rename function

* Fix tests

* More test fixing

* Revert the WASM locks, I guess

* Add space

* Remove space
This commit is contained in:
Pierre Krieger
2019-05-23 12:07:51 +02:00
committed by GitHub
parent 563a67b8e4
commit c357854015
8 changed files with 103 additions and 74 deletions
+1 -1
View File
@@ -39,7 +39,7 @@ pub struct Params<B: BlockT, S, H: ExHashT> {
/// On-demand service reference.
pub on_demand: Option<Arc<OnDemandService<B>>>,
/// Transaction pool.
pub transaction_pool: Arc<TransactionPool<H, B>>,
pub transaction_pool: Arc<dyn TransactionPool<H, B>>,
/// Protocol specialization.
pub specialization: S,
}
+32 -34
View File
@@ -35,7 +35,6 @@ use crate::specialization::NetworkSpecialization;
use crate::sync::{ChainSync, Context as SyncContext, Status as SyncStatus, SyncState};
use crate::service::{TransactionPool, ExHashT};
use crate::config::{ProtocolConfig, Roles};
use parking_lot::RwLock;
use rustc_hex::ToHex;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
@@ -92,10 +91,6 @@ pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
context_data: ContextData<B, H>,
// Connected peers pending Status message.
handshaking_peers: HashMap<PeerId, HandshakingPeer>,
// Connected peers from whom we received a Status message,
// similar to context_data.peers but shared with the SyncProvider.
connected_peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<B>>>>,
transaction_pool: Arc<TransactionPool<H, B>>,
}
/// A peer from whom we have received a Status message.
@@ -261,18 +256,14 @@ struct ContextData<B: BlockT, H: ExHashT> {
// All connected peers
peers: HashMap<PeerId, Peer<B, H>>,
pub chain: Arc<Client<B>>,
pub finality_proof_provider: Option<Arc<FinalityProofProvider<B>>>,
}
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
/// Create a new instance.
pub fn new(
connected_peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<B>>>>,
config: ProtocolConfig,
chain: Arc<Client<B>>,
finality_proof_provider: Option<Arc<FinalityProofProvider<B>>>,
on_demand: Option<Arc<OnDemandService<B>>>,
transaction_pool: Arc<TransactionPool<H, B>>,
specialization: S,
) -> error::Result<Protocol<B, S, H>> {
let info = chain.info()?;
@@ -284,7 +275,6 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
context_data: ContextData {
peers: HashMap::new(),
chain,
finality_proof_provider,
},
on_demand,
genesis_hash: info.chain.genesis_hash,
@@ -292,8 +282,6 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
specialization: specialization,
consensus_gossip: ConsensusGossip::new(),
handshaking_peers: HashMap::new(),
connected_peers,
transaction_pool: transaction_pool,
})
}
@@ -319,13 +307,17 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
self.sync.status().is_offline()
}
pub fn poll(&mut self, network_out: &mut dyn NetworkOut<B>) -> Poll<void::Void, void::Void> {
pub fn poll(
&mut self,
network_out: &mut dyn NetworkOut<B>,
transaction_pool: &(impl TransactionPool<H, B> + ?Sized)
) -> Poll<void::Void, void::Void> {
while let Ok(Async::Ready(_)) = self.tick_timeout.poll() {
self.tick(network_out);
}
while let Ok(Async::Ready(_)) = self.propagate_timeout.poll() {
self.propagate_extrinsics(network_out);
self.propagate_extrinsics(network_out, transaction_pool);
}
Ok(Async::NotReady)
@@ -364,19 +356,21 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
peer.info.best_hash = info.best_hash;
peer.info.best_number = info.best_number;
}
let mut peers = self.connected_peers.write();
if let Some(ref mut peer) = peers.get_mut(who) {
peer.peer_info.best_hash = info.best_hash;
peer.peer_info.best_number = info.best_number;
}
}
}
/// Returns information about all the peers we are connected to after the handshake message.
pub fn peers_info(&self) -> impl Iterator<Item = (&PeerId, &PeerInfo<B>)> {
self.context_data.peers.iter().map(|(id, peer)| (id, &peer.info))
}
pub fn on_custom_message(
&mut self,
network_out: &mut dyn NetworkOut<B>,
transaction_pool: &(impl TransactionPool<H, B> + ?Sized),
who: PeerId,
message: Message<B>
message: Message<B>,
finality_proof_provider: Option<&FinalityProofProvider<B>>
) -> CustomMessageOutcome<B> {
match message {
GenericMessage::Status(s) => self.on_status_message(network_out, who, s),
@@ -397,7 +391,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
self.on_block_announce(network_out, who.clone(), announce);
self.update_peer_info(&who);
},
GenericMessage::Transactions(m) => self.on_extrinsics(network_out, who, m),
GenericMessage::Transactions(m) =>
self.on_extrinsics(network_out, transaction_pool, who, m),
GenericMessage::RemoteCallRequest(request) => self.on_remote_call_request(network_out, who, request),
GenericMessage::RemoteCallResponse(response) => self.on_remote_call_response(who, response),
GenericMessage::RemoteReadRequest(request) => self.on_remote_read_request(network_out, who, request),
@@ -406,7 +401,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
GenericMessage::RemoteHeaderResponse(response) => self.on_remote_header_response(who, response),
GenericMessage::RemoteChangesRequest(request) => self.on_remote_changes_request(network_out, who, request),
GenericMessage::RemoteChangesResponse(response) => self.on_remote_changes_response(who, response),
GenericMessage::FinalityProofRequest(request) => self.on_finality_proof_request(network_out, who, request),
GenericMessage::FinalityProofRequest(request) =>
self.on_finality_proof_request(network_out, who, request, finality_proof_provider),
GenericMessage::FinalityProofResponse(response) =>
return self.on_finality_proof_response(network_out, who, response),
GenericMessage::Consensus(msg) => {
@@ -489,7 +485,6 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
// lock all the the peer lists so that add/remove peer events are in order
let removed = {
self.handshaking_peers.remove(&peer);
self.connected_peers.write().remove(&peer);
self.context_data.peers.remove(&peer)
};
if let Some(peer_data) = removed {
@@ -734,16 +729,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
let info = match self.handshaking_peers.remove(&who) {
Some(_handshaking) => {
let peer_info = PeerInfo {
PeerInfo {
protocol_version: status.version,
roles: status.roles,
best_hash: status.best_hash,
best_number: status.best_number
};
self.connected_peers
.write()
.insert(who.clone(), ConnectedPeer { peer_info: peer_info.clone() });
peer_info
}
},
None => {
error!(target: "sync", "Received status from previously unconnected node {}", who);
@@ -780,6 +771,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
fn on_extrinsics(
&mut self,
network_out: &mut dyn NetworkOut<B>,
transaction_pool: &(impl TransactionPool<H, B> + ?Sized),
who: PeerId,
extrinsics: message::Transactions<B::Extrinsic>
) {
@@ -791,7 +783,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
trace!(target: "sync", "Received {} extrinsics from {}", extrinsics.len(), who);
if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) {
for t in extrinsics {
if let Some(hash) = self.transaction_pool.import(&t) {
if let Some(hash) = transaction_pool.import(&t) {
network_out.report_peer(who.clone(), NEW_EXTRINSIC_REPUTATION_CHANGE);
peer.known_extrinsics.insert(hash);
} else {
@@ -802,7 +794,11 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
}
/// Call when we must propagate ready extrinsics to peers.
pub fn propagate_extrinsics(&mut self, network_out: &mut dyn NetworkOut<B>) {
pub fn propagate_extrinsics(
&mut self,
network_out: &mut dyn NetworkOut<B>,
transaction_pool: &(impl TransactionPool<H, B> + ?Sized)
) {
debug!(target: "sync", "Propagating extrinsics");
// Accept transactions only when fully synced
@@ -810,7 +806,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
return;
}
let extrinsics = self.transaction_pool.transactions();
let extrinsics = transaction_pool.transactions();
let mut propagated_to = HashMap::new();
for (who, peer) in self.context_data.peers.iter_mut() {
let (hashes, to_send): (Vec<_>, Vec<_>) = extrinsics
@@ -830,7 +826,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
network_out.send_message(who.clone(), GenericMessage::Transactions(to_send))
}
}
self.transaction_pool.on_broadcasted(propagated_to);
transaction_pool.on_broadcasted(propagated_to);
}
/// Make sure an important block is propagated to peers.
@@ -1203,9 +1200,10 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
network_out: &mut dyn NetworkOut<B>,
who: PeerId,
request: message::FinalityProofRequest<B::Hash>,
finality_proof_provider: Option<&FinalityProofProvider<B>>
) {
trace!(target: "sync", "Finality proof request from {} for {}", who, request.block);
let finality_proof = self.context_data.finality_proof_provider.as_ref()
let finality_proof = finality_proof_provider.as_ref()
.ok_or_else(|| String::from("Finality provider is not configured"))
.and_then(|provider|
provider.prove_finality(request.block, &request.request).map_err(|e| e.to_string())
+43 -10
View File
@@ -29,6 +29,7 @@ use peerset::PeersetHandle;
use consensus::import_queue::{ImportQueue, Link, SharedFinalityProofRequestBuilder};
use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId};
use crate::chain::FinalityProofProvider;
use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient};
use crate::message::Message;
use crate::protocol::{self, Context, CustomMessageOutcome, Protocol, ConnectedPeer};
@@ -41,6 +42,8 @@ use tokio::runtime::Builder as RuntimeBuilder;
/// Interval at which we send status updates on the SyncProvider status stream.
const STATUS_INTERVAL: Duration = Duration::from_millis(5000);
/// Interval at which we update the `peers` field on the main thread.
const CONNECTED_PEERS_INTERVAL: Duration = Duration::from_millis(500);
pub use network_libp2p::PeerId;
@@ -53,8 +56,13 @@ pub trait SyncProvider<B: BlockT>: Send + Sync {
fn status(&self) -> mpsc::UnboundedReceiver<ProtocolStatus<B>>;
/// Get network state.
fn network_state(&self) -> NetworkState;
/// Get currently connected peers
fn peers(&self) -> Vec<(PeerId, PeerInfo<B>)>;
/// Get currently connected peers.
///
/// > **Warning**: This method can return outdated information and should only ever be used
/// > when obtaining outdated information is acceptable.
fn peers_debug_info(&self) -> Vec<(PeerId, PeerInfo<B>)>;
/// Are we in the process of downloading the chain?
fn is_major_syncing(&self) -> bool;
}
@@ -206,12 +214,9 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
let is_major_syncing = Arc::new(AtomicBool::new(false));
let peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<B>>>> = Arc::new(Default::default());
let protocol = Protocol::new(
peers.clone(),
params.config,
params.chain,
params.finality_proof_provider,
params.on_demand,
params.transaction_pool,
params.specialization,
)?;
let versions: Vec<_> = ((protocol::MIN_VERSION as u8)..=(protocol::CURRENT_VERSION as u8)).collect();
@@ -220,7 +225,10 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
is_offline.clone(),
is_major_syncing.clone(),
protocol,
peers.clone(),
import_queue.clone(),
params.transaction_pool,
params.finality_proof_provider,
network_port,
protocol_rx,
status_sinks.clone(),
@@ -392,7 +400,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> SyncProvider<B> for Servi
self.network.lock().state()
}
fn peers(&self) -> Vec<(PeerId, PeerInfo<B>)> {
fn peers_debug_info(&self) -> Vec<(PeerId, PeerInfo<B>)> {
let peers = (*self.peers.read()).clone();
peers.into_iter().map(|(idx, connected)| (idx, connected.peer_info)).collect()
}
@@ -514,7 +522,10 @@ fn start_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
is_offline: Arc<AtomicBool>,
is_major_syncing: Arc<AtomicBool>,
protocol: Protocol<B, S, H>,
peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<B>>>>,
import_queue: Box<ImportQueue<B>>,
transaction_pool: Arc<dyn TransactionPool<H, B>>,
finality_proof_provider: Option<Arc<FinalityProofProvider<B>>>,
network_port: mpsc::UnboundedReceiver<NetworkMsg<B>>,
protocol_rx: mpsc::UnboundedReceiver<ProtocolMsg<B, S>>,
status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<ProtocolStatus<B>>>>>,
@@ -540,7 +551,10 @@ fn start_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
is_major_syncing,
protocol,
service_clone,
peers,
import_queue,
transaction_pool,
finality_proof_provider,
network_port,
protocol_rx,
status_sinks,
@@ -567,7 +581,10 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
is_major_syncing: Arc<AtomicBool>,
mut protocol: Protocol<B, S, H>,
network_service: Arc<Mutex<NetworkService<Message<B>>>>,
peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<B>>>>,
import_queue: Box<ImportQueue<B>>,
transaction_pool: Arc<dyn TransactionPool<H, B>>,
finality_proof_provider: Option<Arc<FinalityProofProvider<B>>>,
mut network_port: mpsc::UnboundedReceiver<NetworkMsg<B>>,
mut protocol_rx: mpsc::UnboundedReceiver<ProtocolMsg<B, S>>,
status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<ProtocolStatus<B>>>>>,
@@ -589,6 +606,8 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
// Interval at which we send status updates on the `status_sinks`.
let mut status_interval = tokio::timer::Interval::new_interval(STATUS_INTERVAL);
// Interval at which we update the `connected_peers` Arc.
let mut connected_peers_interval = tokio::timer::Interval::new_interval(CONNECTED_PEERS_INTERVAL);
futures::future::poll_fn(move || {
while let Ok(Async::Ready(_)) = status_interval.poll() {
@@ -596,7 +615,14 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
status_sinks.lock().retain(|sink| sink.unbounded_send(status.clone()).is_ok());
}
match protocol.poll(&mut Ctxt(&mut network_service.lock(), &peerset)) {
while let Ok(Async::Ready(_)) = connected_peers_interval.poll() {
let infos = protocol.peers_info().map(|(id, info)| {
(id.clone(), ConnectedPeer { peer_info: info.clone() })
}).collect();
*peers.write() = infos;
}
match protocol.poll(&mut Ctxt(&mut network_service.lock(), &peerset), &*transaction_pool) {
Ok(Async::Ready(v)) => void::unreachable(v),
Ok(Async::NotReady) => {}
Err(err) => void::unreachable(err),
@@ -646,7 +672,7 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
protocol.gossip_consensus_message(&mut network_out, topic, engine_id, message, recipient),
ProtocolMsg::BlocksProcessed(hashes, has_error) =>
protocol.blocks_processed(&mut network_out, hashes, has_error),
ProtocolMsg::RestartSync =>
ProtocolMsg::RestartSync =>
protocol.restart(&mut network_out),
ProtocolMsg::AnnounceBlock(hash) =>
protocol.announce_block(&mut network_out, hash),
@@ -664,7 +690,8 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
protocol.request_finality_proof(&mut network_out, &hash, number),
ProtocolMsg::FinalityProofImportResult(requested_block, finalziation_result) =>
protocol.finality_proof_import_result(requested_block, finalziation_result),
ProtocolMsg::PropagateExtrinsics => protocol.propagate_extrinsics(&mut network_out),
ProtocolMsg::PropagateExtrinsics =>
protocol.propagate_extrinsics(&mut network_out, &*transaction_pool),
#[cfg(any(test, feature = "test-helpers"))]
ProtocolMsg::Tick => protocol.tick(&mut network_out),
#[cfg(any(test, feature = "test-helpers"))]
@@ -692,7 +719,13 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
CustomMessageOutcome::None
},
Ok(Async::Ready(Some(NetworkServiceEvent::CustomMessage { peer_id, message, .. }))) =>
protocol.on_custom_message(&mut network_out, peer_id, message),
protocol.on_custom_message(
&mut network_out,
&*transaction_pool,
peer_id,
message,
finality_proof_provider.as_ref().map(|p| &**p)
),
Ok(Async::Ready(Some(NetworkServiceEvent::Clogged { peer_id, messages, .. }))) => {
debug!(target: "sync", "{} clogging messages:", messages.len());
for msg in messages.into_iter().take(5) {
+19 -21
View File
@@ -44,7 +44,7 @@ use crate::message::Message;
use network_libp2p::PeerId;
use parking_lot::{Mutex, RwLock};
use primitives::{H256, sr25519::Public as AuthorityId, Blake2Hasher};
use crate::protocol::{ConnectedPeer, Context, Protocol, ProtocolStatus, CustomMessageOutcome, NetworkOut};
use crate::protocol::{Context, Protocol, ProtocolStatus, CustomMessageOutcome, NetworkOut};
use runtime_primitives::generic::BlockId;
use runtime_primitives::traits::{AuthorityIdFor, Block as BlockT, Digest, DigestItem, Header, NumberFor};
use runtime_primitives::{Justification, ConsensusEngineId};
@@ -274,7 +274,6 @@ impl<S: NetworkSpecialization<Block>> Link<Block> for TestLink<S> {
}
pub struct Peer<D, S: NetworkSpecialization<Block>> {
peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<Block>>>>,
peer_id: PeerId,
client: PeersClient,
net_proto_channel: ProtocolChannel<S>,
@@ -411,7 +410,7 @@ impl<S: NetworkSpecialization<Block>> ProtocolChannel<S> {
Ok(Async::Ready(None)) => None,
}))
});
if self.use_tokio {
fut.wait()
} else {
@@ -423,7 +422,6 @@ impl<S: NetworkSpecialization<Block>> ProtocolChannel<S> {
impl<D, S: NetworkSpecialization<Block>> Peer<D, S> {
fn new(
protocol_status: Arc<RwLock<ProtocolStatus<Block>>>,
peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<Block>>>>,
client: PeersClient,
import_queue: Box<BasicQueue<Block>>,
use_tokio: bool,
@@ -447,7 +445,6 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> {
import_queue.start(Box::new(network_link)).expect("Test ImportQueue always starts");
Peer {
protocol_status,
peers,
peer_id: PeerId::random(),
client,
import_queue,
@@ -792,6 +789,8 @@ pub trait TestNetFactory: Sized {
&mut self,
protocol_status: Arc<RwLock<ProtocolStatus<Block>>>,
import_queue: Box<BasicQueue<Block>>,
tx_pool: EmptyTransactionPool,
finality_proof_provider: Option<Arc<FinalityProofProvider<Block>>>,
mut protocol: Protocol<Block, Self::Specialization, Hash>,
network_sender: mpsc::UnboundedSender<NetworkMsg<Block>>,
mut network_to_protocol_rx: mpsc::UnboundedReceiver<FromNetworkMsg<Block>>,
@@ -825,7 +824,13 @@ pub trait TestNetFactory: Sized {
CustomMessageOutcome::None
},
Some(FromNetworkMsg::CustomMessage(peer_id, message)) =>
protocol.on_custom_message(&mut Ctxt(&network_sender), peer_id, message),
protocol.on_custom_message(
&mut Ctxt(&network_sender),
&tx_pool,
peer_id,
message,
finality_proof_provider.as_ref().map(|p| &**p)
),
Some(FromNetworkMsg::Synchronize) => {
let _ = network_sender.unbounded_send(NetworkMsg::Synchronized);
CustomMessageOutcome::None
@@ -876,7 +881,7 @@ pub trait TestNetFactory: Sized {
),
ProtocolMsg::BlocksProcessed(hashes, has_error) =>
protocol.blocks_processed(&mut Ctxt(&network_sender), hashes, has_error),
ProtocolMsg::RestartSync =>
ProtocolMsg::RestartSync =>
protocol.restart(&mut Ctxt(&network_sender)),
ProtocolMsg::AnnounceBlock(hash) =>
protocol.announce_block(&mut Ctxt(&network_sender), hash),
@@ -894,7 +899,8 @@ pub trait TestNetFactory: Sized {
protocol.request_finality_proof(&mut Ctxt(&network_sender), &hash, number),
ProtocolMsg::FinalityProofImportResult(requested_block, finalziation_result) =>
protocol.finality_proof_import_result(requested_block, finalziation_result),
ProtocolMsg::PropagateExtrinsics => protocol.propagate_extrinsics(&mut Ctxt(&network_sender)),
ProtocolMsg::PropagateExtrinsics =>
protocol.propagate_extrinsics(&mut Ctxt(&network_sender), &tx_pool),
#[cfg(any(test, feature = "test-helpers"))]
ProtocolMsg::Tick => protocol.tick(&mut Ctxt(&network_sender)),
#[cfg(any(test, feature = "test-helpers"))]
@@ -905,7 +911,7 @@ pub trait TestNetFactory: Sized {
}
}
if let Async::Ready(_) = protocol.poll(&mut Ctxt(&network_sender)).unwrap() {
if let Async::Ready(_) = protocol.poll(&mut Ctxt(&network_sender), &tx_pool).unwrap() {
return Ok(Async::Ready(()))
}
@@ -930,7 +936,6 @@ pub trait TestNetFactory: Sized {
/// Add a full peer.
fn add_full_peer(&mut self, config: &ProtocolConfig) {
let client = Arc::new(test_client::new());
let tx_pool = Arc::new(EmptyTransactionPool);
let verifier = self.make_verifier(PeersClient::Full(client.clone()), config);
let (block_import, justification_import, finality_proof_import, finality_proof_request_builder, data)
= self.make_block_import(PeersClient::Full(client.clone()));
@@ -944,18 +949,14 @@ pub trait TestNetFactory: Sized {
finality_proof_request_builder,
));
let specialization = self::SpecializationFactory::create();
let peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<Block>>>> = Arc::new(Default::default());
let (network_to_protocol_sender, network_to_protocol_rx) = mpsc::unbounded();
let (protocol_sender, protocol_rx) = mpsc::unbounded();
let protocol = Protocol::new(
peers.clone(),
config.clone(),
client.clone(),
self.make_finality_proof_provider(PeersClient::Full(client.clone())),
None,
tx_pool,
specialization,
).unwrap();
@@ -963,13 +964,14 @@ pub trait TestNetFactory: Sized {
self.add_peer(
protocol_status.clone(),
import_queue.clone(),
EmptyTransactionPool,
self.make_finality_proof_provider(PeersClient::Full(client.clone())),
protocol,
network_sender.clone(),
network_to_protocol_rx,
protocol_rx,
Arc::new(Peer::new(
protocol_status,
peers,
PeersClient::Full(client),
import_queue,
self.uses_tokio(),
@@ -988,7 +990,6 @@ pub trait TestNetFactory: Sized {
config.roles = Roles::LIGHT;
let client = Arc::new(test_client::new_light());
let tx_pool = Arc::new(EmptyTransactionPool);
let verifier = self.make_verifier(PeersClient::Light(client.clone()), &config);
let (block_import, justification_import, finality_proof_import, finality_proof_request_builder, data)
= self.make_block_import(PeersClient::Light(client.clone()));
@@ -1002,18 +1003,14 @@ pub trait TestNetFactory: Sized {
finality_proof_request_builder,
));
let specialization = self::SpecializationFactory::create();
let peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<Block>>>> = Arc::new(Default::default());
let (network_to_protocol_sender, network_to_protocol_rx) = mpsc::unbounded();
let (protocol_sender, protocol_rx) = mpsc::unbounded();
let protocol = Protocol::new(
peers.clone(),
config,
client.clone(),
self.make_finality_proof_provider(PeersClient::Light(client.clone())),
None,
tx_pool,
specialization,
).unwrap();
@@ -1021,13 +1018,14 @@ pub trait TestNetFactory: Sized {
self.add_peer(
protocol_status.clone(),
import_queue.clone(),
EmptyTransactionPool,
self.make_finality_proof_provider(PeersClient::Light(client.clone())),
protocol,
network_sender.clone(),
network_to_protocol_rx,
protocol_rx,
Arc::new(Peer::new(
protocol_status,
peers,
PeersClient::Light(client),
import_queue,
self.uses_tokio(),
+3 -3
View File
@@ -45,7 +45,7 @@ fn sync_peers_works() {
net.sync();
for peer in 0..3 {
// Assert peers is up to date.
assert_eq!(net.peer(peer).peers.read().len(), 2);
assert_eq!(net.peer(peer).protocol_status.read().num_peers, 2);
// And then disconnect.
for other in 0..3 {
if other != peer {
@@ -56,8 +56,8 @@ fn sync_peers_works() {
net.sync();
// Now peers are disconnected.
for peer in 0..3 {
let peers = net.peer(peer).peers.read();
assert_eq!(peers.len(), 0);
let status = net.peer(peer).protocol_status.read();
assert_eq!(status.num_peers, 0);
}
}
+2 -2
View File
@@ -110,14 +110,14 @@ impl<B: traits::Block> SystemApi<B::Hash, <B::Header as HeaderT>::Number> for Sy
fn system_health(&self) -> Result<Health> {
Ok(Health {
peers: self.sync.peers().len(),
peers: self.sync.peers_debug_info().len(),
is_syncing: self.sync.is_major_syncing(),
should_have_peers: self.should_have_peers,
})
}
fn system_peers(&self) -> Result<Vec<PeerInfo<B::Hash, <B::Header as HeaderT>::Number>>> {
Ok(self.sync.peers().into_iter().map(|(peer_id, p)| PeerInfo {
Ok(self.sync.peers_debug_info().into_iter().map(|(peer_id, p)| PeerInfo {
peer_id: peer_id.to_base58(),
roles: format!("{:?}", p.roles),
protocol_version: p.protocol_version,
+1 -1
View File
@@ -59,7 +59,7 @@ impl network::SyncProvider<Block> for Status {
}
}
fn peers(&self) -> Vec<(PeerId, NetworkPeerInfo<Block>)> {
fn peers_debug_info(&self) -> Vec<(PeerId, NetworkPeerInfo<Block>)> {
let mut peers = vec![];
for _peer in 0..self.peers {
peers.push(
+2 -2
View File
@@ -195,7 +195,7 @@ pub fn connectivity<F: ServiceFactory>(spec: FactoryChainSpec<F>) {
service.network().add_reserved_peer(first_address.to_string()).expect("Error adding reserved peer");
}
network.run_until_all_full(|_index, service|
service.network().peers().len() == NUM_NODES as usize - 1
service.network().peers_debug_info().len() == NUM_NODES as usize - 1
);
network.runtime
};
@@ -215,7 +215,7 @@ pub fn connectivity<F: ServiceFactory>(spec: FactoryChainSpec<F>) {
address = node_id.clone();
}
network.run_until_all_full(|_index, service| {
service.network().peers().len() == NUM_NODES as usize - 1
service.network().peers_debug_info().len() == NUM_NODES as usize - 1
});
}
temp.close().expect("Error removing temp dir");