diff --git a/substrate/client/authority-discovery/src/lib.rs b/substrate/client/authority-discovery/src/lib.rs index 818eb1beb3..469c0851f1 100644 --- a/substrate/client/authority-discovery/src/lib.rs +++ b/substrate/client/authority-discovery/src/lib.rs @@ -53,6 +53,11 @@ pub struct WorkerConfig { /// /// By default this is set to 1 hour. pub max_publish_interval: Duration, + /// Interval at which the keystore is queried. If the keys have changed, unconditionally + /// re-publish its addresses on the DHT. + /// + /// By default this is set to 1 minute. + pub keystore_refresh_interval: Duration, /// The maximum interval in which the node will query the DHT for new entries. /// /// By default this is set to 10 minutes. @@ -67,6 +72,7 @@ impl Default for WorkerConfig { // not depend on the republishing process, thus publishing own external addresses should // happen on an interval < 36h. max_publish_interval: Duration::from_secs(1 * 60 * 60), + keystore_refresh_interval: Duration::from_secs(60), // External addresses of remote authorities can change at any given point in time. The // interval on which to trigger new queries for the current and next authorities is a trade // off between efficiency and performance. diff --git a/substrate/client/authority-discovery/src/worker.rs b/substrate/client/authority-discovery/src/worker.rs index b1fb89669b..f05c6d4604 100644 --- a/substrate/client/authority-discovery/src/worker.rs +++ b/substrate/client/authority-discovery/src/worker.rs @@ -25,7 +25,7 @@ use std::sync::Arc; use std::time::Duration; use futures::channel::mpsc; -use futures::{FutureExt, Stream, StreamExt, stream::Fuse}; +use futures::{future, FutureExt, Stream, StreamExt, stream::Fuse}; use addr_cache::AddrCache; use async_trait::async_trait; @@ -44,7 +44,7 @@ use sc_network::{ PeerId, }; use sp_authority_discovery::{AuthorityDiscoveryApi, AuthorityId, AuthoritySignature, AuthorityPair}; -use sp_core::crypto::{key_types, Pair}; +use sp_core::crypto::{key_types, CryptoTypePublicPair, Pair}; use sp_keystore::CryptoStore; use sp_runtime::{traits::Block as BlockT, generic::BlockId}; use sp_api::ProvideRuntimeApi; @@ -109,6 +109,13 @@ pub struct Worker { /// Interval to be proactive, publishing own addresses. publish_interval: ExpIncInterval, + /// Pro-actively publish our own addresses at this interval, if the keys in the keystore + /// have changed. + publish_if_changed_interval: ExpIncInterval, + /// List of keys onto which addresses have been published at the latest publication. + /// Used to check whether they have changed. + latest_published_keys: HashSet, + /// Interval at which to request addresses of authorities, refilling the pending lookups queue. query_interval: ExpIncInterval, @@ -160,6 +167,13 @@ where config.max_query_interval, ); + // An `ExpIncInterval` is overkill here because the interval is constant, but consistency + // is more simple. + let publish_if_changed_interval = ExpIncInterval::new( + config.keystore_refresh_interval, + config.keystore_refresh_interval + ); + let addr_cache = AddrCache::new(); let metrics = match prometheus_registry { @@ -181,6 +195,8 @@ where network, dht_event_rx, publish_interval, + publish_if_changed_interval, + latest_published_keys: HashSet::new(), query_interval, pending_lookups: Vec::new(), in_flight_lookups: HashMap::new(), @@ -212,8 +228,11 @@ where self.process_message_from_service(msg); }, // Publish own addresses. - _ = self.publish_interval.next().fuse() => { - if let Err(e) = self.publish_ext_addresses().await { + only_if_changed = future::select( + self.publish_interval.next().map(|_| false), + self.publish_if_changed_interval.next().map(|_| true) + ).map(|e| e.factor_first().0).fuse() => { + if let Err(e) = self.publish_ext_addresses(only_if_changed).await { error!( target: LOG_TARGET, "Failed to publish external addresses: {:?}", e, @@ -262,7 +281,10 @@ where } /// Publish own public addresses. - async fn publish_ext_addresses(&mut self) -> Result<()> { + /// + /// If `only_if_changed` is true, the function has no effect if the list of keys to publish + /// is equal to `self.latest_published_keys`. + async fn publish_ext_addresses(&mut self, only_if_changed: bool) -> Result<()> { let key_store = match &self.role { Role::PublishAndDiscover(key_store) => key_store, Role::Discover => return Ok(()), @@ -285,15 +307,20 @@ where let keys = Worker::::get_own_public_keys_within_authority_set( key_store.clone(), self.client.as_ref(), - ).await?.into_iter().map(Into::into).collect::>(); + ).await?.into_iter().map(Into::into).collect::>(); + if only_if_changed && keys == self.latest_published_keys { + return Ok(()) + } + + let keys_vec = keys.iter().cloned().collect::>(); let signatures = key_store.sign_with_all( key_types::AUTHORITY_DISCOVERY, - keys.clone(), + keys_vec.clone(), serialized_addresses.as_slice(), ).await.map_err(|_| Error::Signing)?; - for (sign_result, key) in signatures.into_iter().zip(keys) { + for (sign_result, key) in signatures.into_iter().zip(keys_vec.iter()) { let mut signed_addresses = vec![]; // Verify that all signatures exist for all provided keys. @@ -313,6 +340,8 @@ where ); } + self.latest_published_keys = keys; + Ok(()) } diff --git a/substrate/client/authority-discovery/src/worker/tests.rs b/substrate/client/authority-discovery/src/worker/tests.rs index 04f597aa26..b702cd8c40 100644 --- a/substrate/client/authority-discovery/src/worker/tests.rs +++ b/substrate/client/authority-discovery/src/worker/tests.rs @@ -294,7 +294,7 @@ fn publish_discover_cycle() { Default::default(), ); - worker.publish_ext_addresses().await.unwrap(); + worker.publish_ext_addresses(false).await.unwrap(); // Expect authority discovery to put a new record onto the dht. assert_eq!(network.put_value_call.lock().unwrap().len(), 1);