Make authority discovery configurable (#7465)

* Make authority discovery configurable

This pr makes the authority discovery configurable. So, instead of
having default values for the query interval, publish interval etc this
pr adds a configuration to make these values changeable. This will be
useful for tests where authority discovery is required.

* Update client/authority-discovery/src/worker.rs

Co-authored-by: Max Inden <mail@max-inden.de>

* Update client/authority-discovery/src/lib.rs

Co-authored-by: Max Inden <mail@max-inden.de>

* Update client/authority-discovery/src/lib.rs

Co-authored-by: Max Inden <mail@max-inden.de>

* Update client/authority-discovery/src/lib.rs

Co-authored-by: Max Inden <mail@max-inden.de>

* Fix compilation

* line width

Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
Bastian Köcher
2020-10-30 14:59:45 +01:00
committed by GitHub
parent 1f45b4450e
commit 86d5d9609d
3 changed files with 101 additions and 13 deletions
@@ -26,7 +26,7 @@
pub use crate::{service::Service, worker::{NetworkProvider, Worker, Role}};
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use futures::channel::{mpsc, oneshot};
use futures::Stream;
@@ -43,6 +43,49 @@ mod service;
mod tests;
mod worker;
/// Configuration of [`Worker`].
pub struct WorkerConfig {
/// The interval in which the node will publish its own address on the DHT.
///
/// By default this is set to 12 hours.
pub publish_interval: Duration,
/// The interval in which the node will query the DHT for new entries.
///
/// By default this is set to 10 minutes.
pub query_interval: Duration,
/// The time the node will wait before triggering the first DHT query or publish.
///
/// By default this is set to 30 seconds.
///
/// This default is based on the rough boostrap time required by libp2p Kademlia.
pub query_start_delay: Duration,
/// The interval in which the worker will instruct the peerset to connect to a random subset
/// of discovered validators.
///
/// By default this is set to 10 minutes.
pub priority_group_set_interval: Duration,
/// The time the worker will wait after each query interval tick to pass a subset of
/// the cached authority addresses down to the peerset.
///
/// Be aware that the actual delay will be computed by [`Self::query_start_delay`] +
/// [`Self::priority_group_set_start_delay`]
///
/// By default this is set to 5 minutes.
pub priority_group_set_offset: Duration,
}
impl Default for WorkerConfig {
fn default() -> Self {
Self {
publish_interval: Duration::from_secs(12 * 60 * 60),
query_interval: Duration::from_secs(10 * 60),
query_start_delay: Duration::from_secs(30),
priority_group_set_interval: Duration::from_secs(10 * 60),
priority_group_set_offset: Duration::from_secs(5 * 60),
}
}
}
/// Create a new authority discovery [`Worker`] and [`Service`].
///
/// See the struct documentation of each for more details.
@@ -53,6 +96,34 @@ pub fn new_worker_and_service<Client, Network, Block, DhtEventStream>(
role: Role,
prometheus_registry: Option<prometheus_endpoint::Registry>,
) -> (Worker<Client, Network, Block, DhtEventStream>, Service)
where
Block: BlockT + Unpin + 'static,
Network: NetworkProvider,
Client: ProvideRuntimeApi<Block> + Send + Sync + 'static + HeaderBackend<Block>,
<Client as ProvideRuntimeApi<Block>>::Api: AuthorityDiscoveryApi<Block, Error = sp_blockchain::Error>,
DhtEventStream: Stream<Item = DhtEvent> + Unpin,
{
new_worker_and_service_with_config(
Default::default(),
client,
network,
dht_event_rx,
role,
prometheus_registry,
)
}
/// Same as [`new_worker_and_service`] but with support for providing the `config`.
///
/// When in doubt use [`new_worker_and_service`] as it will use the default configuration.
pub fn new_worker_and_service_with_config<Client, Network, Block, DhtEventStream>(
config: WorkerConfig,
client: Arc<Client>,
network: Arc<Network>,
dht_event_rx: DhtEventStream,
role: Role,
prometheus_registry: Option<prometheus_endpoint::Registry>,
) -> (Worker<Client, Network, Block, DhtEventStream>, Service)
where
Block: BlockT + Unpin + 'static,
Network: NetworkProvider,
@@ -63,7 +134,13 @@ where
let (to_worker, from_service) = mpsc::channel(0);
let worker = Worker::new(
from_service, client, network, dht_event_rx, role, prometheus_registry,
from_service,
client,
network,
dht_event_rx,
role,
prometheus_registry,
config,
);
let service = Service::new(to_worker);
@@ -58,9 +58,6 @@ type Interval = Box<dyn Stream<Item = ()> + Unpin + Send + Sync>;
const LOG_TARGET: &'static str = "sub-authority-discovery";
/// Upper bound estimation on how long one should wait before accessing the Kademlia DHT.
const LIBP2P_KADEMLIA_BOOTSTRAP_TIME: Duration = Duration::from_secs(30);
/// Name of the Substrate peerset priority group for authorities discovered through the authority
/// discovery module.
const AUTHORITIES_PRIORITY_GROUP_NAME: &'static str = "authorities";
@@ -154,30 +151,33 @@ where
dht_event_rx: DhtEventStream,
role: Role,
prometheus_registry: Option<prometheus_endpoint::Registry>,
config: crate::WorkerConfig,
) -> Self {
// Kademlia's default time-to-live for Dht records is 36h, republishing records every 24h.
// Kademlia's default time-to-live for Dht records is 36h, republishing
// records every 24h through libp2p-kad.
// Given that a node could restart at any point in time, one can not depend on the
// republishing process, thus publishing own external addresses should happen on an interval
// < 36h.
let publish_interval = interval_at(
Instant::now() + LIBP2P_KADEMLIA_BOOTSTRAP_TIME,
Duration::from_secs(12 * 60 * 60),
Instant::now() + config.query_start_delay,
config.publish_interval,
);
// External addresses of remote authorities can change at any given point in time. The
// interval on which to trigger new queries for the current authorities is a trade off
// between efficiency and performance.
let query_interval_start = Instant::now() + LIBP2P_KADEMLIA_BOOTSTRAP_TIME;
let query_interval_duration = Duration::from_secs(10 * 60);
let query_interval_start = Instant::now() + config.query_start_delay;
let query_interval_duration = config.query_interval;
let query_interval = interval_at(query_interval_start, query_interval_duration);
// Querying 500 [`AuthorityId`]s takes ~1m on the Kusama DHT (10th of August 2020) when
// comparing `authority_discovery_authority_addresses_requested_total` and
// `authority_discovery_dht_event_received`. With that in mind set the peerset priority
// group on the same interval as the [`query_interval`] above, just delayed by 5 minutes.
// group on the same interval as the [`query_interval`] above,
// just delayed by 5 minutes by default.
let priority_group_set_interval = interval_at(
query_interval_start + Duration::from_secs(5 * 60),
query_interval_duration,
query_interval_start + config.priority_group_set_offset,
config.priority_group_set_interval,
);
let addr_cache = AddrCache::new();
@@ -306,6 +306,7 @@ fn new_registers_metrics() {
Box::pin(dht_event_rx),
Role::PublishAndDiscover(key_store.into()),
Some(registry.clone()),
Default::default(),
);
assert!(registry.gather().len() > 0);
@@ -334,6 +335,7 @@ fn triggers_dht_get_query() {
Box::pin(dht_event_rx),
Role::PublishAndDiscover(key_store.into()),
None,
Default::default(),
);
futures::executor::block_on(async {
@@ -382,6 +384,7 @@ fn publish_discover_cycle() {
Box::pin(dht_event_rx),
Role::PublishAndDiscover(key_store.into()),
None,
Default::default(),
);
worker.publish_ext_addresses().await.unwrap();
@@ -412,6 +415,7 @@ fn publish_discover_cycle() {
Box::pin(dht_event_rx),
Role::PublishAndDiscover(key_store.into()),
None,
Default::default(),
);
dht_event_tx.try_send(dht_event.clone()).unwrap();
@@ -458,6 +462,7 @@ fn terminate_when_event_stream_terminates() {
Box::pin(dht_event_rx),
Role::PublishAndDiscover(key_store.into()),
None,
Default::default(),
).run();
futures::pin_mut!(worker);
@@ -520,6 +525,7 @@ fn dont_stop_polling_dht_event_stream_after_bogus_event() {
Box::pin(dht_event_rx),
Role::PublishAndDiscover(Arc::new(key_store)),
None,
Default::default(),
);
// Spawn the authority discovery to make sure it is polled independently.
@@ -596,6 +602,7 @@ fn limit_number_of_addresses_added_to_cache_per_authority() {
Box::pin(dht_event_rx),
Role::Discover,
None,
Default::default(),
);
block_on(worker.refill_pending_lookups_queue()).unwrap();
@@ -648,6 +655,7 @@ fn do_not_cache_addresses_without_peer_id() {
Box::pin(dht_event_rx),
Role::PublishAndDiscover(Arc::new(local_key_store)),
None,
Default::default(),
);
block_on(local_worker.refill_pending_lookups_queue()).unwrap();
@@ -682,6 +690,7 @@ fn addresses_to_publish_adds_p2p() {
Box::pin(dht_event_rx),
Role::PublishAndDiscover(Arc::new(KeyStore::new())),
Some(prometheus_endpoint::Registry::new()),
Default::default(),
);
assert!(
@@ -716,6 +725,7 @@ fn addresses_to_publish_respects_existing_p2p_protocol() {
Box::pin(dht_event_rx),
Role::PublishAndDiscover(Arc::new(KeyStore::new())),
Some(prometheus_endpoint::Registry::new()),
Default::default(),
);
assert_eq!(
@@ -757,6 +767,7 @@ fn lookup_throttling() {
dht_event_rx.boxed(),
Role::Discover,
Some(default_registry().clone()),
Default::default(),
);
let mut pool = LocalPool::new();