mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-09 20:11:09 +00:00
* Add a failing test * Make test not freeze * Fix the bug * Fix spaces * Fix tests * Apply suggestions from code review Co-Authored-By: Toralf Wittner <tw@dtex.org> * Make sure test doesn't succeed if nothing happened * Fix build * Do the events change Co-authored-by: Toralf Wittner <tw@dtex.org>
This commit is contained in:
@@ -126,6 +126,7 @@ impl NetworkParams {
|
||||
},
|
||||
listen_addresses,
|
||||
public_addresses: Vec::new(),
|
||||
notifications_protocols: Vec::new(),
|
||||
node_key,
|
||||
node_name: node_name.to_string(),
|
||||
client_version: client_id.to_string(),
|
||||
|
||||
@@ -31,16 +31,23 @@ pub use crate::protocol::ProtocolConfig;
|
||||
|
||||
use crate::service::ExHashT;
|
||||
|
||||
use sp_consensus::{block_validation::BlockAnnounceValidator, import_queue::ImportQueue};
|
||||
use sp_runtime::traits::{Block as BlockT};
|
||||
use libp2p::identity::{Keypair, ed25519};
|
||||
use libp2p::wasm_ext;
|
||||
use libp2p::{PeerId, Multiaddr, multiaddr};
|
||||
use core::{fmt, iter};
|
||||
use std::{convert::TryFrom, future::Future, pin::Pin, str::FromStr};
|
||||
use std::{error::Error, fs, io::{self, Write}, net::Ipv4Addr, path::{Path, PathBuf}, sync::Arc};
|
||||
use zeroize::Zeroize;
|
||||
use libp2p::identity::{ed25519, Keypair};
|
||||
use libp2p::wasm_ext;
|
||||
use libp2p::{multiaddr, Multiaddr, PeerId};
|
||||
use prometheus_endpoint::Registry;
|
||||
use sp_consensus::{block_validation::BlockAnnounceValidator, import_queue::ImportQueue};
|
||||
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
|
||||
use std::{borrow::Cow, convert::TryFrom, future::Future, pin::Pin, str::FromStr};
|
||||
use std::{
|
||||
error::Error,
|
||||
fs,
|
||||
io::{self, Write},
|
||||
net::Ipv4Addr,
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
};
|
||||
use zeroize::Zeroize;
|
||||
|
||||
/// Network initialization parameters.
|
||||
pub struct Params<B: BlockT, H: ExHashT> {
|
||||
@@ -317,6 +324,9 @@ pub struct NetworkConfiguration {
|
||||
pub boot_nodes: Vec<MultiaddrWithPeerId>,
|
||||
/// The node key configuration, which determines the node's network identity keypair.
|
||||
pub node_key: NodeKeyConfig,
|
||||
/// List of notifications protocols that the node supports. Must also include a
|
||||
/// `ConsensusEngineId` for backwards-compatibility.
|
||||
pub notifications_protocols: Vec<(ConsensusEngineId, Cow<'static, [u8]>)>,
|
||||
/// Maximum allowed number of incoming connections.
|
||||
pub in_peers: u32,
|
||||
/// Number of outgoing connections we're trying to maintain.
|
||||
@@ -349,6 +359,7 @@ impl NetworkConfiguration {
|
||||
public_addresses: Vec::new(),
|
||||
boot_nodes: Vec::new(),
|
||||
node_key,
|
||||
notifications_protocols: Vec::new(),
|
||||
in_peers: 25,
|
||||
out_peers: 75,
|
||||
reserved_nodes: Vec::new(),
|
||||
|
||||
@@ -442,6 +442,38 @@ impl ProtocolsHandler for NotifsHandler {
|
||||
) -> Poll<
|
||||
ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>
|
||||
> {
|
||||
while let Poll::Ready(ev) = self.legacy.poll(cx) {
|
||||
match ev {
|
||||
ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: () } =>
|
||||
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
|
||||
protocol: protocol.map_upgrade(EitherUpgrade::B),
|
||||
info: None,
|
||||
}),
|
||||
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen { endpoint, .. }) =>
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(
|
||||
NotifsHandlerOut::Open { endpoint }
|
||||
)),
|
||||
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { endpoint, reason }) =>
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(
|
||||
NotifsHandlerOut::Closed { endpoint, reason }
|
||||
)),
|
||||
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomMessage { message }) =>
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(
|
||||
NotifsHandlerOut::CustomMessage { message }
|
||||
)),
|
||||
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::Clogged { messages }) =>
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(
|
||||
NotifsHandlerOut::Clogged { messages }
|
||||
)),
|
||||
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::ProtocolError { is_severe, error }) =>
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(
|
||||
NotifsHandlerOut::ProtocolError { is_severe, error }
|
||||
)),
|
||||
ProtocolsHandlerEvent::Close(err) =>
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Close(EitherError::B(err))),
|
||||
}
|
||||
}
|
||||
|
||||
for (handler_num, handler) in self.in_handlers.iter_mut().enumerate() {
|
||||
while let Poll::Ready(ev) = handler.poll(cx) {
|
||||
match ev {
|
||||
@@ -495,38 +527,6 @@ impl ProtocolsHandler for NotifsHandler {
|
||||
}
|
||||
}
|
||||
|
||||
while let Poll::Ready(ev) = self.legacy.poll(cx) {
|
||||
match ev {
|
||||
ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: () } =>
|
||||
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
|
||||
protocol: protocol.map_upgrade(EitherUpgrade::B),
|
||||
info: None,
|
||||
}),
|
||||
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen { endpoint, .. }) =>
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(
|
||||
NotifsHandlerOut::Open { endpoint }
|
||||
)),
|
||||
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { endpoint, reason }) =>
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(
|
||||
NotifsHandlerOut::Closed { endpoint, reason }
|
||||
)),
|
||||
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomMessage { message }) =>
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(
|
||||
NotifsHandlerOut::CustomMessage { message }
|
||||
)),
|
||||
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::Clogged { messages }) =>
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(
|
||||
NotifsHandlerOut::Clogged { messages }
|
||||
)),
|
||||
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::ProtocolError { is_severe, error }) =>
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(
|
||||
NotifsHandlerOut::ProtocolError { is_severe, error }
|
||||
)),
|
||||
ProtocolsHandlerEvent::Close(err) =>
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Close(EitherError::B(err))),
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
@@ -52,7 +52,7 @@ use sp_runtime::{
|
||||
traits::{Block as BlockT, NumberFor},
|
||||
ConsensusEngineId,
|
||||
};
|
||||
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};
|
||||
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
collections::{HashMap, HashSet},
|
||||
@@ -60,15 +60,20 @@ use std::{
|
||||
marker::PhantomData,
|
||||
pin::Pin,
|
||||
str,
|
||||
sync::{atomic::{AtomicBool, AtomicUsize, Ordering}, Arc},
|
||||
sync::{
|
||||
atomic::{AtomicBool, AtomicUsize, Ordering},
|
||||
Arc,
|
||||
},
|
||||
task::Poll,
|
||||
};
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
/// Minimum Requirements for a Hash within Networking
|
||||
pub trait ExHashT: std::hash::Hash + Eq + std::fmt::Debug + Clone + Send + Sync + 'static {}
|
||||
|
||||
impl<T> ExHashT for T where
|
||||
T: std::hash::Hash + Eq + std::fmt::Debug + Clone + Send + Sync + 'static
|
||||
impl<T> ExHashT for T where T: std::hash::Hash + Eq + std::fmt::Debug + Clone + Send + Sync + 'static
|
||||
{}
|
||||
|
||||
/// Transaction pool interface
|
||||
@@ -284,7 +289,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
|
||||
)?;
|
||||
|
||||
// Build the swarm.
|
||||
let (mut swarm, bandwidth): (Swarm::<B, H>, _) = {
|
||||
let (mut swarm, bandwidth): (Swarm<B, H>, _) = {
|
||||
let user_agent = format!(
|
||||
"{} ({})",
|
||||
params.network_config.client_version,
|
||||
@@ -296,9 +301,14 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
|
||||
};
|
||||
let light_client_handler = {
|
||||
let config = protocol::light_client_handler::Config::new(¶ms.protocol_id);
|
||||
protocol::LightClientHandler::new(config, params.chain, checker, peerset_handle.clone())
|
||||
protocol::LightClientHandler::new(
|
||||
config,
|
||||
params.chain,
|
||||
checker,
|
||||
peerset_handle.clone(),
|
||||
)
|
||||
};
|
||||
let behaviour = futures::executor::block_on(Behaviour::new(
|
||||
let mut behaviour = futures::executor::block_on(Behaviour::new(
|
||||
protocol,
|
||||
params.role,
|
||||
user_agent,
|
||||
@@ -316,6 +326,9 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
|
||||
block_requests,
|
||||
light_client_handler
|
||||
));
|
||||
for (engine_id, protocol_name) in ¶ms.network_config.notifications_protocols {
|
||||
behaviour.register_notifications_protocol(*engine_id, protocol_name.clone());
|
||||
}
|
||||
let (transport, bandwidth) = {
|
||||
let (config_mem, config_wasm, flowctrl) = match params.network_config.transport {
|
||||
TransportConfig::MemoryOnly => (true, None, false),
|
||||
@@ -531,6 +544,11 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
|
||||
}
|
||||
|
||||
impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
|
||||
/// Returns the local `PeerId`.
|
||||
pub fn local_peer_id(&self) -> &PeerId {
|
||||
&self.local_peer_id
|
||||
}
|
||||
|
||||
/// Writes a message on an open notifications channel. Has no effect if the notifications
|
||||
/// channel with this protocol name is closed.
|
||||
///
|
||||
|
||||
@@ -0,0 +1,270 @@
|
||||
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Substrate is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Substrate is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use crate::{config, Event, NetworkService, NetworkWorker};
|
||||
|
||||
use futures::prelude::*;
|
||||
use sp_runtime::traits::{Block as BlockT, Header as _};
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt as _};
|
||||
|
||||
type TestNetworkService = NetworkService<
|
||||
substrate_test_runtime_client::runtime::Block,
|
||||
substrate_test_runtime_client::runtime::Hash,
|
||||
>;
|
||||
|
||||
/// Builds a full node to be used for testing. Returns the node service and its associated events
|
||||
/// stream.
|
||||
///
|
||||
/// > **Note**: We return the events stream in order to not possibly lose events between the
|
||||
/// > construction of the service and the moment the events stream is grabbed.
|
||||
fn build_test_full_node(config: config::NetworkConfiguration)
|
||||
-> (Arc<TestNetworkService>, impl Stream<Item = Event>)
|
||||
{
|
||||
let client = Arc::new(
|
||||
TestClientBuilder::with_default_backend()
|
||||
.build_with_longest_chain()
|
||||
.0,
|
||||
);
|
||||
|
||||
#[derive(Clone)]
|
||||
struct PassThroughVerifier(bool);
|
||||
impl<B: BlockT> sp_consensus::import_queue::Verifier<B> for PassThroughVerifier {
|
||||
fn verify(
|
||||
&mut self,
|
||||
origin: sp_consensus::BlockOrigin,
|
||||
header: B::Header,
|
||||
justification: Option<sp_runtime::Justification>,
|
||||
body: Option<Vec<B::Extrinsic>>,
|
||||
) -> Result<
|
||||
(
|
||||
sp_consensus::BlockImportParams<B, ()>,
|
||||
Option<Vec<(sp_blockchain::well_known_cache_keys::Id, Vec<u8>)>>,
|
||||
),
|
||||
String,
|
||||
> {
|
||||
let maybe_keys = header
|
||||
.digest()
|
||||
.log(|l| {
|
||||
l.try_as_raw(sp_runtime::generic::OpaqueDigestItemId::Consensus(b"aura"))
|
||||
.or_else(|| {
|
||||
l.try_as_raw(sp_runtime::generic::OpaqueDigestItemId::Consensus(b"babe"))
|
||||
})
|
||||
})
|
||||
.map(|blob| {
|
||||
vec![(
|
||||
sp_blockchain::well_known_cache_keys::AUTHORITIES,
|
||||
blob.to_vec(),
|
||||
)]
|
||||
});
|
||||
|
||||
let mut import = sp_consensus::BlockImportParams::new(origin, header);
|
||||
import.body = body;
|
||||
import.finalized = self.0;
|
||||
import.justification = justification;
|
||||
import.fork_choice = Some(sp_consensus::ForkChoiceStrategy::LongestChain);
|
||||
Ok((import, maybe_keys))
|
||||
}
|
||||
}
|
||||
|
||||
let import_queue = Box::new(sp_consensus::import_queue::BasicQueue::new(
|
||||
PassThroughVerifier(false),
|
||||
Box::new(client.clone()),
|
||||
None,
|
||||
None,
|
||||
));
|
||||
|
||||
let worker = NetworkWorker::new(config::Params {
|
||||
role: config::Role::Full,
|
||||
executor: None,
|
||||
network_config: config,
|
||||
chain: client.clone(),
|
||||
finality_proof_provider: None,
|
||||
finality_proof_request_builder: None,
|
||||
on_demand: None,
|
||||
transaction_pool: Arc::new(crate::service::EmptyTransactionPool),
|
||||
protocol_id: config::ProtocolId::from(&b"/test-protocol-name"[..]),
|
||||
import_queue,
|
||||
block_announce_validator: Box::new(
|
||||
sp_consensus::block_validation::DefaultBlockAnnounceValidator::new(client.clone()),
|
||||
),
|
||||
metrics_registry: None,
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let service = worker.service().clone();
|
||||
let event_stream = service.event_stream();
|
||||
|
||||
async_std::task::spawn(async move {
|
||||
futures::pin_mut!(worker);
|
||||
let _ = worker.await;
|
||||
});
|
||||
|
||||
(service, event_stream)
|
||||
}
|
||||
|
||||
const ENGINE_ID: sp_runtime::ConsensusEngineId = *b"foo\0";
|
||||
|
||||
/// Builds two nodes and their associated events stream.
|
||||
/// The nodes are connected together and have the `ENGINE_ID` protocol registered.
|
||||
fn build_nodes_one_proto()
|
||||
-> (Arc<TestNetworkService>, impl Stream<Item = Event>, Arc<TestNetworkService>, impl Stream<Item = Event>)
|
||||
{
|
||||
let listen_addr = config::build_multiaddr![Memory(rand::random::<u64>())];
|
||||
|
||||
let (node1, events_stream1) = build_test_full_node(config::NetworkConfiguration {
|
||||
notifications_protocols: vec![(ENGINE_ID, From::from(&b"/foo"[..]))],
|
||||
listen_addresses: vec![listen_addr.clone()],
|
||||
transport: config::TransportConfig::MemoryOnly,
|
||||
.. config::NetworkConfiguration::new_local()
|
||||
});
|
||||
|
||||
let (node2, events_stream2) = build_test_full_node(config::NetworkConfiguration {
|
||||
notifications_protocols: vec![(ENGINE_ID, From::from(&b"/foo"[..]))],
|
||||
reserved_nodes: vec![config::MultiaddrWithPeerId {
|
||||
multiaddr: listen_addr,
|
||||
peer_id: node1.local_peer_id().clone(),
|
||||
}],
|
||||
transport: config::TransportConfig::MemoryOnly,
|
||||
.. config::NetworkConfiguration::new_local()
|
||||
});
|
||||
|
||||
(node1, events_stream1, node2, events_stream2)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn notifications_state_consistent() {
|
||||
// Runs two nodes and ensures that events are propagated out of the API in a consistent
|
||||
// correct order, which means no notification received on a closed substream.
|
||||
|
||||
let (node1, mut events_stream1, node2, mut events_stream2) = build_nodes_one_proto();
|
||||
|
||||
// Write some initial notifications that shouldn't get through.
|
||||
for _ in 0..(rand::random::<u8>() % 5) {
|
||||
node1.write_notification(node2.local_peer_id().clone(), ENGINE_ID, b"hello world".to_vec());
|
||||
}
|
||||
for _ in 0..(rand::random::<u8>() % 5) {
|
||||
node2.write_notification(node1.local_peer_id().clone(), ENGINE_ID, b"hello world".to_vec());
|
||||
}
|
||||
|
||||
async_std::task::block_on(async move {
|
||||
// True if we have an active substream from node1 to node2.
|
||||
let mut node1_to_node2_open = false;
|
||||
// True if we have an active substream from node2 to node1.
|
||||
let mut node2_to_node1_open = false;
|
||||
// We stop the test after a certain number of iterations.
|
||||
let mut iterations = 0;
|
||||
// Safe guard because we don't want the test to pass if no substream has been open.
|
||||
let mut something_happened = false;
|
||||
|
||||
loop {
|
||||
iterations += 1;
|
||||
if iterations >= 1_000 {
|
||||
assert!(something_happened);
|
||||
break;
|
||||
}
|
||||
|
||||
// Start by sending a notification from node1 to node2 and vice-versa. Part of the
|
||||
// test consists in ensuring that notifications get ignored if the stream isn't open.
|
||||
if rand::random::<u8>() % 5 >= 3 {
|
||||
node1.write_notification(node2.local_peer_id().clone(), ENGINE_ID, b"hello world".to_vec());
|
||||
}
|
||||
if rand::random::<u8>() % 5 >= 3 {
|
||||
node2.write_notification(node1.local_peer_id().clone(), ENGINE_ID, b"hello world".to_vec());
|
||||
}
|
||||
|
||||
// Also randomly disconnect the two nodes from time to time.
|
||||
if rand::random::<u8>() % 20 == 0 {
|
||||
node1.disconnect_peer(node2.local_peer_id().clone());
|
||||
}
|
||||
if rand::random::<u8>() % 20 == 0 {
|
||||
node2.disconnect_peer(node1.local_peer_id().clone());
|
||||
}
|
||||
|
||||
// Grab next event from either `events_stream1` or `events_stream2`.
|
||||
let next_event = {
|
||||
let next1 = events_stream1.next();
|
||||
let next2 = events_stream2.next();
|
||||
// We also await on a small timer, otherwise it is possible for the test to wait
|
||||
// forever while nothing at all happens on the network.
|
||||
let continue_test = futures_timer::Delay::new(Duration::from_millis(20));
|
||||
match future::select(future::select(next1, next2), continue_test).await {
|
||||
future::Either::Left((future::Either::Left((Some(ev), _)), _)) =>
|
||||
future::Either::Left(ev),
|
||||
future::Either::Left((future::Either::Right((Some(ev), _)), _)) =>
|
||||
future::Either::Right(ev),
|
||||
future::Either::Right(_) => continue,
|
||||
_ => break,
|
||||
}
|
||||
};
|
||||
|
||||
match next_event {
|
||||
future::Either::Left(Event::NotificationStreamOpened { remote, engine_id, .. }) => {
|
||||
something_happened = true;
|
||||
assert!(!node1_to_node2_open);
|
||||
node1_to_node2_open = true;
|
||||
assert_eq!(remote, *node2.local_peer_id());
|
||||
assert_eq!(engine_id, ENGINE_ID);
|
||||
}
|
||||
future::Either::Right(Event::NotificationStreamOpened { remote, engine_id, .. }) => {
|
||||
something_happened = true;
|
||||
assert!(!node2_to_node1_open);
|
||||
node2_to_node1_open = true;
|
||||
assert_eq!(remote, *node1.local_peer_id());
|
||||
assert_eq!(engine_id, ENGINE_ID);
|
||||
}
|
||||
future::Either::Left(Event::NotificationStreamClosed { remote, engine_id, .. }) => {
|
||||
assert!(node1_to_node2_open);
|
||||
node1_to_node2_open = false;
|
||||
assert_eq!(remote, *node2.local_peer_id());
|
||||
assert_eq!(engine_id, ENGINE_ID);
|
||||
}
|
||||
future::Either::Right(Event::NotificationStreamClosed { remote, engine_id, .. }) => {
|
||||
assert!(node2_to_node1_open);
|
||||
node2_to_node1_open = false;
|
||||
assert_eq!(remote, *node1.local_peer_id());
|
||||
assert_eq!(engine_id, ENGINE_ID);
|
||||
}
|
||||
future::Either::Left(Event::NotificationsReceived { remote, .. }) => {
|
||||
assert!(node1_to_node2_open);
|
||||
assert_eq!(remote, *node2.local_peer_id());
|
||||
if rand::random::<u8>() % 5 >= 4 {
|
||||
node1.write_notification(
|
||||
node2.local_peer_id().clone(),
|
||||
ENGINE_ID,
|
||||
b"hello world".to_vec()
|
||||
);
|
||||
}
|
||||
}
|
||||
future::Either::Right(Event::NotificationsReceived { remote, .. }) => {
|
||||
assert!(node2_to_node1_open);
|
||||
assert_eq!(remote, *node1.local_peer_id());
|
||||
if rand::random::<u8>() % 5 >= 4 {
|
||||
node2.write_notification(
|
||||
node1.local_peer_id().clone(),
|
||||
ENGINE_ID,
|
||||
b"hello world".to_vec()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Add new events here.
|
||||
future::Either::Left(Event::Dht(_)) => {}
|
||||
future::Either::Right(Event::Dht(_)) => {}
|
||||
};
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -41,7 +41,7 @@ use sp_consensus::block_import::{BlockImport, ImportResult};
|
||||
use sp_consensus::Error as ConsensusError;
|
||||
use sp_consensus::{BlockOrigin, ForkChoiceStrategy, BlockImportParams, BlockCheckParams, JustificationImport};
|
||||
use futures::prelude::*;
|
||||
use sc_network::{NetworkWorker, NetworkStateInfo, NetworkService, config::ProtocolId};
|
||||
use sc_network::{NetworkWorker, NetworkService, config::ProtocolId};
|
||||
use sc_network::config::{NetworkConfiguration, TransportConfig, BoxFinalityProofRequestBuilder};
|
||||
use libp2p::PeerId;
|
||||
use parking_lot::Mutex;
|
||||
@@ -189,7 +189,7 @@ pub struct Peer<D> {
|
||||
impl<D> Peer<D> {
|
||||
/// Get this peer ID.
|
||||
pub fn id(&self) -> PeerId {
|
||||
self.network.service().local_peer_id()
|
||||
self.network.service().local_peer_id().clone()
|
||||
}
|
||||
|
||||
/// Returns true if we're major syncing.
|
||||
@@ -628,7 +628,7 @@ pub trait TestNetFactory: Sized {
|
||||
|
||||
self.mut_peers(|peers| {
|
||||
for peer in peers.iter_mut() {
|
||||
peer.network.add_known_address(network.service().local_peer_id(), listen_addr.clone());
|
||||
peer.network.add_known_address(network.service().local_peer_id().clone(), listen_addr.clone());
|
||||
}
|
||||
|
||||
let imported_blocks_stream = Box::pin(client.import_notification_stream().fuse());
|
||||
@@ -704,7 +704,7 @@ pub trait TestNetFactory: Sized {
|
||||
|
||||
self.mut_peers(|peers| {
|
||||
for peer in peers.iter_mut() {
|
||||
peer.network.add_known_address(network.service().local_peer_id(), listen_addr.clone());
|
||||
peer.network.add_known_address(network.service().local_peer_id().clone(), listen_addr.clone());
|
||||
}
|
||||
|
||||
let imported_blocks_stream = Box::pin(client.import_notification_stream().fuse());
|
||||
|
||||
@@ -37,7 +37,7 @@ use sc_service::{
|
||||
Role,
|
||||
Error,
|
||||
};
|
||||
use sc_network::{multiaddr, Multiaddr, NetworkStateInfo};
|
||||
use sc_network::{multiaddr, Multiaddr};
|
||||
use sc_network::config::{NetworkConfiguration, TransportConfig};
|
||||
use sp_runtime::{generic::BlockId, traits::Block as BlockT};
|
||||
use sp_transaction_pool::TransactionPool;
|
||||
@@ -265,7 +265,7 @@ impl<G, E, F, L, U> TestNet<G, E, F, L, U> where
|
||||
let service = SyncService::from(service);
|
||||
|
||||
executor.spawn(service.clone().map_err(|_| ()));
|
||||
let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().into()));
|
||||
let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().clone().into()));
|
||||
self.authority_nodes.push((self.nodes, service, user_data, addr));
|
||||
self.nodes += 1;
|
||||
}
|
||||
@@ -281,7 +281,7 @@ impl<G, E, F, L, U> TestNet<G, E, F, L, U> where
|
||||
let service = SyncService::from(service);
|
||||
|
||||
executor.spawn(service.clone().map_err(|_| ()));
|
||||
let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().into()));
|
||||
let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().clone().into()));
|
||||
self.full_nodes.push((self.nodes, service, user_data, addr));
|
||||
self.nodes += 1;
|
||||
}
|
||||
@@ -296,7 +296,7 @@ impl<G, E, F, L, U> TestNet<G, E, F, L, U> where
|
||||
let service = SyncService::from(light(node_config).expect("Error creating test node service"));
|
||||
|
||||
executor.spawn(service.clone().map_err(|_| ()));
|
||||
let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().into()));
|
||||
let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().clone().into()));
|
||||
self.light_nodes.push((self.nodes, service, addr));
|
||||
self.nodes += 1;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user