client: Introduce --sentry-nodes flag (#4285)

* client/authority-discovery: Add smoke tests for intervall_at

* client/authority-discovery: Fix interval_at to fire on start

* .maintain/sentry-node: Update docker compose file

* client: Introduce --sentry-nodes flag

Enable operators to specify the public addresses of sentry nodes infront
of a validator node so that the validator node can announce the sentry
node addresses instead of its own public addresses on the DHT via the
authority discovery module.

* client/authority-discovery: Break lines at 100 characters

Limit line length to 100 instead of 120 characters.
This commit is contained in:
Max Inden
2019-12-03 18:03:08 +01:00
committed by Gavin Wood
parent 5ec0923285
commit 2de6f82b8a
7 changed files with 205 additions and 80 deletions
+171 -59
View File
@@ -18,8 +18,9 @@
//! Substrate authority discovery.
//!
//! This crate enables Substrate authorities to directly connect to other authorities. [`AuthorityDiscovery`] implements
//! the Future trait. By polling [`AuthorityDiscovery`] an authority:
//! This crate enables Substrate authorities to directly connect to other authorities.
//! [`AuthorityDiscovery`] implements the Future trait. By polling [`AuthorityDiscovery`] an
//! authority:
//!
//!
//! 1. **Makes itself discoverable**
@@ -54,11 +55,14 @@ use futures::task::{Context, Poll};
use futures::{Future, FutureExt, Stream, StreamExt};
use futures_timer::Delay;
use authority_discovery_primitives::{AuthorityDiscoveryApi, AuthorityId, AuthoritySignature, AuthorityPair};
use authority_discovery_primitives::{
AuthorityDiscoveryApi, AuthorityId, AuthoritySignature, AuthorityPair
};
use client_api::blockchain::HeaderBackend;
use codec::{Decode, Encode};
use error::{Error, Result};
use log::{debug, error, log_enabled, warn};
use libp2p::Multiaddr;
use network::specialization::NetworkSpecialization;
use network::{DhtEvent, ExHashT};
use primitives::crypto::{key_types, Pair};
@@ -78,7 +82,8 @@ mod schema {
/// Upper bound estimation on how long one should wait before accessing the Kademlia DHT.
const LIBP2P_KADEMLIA_BOOTSTRAP_TIME: Duration = Duration::from_secs(30);
/// Name of the Substrate peerset priority group for authorities discovered through the authority discovery module.
/// Name of the Substrate peerset priority group for authorities discovered through the authority
/// discovery module.
const AUTHORITIES_PRIORITY_GROUP_NAME: &'static str = "authorities";
/// An `AuthorityDiscovery` makes a given authority discoverable and discovers other authorities.
@@ -93,6 +98,14 @@ where
client: Arc<Client>,
network: Arc<Network>,
/// List of sentry node public addresses.
//
// There are 3 states:
// - None: No addresses were specified.
// - Some(vec![]): Addresses were specified, but none could be parsed as proper
// Multiaddresses.
// - Some(vec![a, b, c, ...]): Valid addresses were specified.
sentry_nodes: Option<Vec<Multiaddr>>,
/// Channel we receive Dht events on.
dht_event_rx: Pin<Box<dyn Stream<Item = DhtEvent> + Send>>,
@@ -103,11 +116,12 @@ where
/// Interval on which to query for addresses of other authorities.
query_interval: Interval,
/// The network peerset interface for priority groups lets us only set an entire group, but we retrieve the
/// addresses of other authorities one by one from the network. To use the peerset interface we need to cache the
/// addresses and always overwrite the entire peerset priority group. To ensure this map doesn't grow indefinitely
/// `purge_old_authorities_from_cache` function is called each time we add a new entry.
address_cache: HashMap<AuthorityId, Vec<libp2p::Multiaddr>>,
/// The network peerset interface for priority groups lets us only set an entire group, but we
/// retrieve the addresses of other authorities one by one from the network. To use the peerset
/// interface we need to cache the addresses and always overwrite the entire peerset priority
/// group. To ensure this map doesn't grow indefinitely `purge_old_authorities_from_cache`
/// function is called each time we add a new entry.
address_cache: HashMap<AuthorityId, Vec<Multiaddr>>,
phantom: PhantomData<Block>,
}
@@ -121,32 +135,54 @@ where
Self: Future<Output = ()>,
{
/// Return a new authority discovery.
///
/// Note: When specifying `sentry_nodes` this module will not advertise the public addresses of
/// the node itself but only the public addresses of its sentry nodes.
pub fn new(
client: Arc<Client>,
network: Arc<Network>,
sentry_nodes: Vec<String>,
key_store: BareCryptoStorePtr,
dht_event_rx: Pin<Box<dyn Stream<Item = DhtEvent> + Send>>,
) -> Self {
// Kademlia's default time-to-live for Dht records is 36h, republishing records every 24h. Given that a node
// could restart at any point in time, one can not depend on the republishing process, thus publishing own
// external addresses should happen on an interval < 36h.
// Kademlia's default time-to-live for Dht records is 36h, republishing records every 24h.
// Given that a node could restart at any point in time, one can not depend on the
// republishing process, thus publishing own external addresses should happen on an interval
// < 36h.
let publish_interval = interval_at(
Instant::now() + LIBP2P_KADEMLIA_BOOTSTRAP_TIME,
Duration::from_secs(12 * 60 * 60),
);
// External addresses of other authorities can change at any given point in time. The interval on which to query
// for external addresses of other authorities is a trade off between efficiency and performance.
// External addresses of other authorities can change at any given point in time. The
// interval on which to query for external addresses of other authorities is a trade off
// between efficiency and performance.
let query_interval = interval_at(
Instant::now() + LIBP2P_KADEMLIA_BOOTSTRAP_TIME,
Duration::from_secs(10 * 60),
);
let sentry_nodes = if !sentry_nodes.is_empty() {
Some(sentry_nodes.into_iter().filter_map(|a| match a.parse() {
Ok(addr) => Some(addr),
Err(e) => {
error!(
target: "sub-authority-discovery",
"Failed to parse sentry node public address '{:?}', continuing anyways.", e,
);
None
}
}).collect())
} else {
None
};
let address_cache = HashMap::new();
AuthorityDiscovery {
client,
network,
sentry_nodes,
dht_event_rx,
key_store,
publish_interval,
@@ -156,18 +192,20 @@ where
}
}
fn publish_own_ext_addresses(&mut self) -> Result<()> {
let addresses = self
.network
.external_addresses()
.into_iter()
.map(|a| {
a.with(libp2p::core::multiaddr::Protocol::P2p(
/// Publish either our own or if specified the public addresses of our sentry nodes.
fn publish_ext_addresses(&mut self) -> Result<()> {
let addresses = match &self.sentry_nodes {
Some(addrs) => addrs.clone().into_iter()
.map(|a| a.to_vec())
.collect(),
None => self.network.external_addresses()
.into_iter()
.map(|a| a.with(libp2p::core::multiaddr::Protocol::P2p(
self.network.local_peer_id().into(),
))
})
.map(|a| a.to_vec())
.collect();
)))
.map(|a| a.to_vec())
.collect(),
};
let mut serialized_addresses = vec![];
schema::AuthorityAddresses { addresses }
@@ -217,7 +255,10 @@ where
DhtEvent::ValueFound(v) => {
if log_enabled!(log::Level::Debug) {
let hashes = v.iter().map(|(hash, _value)| hash.clone());
debug!(target: "sub-authority-discovery", "Value for hash '{:?}' found on Dht.", hashes);
debug!(
target: "sub-authority-discovery",
"Value for hash '{:?}' found on Dht.", hashes,
);
}
self.handle_dht_value_found_event(v)?;
@@ -247,8 +288,9 @@ where
let block_id = BlockId::hash(self.client.info().best_hash);
// From the Dht we only get the hashed authority id. In order to retrieve the actual authority id and to ensure
// it is actually an authority, we match the hash against the hash of the authority id of all other authorities.
// From the Dht we only get the hashed authority id. In order to retrieve the actual
// authority id and to ensure it is actually an authority, we match the hash against the
// hash of the authority id of all other authorities.
let authorities = self.client.runtime_api().authorities(&block_id)?;
self.purge_old_authorities_from_cache(&authorities);
@@ -267,7 +309,8 @@ where
signature,
addresses,
} = schema::SignedAuthorityAddresses::decode(value).map_err(Error::DecodingProto)?;
let signature = AuthoritySignature::decode(&mut &signature[..]).map_err(Error::EncodingDecodingScale)?;
let signature = AuthoritySignature::decode(&mut &signature[..])
.map_err(Error::EncodingDecodingScale)?;
if !AuthorityPair::verify(&signature, &addresses, authority_id) {
return Err(Error::VerifyingDhtPayload);
@@ -293,7 +336,10 @@ where
.flatten(),
);
debug!(target: "sub-authority-discovery", "Applying priority group {:#?} to peerset.", addresses);
debug!(
target: "sub-authority-discovery",
"Applying priority group {:#?} to peerset.", addresses,
);
self.network
.set_priority_group(AUTHORITIES_PRIORITY_GROUP_NAME.to_string(), addresses)
.map_err(Error::SettingPeersetPriorityGroup)?;
@@ -368,20 +414,20 @@ where
self.handle_dht_events(cx)?;
if let Poll::Ready(_) = self.publish_interval.poll_next_unpin(cx) {
// Make sure to call interval.poll until it returns Async::NotReady once. Otherwise, in case one of the
// function calls within this block do a `return`, we don't call `interval.poll` again and thereby the
// underlying Tokio task is never registered with Tokio's Reactor to be woken up on the next interval
// tick.
// Make sure to call interval.poll until it returns Async::NotReady once. Otherwise,
// in case one of the function calls within this block do a `return`, we don't call
// `interval.poll` again and thereby the underlying Tokio task is never registered
// with Tokio's Reactor to be woken up on the next interval tick.
while let Poll::Ready(_) = self.publish_interval.poll_next_unpin(cx) {}
self.publish_own_ext_addresses()?;
self.publish_ext_addresses()?;
}
if let Poll::Ready(_) = self.query_interval.poll_next_unpin(cx) {
// Make sure to call interval.poll until it returns Async::NotReady once. Otherwise, in case one of the
// function calls within this block do a `return`, we don't call `interval.poll` again and thereby the
// underlying Tokio task is never registered with Tokio's Reactor to be woken up on the next interval
// tick.
// Make sure to call interval.poll until it returns Async::NotReady once. Otherwise,
// in case one of the function calls within this block do a `return`, we don't call
// `interval.poll` again and thereby the underlying Tokio task is never registered
// with Tokio's Reactor to be woken up on the next interval tick.
while let Poll::Ready(_) = self.query_interval.poll_next_unpin(cx) {}
self.request_addresses_of_others()?;
@@ -395,13 +441,15 @@ where
Err(e) => error!(target: "sub-authority-discovery", "Poll failure: {:?}", e),
};
// Make sure to always return NotReady as this is a long running task with the same lifetime as the node itself.
// Make sure to always return NotReady as this is a long running task with the same lifetime
// as the node itself.
Poll::Pending
}
}
/// NetworkProvider provides AuthorityDiscovery with all necessary hooks into the underlying Substrate networking. Using
/// this trait abstraction instead of NetworkService directly is necessary to unit test AuthorityDiscovery.
/// NetworkProvider provides AuthorityDiscovery with all necessary hooks into the underlying
/// Substrate networking. Using this trait abstraction instead of NetworkService directly is
/// necessary to unit test AuthorityDiscovery.
pub trait NetworkProvider {
/// Returns the local external addresses.
fn external_addresses(&self) -> Vec<libp2p::Multiaddr>;
@@ -457,14 +505,11 @@ fn hash_authority_id(id: &[u8]) -> Result<libp2p::kad::record::Key> {
}
fn interval_at(start: Instant, duration: Duration) -> Interval {
let stream = futures::stream::unfold((), move |_| {
let wait_time = start.saturating_duration_since(Instant::now());
let stream = futures::stream::unfold(start, move |next| {
let time_until_next = next.saturating_duration_since(Instant::now());
futures::future::join(
Delay::new(wait_time),
Delay::new(duration)
).map(|_| Some(((), ())))
}).map(drop);
Delay::new(time_until_next).map(move |_| Some(((), next + duration)))
});
Box::new(stream)
}
@@ -482,6 +527,67 @@ mod tests {
use std::sync::{Arc, Mutex};
use test_client::runtime::Block;
#[test]
fn interval_at_with_start_now() {
let start = Instant::now();
let mut interval = interval_at(
std::time::Instant::now(),
std::time::Duration::from_secs(10),
);
futures::executor::block_on(async {
interval.next().await;
});
assert!(
Instant::now().saturating_duration_since(start) < Duration::from_secs(1),
"Expected low resolution instant interval to fire within less than a second.",
);
}
#[test]
fn interval_at_is_queuing_events() {
let start = Instant::now();
let interval = interval_at(
std::time::Instant::now(),
std::time::Duration::from_millis(10),
);
// Let's wait for 100ms, thus 10 elements should be queued up.
std::thread::sleep(Duration::from_millis(100));
futures::executor::block_on(async {
interval.take(10).collect::<Vec<()>>().await;
});
// Make sure we did not just wait for yet another 100ms (10 elements).
assert!(
Instant::now().saturating_duration_since(start) < Duration::from_millis(150),
"Expect interval to /queue/ events when not polled for a while.",
);
}
#[test]
fn interval_at_with_initial_delay() {
let start = Instant::now();
let mut interval = interval_at(
std::time::Instant::now() + Duration::from_millis(100),
std::time::Duration::from_secs(10),
);
futures::executor::block_on(async {
interval.next().await;
});
assert!(
Instant::now().saturating_duration_since(start) > Duration::from_millis(100),
"Expected interval with initial delay not to fire right away.",
);
}
#[derive(Clone)]
struct TestApi {
authorities: Vec<AuthorityId>,
@@ -612,7 +718,8 @@ mod tests {
#[derive(Default)]
struct TestNetwork {
// Whenever functions on `TestNetwork` are called, the function arguments are added to the vectors below.
// Whenever functions on `TestNetwork` are called, the function arguments are added to the
// vectors below.
pub put_value_call: Arc<Mutex<Vec<(libp2p::kad::record::Key, Vec<u8>)>>>,
pub get_value_call: Arc<Mutex<Vec<libp2p::kad::record::Key>>>,
pub set_priority_group_call: Arc<Mutex<Vec<(String, HashSet<libp2p::Multiaddr>)>>>,
@@ -645,17 +752,20 @@ mod tests {
}
#[test]
fn publish_own_ext_addresses_puts_record_on_dht() {
fn publish_ext_addresses_puts_record_on_dht() {
let (_dht_event_tx, dht_event_rx) = channel(1000);
let network: Arc<TestNetwork> = Arc::new(Default::default());
let key_store = KeyStore::new();
let public = key_store.write().sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None).unwrap();
let public = key_store.write()
.sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None)
.unwrap();
let test_api = Arc::new(TestApi {authorities: vec![public.into()]});
let mut authority_discovery =
AuthorityDiscovery::new(test_api, network.clone(), key_store, dht_event_rx.boxed());
let mut authority_discovery = AuthorityDiscovery::new(
test_api, network.clone(), vec![], key_store, dht_event_rx.boxed(),
);
authority_discovery.publish_own_ext_addresses().unwrap();
authority_discovery.publish_ext_addresses().unwrap();
// Expect authority discovery to put a new record onto the dht.
assert_eq!(network.put_value_call.lock().unwrap().len(), 1);
@@ -676,8 +786,9 @@ mod tests {
let network: Arc<TestNetwork> = Arc::new(Default::default());
let key_store = KeyStore::new();
let mut authority_discovery =
AuthorityDiscovery::new(test_api, network.clone(), key_store, dht_event_rx.boxed());
let mut authority_discovery = AuthorityDiscovery::new(
test_api, network.clone(), vec![], key_store, dht_event_rx.boxed(),
);
authority_discovery.request_addresses_of_others().unwrap();
@@ -695,8 +806,9 @@ mod tests {
let network: Arc<TestNetwork> = Arc::new(Default::default());
let key_store = KeyStore::new();
let mut authority_discovery =
AuthorityDiscovery::new(test_api, network.clone(), key_store, dht_event_rx.boxed());
let mut authority_discovery = AuthorityDiscovery::new(
test_api, network.clone(), vec![], key_store, dht_event_rx.boxed(),
);
// Create sample dht event.