Update to libp2p 0.37 (#8625)

* Update to libp2p 0.37

* Line widths

* Fix tests
This commit is contained in:
Pierre Krieger
2021-04-18 10:04:45 +02:00
committed by GitHub
parent b6b107030d
commit d64f79924a
19 changed files with 220 additions and 168 deletions
+23 -7
View File
@@ -573,9 +573,19 @@ impl NetworkBehaviour for DiscoveryBehaviour {
}
}
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
fn inject_expired_external_addr(&mut self, addr: &Multiaddr) {
let with_peer_id = addr.clone()
.with(Protocol::P2p(self.local_peer_id.clone().into()));
self.known_external_addresses.remove(&with_peer_id);
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_expired_listen_addr(k, addr)
NetworkBehaviour::inject_expired_external_addr(k, addr)
}
}
fn inject_expired_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_expired_listen_addr(k, id, addr)
}
}
@@ -585,9 +595,15 @@ impl NetworkBehaviour for DiscoveryBehaviour {
}
}
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
fn inject_new_listener(&mut self, id: ListenerId) {
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_new_listen_addr(k, addr)
NetworkBehaviour::inject_new_listener(k, id)
}
}
fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_new_listen_addr(k, id, addr)
}
}
@@ -892,7 +908,7 @@ mod tests {
first_swarm_peer_id_and_addr = Some((keypair.public().into_peer_id(), listen_addr.clone()))
}
Swarm::listen_on(&mut swarm, listen_addr.clone()).unwrap();
swarm.listen_on(listen_addr.clone()).unwrap();
(swarm, listen_addr)
}).collect::<Vec<_>>();
@@ -915,13 +931,13 @@ mod tests {
DiscoveryOut::UnroutablePeer(other) | DiscoveryOut::Discovered(other) => {
// Call `add_self_reported_address` to simulate identify happening.
let addr = swarms.iter().find_map(|(s, a)|
if s.local_peer_id == other {
if s.behaviour().local_peer_id == other {
Some(a.clone())
} else {
None
})
.unwrap();
swarms[swarm_n].0.add_self_reported_address(
swarms[swarm_n].0.behaviour_mut().add_self_reported_address(
&other,
[protocol_name_from_protocol_id(&protocol_id)].iter(),
addr,
+21 -9
View File
@@ -23,7 +23,7 @@ use libp2p::core::connection::{ConnectionId, ListenerId};
use libp2p::core::{ConnectedPoint, either::EitherOutput, PeerId, PublicKey};
use libp2p::swarm::{IntoProtocolsHandler, IntoProtocolsHandlerSelect, ProtocolsHandler};
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p::identify::{Identify, IdentifyEvent, IdentifyInfo};
use libp2p::identify::{Identify, IdentifyConfig, IdentifyEvent, IdentifyInfo};
use libp2p::ping::{Ping, PingConfig, PingEvent, PingSuccess};
use log::{debug, trace, error};
use smallvec::SmallVec;
@@ -86,8 +86,9 @@ impl PeerInfoBehaviour {
local_public_key: PublicKey,
) -> Self {
let identify = {
let proto_version = "/substrate/1.0".to_string();
Identify::new(proto_version, user_agent, local_public_key)
let cfg = IdentifyConfig::new("/substrate/1.0".to_string(), local_public_key)
.with_agent_version(user_agent);
Identify::new(cfg)
};
PeerInfoBehaviour {
@@ -253,14 +254,19 @@ impl NetworkBehaviour for PeerInfoBehaviour {
self.identify.inject_dial_failure(peer_id);
}
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
self.ping.inject_new_listen_addr(addr);
self.identify.inject_new_listen_addr(addr);
fn inject_new_listener(&mut self, id: ListenerId) {
self.ping.inject_new_listener(id);
self.identify.inject_new_listener(id);
}
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
self.ping.inject_expired_listen_addr(addr);
self.identify.inject_expired_listen_addr(addr);
fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
self.ping.inject_new_listen_addr(id, addr);
self.identify.inject_new_listen_addr(id, addr);
}
fn inject_expired_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
self.ping.inject_expired_listen_addr(id, addr);
self.identify.inject_expired_listen_addr(id, addr);
}
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
@@ -268,6 +274,11 @@ impl NetworkBehaviour for PeerInfoBehaviour {
self.identify.inject_new_external_addr(addr);
}
fn inject_expired_external_addr(&mut self, addr: &Multiaddr) {
self.ping.inject_expired_external_addr(addr);
self.identify.inject_expired_external_addr(addr);
}
fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn error::Error + 'static)) {
self.ping.inject_listener_error(id, err);
self.identify.inject_listener_error(id, err);
@@ -323,6 +334,7 @@ impl NetworkBehaviour for PeerInfoBehaviour {
}
IdentifyEvent::Error { peer_id, error } =>
debug!(target: "sub-libp2p", "Identification with peer {:?} failed => {}", peer_id, error),
IdentifyEvent::Pushed { .. } => {}
IdentifyEvent::Sent { .. } => {}
}
},
+12 -4
View File
@@ -1523,18 +1523,26 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
self.behaviour.inject_dial_failure(peer_id)
}
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
self.behaviour.inject_new_listen_addr(addr)
fn inject_new_listener(&mut self, id: ListenerId) {
self.behaviour.inject_new_listener(id)
}
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
self.behaviour.inject_expired_listen_addr(addr)
fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
self.behaviour.inject_new_listen_addr(id, addr)
}
fn inject_expired_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
self.behaviour.inject_expired_listen_addr(id, addr)
}
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
self.behaviour.inject_new_external_addr(addr)
}
fn inject_expired_external_addr(&mut self, addr: &Multiaddr) {
self.behaviour.inject_expired_external_addr(addr)
}
fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) {
self.behaviour.inject_listener_error(id, err);
}
@@ -97,7 +97,7 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
behaviour,
keypairs[index].public().into_peer_id()
);
Swarm::listen_on(&mut swarm, addrs[index].clone()).unwrap();
swarm.listen_on(addrs[index].clone()).unwrap();
out.push(swarm);
}
@@ -192,18 +192,26 @@ impl NetworkBehaviour for CustomProtoWithAddr {
self.inner.inject_dial_failure(peer_id)
}
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
self.inner.inject_new_listen_addr(addr)
fn inject_new_listener(&mut self, id: ListenerId) {
self.inner.inject_new_listener(id)
}
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
self.inner.inject_expired_listen_addr(addr)
fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
self.inner.inject_new_listen_addr(id, addr)
}
fn inject_expired_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
self.inner.inject_expired_listen_addr(id, addr)
}
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
self.inner.inject_new_external_addr(addr)
}
fn inject_expired_external_addr(&mut self, addr: &Multiaddr) {
self.inner.inject_expired_external_addr(addr)
}
fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn error::Error + 'static)) {
self.inner.inject_listener_error(id, err);
}
@@ -245,7 +253,7 @@ fn reconnect_after_disconnect() {
ServiceState::NotConnected => {
service1_state = ServiceState::FirstConnec;
if service2_state == ServiceState::FirstConnec {
service1.disconnect_peer(
service1.behaviour_mut().disconnect_peer(
Swarm::local_peer_id(&service2),
sc_peerset::SetId::from(0)
);
@@ -267,7 +275,7 @@ fn reconnect_after_disconnect() {
ServiceState::NotConnected => {
service2_state = ServiceState::FirstConnec;
if service1_state == ServiceState::FirstConnec {
service1.disconnect_peer(
service1.behaviour_mut().disconnect_peer(
Swarm::local_peer_id(&service2),
sc_peerset::SetId::from(0)
);
@@ -428,9 +428,15 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
}
}
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
fn inject_expired_external_addr(&mut self, addr: &Multiaddr) {
for (p, _) in self.protocols.values_mut() {
NetworkBehaviour::inject_expired_listen_addr(p, addr)
NetworkBehaviour::inject_expired_external_addr(p, addr)
}
}
fn inject_expired_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
for (p, _) in self.protocols.values_mut() {
NetworkBehaviour::inject_expired_listen_addr(p, id, addr)
}
}
@@ -440,9 +446,15 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
}
}
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
fn inject_new_listener(&mut self, id: ListenerId) {
for (p, _) in self.protocols.values_mut() {
NetworkBehaviour::inject_new_listen_addr(p, addr)
NetworkBehaviour::inject_new_listener(p, id)
}
}
fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
for (p, _) in self.protocols.values_mut() {
NetworkBehaviour::inject_new_listen_addr(p, id, addr)
}
}
@@ -930,7 +942,7 @@ mod tests {
let mut swarm = Swarm::new(transport, behaviour, keypair.public().into_peer_id());
let listen_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
Swarm::listen_on(&mut swarm, listen_addr.clone()).unwrap();
swarm.listen_on(listen_addr.clone()).unwrap();
(swarm, listen_addr)
}
@@ -1000,7 +1012,7 @@ mod tests {
match swarm.next_event().await {
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
let (sender, receiver) = oneshot::channel();
swarm.send_request(
swarm.behaviour_mut().send_request(
&peer_id,
protocol_name,
b"this is a request".to_vec(),
@@ -1090,7 +1102,7 @@ mod tests {
match swarm.next_event().await {
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
let (sender, receiver) = oneshot::channel();
swarm.send_request(
swarm.behaviour_mut().send_request(
&peer_id,
protocol_name,
b"this is a request".to_vec(),
@@ -1182,7 +1194,7 @@ mod tests {
// Ask swarm 1 to dial swarm 2. There isn't any discovery mechanism in place in this test,
// so they wouldn't connect to each other.
Swarm::dial_addr(&mut swarm_1, listen_add_2).unwrap();
swarm_1.dial_addr(listen_add_2).unwrap();
// Run swarm 2 in the background, receiving two requests.
pool.spawner().spawn_obj(
@@ -1235,14 +1247,14 @@ mod tests {
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
let (sender_1, receiver_1) = oneshot::channel();
let (sender_2, receiver_2) = oneshot::channel();
swarm_1.send_request(
swarm_1.behaviour_mut().send_request(
&peer_id,
protocol_name_1,
b"this is a request".to_vec(),
sender_1,
IfDisconnected::ImmediateError,
);
swarm_1.send_request(
swarm_1.behaviour_mut().send_request(
&peer_id,
protocol_name_2,
b"this is a request".to_vec(),
+56 -52
View File
@@ -465,47 +465,47 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
/// Returns the number of peers we're connected to.
pub fn num_connected_peers(&self) -> usize {
self.network_service.user_protocol().num_connected_peers()
self.network_service.behaviour().user_protocol().num_connected_peers()
}
/// Returns the number of peers we're connected to and that are being queried.
pub fn num_active_peers(&self) -> usize {
self.network_service.user_protocol().num_active_peers()
self.network_service.behaviour().user_protocol().num_active_peers()
}
/// Current global sync state.
pub fn sync_state(&self) -> SyncState {
self.network_service.user_protocol().sync_state()
self.network_service.behaviour().user_protocol().sync_state()
}
/// Target sync block number.
pub fn best_seen_block(&self) -> Option<NumberFor<B>> {
self.network_service.user_protocol().best_seen_block()
self.network_service.behaviour().user_protocol().best_seen_block()
}
/// Number of peers participating in syncing.
pub fn num_sync_peers(&self) -> u32 {
self.network_service.user_protocol().num_sync_peers()
self.network_service.behaviour().user_protocol().num_sync_peers()
}
/// Number of blocks in the import queue.
pub fn num_queued_blocks(&self) -> u32 {
self.network_service.user_protocol().num_queued_blocks()
self.network_service.behaviour().user_protocol().num_queued_blocks()
}
/// Returns the number of downloaded blocks.
pub fn num_downloaded_blocks(&self) -> usize {
self.network_service.user_protocol().num_downloaded_blocks()
self.network_service.behaviour().user_protocol().num_downloaded_blocks()
}
/// Number of active sync requests.
pub fn num_sync_requests(&self) -> usize {
self.network_service.user_protocol().num_sync_requests()
self.network_service.behaviour().user_protocol().num_sync_requests()
}
/// Adds an address for a node.
pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
self.network_service.add_known_address(peer_id, addr);
self.network_service.behaviour_mut().add_known_address(peer_id, addr);
}
/// Return a `NetworkService` that can be shared through the code base and can be used to
@@ -516,12 +516,12 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
/// You must call this when a new block is finalized by the client.
pub fn on_block_finalized(&mut self, hash: B::Hash, header: B::Header) {
self.network_service.user_protocol_mut().on_block_finalized(hash, &header);
self.network_service.behaviour_mut().user_protocol_mut().on_block_finalized(hash, &header);
}
/// Inform the network service about new best imported block.
pub fn new_best_block_imported(&mut self, hash: B::Hash, number: NumberFor<B>) {
self.network_service.user_protocol_mut().new_best_block_imported(hash, number);
self.network_service.behaviour_mut().user_protocol_mut().new_best_block_imported(hash, number);
}
/// Returns the local `PeerId`.
@@ -542,15 +542,15 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
/// everywhere about this. Please don't use this function to retrieve actual information.
pub fn network_state(&mut self) -> NetworkState {
let swarm = &mut self.network_service;
let open = swarm.user_protocol().open_peers().cloned().collect::<Vec<_>>();
let open = swarm.behaviour_mut().user_protocol().open_peers().cloned().collect::<Vec<_>>();
let connected_peers = {
let swarm = &mut *swarm;
open.iter().filter_map(move |peer_id| {
let known_addresses = NetworkBehaviour::addresses_of_peer(&mut **swarm, peer_id)
let known_addresses = NetworkBehaviour::addresses_of_peer(swarm.behaviour_mut(), peer_id)
.into_iter().collect();
let endpoint = if let Some(e) = swarm.node(peer_id).map(|i| i.endpoint()) {
let endpoint = if let Some(e) = swarm.behaviour_mut().node(peer_id).map(|i| i.endpoint()) {
e.clone().into()
} else {
error!(target: "sub-libp2p", "Found state inconsistency between custom protocol \
@@ -560,9 +560,9 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
Some((peer_id.to_base58(), NetworkStatePeer {
endpoint,
version_string: swarm.node(peer_id)
version_string: swarm.behaviour_mut().node(peer_id)
.and_then(|i| i.client_version().map(|s| s.to_owned())),
latest_ping_time: swarm.node(peer_id).and_then(|i| i.latest_ping()),
latest_ping_time: swarm.behaviour_mut().node(peer_id).and_then(|i| i.latest_ping()),
known_addresses,
}))
}).collect()
@@ -570,14 +570,14 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
let not_connected_peers = {
let swarm = &mut *swarm;
swarm.known_peers().into_iter()
swarm.behaviour_mut().known_peers().into_iter()
.filter(|p| open.iter().all(|n| n != p))
.map(move |peer_id| {
(peer_id.to_base58(), NetworkStateNotConnectedPeer {
version_string: swarm.node(&peer_id)
version_string: swarm.behaviour_mut().node(&peer_id)
.and_then(|i| i.client_version().map(|s| s.to_owned())),
latest_ping_time: swarm.node(&peer_id).and_then(|i| i.latest_ping()),
known_addresses: NetworkBehaviour::addresses_of_peer(&mut **swarm, &peer_id)
latest_ping_time: swarm.behaviour_mut().node(&peer_id).and_then(|i| i.latest_ping()),
known_addresses: NetworkBehaviour::addresses_of_peer(swarm.behaviour_mut(), &peer_id)
.into_iter().collect(),
})
})
@@ -585,8 +585,8 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
};
let peer_id = Swarm::<B>::local_peer_id(&swarm).to_base58();
let listened_addresses = Swarm::<B>::listeners(&swarm).cloned().collect();
let external_addresses = Swarm::<B>::external_addresses(&swarm)
let listened_addresses = swarm.listeners().cloned().collect();
let external_addresses = swarm.external_addresses()
.map(|r| &r.addr)
.cloned()
.collect();
@@ -597,13 +597,13 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
external_addresses,
connected_peers,
not_connected_peers,
peerset: swarm.user_protocol_mut().peerset_debug_info(),
peerset: swarm.behaviour_mut().user_protocol_mut().peerset_debug_info(),
}
}
/// Get currently connected peers.
pub fn peers_debug_info(&mut self) -> Vec<(PeerId, PeerInfo<B>)> {
self.network_service.user_protocol_mut()
self.network_service.behaviour_mut().user_protocol_mut()
.peers_info()
.map(|(id, info)| (id.clone(), info.clone()))
.collect()
@@ -1354,7 +1354,7 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
// Check for new incoming light client requests.
if let Some(light_client_rqs) = this.light_client_rqs.as_mut() {
while let Poll::Ready(Some(rq)) = light_client_rqs.poll_next_unpin(cx) {
let result = this.network_service.light_client_request(rq);
let result = this.network_service.behaviour_mut().light_client_request(rq);
match result {
Ok(()) => {},
Err(light_client_requests::sender::SendRequestError::TooManyRequests) => {
@@ -1393,46 +1393,46 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
match msg {
ServiceToWorkerMsg::AnnounceBlock(hash, data) =>
this.network_service.user_protocol_mut().announce_block(hash, data),
this.network_service.behaviour_mut().user_protocol_mut().announce_block(hash, data),
ServiceToWorkerMsg::RequestJustification(hash, number) =>
this.network_service.user_protocol_mut().request_justification(&hash, number),
this.network_service.behaviour_mut().user_protocol_mut().request_justification(&hash, number),
ServiceToWorkerMsg::PropagateTransaction(hash) =>
this.tx_handler_controller.propagate_transaction(hash),
ServiceToWorkerMsg::PropagateTransactions =>
this.tx_handler_controller.propagate_transactions(),
ServiceToWorkerMsg::GetValue(key) =>
this.network_service.get_value(&key),
this.network_service.behaviour_mut().get_value(&key),
ServiceToWorkerMsg::PutValue(key, value) =>
this.network_service.put_value(key, value),
this.network_service.behaviour_mut().put_value(key, value),
ServiceToWorkerMsg::SetReservedOnly(reserved_only) =>
this.network_service.user_protocol_mut().set_reserved_only(reserved_only),
this.network_service.behaviour_mut().user_protocol_mut().set_reserved_only(reserved_only),
ServiceToWorkerMsg::SetReserved(peers) =>
this.network_service.user_protocol_mut().set_reserved_peers(peers),
this.network_service.behaviour_mut().user_protocol_mut().set_reserved_peers(peers),
ServiceToWorkerMsg::AddReserved(peer_id) =>
this.network_service.user_protocol_mut().add_reserved_peer(peer_id),
this.network_service.behaviour_mut().user_protocol_mut().add_reserved_peer(peer_id),
ServiceToWorkerMsg::RemoveReserved(peer_id) =>
this.network_service.user_protocol_mut().remove_reserved_peer(peer_id),
this.network_service.behaviour_mut().user_protocol_mut().remove_reserved_peer(peer_id),
ServiceToWorkerMsg::AddSetReserved(protocol, peer_id) =>
this.network_service.user_protocol_mut().add_set_reserved_peer(protocol, peer_id),
this.network_service.behaviour_mut().user_protocol_mut().add_set_reserved_peer(protocol, peer_id),
ServiceToWorkerMsg::RemoveSetReserved(protocol, peer_id) =>
this.network_service.user_protocol_mut().remove_set_reserved_peer(protocol, peer_id),
this.network_service.behaviour_mut().user_protocol_mut().remove_set_reserved_peer(protocol, peer_id),
ServiceToWorkerMsg::AddKnownAddress(peer_id, addr) =>
this.network_service.add_known_address(peer_id, addr),
this.network_service.behaviour_mut().add_known_address(peer_id, addr),
ServiceToWorkerMsg::AddToPeersSet(protocol, peer_id) =>
this.network_service.user_protocol_mut().add_to_peers_set(protocol, peer_id),
this.network_service.behaviour_mut().user_protocol_mut().add_to_peers_set(protocol, peer_id),
ServiceToWorkerMsg::RemoveFromPeersSet(protocol, peer_id) =>
this.network_service.user_protocol_mut().remove_from_peers_set(protocol, peer_id),
this.network_service.behaviour_mut().user_protocol_mut().remove_from_peers_set(protocol, peer_id),
ServiceToWorkerMsg::SyncFork(peer_ids, hash, number) =>
this.network_service.user_protocol_mut().set_sync_fork_request(peer_ids, &hash, number),
this.network_service.behaviour_mut().user_protocol_mut().set_sync_fork_request(peer_ids, &hash, number),
ServiceToWorkerMsg::EventStream(sender) =>
this.event_streams.push(sender),
ServiceToWorkerMsg::Request { target, protocol, request, pending_response, connect } => {
this.network_service.send_request(&target, &protocol, request, pending_response, connect);
this.network_service.behaviour_mut().send_request(&target, &protocol, request, pending_response, connect);
},
ServiceToWorkerMsg::DisconnectPeer(who, protocol_name) =>
this.network_service.user_protocol_mut().disconnect_peer(&who, &protocol_name),
this.network_service.behaviour_mut().user_protocol_mut().disconnect_peer(&who, &protocol_name),
ServiceToWorkerMsg::NewBestBlockImported(hash, number) =>
this.network_service.user_protocol_mut().new_best_block_imported(hash, number),
this.network_service.behaviour_mut().user_protocol_mut().new_best_block_imported(hash, number),
}
}
@@ -1777,7 +1777,7 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
};
}
let num_connected_peers = this.network_service.user_protocol_mut().num_connected_peers();
let num_connected_peers = this.network_service.behaviour_mut().user_protocol_mut().num_connected_peers();
// Update the variables shared with the `NetworkService`.
this.num_connected.store(num_connected_peers, Ordering::Relaxed);
@@ -1789,7 +1789,7 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
*this.external_addresses.lock() = external_addresses;
}
let is_major_syncing = match this.network_service.user_protocol_mut().sync_state() {
let is_major_syncing = match this.network_service.behaviour_mut().user_protocol_mut().sync_state() {
SyncState::Idle => false,
SyncState::Downloading => true,
};
@@ -1799,21 +1799,25 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
this.is_major_syncing.store(is_major_syncing, Ordering::Relaxed);
if let Some(metrics) = this.metrics.as_ref() {
for (proto, buckets) in this.network_service.num_entries_per_kbucket() {
for (proto, buckets) in this.network_service.behaviour_mut().num_entries_per_kbucket() {
for (lower_ilog2_bucket_bound, num_entries) in buckets {
metrics.kbuckets_num_nodes
.with_label_values(&[&proto.as_ref(), &lower_ilog2_bucket_bound.to_string()])
.set(num_entries as u64);
}
}
for (proto, num_entries) in this.network_service.num_kademlia_records() {
for (proto, num_entries) in this.network_service.behaviour_mut().num_kademlia_records() {
metrics.kademlia_records_count.with_label_values(&[&proto.as_ref()]).set(num_entries as u64);
}
for (proto, num_entries) in this.network_service.kademlia_records_total_size() {
for (proto, num_entries) in this.network_service.behaviour_mut().kademlia_records_total_size() {
metrics.kademlia_records_sizes_total.with_label_values(&[&proto.as_ref()]).set(num_entries as u64);
}
metrics.peerset_num_discovered.set(this.network_service.user_protocol().num_discovered_peers() as u64);
metrics.peerset_num_requested.set(this.network_service.user_protocol().requested_peers().count() as u64);
metrics.peerset_num_discovered.set(
this.network_service.behaviour_mut().user_protocol().num_discovered_peers() as u64
);
metrics.peerset_num_requested.set(
this.network_service.behaviour_mut().user_protocol().requested_peers().count() as u64
);
metrics.pending_connections.set(
Swarm::network_info(&this.network_service).connection_counters().num_pending() as u64
);
@@ -1841,13 +1845,13 @@ impl<'a, B: BlockT> Link<B> for NetworkLink<'a, B> {
count: usize,
results: Vec<(Result<BlockImportResult<NumberFor<B>>, BlockImportError>, B::Hash)>
) {
self.protocol.user_protocol_mut().on_blocks_processed(imported, count, results)
self.protocol.behaviour_mut().user_protocol_mut().on_blocks_processed(imported, count, results)
}
fn justification_imported(&mut self, who: PeerId, hash: &B::Hash, number: NumberFor<B>, success: bool) {
self.protocol.user_protocol_mut().justification_import_result(who, hash.clone(), number, success);
self.protocol.behaviour_mut().user_protocol_mut().justification_import_result(who, hash.clone(), number, success);
}
fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
self.protocol.user_protocol_mut().request_justification(hash, number)
self.protocol.behaviour_mut().user_protocol_mut().request_justification(hash, number)
}
}
+5
View File
@@ -59,6 +59,11 @@ impl<T: Hash + Eq> LruHashSet<T> {
}
false
}
/// Removes an element from the set if it is present.
pub fn remove(&mut self, e: &T) -> bool {
self.set.remove(e)
}
}
#[cfg(test)]