client/authority-discovery: Publish and query on exponential interval (#7545)

* client/authority-discovery: Publish and query on exponential interval

When a node starts up publishing and querying might fail due to various
reasons, for example due to being not yet fully bootstrapped on the DHT.
Thus one should retry rather sooner than later. On the other hand, a
long running node is likely well connected and thus timely retries are
not needed. For this reasoning use an exponentially increasing interval
for `publish_interval`, `query_interval` and
`priority_group_set_interval` instead of a constant interval.

* client/authority-discovery/src/interval.rs: Add license header

* .maintain/gitlab: Ensure adder collator tests are run on CI
This commit is contained in:
Max Inden
2020-11-23 17:34:37 +01:00
committed by GitHub
parent 1871a95088
commit d692d173f2
6 changed files with 119 additions and 136 deletions
@@ -14,17 +14,16 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use crate::{error::{Error, Result}, ServicetoWorkerMsg};
use crate::{error::{Error, Result}, interval::ExpIncInterval, ServicetoWorkerMsg};
use std::collections::{HashMap, HashSet};
use std::convert::TryInto;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Duration;
use futures::channel::mpsc;
use futures::{FutureExt, Stream, StreamExt, stream::Fuse};
use futures_timer::Delay;
use addr_cache::AddrCache;
use async_trait::async_trait;
@@ -54,8 +53,6 @@ mod schema { include!(concat!(env!("OUT_DIR"), "/authority_discovery.rs")); }
#[cfg(test)]
pub mod tests;
type Interval = Box<dyn Stream<Item = ()> + Unpin + Send + Sync>;
const LOG_TARGET: &'static str = "sub-authority-discovery";
/// Name of the Substrate peerset priority group for authorities discovered through the authority
@@ -113,12 +110,12 @@ pub struct Worker<Client, Network, Block, DhtEventStream> {
dht_event_rx: DhtEventStream,
/// Interval to be proactive, publishing own addresses.
publish_interval: Interval,
publish_interval: ExpIncInterval,
/// Interval at which to request addresses of authorities, refilling the pending lookups queue.
query_interval: Interval,
query_interval: ExpIncInterval,
/// Interval on which to set the peerset priority group to a new random
/// set of addresses.
priority_group_set_interval: Interval,
priority_group_set_interval: ExpIncInterval,
/// Queue of throttled lookups pending to be passed to the network.
pending_lookups: Vec<AuthorityId>,
@@ -153,31 +150,26 @@ where
prometheus_registry: Option<prometheus_endpoint::Registry>,
config: crate::WorkerConfig,
) -> Self {
// Kademlia's default time-to-live for Dht records is 36h, republishing
// records every 24h through libp2p-kad.
// Given that a node could restart at any point in time, one can not depend on the
// republishing process, thus publishing own external addresses should happen on an interval
// < 36h.
let publish_interval = interval_at(
Instant::now() + config.query_start_delay,
config.publish_interval,
// When a node starts up publishing and querying might fail due to various reasons, for
// example due to being not yet fully bootstrapped on the DHT. Thus one should retry rather
// sooner than later. On the other hand, a long running node is likely well connected and
// thus timely retries are not needed. For this reasoning use an exponentially increasing
// interval for `publish_interval`, `query_interval` and `priority_group_set_interval`
// instead of a constant interval.
let publish_interval = ExpIncInterval::new(
Duration::from_secs(2),
config.max_publish_interval,
);
// External addresses of remote authorities can change at any given point in time. The
// interval on which to trigger new queries for the current authorities is a trade off
// between efficiency and performance.
let query_interval_start = Instant::now() + config.query_start_delay;
let query_interval_duration = config.query_interval;
let query_interval = interval_at(query_interval_start, query_interval_duration);
// Querying 500 [`AuthorityId`]s takes ~1m on the Kusama DHT (10th of August 2020) when
// comparing `authority_discovery_authority_addresses_requested_total` and
// `authority_discovery_dht_event_received`. With that in mind set the peerset priority
// group on the same interval as the [`query_interval`] above,
// just delayed by 5 minutes by default.
let priority_group_set_interval = interval_at(
query_interval_start + config.priority_group_set_offset,
config.priority_group_set_interval,
let query_interval = ExpIncInterval::new(
Duration::from_secs(2),
config.max_query_interval,
);
let priority_group_set_interval = ExpIncInterval::new(
Duration::from_secs(2),
// Trade-off between node connection churn and connectivity. Using half of
// [`crate::WorkerConfig::max_query_interval`] to update priority group once at the
// beginning and once in the middle of each query interval.
config.max_query_interval / 2,
);
let addr_cache = AddrCache::new();
@@ -413,7 +405,7 @@ where
}
if log_enabled!(log::Level::Debug) {
let hashes = v.iter().map(|(hash, _value)| hash.clone());
let hashes: Vec<_> = v.iter().map(|(hash, _value)| hash.clone()).collect();
debug!(
target: LOG_TARGET,
"Value for hash '{:?}' found on Dht.", hashes,
@@ -449,6 +441,11 @@ where
}
},
DhtEvent::ValuePut(hash) => {
// Fast forward the exponentially increasing interval to the configured maximum. In
// case this was the first successful address publishing there is no need for a
// timely retry.
self.publish_interval.set_to_max();
if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_put"]).inc();
}
@@ -661,16 +658,6 @@ fn hash_authority_id(id: &[u8]) -> libp2p::kad::record::Key {
libp2p::kad::record::Key::new(&libp2p::multihash::Sha2_256::digest(id))
}
fn interval_at(start: Instant, duration: Duration) -> Interval {
let stream = futures::stream::unfold(start, move |next| {
let time_until_next = next.saturating_duration_since(Instant::now());
Delay::new(time_until_next).map(move |_| Some(((), next + duration)))
});
Box::new(stream)
}
/// Prometheus metrics for a [`Worker`].
#[derive(Clone)]
pub(crate) struct Metrics {