Check every minute whether authority-discovery key has changed (#8575)

* Check every minute whether authority-discovery key has changed

* Fix test

* Fix comment

* Use HashSet for latest_published_keys

* More fixing

* God I'm tired, sorry
This commit is contained in:
Pierre Krieger
2021-04-08 18:41:23 +02:00
committed by GitHub
parent 1f67933afc
commit 1b939bcb53
3 changed files with 44 additions and 9 deletions
@@ -53,6 +53,11 @@ pub struct WorkerConfig {
/// ///
/// By default this is set to 1 hour. /// By default this is set to 1 hour.
pub max_publish_interval: Duration, pub max_publish_interval: Duration,
/// Interval at which the keystore is queried. If the keys have changed, unconditionally
/// re-publish its addresses on the DHT.
///
/// By default this is set to 1 minute.
pub keystore_refresh_interval: Duration,
/// The maximum interval in which the node will query the DHT for new entries. /// The maximum interval in which the node will query the DHT for new entries.
/// ///
/// By default this is set to 10 minutes. /// By default this is set to 10 minutes.
@@ -67,6 +72,7 @@ impl Default for WorkerConfig {
// not depend on the republishing process, thus publishing own external addresses should // not depend on the republishing process, thus publishing own external addresses should
// happen on an interval < 36h. // happen on an interval < 36h.
max_publish_interval: Duration::from_secs(1 * 60 * 60), max_publish_interval: Duration::from_secs(1 * 60 * 60),
keystore_refresh_interval: Duration::from_secs(60),
// External addresses of remote authorities can change at any given point in time. The // External addresses of remote authorities can change at any given point in time. The
// interval on which to trigger new queries for the current and next authorities is a trade // interval on which to trigger new queries for the current and next authorities is a trade
// off between efficiency and performance. // off between efficiency and performance.
@@ -25,7 +25,7 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::{FutureExt, Stream, StreamExt, stream::Fuse}; use futures::{future, FutureExt, Stream, StreamExt, stream::Fuse};
use addr_cache::AddrCache; use addr_cache::AddrCache;
use async_trait::async_trait; use async_trait::async_trait;
@@ -44,7 +44,7 @@ use sc_network::{
PeerId, PeerId,
}; };
use sp_authority_discovery::{AuthorityDiscoveryApi, AuthorityId, AuthoritySignature, AuthorityPair}; use sp_authority_discovery::{AuthorityDiscoveryApi, AuthorityId, AuthoritySignature, AuthorityPair};
use sp_core::crypto::{key_types, Pair}; use sp_core::crypto::{key_types, CryptoTypePublicPair, Pair};
use sp_keystore::CryptoStore; use sp_keystore::CryptoStore;
use sp_runtime::{traits::Block as BlockT, generic::BlockId}; use sp_runtime::{traits::Block as BlockT, generic::BlockId};
use sp_api::ProvideRuntimeApi; use sp_api::ProvideRuntimeApi;
@@ -109,6 +109,13 @@ pub struct Worker<Client, Network, Block, DhtEventStream> {
/// Interval to be proactive, publishing own addresses. /// Interval to be proactive, publishing own addresses.
publish_interval: ExpIncInterval, publish_interval: ExpIncInterval,
/// Pro-actively publish our own addresses at this interval, if the keys in the keystore
/// have changed.
publish_if_changed_interval: ExpIncInterval,
/// List of keys onto which addresses have been published at the latest publication.
/// Used to check whether they have changed.
latest_published_keys: HashSet<CryptoTypePublicPair>,
/// Interval at which to request addresses of authorities, refilling the pending lookups queue. /// Interval at which to request addresses of authorities, refilling the pending lookups queue.
query_interval: ExpIncInterval, query_interval: ExpIncInterval,
@@ -160,6 +167,13 @@ where
config.max_query_interval, config.max_query_interval,
); );
// An `ExpIncInterval` is overkill here because the interval is constant, but consistency
// is more simple.
let publish_if_changed_interval = ExpIncInterval::new(
config.keystore_refresh_interval,
config.keystore_refresh_interval
);
let addr_cache = AddrCache::new(); let addr_cache = AddrCache::new();
let metrics = match prometheus_registry { let metrics = match prometheus_registry {
@@ -181,6 +195,8 @@ where
network, network,
dht_event_rx, dht_event_rx,
publish_interval, publish_interval,
publish_if_changed_interval,
latest_published_keys: HashSet::new(),
query_interval, query_interval,
pending_lookups: Vec::new(), pending_lookups: Vec::new(),
in_flight_lookups: HashMap::new(), in_flight_lookups: HashMap::new(),
@@ -212,8 +228,11 @@ where
self.process_message_from_service(msg); self.process_message_from_service(msg);
}, },
// Publish own addresses. // Publish own addresses.
_ = self.publish_interval.next().fuse() => { only_if_changed = future::select(
if let Err(e) = self.publish_ext_addresses().await { self.publish_interval.next().map(|_| false),
self.publish_if_changed_interval.next().map(|_| true)
).map(|e| e.factor_first().0).fuse() => {
if let Err(e) = self.publish_ext_addresses(only_if_changed).await {
error!( error!(
target: LOG_TARGET, target: LOG_TARGET,
"Failed to publish external addresses: {:?}", e, "Failed to publish external addresses: {:?}", e,
@@ -262,7 +281,10 @@ where
} }
/// Publish own public addresses. /// Publish own public addresses.
async fn publish_ext_addresses(&mut self) -> Result<()> { ///
/// If `only_if_changed` is true, the function has no effect if the list of keys to publish
/// is equal to `self.latest_published_keys`.
async fn publish_ext_addresses(&mut self, only_if_changed: bool) -> Result<()> {
let key_store = match &self.role { let key_store = match &self.role {
Role::PublishAndDiscover(key_store) => key_store, Role::PublishAndDiscover(key_store) => key_store,
Role::Discover => return Ok(()), Role::Discover => return Ok(()),
@@ -285,15 +307,20 @@ where
let keys = Worker::<Client, Network, Block, DhtEventStream>::get_own_public_keys_within_authority_set( let keys = Worker::<Client, Network, Block, DhtEventStream>::get_own_public_keys_within_authority_set(
key_store.clone(), key_store.clone(),
self.client.as_ref(), self.client.as_ref(),
).await?.into_iter().map(Into::into).collect::<Vec<_>>(); ).await?.into_iter().map(Into::into).collect::<HashSet<_>>();
if only_if_changed && keys == self.latest_published_keys {
return Ok(())
}
let keys_vec = keys.iter().cloned().collect::<Vec<_>>();
let signatures = key_store.sign_with_all( let signatures = key_store.sign_with_all(
key_types::AUTHORITY_DISCOVERY, key_types::AUTHORITY_DISCOVERY,
keys.clone(), keys_vec.clone(),
serialized_addresses.as_slice(), serialized_addresses.as_slice(),
).await.map_err(|_| Error::Signing)?; ).await.map_err(|_| Error::Signing)?;
for (sign_result, key) in signatures.into_iter().zip(keys) { for (sign_result, key) in signatures.into_iter().zip(keys_vec.iter()) {
let mut signed_addresses = vec![]; let mut signed_addresses = vec![];
// Verify that all signatures exist for all provided keys. // Verify that all signatures exist for all provided keys.
@@ -313,6 +340,8 @@ where
); );
} }
self.latest_published_keys = keys;
Ok(()) Ok(())
} }
@@ -294,7 +294,7 @@ fn publish_discover_cycle() {
Default::default(), Default::default(),
); );
worker.publish_ext_addresses().await.unwrap(); worker.publish_ext_addresses(false).await.unwrap();
// Expect authority discovery to put a new record onto the dht. // Expect authority discovery to put a new record onto the dht.
assert_eq!(network.put_value_call.lock().unwrap().len(), 1); assert_eq!(network.put_value_call.lock().unwrap().len(), 1);