mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-01 04:17:57 +00:00
client/authority-discovery: Limit number of connections to authorities (#4487)
* client/authority-discovery: Limit number of connections to authorities Instead of connecting to all sentry nodes of all authorities, with this patch the authority discovery module does the following: - Choose one sentry node per authority at random. - Choose MAX_NUM_AUTHORITY_CONN out of the above at random. The module uses randomness to prevent hot spots, e.g. all nodes trying to connect to a single node. If the authority discovery module would choose the nodes to connect to at random on each new address that it learns of, the node would go through a lot of connection churn. Instead it creates a random seed at start up and uses this seed for its RNG on each update cycle. * client/authority-discovery: Extract address cache into own module * client/authority-discovery/src/addr_cache: Add basic unit tests * client/authority-discovery: Replace unwrap with expect on [u8] cmp * .maintain/sentry-node/docker-compose.yml: Prefix endpoint flags * client/authority-discovery/src/addr_cache: Use sort_unstable and cmp * client/authority-discovery: Use BTreeMap in addr_cache for sorted iter To reduce connection churn it is preferrable to have `get_subset` of the `addr_cache` to return the same result on repeated calls. `get_subset` iterates a map. To make the process of iteration deterministic, use a `BTreeMap` instead of a `HashMap`.
This commit is contained in:
@@ -47,8 +47,8 @@ services:
|
||||
- "--reserved-nodes"
|
||||
- "/dns4/sentry-a/tcp/30333/p2p/QmV7EhW6J6KgmNdr558RH1mPx2xGGznW7At4BhXzntRFsi"
|
||||
# Not only bind to localhost.
|
||||
- "--ws-external"
|
||||
- "--rpc-external"
|
||||
- "--unsafe-ws-external"
|
||||
- "--unsafe-rpc-external"
|
||||
# - "--log"
|
||||
# - "sub-libp2p=trace"
|
||||
# - "--log"
|
||||
@@ -88,8 +88,8 @@ services:
|
||||
- "--rpc-cors"
|
||||
- "all"
|
||||
# Not only bind to localhost.
|
||||
- "--ws-external"
|
||||
- "--rpc-external"
|
||||
- "--unsafe-ws-external"
|
||||
- "--unsafe-rpc-external"
|
||||
- "--log"
|
||||
- "sub-authority-discovery=trace"
|
||||
- "--sentry"
|
||||
@@ -121,8 +121,8 @@ services:
|
||||
- "--rpc-cors"
|
||||
- "all"
|
||||
# Not only bind to localhost.
|
||||
- "--ws-external"
|
||||
- "--rpc-external"
|
||||
- "--unsafe-ws-external"
|
||||
- "--unsafe-rpc-external"
|
||||
- "--log"
|
||||
- "sub-authority-discovery=trace"
|
||||
|
||||
|
||||
Generated
+2
-1
@@ -4933,9 +4933,10 @@ dependencies = [
|
||||
"libp2p 0.13.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"parity-scale-codec 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"prost 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"prost-build 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"quickcheck 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rand 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"sc-client-api 2.0.0",
|
||||
"sc-keystore 2.0.0",
|
||||
"sc-network 0.8.0",
|
||||
|
||||
@@ -9,26 +9,27 @@ build = "build.rs"
|
||||
prost-build = "0.5.0"
|
||||
|
||||
[dependencies]
|
||||
sp-authority-discovery = { version = "2.0.0", path = "../../primitives/authority-discovery" }
|
||||
bytes = "0.4.12"
|
||||
sc-client-api = { version = "2.0.0", path = "../api" }
|
||||
codec = { package = "parity-scale-codec", default-features = false, version = "1.0.3" }
|
||||
derive_more = "0.99.2"
|
||||
futures = "0.3.1"
|
||||
futures-timer = "2.0"
|
||||
sc-keystore = { version = "2.0.0", path = "../keystore" }
|
||||
libp2p = { version = "0.13.2", default-features = false, features = ["secp256k1", "libp2p-websocket"] }
|
||||
log = "0.4.8"
|
||||
sc-network = { version = "0.8", path = "../network" }
|
||||
sp-core = { version = "2.0.0", path = "../../primitives/core" }
|
||||
sp-blockchain = { version = "2.0.0", path = "../../primitives/blockchain" }
|
||||
prost = "0.5.0"
|
||||
rand = "0.7.2"
|
||||
sc-client-api = { version = "2.0.0", path = "../api" }
|
||||
sc-keystore = { version = "2.0.0", path = "../keystore" }
|
||||
sc-network = { version = "0.8", path = "../network" }
|
||||
serde_json = "1.0.41"
|
||||
sp-authority-discovery = { version = "2.0.0", path = "../../primitives/authority-discovery" }
|
||||
sp-blockchain = { version = "2.0.0", path = "../../primitives/blockchain" }
|
||||
sp-core = { version = "2.0.0", path = "../../primitives/core" }
|
||||
sp-runtime = { version = "2.0.0", path = "../../primitives/runtime" }
|
||||
|
||||
[dev-dependencies]
|
||||
env_logger = "0.7.0"
|
||||
parking_lot = "0.9.0"
|
||||
quickcheck = "0.9.0"
|
||||
sc-peerset = { version = "2.0.0", path = "../peerset" }
|
||||
sp-test-primitives = { version = "2.0.0", path = "../../primitives/test-primitives" }
|
||||
sp-api = { version = "2.0.0", path = "../../primitives/api" }
|
||||
sp-test-primitives = { version = "2.0.0", path = "../../primitives/test-primitives" }
|
||||
|
||||
@@ -0,0 +1,200 @@
|
||||
// Copyright 2019 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Substrate is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Substrate is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng};
|
||||
use std::{
|
||||
clone::Clone,
|
||||
cmp::{Eq, Ord, PartialEq},
|
||||
collections::BTreeMap,
|
||||
convert::AsRef,
|
||||
hash::Hash,
|
||||
};
|
||||
|
||||
/// The maximum number of authority connections initialized through the authority discovery module.
|
||||
///
|
||||
/// In other words the maximum size of the `authority` peer set priority group.
|
||||
const MAX_NUM_AUTHORITY_CONN: usize = 10;
|
||||
|
||||
/// Cache of Multiaddresses of authority nodes or their sentry nodes.
|
||||
//
|
||||
// The network peerset interface for priority groups lets us only set an entire group, but we
|
||||
// retrieve the addresses of other authorities one by one from the network. To use the peerset
|
||||
// interface we need to cache the addresses and always overwrite the entire peerset priority
|
||||
// group. To ensure this map doesn't grow indefinitely `purge_old_authorities_from_cache`
|
||||
// function is called each time we add a new entry.
|
||||
pub(super) struct AddrCache<Id, Addr> {
|
||||
cache: BTreeMap<Id, Vec<Addr>>,
|
||||
|
||||
/// Random number to seed address selection RNG.
|
||||
///
|
||||
/// A node should only try to connect to a subset of all authorities. To choose this subset one
|
||||
/// uses randomness. The choice should differ between nodes to prevent hot spots, but not within
|
||||
/// each node between each update to prevent connection churn. Thus before each selection we
|
||||
/// seed an RNG with the same seed.
|
||||
rand_addr_selection_seed: u64,
|
||||
}
|
||||
|
||||
impl<Id, Addr> AddrCache<Id, Addr>
|
||||
where
|
||||
Id: Clone + Eq + Hash + Ord,
|
||||
Addr: Clone + PartialEq + AsRef<[u8]>,
|
||||
{
|
||||
pub fn new() -> Self {
|
||||
AddrCache {
|
||||
cache: BTreeMap::new(),
|
||||
rand_addr_selection_seed: rand::thread_rng().gen(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, id: Id, mut addresses: Vec<Addr>) {
|
||||
if addresses.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
addresses.sort_unstable_by(|a, b| a.as_ref().cmp(b.as_ref()));
|
||||
self.cache.insert(id, addresses);
|
||||
}
|
||||
|
||||
// Each node should connect to a subset of all authorities. In order to prevent hot spots, this
|
||||
// selection is based on randomness. Selecting randomly each time we alter the address cache
|
||||
// would result in connection churn. To reduce this churn a node generates a seed on startup and
|
||||
// uses this seed for a new rng on each update. (One could as well use ones peer id as a seed.
|
||||
// Given that the peer id is publicly known, it would make this process predictable by others,
|
||||
// which might be used as an attack.)
|
||||
pub fn get_subset(&self) -> Vec<Addr> {
|
||||
let mut rng = StdRng::seed_from_u64(self.rand_addr_selection_seed);
|
||||
|
||||
let mut addresses = self
|
||||
.cache
|
||||
.iter()
|
||||
.map(|(_peer_id, addresses)| {
|
||||
addresses
|
||||
.choose(&mut rng)
|
||||
.expect("an empty address vector is never inserted into the cache")
|
||||
})
|
||||
.cloned()
|
||||
.collect::<Vec<Addr>>();
|
||||
|
||||
addresses.dedup();
|
||||
addresses.sort_unstable_by(|a, b| a.as_ref().cmp(b.as_ref()));
|
||||
|
||||
addresses
|
||||
.choose_multiple(&mut rng, MAX_NUM_AUTHORITY_CONN)
|
||||
.cloned()
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn retain_ids(&mut self, ids: &Vec<Id>) {
|
||||
let to_remove = self
|
||||
.cache
|
||||
.iter()
|
||||
.filter(|(id, _addresses)| !ids.contains(id))
|
||||
.map(|entry| entry.0)
|
||||
.cloned()
|
||||
.collect::<Vec<Id>>();
|
||||
|
||||
for key in to_remove {
|
||||
self.cache.remove(&key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use quickcheck::{QuickCheck, TestResult};
|
||||
|
||||
#[test]
|
||||
fn returns_addresses_of_same_authorities_on_repeated_calls() {
|
||||
fn property(input: Vec<(u32, Vec<String>)>) -> TestResult {
|
||||
// Expect less than 1000 authorities.
|
||||
if input.len() > 1000 {
|
||||
return TestResult::discard();
|
||||
}
|
||||
|
||||
// Expect less than 100 addresses per authority.
|
||||
for i in &input {
|
||||
if i.1.len() > 100 {
|
||||
return TestResult::discard();
|
||||
}
|
||||
}
|
||||
|
||||
let mut c = AddrCache::new();
|
||||
|
||||
for (id, addresses) in input {
|
||||
c.insert(id, addresses);
|
||||
}
|
||||
|
||||
let result = c.get_subset();
|
||||
assert!(result.len() <= MAX_NUM_AUTHORITY_CONN);
|
||||
|
||||
for _ in 1..100 {
|
||||
assert_eq!(c.get_subset(), result);
|
||||
}
|
||||
|
||||
TestResult::passed()
|
||||
}
|
||||
|
||||
QuickCheck::new()
|
||||
.max_tests(10)
|
||||
.quickcheck(property as fn(Vec<(u32, Vec<String>)>) -> TestResult)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn returns_same_addresses_of_first_authority_when_second_authority_changes() {
|
||||
let mut c = AddrCache::new();
|
||||
|
||||
// Insert addresses of first authority.
|
||||
let addresses = (1..100)
|
||||
.map(|i| format!("{:?}", i))
|
||||
.collect::<Vec<String>>();
|
||||
c.insert(1, addresses);
|
||||
let first_subset = c.get_subset();
|
||||
assert_eq!(1, first_subset.len());
|
||||
|
||||
// Insert address of second authority.
|
||||
c.insert(2, vec!["a".to_string()]);
|
||||
let second_subset = c.get_subset();
|
||||
assert_eq!(2, second_subset.len());
|
||||
|
||||
// Expect same address of first authority.
|
||||
assert!(second_subset.contains(&first_subset[0]));
|
||||
|
||||
// Alter address of second authority.
|
||||
c.insert(2, vec!["b".to_string()]);
|
||||
let second_subset = c.get_subset();
|
||||
assert_eq!(2, second_subset.len());
|
||||
|
||||
// Expect same address of first authority.
|
||||
assert!(second_subset.contains(&first_subset[0]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn retains_only_entries_of_provided_ids() {
|
||||
let mut cache = AddrCache::new();
|
||||
|
||||
cache.insert(1, vec![vec![10]]);
|
||||
cache.insert(2, vec![vec![20]]);
|
||||
cache.insert(3, vec![vec![30]]);
|
||||
|
||||
cache.retain_ids(&vec![1, 3]);
|
||||
|
||||
let mut subset = cache.get_subset();
|
||||
subset.sort();
|
||||
|
||||
assert_eq!(vec![vec![10], vec![30]], subset);
|
||||
}
|
||||
}
|
||||
@@ -22,6 +22,10 @@ pub type Result<T> = std::result::Result<T, Error>;
|
||||
/// Error type for the authority discovery module.
|
||||
#[derive(Debug, derive_more::Display, derive_more::From)]
|
||||
pub enum Error {
|
||||
/// Received dht value found event with records with different keys.
|
||||
ReceivingDhtValueFoundEventWithDifferentKeys,
|
||||
/// Received dht value found event with no records.
|
||||
ReceivingDhtValueFoundEventWithNoRecords,
|
||||
/// Failed to verify a dht payload with the given signature.
|
||||
VerifyingDhtPayload,
|
||||
/// Failed to hash the authority id to be used as a dht key.
|
||||
|
||||
@@ -45,7 +45,6 @@
|
||||
//! 4. Adds the retrieved external addresses as priority nodes to the peerset.
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::convert::TryInto;
|
||||
use std::iter::FromIterator;
|
||||
use std::marker::PhantomData;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
@@ -55,26 +54,26 @@ use futures::task::{Context, Poll};
|
||||
use futures::{Future, FutureExt, Stream, StreamExt};
|
||||
use futures_timer::Delay;
|
||||
|
||||
use sp_authority_discovery::{
|
||||
AuthorityDiscoveryApi, AuthorityId, AuthoritySignature, AuthorityPair
|
||||
};
|
||||
use sc_client_api::blockchain::HeaderBackend;
|
||||
use codec::{Decode, Encode};
|
||||
use error::{Error, Result};
|
||||
use log::{debug, error, log_enabled, warn};
|
||||
use libp2p::Multiaddr;
|
||||
use log::{debug, error, log_enabled, warn};
|
||||
use prost::Message;
|
||||
use sc_client_api::blockchain::HeaderBackend;
|
||||
use sc_network::specialization::NetworkSpecialization;
|
||||
use sc_network::{DhtEvent, ExHashT, NetworkStateInfo};
|
||||
use sp_authority_discovery::{AuthorityDiscoveryApi, AuthorityId, AuthoritySignature, AuthorityPair};
|
||||
use sp_core::crypto::{key_types, Pair};
|
||||
use sp_core::traits::BareCryptoStorePtr;
|
||||
use prost::Message;
|
||||
use sp_runtime::generic::BlockId;
|
||||
use sp_runtime::traits::{Block as BlockT, ProvideRuntimeApi};
|
||||
use addr_cache::AddrCache;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
mod error;
|
||||
mod addr_cache;
|
||||
/// Dht payload schemas generated from Protobuf definitions via Prost crate in build.rs.
|
||||
mod schema {
|
||||
include!(concat!(env!("OUT_DIR"), "/authority_discovery.rs"));
|
||||
@@ -89,12 +88,6 @@ const LIBP2P_KADEMLIA_BOOTSTRAP_TIME: Duration = Duration::from_secs(30);
|
||||
/// discovery module.
|
||||
const AUTHORITIES_PRIORITY_GROUP_NAME: &'static str = "authorities";
|
||||
|
||||
/// The maximum number of sentry node public addresses that we accept per authority.
|
||||
///
|
||||
/// Everything above this threshold should be dropped to prevent a single authority from filling up
|
||||
/// our peer set priority group.
|
||||
const MAX_NUM_SENTRY_ADDRESSES_PER_AUTHORITY: usize = 5;
|
||||
|
||||
/// An `AuthorityDiscovery` makes a given authority discoverable and discovers other authorities.
|
||||
pub struct AuthorityDiscovery<Client, Network, Block>
|
||||
where
|
||||
@@ -124,12 +117,7 @@ where
|
||||
/// Interval on which to query for addresses of other authorities.
|
||||
query_interval: Interval,
|
||||
|
||||
/// The network peerset interface for priority groups lets us only set an entire group, but we
|
||||
/// retrieve the addresses of other authorities one by one from the network. To use the peerset
|
||||
/// interface we need to cache the addresses and always overwrite the entire peerset priority
|
||||
/// group. To ensure this map doesn't grow indefinitely `purge_old_authorities_from_cache`
|
||||
/// function is called each time we add a new entry.
|
||||
address_cache: HashMap<AuthorityId, Vec<Multiaddr>>,
|
||||
addr_cache: addr_cache::AddrCache<AuthorityId, Multiaddr>,
|
||||
|
||||
phantom: PhantomData<Block>,
|
||||
}
|
||||
@@ -182,22 +170,12 @@ where
|
||||
}
|
||||
}).collect::<Vec<Multiaddr>>();
|
||||
|
||||
if addrs.len() > MAX_NUM_SENTRY_ADDRESSES_PER_AUTHORITY {
|
||||
warn!(
|
||||
target: "sub-authority-discovery",
|
||||
"More than MAX_NUM_SENTRY_ADDRESSES_PER_AUTHORITY ({:?}) were specified. Other \
|
||||
nodes will likely ignore the remainder.",
|
||||
MAX_NUM_SENTRY_ADDRESSES_PER_AUTHORITY,
|
||||
);
|
||||
}
|
||||
|
||||
Some(addrs)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
|
||||
let address_cache = HashMap::new();
|
||||
let addr_cache = AddrCache::new();
|
||||
|
||||
AuthorityDiscovery {
|
||||
client,
|
||||
@@ -207,7 +185,7 @@ where
|
||||
key_store,
|
||||
publish_interval,
|
||||
query_interval,
|
||||
address_cache,
|
||||
addr_cache,
|
||||
phantom: PhantomData,
|
||||
}
|
||||
}
|
||||
@@ -304,86 +282,70 @@ where
|
||||
&mut self,
|
||||
values: Vec<(libp2p::kad::record::Key, Vec<u8>)>,
|
||||
) -> Result<()> {
|
||||
debug!(target: "sub-authority-discovery", "Got Dht value from network.");
|
||||
|
||||
let block_id = BlockId::hash(self.client.info().best_hash);
|
||||
|
||||
// From the Dht we only get the hashed authority id. In order to retrieve the actual
|
||||
// authority id and to ensure it is actually an authority, we match the hash against the
|
||||
// hash of the authority id of all other authorities.
|
||||
let authorities = self.client.runtime_api().authorities(&block_id)?;
|
||||
self.purge_old_authorities_from_cache(&authorities);
|
||||
|
||||
let authorities = authorities
|
||||
.into_iter()
|
||||
.map(|id| hash_authority_id(id.as_ref()).map(|h| (h, id)))
|
||||
.collect::<Result<HashMap<_, _>>>()?;
|
||||
|
||||
for (key, value) in values.iter() {
|
||||
// Check if the event origins from an authority in the current authority set.
|
||||
let authority_id: &AuthorityId = authorities
|
||||
.get(key)
|
||||
.ok_or(Error::MatchingHashedAuthorityIdWithAuthorityId)?;
|
||||
|
||||
let schema::SignedAuthorityAddresses {
|
||||
signature,
|
||||
addresses,
|
||||
} = schema::SignedAuthorityAddresses::decode(value).map_err(Error::DecodingProto)?;
|
||||
let signature = AuthoritySignature::decode(&mut &signature[..])
|
||||
.map_err(Error::EncodingDecodingScale)?;
|
||||
|
||||
if !AuthorityPair::verify(&signature, &addresses, authority_id) {
|
||||
return Err(Error::VerifyingDhtPayload);
|
||||
// Ensure `values` is not empty and all its keys equal.
|
||||
let remote_key = values.iter().fold(Ok(None), |acc, (key, _)| {
|
||||
match acc {
|
||||
Ok(None) => Ok(Some(key.clone())),
|
||||
Ok(Some(ref prev_key)) if prev_key != key => Err(
|
||||
Error::ReceivingDhtValueFoundEventWithDifferentKeys
|
||||
),
|
||||
x @ Ok(_) => x,
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
})?.ok_or(Error::ReceivingDhtValueFoundEventWithNoRecords)?;
|
||||
|
||||
let mut addresses: Vec<libp2p::Multiaddr> = schema::AuthorityAddresses::decode(addresses)
|
||||
.map(|a| a.addresses)
|
||||
.map_err(Error::DecodingProto)?
|
||||
let authorities = {
|
||||
let block_id = BlockId::hash(self.client.info().best_hash);
|
||||
// From the Dht we only get the hashed authority id. In order to retrieve the actual
|
||||
// authority id and to ensure it is actually an authority, we match the hash against the
|
||||
// hash of the authority id of all other authorities.
|
||||
let authorities = self.client.runtime_api().authorities(&block_id)?;
|
||||
self.addr_cache.retain_ids(&authorities);
|
||||
authorities
|
||||
.into_iter()
|
||||
.map(|a| a.try_into())
|
||||
.collect::<std::result::Result<_, _>>()
|
||||
.map_err(Error::ParsingMultiaddress)?;
|
||||
.map(|id| hash_authority_id(id.as_ref()).map(|h| (h, id)))
|
||||
.collect::<Result<HashMap<_, _>>>()?
|
||||
};
|
||||
|
||||
if addresses.len() > MAX_NUM_SENTRY_ADDRESSES_PER_AUTHORITY {
|
||||
warn!(
|
||||
target: "sub-authority-discovery",
|
||||
"Got more than MAX_NUM_SENTRY_ADDRESSES_PER_AUTHORITY ({:?}) for Authority \
|
||||
'{:?}' from DHT, dropping the remainder.",
|
||||
MAX_NUM_SENTRY_ADDRESSES_PER_AUTHORITY, authority_id,
|
||||
);
|
||||
addresses = addresses.into_iter()
|
||||
.take(MAX_NUM_SENTRY_ADDRESSES_PER_AUTHORITY)
|
||||
.collect();
|
||||
}
|
||||
// Check if the event origins from an authority in the current authority set.
|
||||
let authority_id: &AuthorityId = authorities
|
||||
.get(&remote_key)
|
||||
.ok_or(Error::MatchingHashedAuthorityIdWithAuthorityId)?;
|
||||
|
||||
self.address_cache.insert(authority_id.clone(), addresses);
|
||||
let remote_addresses: Vec<Multiaddr> = values.into_iter()
|
||||
.map(|(_k, v)| {
|
||||
let schema::SignedAuthorityAddresses {
|
||||
signature,
|
||||
addresses,
|
||||
} = schema::SignedAuthorityAddresses::decode(v).map_err(Error::DecodingProto)?;
|
||||
let signature = AuthoritySignature::decode(&mut &signature[..])
|
||||
.map_err(Error::EncodingDecodingScale)?;
|
||||
|
||||
if !AuthorityPair::verify(&signature, &addresses, authority_id) {
|
||||
return Err(Error::VerifyingDhtPayload);
|
||||
}
|
||||
|
||||
let addresses: Vec<libp2p::Multiaddr> = schema::AuthorityAddresses::decode(addresses)
|
||||
.map(|a| a.addresses)
|
||||
.map_err(Error::DecodingProto)?
|
||||
.into_iter()
|
||||
.map(|a| a.try_into())
|
||||
.collect::<std::result::Result<_, _>>()
|
||||
.map_err(Error::ParsingMultiaddress)?;
|
||||
|
||||
Ok(addresses)
|
||||
})
|
||||
.collect::<Result<Vec<Vec<Multiaddr>>>>()?
|
||||
.into_iter().flatten().collect();
|
||||
|
||||
if !remote_addresses.is_empty() {
|
||||
self.addr_cache.insert(authority_id.clone(), remote_addresses);
|
||||
self.update_peer_set_priority_group()?;
|
||||
}
|
||||
|
||||
// Let's update the peerset priority group with all the addresses we have in our cache.
|
||||
|
||||
let addresses = HashSet::from_iter(
|
||||
self.address_cache
|
||||
.iter()
|
||||
.map(|(_peer_id, addresses)| addresses.clone())
|
||||
.flatten(),
|
||||
);
|
||||
|
||||
debug!(
|
||||
target: "sub-authority-discovery",
|
||||
"Applying priority group {:#?} to peerset.", addresses,
|
||||
);
|
||||
self.network
|
||||
.set_priority_group(AUTHORITIES_PRIORITY_GROUP_NAME.to_string(), addresses)
|
||||
.map_err(Error::SettingPeersetPriorityGroup)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn purge_old_authorities_from_cache(&mut self, current_authorities: &Vec<AuthorityId>) {
|
||||
self.address_cache
|
||||
.retain(|peer_id, _addresses| current_authorities.contains(peer_id))
|
||||
}
|
||||
|
||||
/// Retrieve all local authority discovery private keys that are within the current authority
|
||||
/// set.
|
||||
fn get_priv_keys_within_authority_set(&mut self) -> Result<Vec<AuthorityPair>> {
|
||||
@@ -429,6 +391,22 @@ where
|
||||
|
||||
Ok(intersection)
|
||||
}
|
||||
|
||||
/// Update the peer set 'authority' priority group.
|
||||
//
|
||||
fn update_peer_set_priority_group(&self) -> Result<()>{
|
||||
let addresses = self.addr_cache.get_subset();
|
||||
|
||||
debug!(
|
||||
target: "sub-authority-discovery",
|
||||
"Applying priority group {:?} to peerset.", addresses,
|
||||
);
|
||||
self.network
|
||||
.set_priority_group(AUTHORITIES_PRIORITY_GROUP_NAME.to_string(), addresses.into_iter().collect())
|
||||
.map_err(Error::SettingPeersetPriorityGroup)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<Client, Network, Block> Future for AuthorityDiscovery<Client, Network, Block>
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::{iter::FromIterator, sync::{Arc, Mutex}};
|
||||
|
||||
use futures::channel::mpsc::channel;
|
||||
use futures::executor::block_on;
|
||||
|
||||
Reference in New Issue
Block a user