mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 21:01:05 +00:00
client/network: Make NetworkService::set_priority_group async (#7352)
As done with `NetworkService::{add_to,remove_from}_priority_group`, make
`NetworkService::set_priority_group` async as well. This future-proofs
the API should we ever decide to use a bounded channel between
`NetworkService` and `NetworkWorker`.
This commit is contained in:
@@ -27,6 +27,7 @@ use futures::{FutureExt, Stream, StreamExt, stream::Fuse};
|
||||
use futures_timer::Delay;
|
||||
|
||||
use addr_cache::AddrCache;
|
||||
use async_trait::async_trait;
|
||||
use codec::Decode;
|
||||
use either::Either;
|
||||
use libp2p::{core::multiaddr, multihash::Multihash};
|
||||
@@ -267,7 +268,7 @@ where
|
||||
},
|
||||
// Set peerset priority group to a new random set of addresses.
|
||||
_ = self.priority_group_set_interval.next().fuse() => {
|
||||
if let Err(e) = self.set_priority_group() {
|
||||
if let Err(e) = self.set_priority_group().await {
|
||||
error!(
|
||||
target: LOG_TARGET,
|
||||
"Failed to set priority group: {:?}", e,
|
||||
@@ -629,7 +630,7 @@ where
|
||||
|
||||
/// Set the peer set 'authority' priority group to a new random set of
|
||||
/// [`Multiaddr`]s.
|
||||
fn set_priority_group(&self) -> Result<()> {
|
||||
async fn set_priority_group(&self) -> Result<()> {
|
||||
let addresses = self.addr_cache.get_random_subset();
|
||||
|
||||
if addresses.is_empty() {
|
||||
@@ -653,7 +654,7 @@ where
|
||||
.set_priority_group(
|
||||
AUTHORITIES_PRIORITY_GROUP_NAME.to_string(),
|
||||
addresses.into_iter().collect(),
|
||||
)
|
||||
).await
|
||||
.map_err(Error::SettingPeersetPriorityGroup)?;
|
||||
|
||||
Ok(())
|
||||
@@ -663,9 +664,10 @@ where
|
||||
/// NetworkProvider provides [`Worker`] with all necessary hooks into the
|
||||
/// underlying Substrate networking. Using this trait abstraction instead of [`NetworkService`]
|
||||
/// directly is necessary to unit test [`Worker`].
|
||||
#[async_trait]
|
||||
pub trait NetworkProvider: NetworkStateInfo {
|
||||
/// Modify a peerset priority group.
|
||||
fn set_priority_group(
|
||||
async fn set_priority_group(
|
||||
&self,
|
||||
group_id: String,
|
||||
peers: HashSet<libp2p::Multiaddr>,
|
||||
@@ -678,17 +680,18 @@ pub trait NetworkProvider: NetworkStateInfo {
|
||||
fn get_value(&self, key: &libp2p::kad::record::Key);
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<B, H> NetworkProvider for sc_network::NetworkService<B, H>
|
||||
where
|
||||
B: BlockT + 'static,
|
||||
H: ExHashT,
|
||||
{
|
||||
fn set_priority_group(
|
||||
async fn set_priority_group(
|
||||
&self,
|
||||
group_id: String,
|
||||
peers: HashSet<libp2p::Multiaddr>,
|
||||
) -> std::result::Result<(), String> {
|
||||
self.set_priority_group(group_id, peers)
|
||||
self.set_priority_group(group_id, peers).await
|
||||
}
|
||||
fn put_value(&self, key: libp2p::kad::record::Key, value: Vec<u8>) {
|
||||
self.put_value(key, value)
|
||||
|
||||
@@ -20,6 +20,7 @@ use crate::worker::schema;
|
||||
|
||||
use std::{iter::FromIterator, sync::{Arc, Mutex}, task::Poll};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use futures::channel::mpsc::{self, channel};
|
||||
use futures::executor::{block_on, LocalPool};
|
||||
use futures::future::FutureExt;
|
||||
@@ -213,8 +214,9 @@ impl Default for TestNetwork {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl NetworkProvider for TestNetwork {
|
||||
fn set_priority_group(
|
||||
async fn set_priority_group(
|
||||
&self,
|
||||
group_id: String,
|
||||
peers: HashSet<Multiaddr>,
|
||||
@@ -424,7 +426,7 @@ fn publish_discover_cycle() {
|
||||
// Make authority discovery handle the event.
|
||||
worker.handle_dht_event(dht_event).await;
|
||||
|
||||
worker.set_priority_group().unwrap();
|
||||
worker.set_priority_group().await.unwrap();
|
||||
|
||||
// Expect authority discovery to set the priority set.
|
||||
assert_eq!(network.set_priority_group_call.lock().unwrap().len(), 1);
|
||||
@@ -623,7 +625,7 @@ fn never_add_own_address_to_priority_group() {
|
||||
sentry_worker.start_new_lookups();
|
||||
|
||||
sentry_worker.handle_dht_value_found_event(vec![dht_event]).unwrap();
|
||||
sentry_worker.set_priority_group().unwrap();
|
||||
block_on(sentry_worker.set_priority_group()).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
sentry_network.set_priority_group_call.lock().unwrap().len(), 1,
|
||||
|
||||
Reference in New Issue
Block a user