client/authority-discovery: Remove sentry node logic (#7368)

* client/authority-discovery: Remove sentry node logic

The notion of sentry nodes has been deprecated (see [1] for details).
This commit removes support for sentry nodes in the
`client/authority-discovery` module.

While removing `Role::Sentry` this commit also introduces
`Role::Discover`, allowing a node to discover addresses of authorities
without publishing ones own addresses. This will be needed in Polkadot
for collator nodes.

[1] https://github.com/paritytech/substrate/issues/6845

* client/authority-discovery/service: Improve PeerId comment
This commit is contained in:
Max Inden
2020-10-26 11:06:56 +01:00
committed by GitHub
parent 52a49e7f51
commit 653868c01e
6 changed files with 69 additions and 214 deletions
+5 -17
View File
@@ -26,7 +26,7 @@ use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider};
use node_primitives::Block;
use node_runtime::RuntimeApi;
use sc_service::{
config::{Role, Configuration}, error::{Error as ServiceError},
config::{Configuration}, error::{Error as ServiceError},
RpcHandlers, TaskManager,
};
use sp_inherents::InherentDataProviders;
@@ -258,21 +258,10 @@ pub fn new_full_base(
}
// Spawn authority discovery module.
if matches!(role, Role::Authority{..} | Role::Sentry {..}) {
let (sentries, authority_discovery_role) = match role {
sc_service::config::Role::Authority { ref sentry_nodes } => (
sentry_nodes.clone(),
sc_authority_discovery::Role::Authority (
keystore_container.keystore(),
),
),
sc_service::config::Role::Sentry {..} => (
vec![],
sc_authority_discovery::Role::Sentry,
),
_ => unreachable!("Due to outer matches! constraint; qed.")
};
if role.is_authority() {
let authority_discovery_role = sc_authority_discovery::Role::PublishAndDiscover(
keystore_container.keystore(),
);
let dht_event_stream = network.event_stream("authority-discovery")
.filter_map(|e| async move { match e {
Event::Dht(e) => Some(e),
@@ -281,7 +270,6 @@ pub fn new_full_base(
let (authority_discovery_worker, _service) = sc_authority_discovery::new_worker_and_service(
client.clone(),
network.clone(),
sentries,
Box::pin(dht_event_stream),
authority_discovery_role,
prometheus_registry.clone(),
@@ -32,7 +32,7 @@ use futures::channel::{mpsc, oneshot};
use futures::Stream;
use sc_client_api::blockchain::HeaderBackend;
use sc_network::{config::MultiaddrWithPeerId, DhtEvent, Multiaddr, PeerId};
use sc_network::{DhtEvent, Multiaddr, PeerId};
use sp_authority_discovery::{AuthorityDiscoveryApi, AuthorityId};
use sp_runtime::traits::Block as BlockT;
use sp_api::ProvideRuntimeApi;
@@ -44,10 +44,11 @@ mod tests;
mod worker;
/// Create a new authority discovery [`Worker`] and [`Service`].
///
/// See the struct documentation of each for more details.
pub fn new_worker_and_service<Client, Network, Block, DhtEventStream>(
client: Arc<Client>,
network: Arc<Network>,
sentry_nodes: Vec<MultiaddrWithPeerId>,
dht_event_rx: DhtEventStream,
role: Role,
prometheus_registry: Option<prometheus_endpoint::Registry>,
@@ -62,7 +63,7 @@ where
let (to_worker, from_service) = mpsc::channel(0);
let worker = Worker::new(
from_service, client, network, sentry_nodes, dht_event_rx, role, prometheus_registry,
from_service, client, network, dht_event_rx, role, prometheus_registry,
);
let service = Service::new(to_worker);
@@ -43,12 +43,12 @@ impl Service {
/// Returns `None` if no entry was present or connection to the
/// [`crate::Worker`] failed.
///
/// [`Multiaddr`]s returned always include a [`PeerId`] via a
/// [`libp2p::core::multiaddr:Protocol::P2p`] component. [`Multiaddr`]s
/// might differ in their [`PeerId`], e.g. when each [`Multiaddr`]
/// represents a different sentry node. This might change once support for
/// sentry nodes is removed (see
/// https://github.com/paritytech/substrate/issues/6845).
/// Note: [`Multiaddr`]s returned always include a [`PeerId`] via a
/// [`libp2p::core::multiaddr:Protocol::P2p`] component. Equality of
/// [`PeerId`]s across [`Multiaddr`]s returned by a single call is not
/// enforced today, given that there are still authorities out there
/// publishing the addresses of their sentry nodes on the DHT. In the future
/// this guarantee can be provided.
pub async fn get_addresses_by_authority_id(&mut self, authority: AuthorityId) -> Option<Vec<Multiaddr>> {
let (tx, rx) = oneshot::channel();
@@ -55,9 +55,8 @@ fn get_addresses_and_authority_id() {
let (mut worker, mut service) = new_worker_and_service(
test_api,
network.clone(),
vec![],
Box::pin(dht_event_rx),
Role::Authority(key_store.into()),
Role::PublishAndDiscover(key_store.into()),
None,
);
worker.inject_addresses(remote_authority_id.clone(), vec![remote_addr.clone()]);
@@ -29,7 +29,6 @@ use futures_timer::Delay;
use addr_cache::AddrCache;
use async_trait::async_trait;
use codec::Decode;
use either::Either;
use libp2p::{core::multiaddr, multihash::Multihash};
use log::{debug, error, log_enabled};
use prometheus_endpoint::{Counter, CounterVec, Gauge, Opts, U64, register};
@@ -37,7 +36,6 @@ use prost::Message;
use rand::{seq::SliceRandom, thread_rng};
use sc_client_api::blockchain::HeaderBackend;
use sc_network::{
config::MultiaddrWithPeerId,
DhtEvent,
ExHashT,
Multiaddr,
@@ -73,68 +71,47 @@ const MAX_ADDRESSES_PER_AUTHORITY: usize = 10;
/// Maximum number of in-flight DHT lookups at any given point in time.
const MAX_IN_FLIGHT_LOOKUPS: usize = 8;
/// Role an authority discovery module can run as.
/// Role an authority discovery [`Worker`] can run as.
pub enum Role {
/// Actual authority as well as a reference to its key store.
Authority(Arc<dyn CryptoStore>),
/// Sentry node that guards an authority.
///
/// No reference to its key store needed, as sentry nodes don't have an identity to sign
/// addresses with in the first place.
Sentry,
/// Publish own addresses and discover addresses of others.
PublishAndDiscover(Arc<dyn CryptoStore>),
/// Discover addresses of others.
Discover,
}
/// A [`Worker`] makes a given authority discoverable and discovers other
/// authorities.
/// An authority discovery [`Worker`] can publish the local node's addresses as well as discover
/// those of other nodes via a Kademlia DHT.
///
/// The [`Worker`] implements the Future trait. By
/// polling [`Worker`] an authority:
/// When constructed with [`Role::PublishAndDiscover`] a [`Worker`] will
///
/// 1. **Makes itself discoverable**
/// 1. Retrieve its external addresses (including peer id).
///
/// 1. Retrieves its external addresses (including peer id) or the ones of
/// its sentry nodes.
/// 2. Get the list of keys owned by the local node participating in the current authority set.
///
/// 2. Signs the above.
/// 3. Sign the addresses with the keys.
///
/// 3. Puts the signature and the addresses on the libp2p Kademlia DHT.
/// 4. Put addresses and signature as a record with the authority id as a key on a Kademlia DHT.
///
/// When constructed with either [`Role::PublishAndDiscover`] or [`Role::Publish`] a [`Worker`] will
///
/// 2. **Discovers other authorities**
/// 1. Retrieve the current and next set of authorities.
///
/// 1. Retrieves the current and next set of authorities.
/// 2. Start DHT queries for the ids of the authorities.
///
/// 2. Starts DHT queries for the ids of the authorities.
/// 3. Validate the signatures of the retrieved key value pairs.
///
/// 3. Validates the signatures of the retrieved key value pairs.
/// 4. Add the retrieved external addresses as priority nodes to the
/// network peerset.
///
/// 4. Adds the retrieved external addresses as priority nodes to the
/// peerset.
///
/// When run as a sentry node, the [`Worker`] does not publish
/// any addresses to the DHT but still discovers validators and sentry nodes of
/// validators, i.e. only step 2 (Discovers other authorities) is executed.
pub struct Worker<Client, Network, Block, DhtEventStream>
where
Block: BlockT + 'static,
Network: NetworkProvider,
Client: ProvideRuntimeApi<Block> + Send + Sync + 'static + HeaderBackend<Block>,
<Client as ProvideRuntimeApi<Block>>::Api: AuthorityDiscoveryApi<Block>,
{
/// Channel receiver for messages send by an [`Service`].
/// 5. Allow querying of the collected addresses via the [`crate::Service`].
pub struct Worker<Client, Network, Block, DhtEventStream> {
/// Channel receiver for messages send by a [`Service`].
from_service: Fuse<mpsc::Receiver<ServicetoWorkerMsg>>,
client: Arc<Client>,
network: Arc<Network>,
/// List of sentry node public addresses.
//
// There are 3 states:
// - None: No addresses were specified.
// - Some(vec![]): Addresses were specified, but none could be parsed as proper
// Multiaddresses.
// - Some(vec![a, b, c, ...]): Valid addresses were specified.
sentry_nodes: Option<Vec<Multiaddr>>,
/// Channel we receive Dht events on.
dht_event_rx: DhtEventStream,
@@ -169,15 +146,11 @@ where
AuthorityDiscoveryApi<Block, Error = sp_blockchain::Error>,
DhtEventStream: Stream<Item = DhtEvent> + Unpin,
{
/// Return a new [`Worker`].
///
/// Note: When specifying `sentry_nodes` this module will not advertise the public addresses of
/// the node itself but only the public addresses of its sentry nodes.
/// Construct a [`Worker`].
pub(crate) fn new(
from_service: mpsc::Receiver<ServicetoWorkerMsg>,
client: Arc<Client>,
network: Arc<Network>,
sentry_nodes: Vec<MultiaddrWithPeerId>,
dht_event_rx: DhtEventStream,
role: Role,
prometheus_registry: Option<prometheus_endpoint::Registry>,
@@ -207,12 +180,6 @@ where
query_interval_duration,
);
let sentry_nodes = if !sentry_nodes.is_empty() {
Some(sentry_nodes.into_iter().map(|ma| ma.concat()).collect::<Vec<_>>())
} else {
None
};
let addr_cache = AddrCache::new();
let metrics = match prometheus_registry {
@@ -232,7 +199,6 @@ where
from_service: from_service.fuse(),
client,
network,
sentry_nodes,
dht_event_rx,
publish_interval,
query_interval,
@@ -313,33 +279,23 @@ where
}
fn addresses_to_publish(&self) -> impl ExactSizeIterator<Item = Multiaddr> {
match &self.sentry_nodes {
Some(addrs) => Either::Left(addrs.clone().into_iter()),
None => {
let peer_id: Multihash = self.network.local_peer_id().into();
Either::Right(
self.network.external_addresses()
.into_iter()
.map(move |a| {
if a.iter().any(|p| matches!(p, multiaddr::Protocol::P2p(_))) {
a
} else {
a.with(multiaddr::Protocol::P2p(peer_id.clone()))
}
}),
)
}
}
let peer_id: Multihash = self.network.local_peer_id().into();
self.network.external_addresses()
.into_iter()
.map(move |a| {
if a.iter().any(|p| matches!(p, multiaddr::Protocol::P2p(_))) {
a
} else {
a.with(multiaddr::Protocol::P2p(peer_id.clone()))
}
})
}
/// Publish either our own or if specified the public addresses of our sentry nodes.
/// Publish own public addresses.
async fn publish_ext_addresses(&mut self) -> Result<()> {
let key_store = match &self.role {
Role::Authority(key_store) => key_store,
// Only authority nodes can put addresses (their own or the ones of their sentry nodes)
// on the Dht. Sentry nodes don't have a known identity to authenticate such addresses,
// thus `publish_ext_addresses` becomes a no-op.
Role::Sentry => return Ok(()),
Role::PublishAndDiscover(key_store) => key_store,
Role::Discover => return Ok(()),
};
let addresses = self.addresses_to_publish();
@@ -394,12 +350,12 @@ where
let id = BlockId::hash(self.client.info().best_hash);
let local_keys = match &self.role {
Role::Authority(key_store) => {
Role::PublishAndDiscover(key_store) => {
key_store.sr25519_public_keys(
key_types::AUTHORITY_DISCOVERY
).await.into_iter().collect::<HashSet<_>>()
},
Role::Sentry => HashSet::new(),
Role::Discover => HashSet::new(),
};
let mut authorities = self
@@ -798,13 +754,7 @@ impl Metrics {
// Helper functions for unit testing.
#[cfg(test)]
impl<Block, Client, Network, DhtEventStream> Worker<Client, Network, Block, DhtEventStream>
where
Block: BlockT + 'static,
Network: NetworkProvider,
Client: ProvideRuntimeApi<Block> + Send + Sync + 'static + HeaderBackend<Block>,
<Client as ProvideRuntimeApi<Block>>::Api: AuthorityDiscoveryApi<Block>,
{
impl<Block, Client, Network, DhtEventStream> Worker<Client, Network, Block, DhtEventStream> {
pub(crate) fn inject_addresses(&mut self, authority: AuthorityId, addresses: Vec<Multiaddr>) {
self.addr_cache.insert(authority, addresses);
}
@@ -303,9 +303,8 @@ fn new_registers_metrics() {
from_service,
test_api,
network.clone(),
vec![],
Box::pin(dht_event_rx),
Role::Authority(key_store.into()),
Role::PublishAndDiscover(key_store.into()),
Some(registry.clone()),
);
@@ -332,9 +331,8 @@ fn triggers_dht_get_query() {
from_service,
test_api,
network.clone(),
vec![],
Box::pin(dht_event_rx),
Role::Authority(key_store.into()),
Role::PublishAndDiscover(key_store.into()),
None,
);
@@ -381,9 +379,8 @@ fn publish_discover_cycle() {
from_service,
test_api,
network.clone(),
vec![],
Box::pin(dht_event_rx),
Role::Authority(key_store.into()),
Role::PublishAndDiscover(key_store.into()),
None,
);
@@ -412,9 +409,8 @@ fn publish_discover_cycle() {
from_service,
test_api,
network.clone(),
vec![],
Box::pin(dht_event_rx),
Role::Authority(key_store.into()),
Role::PublishAndDiscover(key_store.into()),
None,
);
@@ -442,6 +438,7 @@ fn publish_discover_cycle() {
pool.run();
}
/// Don't terminate when sender side of service channel is dropped. Terminate when network event
/// stream terminates.
#[test]
@@ -458,9 +455,8 @@ fn terminate_when_event_stream_terminates() {
from_service,
test_api,
network.clone(),
vec![],
Box::pin(dht_event_rx),
Role::Authority(key_store.into()),
Role::PublishAndDiscover(key_store.into()),
None,
).run();
futures::pin_mut!(worker);
@@ -485,7 +481,8 @@ fn terminate_when_event_stream_terminates() {
"Expect the authority discovery module to terminate once the \
sending side of the dht event channel is closed.",
);
});}
});
}
#[test]
fn dont_stop_polling_dht_event_stream_after_bogus_event() {
@@ -520,9 +517,8 @@ fn dont_stop_polling_dht_event_stream_after_bogus_event() {
from_service,
test_api,
network.clone(),
vec![],
Box::pin(dht_event_rx),
Role::Authority(Arc::new(key_store)),
Role::PublishAndDiscover(Arc::new(key_store)),
None,
);
@@ -569,79 +565,6 @@ fn dont_stop_polling_dht_event_stream_after_bogus_event() {
});
}
/// In the scenario of a validator publishing the address of its sentry node to
/// the DHT, said sentry node should not add its own Multiaddr to the
/// peerset "authority" priority group.
#[test]
fn never_add_own_address_to_priority_group() {
let validator_key_store = KeyStore::new();
let validator_public = block_on(validator_key_store
.sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None))
.unwrap();
let sentry_network: Arc<TestNetwork> = Arc::new(Default::default());
let sentry_multiaddr = {
let peer_id = sentry_network.local_peer_id();
let address: Multiaddr = "/ip6/2001:db8:0:0:0:0:0:2/tcp/30333".parse().unwrap();
address.with(multiaddr::Protocol::P2p(peer_id.into()))
};
// Address of some other sentry node of `validator`.
let random_multiaddr = {
let peer_id = PeerId::random();
let address: Multiaddr = "/ip6/2001:db8:0:0:0:0:0:1/tcp/30333".parse().unwrap();
address.with(multiaddr::Protocol::P2p(
peer_id.into(),
))
};
let dht_event = block_on(build_dht_event(
vec![sentry_multiaddr, random_multiaddr.clone()],
validator_public.into(),
&validator_key_store,
));
let (_dht_event_tx, dht_event_rx) = channel(1);
let sentry_test_api = Arc::new(TestApi {
// Make sure the sentry node identifies its validator as an authority.
authorities: vec![validator_public.into()],
});
let (_to_worker, from_service) = mpsc::channel(0);
let mut sentry_worker = Worker::new(
from_service,
sentry_test_api,
sentry_network.clone(),
vec![],
Box::pin(dht_event_rx),
Role::Sentry,
None,
);
block_on(sentry_worker.refill_pending_lookups_queue()).unwrap();
sentry_worker.start_new_lookups();
sentry_worker.handle_dht_value_found_event(vec![dht_event]).unwrap();
block_on(sentry_worker.set_priority_group()).unwrap();
assert_eq!(
sentry_network.set_priority_group_call.lock().unwrap().len(), 1,
"Expect authority discovery to set the priority set.",
);
assert_eq!(
sentry_network.set_priority_group_call.lock().unwrap()[0],
(
"authorities".to_string(),
HashSet::from_iter(vec![random_multiaddr.clone()].into_iter(),)
),
"Expect authority discovery to only add `random_multiaddr`."
);
}
#[test]
fn limit_number_of_addresses_added_to_cache_per_authority() {
let remote_key_store = KeyStore::new();
@@ -670,9 +593,8 @@ fn limit_number_of_addresses_added_to_cache_per_authority() {
from_service,
Arc::new(TestApi { authorities: vec![remote_public.into()] }),
Arc::new(TestNetwork::default()),
vec![],
Box::pin(dht_event_rx),
Role::Sentry,
Role::Discover,
None,
);
@@ -713,7 +635,6 @@ fn do_not_cache_addresses_without_peer_id() {
let (_dht_event_tx, dht_event_rx) = channel(1);
let local_test_api = Arc::new(TestApi {
// Make sure the sentry node identifies its validator as an authority.
authorities: vec![remote_public.into()],
});
let local_network: Arc<TestNetwork> = Arc::new(Default::default());
@@ -724,9 +645,8 @@ fn do_not_cache_addresses_without_peer_id() {
from_service,
local_test_api,
local_network.clone(),
vec![],
Box::pin(dht_event_rx),
Role::Authority(Arc::new(local_key_store)),
Role::PublishAndDiscover(Arc::new(local_key_store)),
None,
);
@@ -759,9 +679,8 @@ fn addresses_to_publish_adds_p2p() {
authorities: vec![],
}),
network.clone(),
vec![],
Box::pin(dht_event_rx),
Role::Authority(Arc::new(KeyStore::new())),
Role::PublishAndDiscover(Arc::new(KeyStore::new())),
Some(prometheus_endpoint::Registry::new()),
);
@@ -794,9 +713,8 @@ fn addresses_to_publish_respects_existing_p2p_protocol() {
authorities: vec![],
}),
network.clone(),
vec![],
Box::pin(dht_event_rx),
Role::Authority(Arc::new(KeyStore::new())),
Role::PublishAndDiscover(Arc::new(KeyStore::new())),
Some(prometheus_endpoint::Registry::new()),
);
@@ -836,9 +754,8 @@ fn lookup_throttling() {
from_service,
Arc::new(TestApi { authorities: remote_public_keys.clone() }),
network.clone(),
vec![],
dht_event_rx.boxed(),
Role::Sentry,
Role::Discover,
Some(default_registry().clone()),
);